1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
38
from bzrlib.errors import (NotBranchError,
40
UnsupportedFormatError,
42
from bzrlib.tests import (
44
TestCaseWithMemoryTransport,
45
TestCaseWithTransport,
48
from bzrlib.tests import(
52
from bzrlib.tests.test_http import TestWithTransport_pycurl
53
from bzrlib.transport import (
57
from bzrlib.transport.http._urllib import HttpTransport_urllib
58
from bzrlib.transport.nosmart import NoSmartTransportDecorator
59
from bzrlib.transport.readonly import ReadonlyTransportDecorator
60
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
63
class TestDefaultFormat(TestCase):
65
def test_get_set_default_format(self):
66
old_format = bzrdir.BzrDirFormat.get_default_format()
67
# default is BzrDirFormat6
68
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
69
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
70
# creating a bzr dir should now create an instrumented dir.
72
result = bzrdir.BzrDir.create('memory:///')
73
self.failUnless(isinstance(result, SampleBzrDir))
75
bzrdir.BzrDirFormat._set_default_format(old_format)
76
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
79
class TestFormatRegistry(TestCase):
81
def make_format_registry(self):
82
my_format_registry = bzrdir.BzrDirFormatRegistry()
83
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
84
'Pre-0.8 format. Slower and does not support checkouts or shared'
85
' repositories', deprecated=True)
86
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
87
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
88
my_format_registry.register_metadir('knit',
89
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
92
my_format_registry.set_default('knit')
93
my_format_registry.register_metadir(
95
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
96
'Experimental successor to knit. Use at your own risk.',
97
branch_format='bzrlib.branch.BzrBranchFormat6',
99
my_format_registry.register_metadir(
101
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
102
'Experimental successor to knit. Use at your own risk.',
103
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
104
my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
105
'Pre-0.8 format. Slower and does not support checkouts or shared'
106
' repositories', hidden=True)
107
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
108
'BzrDirFormat6', 'Format registered lazily', deprecated=True,
110
return my_format_registry
112
def test_format_registry(self):
113
my_format_registry = self.make_format_registry()
114
my_bzrdir = my_format_registry.make_bzrdir('lazy')
115
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
116
my_bzrdir = my_format_registry.make_bzrdir('weave')
117
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
118
my_bzrdir = my_format_registry.make_bzrdir('default')
119
self.assertIsInstance(my_bzrdir.repository_format,
120
knitrepo.RepositoryFormatKnit1)
121
my_bzrdir = my_format_registry.make_bzrdir('knit')
122
self.assertIsInstance(my_bzrdir.repository_format,
123
knitrepo.RepositoryFormatKnit1)
124
my_bzrdir = my_format_registry.make_bzrdir('branch6')
125
self.assertIsInstance(my_bzrdir.get_branch_format(),
126
bzrlib.branch.BzrBranchFormat6)
128
def test_get_help(self):
129
my_format_registry = self.make_format_registry()
130
self.assertEqual('Format registered lazily',
131
my_format_registry.get_help('lazy'))
132
self.assertEqual('Format using knits',
133
my_format_registry.get_help('knit'))
134
self.assertEqual('Format using knits',
135
my_format_registry.get_help('default'))
136
self.assertEqual('Pre-0.8 format. Slower and does not support'
137
' checkouts or shared repositories',
138
my_format_registry.get_help('weave'))
140
def test_help_topic(self):
141
topics = help_topics.HelpTopicRegistry()
142
registry = self.make_format_registry()
143
topics.register('current-formats', registry.help_topic,
145
topics.register('other-formats', registry.help_topic,
147
new = topics.get_detail('current-formats')
148
rest = topics.get_detail('other-formats')
149
experimental, deprecated = rest.split('Deprecated formats')
150
self.assertContainsRe(new, 'formats-help')
151
self.assertContainsRe(new,
152
':knit:\n \(native\) \(default\) Format using knits\n')
153
self.assertContainsRe(experimental,
154
':branch6:\n \(native\) Experimental successor to knit')
155
self.assertContainsRe(deprecated,
156
':lazy:\n \(native\) Format registered lazily\n')
157
self.assertNotContainsRe(new, 'hidden')
159
def test_set_default_repository(self):
160
default_factory = bzrdir.format_registry.get('default')
161
old_default = [k for k, v in bzrdir.format_registry.iteritems()
162
if v == default_factory and k != 'default'][0]
163
bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
165
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
166
bzrdir.format_registry.get('default'))
168
repository.RepositoryFormat.get_default_format().__class__,
169
knitrepo.RepositoryFormatKnit3)
171
bzrdir.format_registry.set_default_repository(old_default)
173
def test_aliases(self):
174
a_registry = bzrdir.BzrDirFormatRegistry()
175
a_registry.register('weave', bzrdir.BzrDirFormat6,
176
'Pre-0.8 format. Slower and does not support checkouts or shared'
177
' repositories', deprecated=True)
178
a_registry.register('weavealias', bzrdir.BzrDirFormat6,
179
'Pre-0.8 format. Slower and does not support checkouts or shared'
180
' repositories', deprecated=True, alias=True)
181
self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
184
class SampleBranch(bzrlib.branch.Branch):
185
"""A dummy branch for guess what, dummy use."""
187
def __init__(self, dir):
191
class SampleRepository(bzrlib.repository.Repository):
194
def __init__(self, dir):
198
class SampleBzrDir(bzrdir.BzrDir):
199
"""A sample BzrDir implementation to allow testing static methods."""
201
def create_repository(self, shared=False):
202
"""See BzrDir.create_repository."""
203
return "A repository"
205
def open_repository(self):
206
"""See BzrDir.open_repository."""
207
return SampleRepository(self)
209
def create_branch(self):
210
"""See BzrDir.create_branch."""
211
return SampleBranch(self)
213
def create_workingtree(self):
214
"""See BzrDir.create_workingtree."""
218
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
221
this format is initializable, unsupported to aid in testing the
222
open and open_downlevel routines.
225
def get_format_string(self):
226
"""See BzrDirFormat.get_format_string()."""
227
return "Sample .bzr dir format."
229
def initialize_on_transport(self, t):
230
"""Create a bzr dir."""
232
t.put_bytes('.bzr/branch-format', self.get_format_string())
233
return SampleBzrDir(t, self)
235
def is_supported(self):
238
def open(self, transport, _found=None):
239
return "opened branch."
242
class TestBzrDirFormat(TestCaseWithTransport):
243
"""Tests for the BzrDirFormat facility."""
245
def test_find_format(self):
246
# is the right format object found for a branch?
247
# create a branch with a few known format objects.
248
# this is not quite the same as
249
t = get_transport(self.get_url())
250
self.build_tree(["foo/", "bar/"], transport=t)
251
def check_format(format, url):
252
format.initialize(url)
253
t = get_transport(url)
254
found_format = bzrdir.BzrDirFormat.find_format(t)
255
self.failUnless(isinstance(found_format, format.__class__))
256
check_format(bzrdir.BzrDirFormat5(), "foo")
257
check_format(bzrdir.BzrDirFormat6(), "bar")
259
def test_find_format_nothing_there(self):
260
self.assertRaises(NotBranchError,
261
bzrdir.BzrDirFormat.find_format,
264
def test_find_format_unknown_format(self):
265
t = get_transport(self.get_url())
267
t.put_bytes('.bzr/branch-format', '')
268
self.assertRaises(UnknownFormatError,
269
bzrdir.BzrDirFormat.find_format,
272
def test_register_unregister_format(self):
273
format = SampleBzrDirFormat()
276
format.initialize(url)
277
# register a format for it.
278
bzrdir.BzrDirFormat.register_format(format)
279
# which bzrdir.Open will refuse (not supported)
280
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
281
# which bzrdir.open_containing will refuse (not supported)
282
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
283
# but open_downlevel will work
284
t = get_transport(url)
285
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
286
# unregister the format
287
bzrdir.BzrDirFormat.unregister_format(format)
288
# now open_downlevel should fail too.
289
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
291
def test_create_branch_and_repo_uses_default(self):
292
format = SampleBzrDirFormat()
293
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
295
self.assertTrue(isinstance(branch, SampleBranch))
297
def test_create_branch_and_repo_under_shared(self):
298
# creating a branch and repo in a shared repo uses the
300
format = bzrdir.format_registry.make_bzrdir('knit')
301
self.make_repository('.', shared=True, format=format)
302
branch = bzrdir.BzrDir.create_branch_and_repo(
303
self.get_url('child'), format=format)
304
self.assertRaises(errors.NoRepositoryPresent,
305
branch.bzrdir.open_repository)
307
def test_create_branch_and_repo_under_shared_force_new(self):
308
# creating a branch and repo in a shared repo can be forced to
310
format = bzrdir.format_registry.make_bzrdir('knit')
311
self.make_repository('.', shared=True, format=format)
312
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
315
branch.bzrdir.open_repository()
317
def test_create_standalone_working_tree(self):
318
format = SampleBzrDirFormat()
319
# note this is deliberately readonly, as this failure should
320
# occur before any writes.
321
self.assertRaises(errors.NotLocalUrl,
322
bzrdir.BzrDir.create_standalone_workingtree,
323
self.get_readonly_url(), format=format)
324
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
326
self.assertEqual('A tree', tree)
328
def test_create_standalone_working_tree_under_shared_repo(self):
329
# create standalone working tree always makes a repo.
330
format = bzrdir.format_registry.make_bzrdir('knit')
331
self.make_repository('.', shared=True, format=format)
332
# note this is deliberately readonly, as this failure should
333
# occur before any writes.
334
self.assertRaises(errors.NotLocalUrl,
335
bzrdir.BzrDir.create_standalone_workingtree,
336
self.get_readonly_url('child'), format=format)
337
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
339
tree.bzrdir.open_repository()
341
def test_create_branch_convenience(self):
342
# outside a repo the default convenience output is a repo+branch_tree
343
format = bzrdir.format_registry.make_bzrdir('knit')
344
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
345
branch.bzrdir.open_workingtree()
346
branch.bzrdir.open_repository()
348
def test_create_branch_convenience_possible_transports(self):
349
"""Check that the optional 'possible_transports' is recognized"""
350
format = bzrdir.format_registry.make_bzrdir('knit')
351
t = self.get_transport()
352
branch = bzrdir.BzrDir.create_branch_convenience(
353
'.', format=format, possible_transports=[t])
354
branch.bzrdir.open_workingtree()
355
branch.bzrdir.open_repository()
357
def test_create_branch_convenience_root(self):
358
"""Creating a branch at the root of a fs should work."""
359
self.vfs_transport_factory = memory.MemoryServer
360
# outside a repo the default convenience output is a repo+branch_tree
361
format = bzrdir.format_registry.make_bzrdir('knit')
362
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
364
self.assertRaises(errors.NoWorkingTree,
365
branch.bzrdir.open_workingtree)
366
branch.bzrdir.open_repository()
368
def test_create_branch_convenience_under_shared_repo(self):
369
# inside a repo the default convenience output is a branch+ follow the
371
format = bzrdir.format_registry.make_bzrdir('knit')
372
self.make_repository('.', shared=True, format=format)
373
branch = bzrdir.BzrDir.create_branch_convenience('child',
375
branch.bzrdir.open_workingtree()
376
self.assertRaises(errors.NoRepositoryPresent,
377
branch.bzrdir.open_repository)
379
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
380
# inside a repo the default convenience output is a branch+ follow the
381
# repo tree policy but we can override that
382
format = bzrdir.format_registry.make_bzrdir('knit')
383
self.make_repository('.', shared=True, format=format)
384
branch = bzrdir.BzrDir.create_branch_convenience('child',
385
force_new_tree=False, format=format)
386
self.assertRaises(errors.NoWorkingTree,
387
branch.bzrdir.open_workingtree)
388
self.assertRaises(errors.NoRepositoryPresent,
389
branch.bzrdir.open_repository)
391
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
392
# inside a repo the default convenience output is a branch+ follow the
394
format = bzrdir.format_registry.make_bzrdir('knit')
395
repo = self.make_repository('.', shared=True, format=format)
396
repo.set_make_working_trees(False)
397
branch = bzrdir.BzrDir.create_branch_convenience('child',
399
self.assertRaises(errors.NoWorkingTree,
400
branch.bzrdir.open_workingtree)
401
self.assertRaises(errors.NoRepositoryPresent,
402
branch.bzrdir.open_repository)
404
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
405
# inside a repo the default convenience output is a branch+ follow the
406
# repo tree policy but we can override that
407
format = bzrdir.format_registry.make_bzrdir('knit')
408
repo = self.make_repository('.', shared=True, format=format)
409
repo.set_make_working_trees(False)
410
branch = bzrdir.BzrDir.create_branch_convenience('child',
411
force_new_tree=True, format=format)
412
branch.bzrdir.open_workingtree()
413
self.assertRaises(errors.NoRepositoryPresent,
414
branch.bzrdir.open_repository)
416
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
417
# inside a repo the default convenience output is overridable to give
419
format = bzrdir.format_registry.make_bzrdir('knit')
420
self.make_repository('.', shared=True, format=format)
421
branch = bzrdir.BzrDir.create_branch_convenience('child',
422
force_new_repo=True, format=format)
423
branch.bzrdir.open_repository()
424
branch.bzrdir.open_workingtree()
427
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
429
def test_acquire_repository_standalone(self):
430
"""The default acquisition policy should create a standalone branch."""
431
my_bzrdir = self.make_bzrdir('.')
432
repo_policy = my_bzrdir.determine_repository_policy()
433
repo, is_new = repo_policy.acquire_repository()
434
self.assertEqual(repo.bzrdir.root_transport.base,
435
my_bzrdir.root_transport.base)
436
self.assertFalse(repo.is_shared())
438
def test_determine_stacking_policy(self):
439
parent_bzrdir = self.make_bzrdir('.')
440
child_bzrdir = self.make_bzrdir('child')
441
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
442
repo_policy = child_bzrdir.determine_repository_policy()
443
self.assertEqual('http://example.org', repo_policy._stack_on)
445
def test_determine_stacking_policy_relative(self):
446
parent_bzrdir = self.make_bzrdir('.')
447
child_bzrdir = self.make_bzrdir('child')
448
parent_bzrdir.get_config().set_default_stack_on('child2')
449
repo_policy = child_bzrdir.determine_repository_policy()
450
self.assertEqual('child2', repo_policy._stack_on)
451
self.assertEqual(parent_bzrdir.root_transport.base,
452
repo_policy._stack_on_pwd)
454
def prepare_default_stacking(self, child_format='1.6'):
455
parent_bzrdir = self.make_bzrdir('.')
456
child_branch = self.make_branch('child', format=child_format)
457
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
458
new_child_transport = parent_bzrdir.transport.clone('child2')
459
return child_branch, new_child_transport
461
def test_clone_on_transport_obeys_stacking_policy(self):
462
child_branch, new_child_transport = self.prepare_default_stacking()
463
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
464
self.assertEqual(child_branch.base,
465
new_child.open_branch().get_stacked_on_url())
467
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
468
# Make stackable source branch with an unstackable repo format.
469
source_bzrdir = self.make_bzrdir('source')
470
pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
471
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
472
# Make a directory with a default stacking policy
473
parent_bzrdir = self.make_bzrdir('parent')
474
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
475
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
476
# Clone source into directory
477
target = source_bzrdir.clone(self.get_url('parent/target'))
479
def test_sprout_obeys_stacking_policy(self):
480
child_branch, new_child_transport = self.prepare_default_stacking()
481
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
482
self.assertEqual(child_branch.base,
483
new_child.open_branch().get_stacked_on_url())
485
def test_clone_ignores_policy_for_unsupported_formats(self):
486
child_branch, new_child_transport = self.prepare_default_stacking(
487
child_format='pack-0.92')
488
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
489
self.assertRaises(errors.UnstackableBranchFormat,
490
new_child.open_branch().get_stacked_on_url)
492
def test_sprout_ignores_policy_for_unsupported_formats(self):
493
child_branch, new_child_transport = self.prepare_default_stacking(
494
child_format='pack-0.92')
495
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
496
self.assertRaises(errors.UnstackableBranchFormat,
497
new_child.open_branch().get_stacked_on_url)
499
def test_sprout_upgrades_format_if_stacked_specified(self):
500
child_branch, new_child_transport = self.prepare_default_stacking(
501
child_format='pack-0.92')
502
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
504
self.assertEqual(child_branch.bzrdir.root_transport.base,
505
new_child.open_branch().get_stacked_on_url())
506
repo = new_child.open_repository()
507
self.assertTrue(repo._format.supports_external_lookups)
508
self.assertFalse(repo.supports_rich_root())
510
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
511
child_branch, new_child_transport = self.prepare_default_stacking(
512
child_format='pack-0.92')
513
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
514
stacked_on=child_branch.bzrdir.root_transport.base)
515
self.assertEqual(child_branch.bzrdir.root_transport.base,
516
new_child.open_branch().get_stacked_on_url())
517
repo = new_child.open_repository()
518
self.assertTrue(repo._format.supports_external_lookups)
519
self.assertFalse(repo.supports_rich_root())
521
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
522
child_branch, new_child_transport = self.prepare_default_stacking(
523
child_format='rich-root-pack')
524
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
526
repo = new_child.open_repository()
527
self.assertTrue(repo._format.supports_external_lookups)
528
self.assertTrue(repo.supports_rich_root())
530
def test_add_fallback_repo_handles_absolute_urls(self):
531
stack_on = self.make_branch('stack_on', format='1.6')
532
repo = self.make_repository('repo', format='1.6')
533
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
534
policy._add_fallback(repo)
536
def test_add_fallback_repo_handles_relative_urls(self):
537
stack_on = self.make_branch('stack_on', format='1.6')
538
repo = self.make_repository('repo', format='1.6')
539
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
540
policy._add_fallback(repo)
542
def test_configure_relative_branch_stacking_url(self):
543
stack_on = self.make_branch('stack_on', format='1.6')
544
stacked = self.make_branch('stack_on/stacked', format='1.6')
545
policy = bzrdir.UseExistingRepository(stacked.repository,
547
policy.configure_branch(stacked)
548
self.assertEqual('..', stacked.get_stacked_on_url())
550
def test_relative_branch_stacking_to_absolute(self):
551
stack_on = self.make_branch('stack_on', format='1.6')
552
stacked = self.make_branch('stack_on/stacked', format='1.6')
553
policy = bzrdir.UseExistingRepository(stacked.repository,
554
'.', self.get_readonly_url('stack_on'))
555
policy.configure_branch(stacked)
556
self.assertEqual(self.get_readonly_url('stack_on'),
557
stacked.get_stacked_on_url())
560
class ChrootedTests(TestCaseWithTransport):
561
"""A support class that provides readonly urls outside the local namespace.
563
This is done by checking if self.transport_server is a MemoryServer. if it
564
is then we are chrooted already, if it is not then an HttpServer is used
569
super(ChrootedTests, self).setUp()
570
if not self.vfs_transport_factory == memory.MemoryServer:
571
self.transport_readonly_server = http_server.HttpServer
573
def local_branch_path(self, branch):
574
return os.path.realpath(urlutils.local_path_from_url(branch.base))
576
def test_open_containing(self):
577
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
578
self.get_readonly_url(''))
579
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
580
self.get_readonly_url('g/p/q'))
581
control = bzrdir.BzrDir.create(self.get_url())
582
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
583
self.assertEqual('', relpath)
584
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
585
self.assertEqual('g/p/q', relpath)
587
def test_open_containing_tree_branch_or_repository_empty(self):
588
self.assertRaises(errors.NotBranchError,
589
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
590
self.get_readonly_url(''))
592
def test_open_containing_tree_branch_or_repository_all(self):
593
self.make_branch_and_tree('topdir')
594
tree, branch, repo, relpath = \
595
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
597
self.assertEqual(os.path.realpath('topdir'),
598
os.path.realpath(tree.basedir))
599
self.assertEqual(os.path.realpath('topdir'),
600
self.local_branch_path(branch))
602
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
603
repo.bzrdir.transport.local_abspath('repository'))
604
self.assertEqual(relpath, 'foo')
606
def test_open_containing_tree_branch_or_repository_no_tree(self):
607
self.make_branch('branch')
608
tree, branch, repo, relpath = \
609
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
611
self.assertEqual(tree, None)
612
self.assertEqual(os.path.realpath('branch'),
613
self.local_branch_path(branch))
615
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
616
repo.bzrdir.transport.local_abspath('repository'))
617
self.assertEqual(relpath, 'foo')
619
def test_open_containing_tree_branch_or_repository_repo(self):
620
self.make_repository('repo')
621
tree, branch, repo, relpath = \
622
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
624
self.assertEqual(tree, None)
625
self.assertEqual(branch, None)
627
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
628
repo.bzrdir.transport.local_abspath('repository'))
629
self.assertEqual(relpath, '')
631
def test_open_containing_tree_branch_or_repository_shared_repo(self):
632
self.make_repository('shared', shared=True)
633
bzrdir.BzrDir.create_branch_convenience('shared/branch',
634
force_new_tree=False)
635
tree, branch, repo, relpath = \
636
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
638
self.assertEqual(tree, None)
639
self.assertEqual(os.path.realpath('shared/branch'),
640
self.local_branch_path(branch))
642
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
643
repo.bzrdir.transport.local_abspath('repository'))
644
self.assertEqual(relpath, '')
646
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
647
self.make_branch_and_tree('foo')
648
self.build_tree(['foo/bar/'])
649
tree, branch, repo, relpath = \
650
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
652
self.assertEqual(os.path.realpath('foo'),
653
os.path.realpath(tree.basedir))
654
self.assertEqual(os.path.realpath('foo'),
655
self.local_branch_path(branch))
657
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
658
repo.bzrdir.transport.local_abspath('repository'))
659
self.assertEqual(relpath, 'bar')
661
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
662
self.make_repository('bar')
663
self.build_tree(['bar/baz/'])
664
tree, branch, repo, relpath = \
665
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
667
self.assertEqual(tree, None)
668
self.assertEqual(branch, None)
670
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
671
repo.bzrdir.transport.local_abspath('repository'))
672
self.assertEqual(relpath, 'baz')
674
def test_open_containing_from_transport(self):
675
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
676
get_transport(self.get_readonly_url('')))
677
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
678
get_transport(self.get_readonly_url('g/p/q')))
679
control = bzrdir.BzrDir.create(self.get_url())
680
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
681
get_transport(self.get_readonly_url('')))
682
self.assertEqual('', relpath)
683
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
684
get_transport(self.get_readonly_url('g/p/q')))
685
self.assertEqual('g/p/q', relpath)
687
def test_open_containing_tree_or_branch(self):
688
self.make_branch_and_tree('topdir')
689
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
691
self.assertEqual(os.path.realpath('topdir'),
692
os.path.realpath(tree.basedir))
693
self.assertEqual(os.path.realpath('topdir'),
694
self.local_branch_path(branch))
695
self.assertIs(tree.bzrdir, branch.bzrdir)
696
self.assertEqual('foo', relpath)
697
# opening from non-local should not return the tree
698
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
699
self.get_readonly_url('topdir/foo'))
700
self.assertEqual(None, tree)
701
self.assertEqual('foo', relpath)
703
self.make_branch('topdir/foo')
704
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
706
self.assertIs(tree, None)
707
self.assertEqual(os.path.realpath('topdir/foo'),
708
self.local_branch_path(branch))
709
self.assertEqual('', relpath)
711
def test_open_tree_or_branch(self):
712
self.make_branch_and_tree('topdir')
713
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
714
self.assertEqual(os.path.realpath('topdir'),
715
os.path.realpath(tree.basedir))
716
self.assertEqual(os.path.realpath('topdir'),
717
self.local_branch_path(branch))
718
self.assertIs(tree.bzrdir, branch.bzrdir)
719
# opening from non-local should not return the tree
720
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
721
self.get_readonly_url('topdir'))
722
self.assertEqual(None, tree)
724
self.make_branch('topdir/foo')
725
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
726
self.assertIs(tree, None)
727
self.assertEqual(os.path.realpath('topdir/foo'),
728
self.local_branch_path(branch))
730
def test_open_from_transport(self):
731
# transport pointing at bzrdir should give a bzrdir with root transport
732
# set to the given transport
733
control = bzrdir.BzrDir.create(self.get_url())
734
transport = get_transport(self.get_url())
735
opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
736
self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
737
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
739
def test_open_from_transport_no_bzrdir(self):
740
transport = get_transport(self.get_url())
741
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
744
def test_open_from_transport_bzrdir_in_parent(self):
745
control = bzrdir.BzrDir.create(self.get_url())
746
transport = get_transport(self.get_url())
747
transport.mkdir('subdir')
748
transport = transport.clone('subdir')
749
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
752
def test_sprout_recursive(self):
753
tree = self.make_branch_and_tree('tree1',
754
format='dirstate-with-subtree')
755
sub_tree = self.make_branch_and_tree('tree1/subtree',
756
format='dirstate-with-subtree')
757
sub_tree.set_root_id('subtree-root')
758
tree.add_reference(sub_tree)
759
self.build_tree(['tree1/subtree/file'])
761
tree.commit('Initial commit')
762
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
764
self.addCleanup(tree2.unlock)
765
self.failUnlessExists('tree2/subtree/file')
766
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
768
def test_cloning_metadir(self):
769
"""Ensure that cloning metadir is suitable"""
770
bzrdir = self.make_bzrdir('bzrdir')
771
bzrdir.cloning_metadir()
772
branch = self.make_branch('branch', format='knit')
773
format = branch.bzrdir.cloning_metadir()
774
self.assertIsInstance(format.workingtree_format,
775
workingtree.WorkingTreeFormat3)
777
def test_sprout_recursive_treeless(self):
778
tree = self.make_branch_and_tree('tree1',
779
format='dirstate-with-subtree')
780
sub_tree = self.make_branch_and_tree('tree1/subtree',
781
format='dirstate-with-subtree')
782
tree.add_reference(sub_tree)
783
self.build_tree(['tree1/subtree/file'])
785
tree.commit('Initial commit')
786
tree.bzrdir.destroy_workingtree()
787
repo = self.make_repository('repo', shared=True,
788
format='dirstate-with-subtree')
789
repo.set_make_working_trees(False)
790
tree.bzrdir.sprout('repo/tree2')
791
self.failUnlessExists('repo/tree2/subtree')
792
self.failIfExists('repo/tree2/subtree/file')
794
def make_foo_bar_baz(self):
795
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
796
bar = self.make_branch('foo/bar').bzrdir
797
baz = self.make_branch('baz').bzrdir
800
def test_find_bzrdirs(self):
801
foo, bar, baz = self.make_foo_bar_baz()
802
transport = get_transport(self.get_url())
803
self.assertEqualBzrdirs([baz, foo, bar],
804
bzrdir.BzrDir.find_bzrdirs(transport))
806
def test_find_bzrdirs_list_current(self):
807
def list_current(transport):
808
return [s for s in transport.list_dir('') if s != 'baz']
810
foo, bar, baz = self.make_foo_bar_baz()
811
transport = get_transport(self.get_url())
812
self.assertEqualBzrdirs([foo, bar],
813
bzrdir.BzrDir.find_bzrdirs(transport,
814
list_current=list_current))
817
def test_find_bzrdirs_evaluate(self):
818
def evaluate(bzrdir):
820
repo = bzrdir.open_repository()
821
except NoRepositoryPresent:
822
return True, bzrdir.root_transport.base
824
return False, bzrdir.root_transport.base
826
foo, bar, baz = self.make_foo_bar_baz()
827
transport = get_transport(self.get_url())
828
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
829
list(bzrdir.BzrDir.find_bzrdirs(transport,
832
def assertEqualBzrdirs(self, first, second):
834
second = list(second)
835
self.assertEqual(len(first), len(second))
836
for x, y in zip(first, second):
837
self.assertEqual(x.root_transport.base, y.root_transport.base)
839
def test_find_branches(self):
840
root = self.make_repository('', shared=True)
841
foo, bar, baz = self.make_foo_bar_baz()
842
qux = self.make_bzrdir('foo/qux')
843
transport = get_transport(self.get_url())
844
branches = bzrdir.BzrDir.find_branches(transport)
845
self.assertEqual(baz.root_transport.base, branches[0].base)
846
self.assertEqual(foo.root_transport.base, branches[1].base)
847
self.assertEqual(bar.root_transport.base, branches[2].base)
849
# ensure this works without a top-level repo
850
branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
851
self.assertEqual(foo.root_transport.base, branches[0].base)
852
self.assertEqual(bar.root_transport.base, branches[1].base)
855
class TestMeta1DirFormat(TestCaseWithTransport):
856
"""Tests specific to the meta1 dir format."""
858
def test_right_base_dirs(self):
859
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
861
branch_base = t.clone('branch').base
862
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
863
self.assertEqual(branch_base,
864
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
865
repository_base = t.clone('repository').base
866
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
867
self.assertEqual(repository_base,
868
dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
869
checkout_base = t.clone('checkout').base
870
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
871
self.assertEqual(checkout_base,
872
dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
874
def test_meta1dir_uses_lockdir(self):
875
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
876
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
878
self.assertIsDirectory('branch-lock', t)
880
def test_comparison(self):
881
"""Equality and inequality behave properly.
883
Metadirs should compare equal iff they have the same repo, branch and
886
mydir = bzrdir.format_registry.make_bzrdir('knit')
887
self.assertEqual(mydir, mydir)
888
self.assertFalse(mydir != mydir)
889
otherdir = bzrdir.format_registry.make_bzrdir('knit')
890
self.assertEqual(otherdir, mydir)
891
self.assertFalse(otherdir != mydir)
892
otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
893
self.assertNotEqual(otherdir2, mydir)
894
self.assertFalse(otherdir2 == mydir)
896
def test_needs_conversion_different_working_tree(self):
897
# meta1dirs need an conversion if any element is not the default.
898
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
899
tree = self.make_branch_and_tree('tree', format='knit')
900
self.assertTrue(tree.bzrdir.needs_format_conversion(
903
def test_initialize_on_format_uses_smart_transport(self):
904
self.setup_smart_server_with_call_log()
905
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
906
transport = self.get_transport('target')
907
transport.ensure_base()
908
self.reset_smart_call_log()
909
instance = new_format.initialize_on_transport(transport)
910
self.assertIsInstance(instance, remote.RemoteBzrDir)
911
rpc_count = len(self.hpss_calls)
912
# This figure represent the amount of work to perform this use case. It
913
# is entirely ok to reduce this number if a test fails due to rpc_count
914
# being too low. If rpc_count increases, more network roundtrips have
915
# become necessary for this use case. Please do not adjust this number
916
# upwards without agreement from bzr's network support maintainers.
917
self.assertEqual(2, rpc_count)
920
class TestFormat5(TestCaseWithTransport):
921
"""Tests specific to the version 5 bzrdir format."""
923
def test_same_lockfiles_between_tree_repo_branch(self):
924
# this checks that only a single lockfiles instance is created
925
# for format 5 objects
926
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
927
def check_dir_components_use_same_lock(dir):
928
ctrl_1 = dir.open_repository().control_files
929
ctrl_2 = dir.open_branch().control_files
930
ctrl_3 = dir.open_workingtree()._control_files
931
self.assertTrue(ctrl_1 is ctrl_2)
932
self.assertTrue(ctrl_2 is ctrl_3)
933
check_dir_components_use_same_lock(dir)
934
# and if we open it normally.
935
dir = bzrdir.BzrDir.open(self.get_url())
936
check_dir_components_use_same_lock(dir)
938
def test_can_convert(self):
939
# format 5 dirs are convertable
940
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
941
self.assertTrue(dir.can_convert_format())
943
def test_needs_conversion(self):
944
# format 5 dirs need a conversion if they are not the default,
946
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
947
# don't need to convert it to itself
948
self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
949
# do need to convert it to the current default
950
self.assertTrue(dir.needs_format_conversion(
951
bzrdir.BzrDirFormat.get_default_format()))
954
class TestFormat6(TestCaseWithTransport):
955
"""Tests specific to the version 6 bzrdir format."""
957
def test_same_lockfiles_between_tree_repo_branch(self):
958
# this checks that only a single lockfiles instance is created
959
# for format 6 objects
960
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
961
def check_dir_components_use_same_lock(dir):
962
ctrl_1 = dir.open_repository().control_files
963
ctrl_2 = dir.open_branch().control_files
964
ctrl_3 = dir.open_workingtree()._control_files
965
self.assertTrue(ctrl_1 is ctrl_2)
966
self.assertTrue(ctrl_2 is ctrl_3)
967
check_dir_components_use_same_lock(dir)
968
# and if we open it normally.
969
dir = bzrdir.BzrDir.open(self.get_url())
970
check_dir_components_use_same_lock(dir)
972
def test_can_convert(self):
973
# format 6 dirs are convertable
974
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
975
self.assertTrue(dir.can_convert_format())
977
def test_needs_conversion(self):
978
# format 6 dirs need an conversion if they are not the default.
979
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
980
self.assertTrue(dir.needs_format_conversion(
981
bzrdir.BzrDirFormat.get_default_format()))
984
class NotBzrDir(bzrlib.bzrdir.BzrDir):
985
"""A non .bzr based control directory."""
987
def __init__(self, transport, format):
988
self._format = format
989
self.root_transport = transport
990
self.transport = transport.clone('.not')
993
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
994
"""A test class representing any non-.bzr based disk format."""
996
def initialize_on_transport(self, transport):
997
"""Initialize a new .not dir in the base directory of a Transport."""
998
transport.mkdir('.not')
999
return self.open(transport)
1001
def open(self, transport):
1002
"""Open this directory."""
1003
return NotBzrDir(transport, self)
1006
def _known_formats(self):
1007
return set([NotBzrDirFormat()])
1010
def probe_transport(self, transport):
1011
"""Our format is present if the transport ends in '.not/'."""
1012
if transport.has('.not'):
1013
return NotBzrDirFormat()
1016
class TestNotBzrDir(TestCaseWithTransport):
1017
"""Tests for using the bzrdir api with a non .bzr based disk format.
1019
If/when one of these is in the core, we can let the implementation tests
1023
def test_create_and_find_format(self):
1024
# create a .notbzr dir
1025
format = NotBzrDirFormat()
1026
dir = format.initialize(self.get_url())
1027
self.assertIsInstance(dir, NotBzrDir)
1029
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1031
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1032
get_transport(self.get_url()))
1033
self.assertIsInstance(found, NotBzrDirFormat)
1035
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1037
def test_included_in_known_formats(self):
1038
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1040
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1041
for format in formats:
1042
if isinstance(format, NotBzrDirFormat):
1044
self.fail("No NotBzrDirFormat in %s" % formats)
1046
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
1049
class NonLocalTests(TestCaseWithTransport):
1050
"""Tests for bzrdir static behaviour on non local paths."""
1053
super(NonLocalTests, self).setUp()
1054
self.vfs_transport_factory = memory.MemoryServer
1056
def test_create_branch_convenience(self):
1057
# outside a repo the default convenience output is a repo+branch_tree
1058
format = bzrdir.format_registry.make_bzrdir('knit')
1059
branch = bzrdir.BzrDir.create_branch_convenience(
1060
self.get_url('foo'), format=format)
1061
self.assertRaises(errors.NoWorkingTree,
1062
branch.bzrdir.open_workingtree)
1063
branch.bzrdir.open_repository()
1065
def test_create_branch_convenience_force_tree_not_local_fails(self):
1066
# outside a repo the default convenience output is a repo+branch_tree
1067
format = bzrdir.format_registry.make_bzrdir('knit')
1068
self.assertRaises(errors.NotLocalUrl,
1069
bzrdir.BzrDir.create_branch_convenience,
1070
self.get_url('foo'),
1071
force_new_tree=True,
1073
t = get_transport(self.get_url('.'))
1074
self.assertFalse(t.has('foo'))
1076
def test_clone(self):
1077
# clone into a nonlocal path works
1078
format = bzrdir.format_registry.make_bzrdir('knit')
1079
branch = bzrdir.BzrDir.create_branch_convenience('local',
1081
branch.bzrdir.open_workingtree()
1082
result = branch.bzrdir.clone(self.get_url('remote'))
1083
self.assertRaises(errors.NoWorkingTree,
1084
result.open_workingtree)
1085
result.open_branch()
1086
result.open_repository()
1088
def test_checkout_metadir(self):
1089
# checkout_metadir has reasonable working tree format even when no
1090
# working tree is present
1091
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1092
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1093
checkout_format = my_bzrdir.checkout_metadir()
1094
self.assertIsInstance(checkout_format.workingtree_format,
1095
workingtree.WorkingTreeFormat3)
1098
class TestHTTPRedirections(object):
1099
"""Test redirection between two http servers.
1101
This MUST be used by daughter classes that also inherit from
1102
TestCaseWithTwoWebservers.
1104
We can't inherit directly from TestCaseWithTwoWebservers or the
1105
test framework will try to create an instance which cannot
1106
run, its implementation being incomplete.
1109
def create_transport_readonly_server(self):
1110
return http_utils.HTTPServerRedirecting()
1112
def create_transport_secondary_server(self):
1113
return http_utils.HTTPServerRedirecting()
1116
super(TestHTTPRedirections, self).setUp()
1117
# The redirections will point to the new server
1118
self.new_server = self.get_readonly_server()
1119
# The requests to the old server will be redirected
1120
self.old_server = self.get_secondary_server()
1121
# Configure the redirections
1122
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1124
def test_loop(self):
1125
# Both servers redirect to each other creating a loop
1126
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1127
# Starting from either server should loop
1128
old_url = self._qualified_url(self.old_server.host,
1129
self.old_server.port)
1130
oldt = self._transport(old_url)
1131
self.assertRaises(errors.NotBranchError,
1132
bzrdir.BzrDir.open_from_transport, oldt)
1133
new_url = self._qualified_url(self.new_server.host,
1134
self.new_server.port)
1135
newt = self._transport(new_url)
1136
self.assertRaises(errors.NotBranchError,
1137
bzrdir.BzrDir.open_from_transport, newt)
1139
def test_qualifier_preserved(self):
1140
wt = self.make_branch_and_tree('branch')
1141
old_url = self._qualified_url(self.old_server.host,
1142
self.old_server.port)
1143
start = self._transport(old_url).clone('branch')
1144
bdir = bzrdir.BzrDir.open_from_transport(start)
1145
# Redirection should preserve the qualifier, hence the transport class
1147
self.assertIsInstance(bdir.root_transport, type(start))
1150
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1151
http_utils.TestCaseWithTwoWebservers):
1152
"""Tests redirections for urllib implementation"""
1154
_transport = HttpTransport_urllib
1156
def _qualified_url(self, host, port):
1157
result = 'http+urllib://%s:%s' % (host, port)
1158
self.permit_url(result)
1163
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1164
TestHTTPRedirections,
1165
http_utils.TestCaseWithTwoWebservers):
1166
"""Tests redirections for pycurl implementation"""
1168
def _qualified_url(self, host, port):
1169
result = 'http+pycurl://%s:%s' % (host, port)
1170
self.permit_url(result)
1174
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1175
http_utils.TestCaseWithTwoWebservers):
1176
"""Tests redirections for the nosmart decorator"""
1178
_transport = NoSmartTransportDecorator
1180
def _qualified_url(self, host, port):
1181
result = 'nosmart+http://%s:%s' % (host, port)
1182
self.permit_url(result)
1186
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1187
http_utils.TestCaseWithTwoWebservers):
1188
"""Tests redirections for readonly decoratror"""
1190
_transport = ReadonlyTransportDecorator
1192
def _qualified_url(self, host, port):
1193
result = 'readonly+http://%s:%s' % (host, port)
1194
self.permit_url(result)
1198
class TestDotBzrHidden(TestCaseWithTransport):
1201
if sys.platform == 'win32':
1202
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1205
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1206
stderr=subprocess.PIPE)
1207
out, err = f.communicate()
1208
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1210
return out.splitlines()
1212
def test_dot_bzr_hidden(self):
1213
if sys.platform == 'win32' and not win32utils.has_win32file:
1214
raise TestSkipped('unable to make file hidden without pywin32 library')
1215
b = bzrdir.BzrDir.create('.')
1216
self.build_tree(['a'])
1217
self.assertEquals(['a'], self.get_ls())
1219
def test_dot_bzr_hidden_with_url(self):
1220
if sys.platform == 'win32' and not win32utils.has_win32file:
1221
raise TestSkipped('unable to make file hidden without pywin32 library')
1222
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1223
self.build_tree(['a'])
1224
self.assertEquals(['a'], self.get_ls())
1227
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1228
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1230
def _open(self, transport):
1231
return _TestBzrDir(transport, self)
1234
class _TestBzrDir(bzrdir.BzrDirMeta1):
1235
"""Test BzrDir implementation for TestBzrDirSprout.
1237
When created a _TestBzrDir already has repository and a branch. The branch
1238
is a test double as well.
1241
def __init__(self, *args, **kwargs):
1242
super(_TestBzrDir, self).__init__(*args, **kwargs)
1243
self.test_branch = _TestBranch()
1244
self.test_branch.repository = self.create_repository()
1246
def open_branch(self, unsupported=False):
1247
return self.test_branch
1249
def cloning_metadir(self, require_stacking=False):
1250
return _TestBzrDirFormat()
1253
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1254
"""Test Branch format for TestBzrDirSprout."""
1257
class _TestBranch(bzrlib.branch.Branch):
1258
"""Test Branch implementation for TestBzrDirSprout."""
1260
def __init__(self, *args, **kwargs):
1261
self._format = _TestBranchFormat()
1262
super(_TestBranch, self).__init__(*args, **kwargs)
1266
def sprout(self, *args, **kwargs):
1267
self.calls.append('sprout')
1268
return _TestBranch()
1270
def copy_content_into(self, destination, revision_id=None):
1271
self.calls.append('copy_content_into')
1273
def get_parent(self):
1276
def set_parent(self, parent):
1277
self._parent = parent
1280
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1282
def test_sprout_uses_branch_sprout(self):
1283
"""BzrDir.sprout calls Branch.sprout.
1285
Usually, BzrDir.sprout should delegate to the branch's sprout method
1286
for part of the work. This allows the source branch to control the
1287
choice of format for the new branch.
1289
There are exceptions, but this tests avoids them:
1290
- if there's no branch in the source bzrdir,
1291
- or if the stacking has been requested and the format needs to be
1292
overridden to satisfy that.
1294
# Make an instrumented bzrdir.
1295
t = self.get_transport('source')
1297
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1298
# The instrumented bzrdir has a test_branch attribute that logs calls
1299
# made to the branch contained in that bzrdir. Initially the test
1300
# branch exists but no calls have been made to it.
1301
self.assertEqual([], source_bzrdir.test_branch.calls)
1304
target_url = self.get_url('target')
1305
result = source_bzrdir.sprout(target_url, recurse='no')
1307
# The bzrdir called the branch's sprout method.
1308
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1310
def test_sprout_parent(self):
1311
grandparent_tree = self.make_branch('grandparent')
1312
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1313
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1314
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1317
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1319
def test_pre_open_called(self):
1321
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1322
transport = self.get_transport('foo')
1323
url = transport.base
1324
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1325
self.assertEqual([transport.base], [t.base for t in calls])
1327
def test_pre_open_actual_exceptions_raised(self):
1329
def fail_once(transport):
1332
raise errors.BzrError("fail")
1333
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1334
transport = self.get_transport('foo')
1335
url = transport.base
1336
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1337
self.assertEqual('fail', err._preformatted_string)