1
# Copyright (C) 2006-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
35
revision as _mod_revision,
39
transport as _mod_transport,
46
from bzrlib.errors import (
48
NoColocatedBranchSupport,
50
UnsupportedFormatError,
52
from bzrlib.tests import (
54
TestCaseWithMemoryTransport,
55
TestCaseWithTransport,
58
from bzrlib.tests import(
62
from bzrlib.tests.test_http import TestWithTransport_pycurl
63
from bzrlib.transport import (
67
from bzrlib.transport.http._urllib import HttpTransport_urllib
68
from bzrlib.transport.nosmart import NoSmartTransportDecorator
69
from bzrlib.transport.readonly import ReadonlyTransportDecorator
70
from bzrlib.repofmt import knitrepo, knitpack_repo
73
class TestDefaultFormat(TestCase):
75
def test_get_set_default_format(self):
76
old_format = bzrdir.BzrDirFormat.get_default_format()
77
# default is BzrDirMetaFormat1
78
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
79
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
80
# creating a bzr dir should now create an instrumented dir.
82
result = bzrdir.BzrDir.create('memory:///')
83
self.assertIsInstance(result, SampleBzrDir)
85
controldir.ControlDirFormat._set_default_format(old_format)
86
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
89
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
90
"""A deprecated bzr dir format."""
93
class TestFormatRegistry(TestCase):
95
def make_format_registry(self):
96
my_format_registry = controldir.ControlDirFormatRegistry()
97
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
98
'Some format. Slower and unawesome and deprecated.',
100
my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
101
'DeprecatedBzrDirFormat', 'Format registered lazily',
103
bzrdir.register_metadir(my_format_registry, 'knit',
104
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
105
'Format using knits',
107
my_format_registry.set_default('knit')
108
bzrdir.register_metadir(my_format_registry,
110
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
111
'Experimental successor to knit. Use at your own risk.',
112
branch_format='bzrlib.branch.BzrBranchFormat6',
114
bzrdir.register_metadir(my_format_registry,
116
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
117
'Experimental successor to knit. Use at your own risk.',
118
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
119
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
120
'Old format. Slower and does not support things. ', hidden=True)
121
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
122
'DeprecatedBzrDirFormat', 'Format registered lazily',
123
deprecated=True, hidden=True)
124
return my_format_registry
126
def test_format_registry(self):
127
my_format_registry = self.make_format_registry()
128
my_bzrdir = my_format_registry.make_bzrdir('lazy')
129
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
130
my_bzrdir = my_format_registry.make_bzrdir('deprecated')
131
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
132
my_bzrdir = my_format_registry.make_bzrdir('default')
133
self.assertIsInstance(my_bzrdir.repository_format,
134
knitrepo.RepositoryFormatKnit1)
135
my_bzrdir = my_format_registry.make_bzrdir('knit')
136
self.assertIsInstance(my_bzrdir.repository_format,
137
knitrepo.RepositoryFormatKnit1)
138
my_bzrdir = my_format_registry.make_bzrdir('branch6')
139
self.assertIsInstance(my_bzrdir.get_branch_format(),
140
bzrlib.branch.BzrBranchFormat6)
142
def test_get_help(self):
143
my_format_registry = self.make_format_registry()
144
self.assertEqual('Format registered lazily',
145
my_format_registry.get_help('lazy'))
146
self.assertEqual('Format using knits',
147
my_format_registry.get_help('knit'))
148
self.assertEqual('Format using knits',
149
my_format_registry.get_help('default'))
150
self.assertEqual('Some format. Slower and unawesome and deprecated.',
151
my_format_registry.get_help('deprecated'))
153
def test_help_topic(self):
154
topics = help_topics.HelpTopicRegistry()
155
registry = self.make_format_registry()
156
topics.register('current-formats', registry.help_topic,
158
topics.register('other-formats', registry.help_topic,
160
new = topics.get_detail('current-formats')
161
rest = topics.get_detail('other-formats')
162
experimental, deprecated = rest.split('Deprecated formats')
163
self.assertContainsRe(new, 'formats-help')
164
self.assertContainsRe(new,
165
':knit:\n \(native\) \(default\) Format using knits\n')
166
self.assertContainsRe(experimental,
167
':branch6:\n \(native\) Experimental successor to knit')
168
self.assertContainsRe(deprecated,
169
':lazy:\n \(native\) Format registered lazily\n')
170
self.assertNotContainsRe(new, 'hidden')
172
def test_set_default_repository(self):
173
default_factory = bzrdir.format_registry.get('default')
174
old_default = [k for k, v in bzrdir.format_registry.iteritems()
175
if v == default_factory and k != 'default'][0]
176
bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
178
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
179
bzrdir.format_registry.get('default'))
181
repository.format_registry.get_default().__class__,
182
knitrepo.RepositoryFormatKnit3)
184
bzrdir.format_registry.set_default_repository(old_default)
186
def test_aliases(self):
187
a_registry = controldir.ControlDirFormatRegistry()
188
a_registry.register('deprecated', DeprecatedBzrDirFormat,
189
'Old format. Slower and does not support stuff',
191
a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
192
'Old format. Slower and does not support stuff',
193
deprecated=True, alias=True)
194
self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
197
class SampleBranch(bzrlib.branch.Branch):
198
"""A dummy branch for guess what, dummy use."""
200
def __init__(self, dir):
204
class SampleRepository(bzrlib.repository.Repository):
207
def __init__(self, dir):
211
class SampleBzrDir(bzrdir.BzrDir):
212
"""A sample BzrDir implementation to allow testing static methods."""
214
def create_repository(self, shared=False):
215
"""See BzrDir.create_repository."""
216
return "A repository"
218
def open_repository(self):
219
"""See BzrDir.open_repository."""
220
return SampleRepository(self)
222
def create_branch(self, name=None):
223
"""See BzrDir.create_branch."""
225
raise NoColocatedBranchSupport(self)
226
return SampleBranch(self)
228
def create_workingtree(self):
229
"""See BzrDir.create_workingtree."""
233
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
236
this format is initializable, unsupported to aid in testing the
237
open and open_downlevel routines.
240
def get_format_string(self):
241
"""See BzrDirFormat.get_format_string()."""
242
return "Sample .bzr dir format."
244
def initialize_on_transport(self, t):
245
"""Create a bzr dir."""
247
t.put_bytes('.bzr/branch-format', self.get_format_string())
248
return SampleBzrDir(t, self)
250
def is_supported(self):
253
def open(self, transport, _found=None):
254
return "opened branch."
257
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
260
def get_format_string():
261
return "Test format 1"
264
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
267
def get_format_string():
268
return "Test format 2"
271
class TestBzrDirFormat(TestCaseWithTransport):
272
"""Tests for the BzrDirFormat facility."""
274
def test_find_format(self):
275
# is the right format object found for a branch?
276
# create a branch with a few known format objects.
277
bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
279
self.addCleanup(bzrdir.BzrProber.formats.remove,
280
BzrDirFormatTest1.get_format_string())
281
bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
283
self.addCleanup(bzrdir.BzrProber.formats.remove,
284
BzrDirFormatTest2.get_format_string())
285
t = self.get_transport()
286
self.build_tree(["foo/", "bar/"], transport=t)
287
def check_format(format, url):
288
format.initialize(url)
289
t = _mod_transport.get_transport(url)
290
found_format = bzrdir.BzrDirFormat.find_format(t)
291
self.assertIsInstance(found_format, format.__class__)
292
check_format(BzrDirFormatTest1(), "foo")
293
check_format(BzrDirFormatTest2(), "bar")
295
def test_find_format_nothing_there(self):
296
self.assertRaises(NotBranchError,
297
bzrdir.BzrDirFormat.find_format,
298
_mod_transport.get_transport('.'))
300
def test_find_format_unknown_format(self):
301
t = self.get_transport()
303
t.put_bytes('.bzr/branch-format', '')
304
self.assertRaises(UnknownFormatError,
305
bzrdir.BzrDirFormat.find_format,
306
_mod_transport.get_transport('.'))
308
def test_register_unregister_format(self):
309
format = SampleBzrDirFormat()
312
format.initialize(url)
313
# register a format for it.
314
bzrdir.BzrProber.formats.register(format.get_format_string(), format)
315
# which bzrdir.Open will refuse (not supported)
316
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
317
# which bzrdir.open_containing will refuse (not supported)
318
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
319
# but open_downlevel will work
320
t = _mod_transport.get_transport(url)
321
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
322
# unregister the format
323
bzrdir.BzrProber.formats.remove(format.get_format_string())
324
# now open_downlevel should fail too.
325
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
327
def test_create_branch_and_repo_uses_default(self):
328
format = SampleBzrDirFormat()
329
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
331
self.assertTrue(isinstance(branch, SampleBranch))
333
def test_create_branch_and_repo_under_shared(self):
334
# creating a branch and repo in a shared repo uses the
336
format = bzrdir.format_registry.make_bzrdir('knit')
337
self.make_repository('.', shared=True, format=format)
338
branch = bzrdir.BzrDir.create_branch_and_repo(
339
self.get_url('child'), format=format)
340
self.assertRaises(errors.NoRepositoryPresent,
341
branch.bzrdir.open_repository)
343
def test_create_branch_and_repo_under_shared_force_new(self):
344
# creating a branch and repo in a shared repo can be forced to
346
format = bzrdir.format_registry.make_bzrdir('knit')
347
self.make_repository('.', shared=True, format=format)
348
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
351
branch.bzrdir.open_repository()
353
def test_create_standalone_working_tree(self):
354
format = SampleBzrDirFormat()
355
# note this is deliberately readonly, as this failure should
356
# occur before any writes.
357
self.assertRaises(errors.NotLocalUrl,
358
bzrdir.BzrDir.create_standalone_workingtree,
359
self.get_readonly_url(), format=format)
360
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
362
self.assertEqual('A tree', tree)
364
def test_create_standalone_working_tree_under_shared_repo(self):
365
# create standalone working tree always makes a repo.
366
format = bzrdir.format_registry.make_bzrdir('knit')
367
self.make_repository('.', shared=True, format=format)
368
# note this is deliberately readonly, as this failure should
369
# occur before any writes.
370
self.assertRaises(errors.NotLocalUrl,
371
bzrdir.BzrDir.create_standalone_workingtree,
372
self.get_readonly_url('child'), format=format)
373
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
375
tree.bzrdir.open_repository()
377
def test_create_branch_convenience(self):
378
# outside a repo the default convenience output is a repo+branch_tree
379
format = bzrdir.format_registry.make_bzrdir('knit')
380
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
381
branch.bzrdir.open_workingtree()
382
branch.bzrdir.open_repository()
384
def test_create_branch_convenience_possible_transports(self):
385
"""Check that the optional 'possible_transports' is recognized"""
386
format = bzrdir.format_registry.make_bzrdir('knit')
387
t = self.get_transport()
388
branch = bzrdir.BzrDir.create_branch_convenience(
389
'.', format=format, possible_transports=[t])
390
branch.bzrdir.open_workingtree()
391
branch.bzrdir.open_repository()
393
def test_create_branch_convenience_root(self):
394
"""Creating a branch at the root of a fs should work."""
395
self.vfs_transport_factory = memory.MemoryServer
396
# outside a repo the default convenience output is a repo+branch_tree
397
format = bzrdir.format_registry.make_bzrdir('knit')
398
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
400
self.assertRaises(errors.NoWorkingTree,
401
branch.bzrdir.open_workingtree)
402
branch.bzrdir.open_repository()
404
def test_create_branch_convenience_under_shared_repo(self):
405
# inside a repo the default convenience output is a branch+ follow the
407
format = bzrdir.format_registry.make_bzrdir('knit')
408
self.make_repository('.', shared=True, format=format)
409
branch = bzrdir.BzrDir.create_branch_convenience('child',
411
branch.bzrdir.open_workingtree()
412
self.assertRaises(errors.NoRepositoryPresent,
413
branch.bzrdir.open_repository)
415
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
416
# inside a repo the default convenience output is a branch+ follow the
417
# repo tree policy but we can override that
418
format = bzrdir.format_registry.make_bzrdir('knit')
419
self.make_repository('.', shared=True, format=format)
420
branch = bzrdir.BzrDir.create_branch_convenience('child',
421
force_new_tree=False, format=format)
422
self.assertRaises(errors.NoWorkingTree,
423
branch.bzrdir.open_workingtree)
424
self.assertRaises(errors.NoRepositoryPresent,
425
branch.bzrdir.open_repository)
427
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
428
# inside a repo the default convenience output is a branch+ follow the
430
format = bzrdir.format_registry.make_bzrdir('knit')
431
repo = self.make_repository('.', shared=True, format=format)
432
repo.set_make_working_trees(False)
433
branch = bzrdir.BzrDir.create_branch_convenience('child',
435
self.assertRaises(errors.NoWorkingTree,
436
branch.bzrdir.open_workingtree)
437
self.assertRaises(errors.NoRepositoryPresent,
438
branch.bzrdir.open_repository)
440
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
441
# inside a repo the default convenience output is a branch+ follow the
442
# repo tree policy but we can override that
443
format = bzrdir.format_registry.make_bzrdir('knit')
444
repo = self.make_repository('.', shared=True, format=format)
445
repo.set_make_working_trees(False)
446
branch = bzrdir.BzrDir.create_branch_convenience('child',
447
force_new_tree=True, format=format)
448
branch.bzrdir.open_workingtree()
449
self.assertRaises(errors.NoRepositoryPresent,
450
branch.bzrdir.open_repository)
452
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
453
# inside a repo the default convenience output is overridable to give
455
format = bzrdir.format_registry.make_bzrdir('knit')
456
self.make_repository('.', shared=True, format=format)
457
branch = bzrdir.BzrDir.create_branch_convenience('child',
458
force_new_repo=True, format=format)
459
branch.bzrdir.open_repository()
460
branch.bzrdir.open_workingtree()
463
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
465
def test_acquire_repository_standalone(self):
466
"""The default acquisition policy should create a standalone branch."""
467
my_bzrdir = self.make_bzrdir('.')
468
repo_policy = my_bzrdir.determine_repository_policy()
469
repo, is_new = repo_policy.acquire_repository()
470
self.assertEqual(repo.bzrdir.root_transport.base,
471
my_bzrdir.root_transport.base)
472
self.assertFalse(repo.is_shared())
474
def test_determine_stacking_policy(self):
475
parent_bzrdir = self.make_bzrdir('.')
476
child_bzrdir = self.make_bzrdir('child')
477
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
478
repo_policy = child_bzrdir.determine_repository_policy()
479
self.assertEqual('http://example.org', repo_policy._stack_on)
481
def test_determine_stacking_policy_relative(self):
482
parent_bzrdir = self.make_bzrdir('.')
483
child_bzrdir = self.make_bzrdir('child')
484
parent_bzrdir.get_config().set_default_stack_on('child2')
485
repo_policy = child_bzrdir.determine_repository_policy()
486
self.assertEqual('child2', repo_policy._stack_on)
487
self.assertEqual(parent_bzrdir.root_transport.base,
488
repo_policy._stack_on_pwd)
490
def prepare_default_stacking(self, child_format='1.6'):
491
parent_bzrdir = self.make_bzrdir('.')
492
child_branch = self.make_branch('child', format=child_format)
493
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
494
new_child_transport = parent_bzrdir.transport.clone('child2')
495
return child_branch, new_child_transport
497
def test_clone_on_transport_obeys_stacking_policy(self):
498
child_branch, new_child_transport = self.prepare_default_stacking()
499
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
500
self.assertEqual(child_branch.base,
501
new_child.open_branch().get_stacked_on_url())
503
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
504
# Make stackable source branch with an unstackable repo format.
505
source_bzrdir = self.make_bzrdir('source')
506
knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
507
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
509
# Make a directory with a default stacking policy
510
parent_bzrdir = self.make_bzrdir('parent')
511
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
512
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
513
# Clone source into directory
514
target = source_bzrdir.clone(self.get_url('parent/target'))
516
def test_sprout_obeys_stacking_policy(self):
517
child_branch, new_child_transport = self.prepare_default_stacking()
518
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
519
self.assertEqual(child_branch.base,
520
new_child.open_branch().get_stacked_on_url())
522
def test_clone_ignores_policy_for_unsupported_formats(self):
523
child_branch, new_child_transport = self.prepare_default_stacking(
524
child_format='pack-0.92')
525
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
526
self.assertRaises(errors.UnstackableBranchFormat,
527
new_child.open_branch().get_stacked_on_url)
529
def test_sprout_ignores_policy_for_unsupported_formats(self):
530
child_branch, new_child_transport = self.prepare_default_stacking(
531
child_format='pack-0.92')
532
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
533
self.assertRaises(errors.UnstackableBranchFormat,
534
new_child.open_branch().get_stacked_on_url)
536
def test_sprout_upgrades_format_if_stacked_specified(self):
537
child_branch, new_child_transport = self.prepare_default_stacking(
538
child_format='pack-0.92')
539
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
541
self.assertEqual(child_branch.bzrdir.root_transport.base,
542
new_child.open_branch().get_stacked_on_url())
543
repo = new_child.open_repository()
544
self.assertTrue(repo._format.supports_external_lookups)
545
self.assertFalse(repo.supports_rich_root())
547
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
548
child_branch, new_child_transport = self.prepare_default_stacking(
549
child_format='pack-0.92')
550
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
551
stacked_on=child_branch.bzrdir.root_transport.base)
552
self.assertEqual(child_branch.bzrdir.root_transport.base,
553
new_child.open_branch().get_stacked_on_url())
554
repo = new_child.open_repository()
555
self.assertTrue(repo._format.supports_external_lookups)
556
self.assertFalse(repo.supports_rich_root())
558
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
559
child_branch, new_child_transport = self.prepare_default_stacking(
560
child_format='rich-root-pack')
561
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
563
repo = new_child.open_repository()
564
self.assertTrue(repo._format.supports_external_lookups)
565
self.assertTrue(repo.supports_rich_root())
567
def test_add_fallback_repo_handles_absolute_urls(self):
568
stack_on = self.make_branch('stack_on', format='1.6')
569
repo = self.make_repository('repo', format='1.6')
570
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
571
policy._add_fallback(repo)
573
def test_add_fallback_repo_handles_relative_urls(self):
574
stack_on = self.make_branch('stack_on', format='1.6')
575
repo = self.make_repository('repo', format='1.6')
576
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
577
policy._add_fallback(repo)
579
def test_configure_relative_branch_stacking_url(self):
580
stack_on = self.make_branch('stack_on', format='1.6')
581
stacked = self.make_branch('stack_on/stacked', format='1.6')
582
policy = bzrdir.UseExistingRepository(stacked.repository,
584
policy.configure_branch(stacked)
585
self.assertEqual('..', stacked.get_stacked_on_url())
587
def test_relative_branch_stacking_to_absolute(self):
588
stack_on = self.make_branch('stack_on', format='1.6')
589
stacked = self.make_branch('stack_on/stacked', format='1.6')
590
policy = bzrdir.UseExistingRepository(stacked.repository,
591
'.', self.get_readonly_url('stack_on'))
592
policy.configure_branch(stacked)
593
self.assertEqual(self.get_readonly_url('stack_on'),
594
stacked.get_stacked_on_url())
597
class ChrootedTests(TestCaseWithTransport):
598
"""A support class that provides readonly urls outside the local namespace.
600
This is done by checking if self.transport_server is a MemoryServer. if it
601
is then we are chrooted already, if it is not then an HttpServer is used
606
super(ChrootedTests, self).setUp()
607
if not self.vfs_transport_factory == memory.MemoryServer:
608
self.transport_readonly_server = http_server.HttpServer
610
def local_branch_path(self, branch):
611
return os.path.realpath(urlutils.local_path_from_url(branch.base))
613
def test_open_containing(self):
614
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
615
self.get_readonly_url(''))
616
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
617
self.get_readonly_url('g/p/q'))
618
control = bzrdir.BzrDir.create(self.get_url())
619
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
620
self.assertEqual('', relpath)
621
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
622
self.assertEqual('g/p/q', relpath)
624
def test_open_containing_tree_branch_or_repository_empty(self):
625
self.assertRaises(errors.NotBranchError,
626
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
627
self.get_readonly_url(''))
629
def test_open_containing_tree_branch_or_repository_all(self):
630
self.make_branch_and_tree('topdir')
631
tree, branch, repo, relpath = \
632
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
634
self.assertEqual(os.path.realpath('topdir'),
635
os.path.realpath(tree.basedir))
636
self.assertEqual(os.path.realpath('topdir'),
637
self.local_branch_path(branch))
639
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
640
repo.bzrdir.transport.local_abspath('repository'))
641
self.assertEqual(relpath, 'foo')
643
def test_open_containing_tree_branch_or_repository_no_tree(self):
644
self.make_branch('branch')
645
tree, branch, repo, relpath = \
646
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
648
self.assertEqual(tree, None)
649
self.assertEqual(os.path.realpath('branch'),
650
self.local_branch_path(branch))
652
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
653
repo.bzrdir.transport.local_abspath('repository'))
654
self.assertEqual(relpath, 'foo')
656
def test_open_containing_tree_branch_or_repository_repo(self):
657
self.make_repository('repo')
658
tree, branch, repo, relpath = \
659
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
661
self.assertEqual(tree, None)
662
self.assertEqual(branch, None)
664
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
665
repo.bzrdir.transport.local_abspath('repository'))
666
self.assertEqual(relpath, '')
668
def test_open_containing_tree_branch_or_repository_shared_repo(self):
669
self.make_repository('shared', shared=True)
670
bzrdir.BzrDir.create_branch_convenience('shared/branch',
671
force_new_tree=False)
672
tree, branch, repo, relpath = \
673
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
675
self.assertEqual(tree, None)
676
self.assertEqual(os.path.realpath('shared/branch'),
677
self.local_branch_path(branch))
679
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
680
repo.bzrdir.transport.local_abspath('repository'))
681
self.assertEqual(relpath, '')
683
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
684
self.make_branch_and_tree('foo')
685
self.build_tree(['foo/bar/'])
686
tree, branch, repo, relpath = \
687
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
689
self.assertEqual(os.path.realpath('foo'),
690
os.path.realpath(tree.basedir))
691
self.assertEqual(os.path.realpath('foo'),
692
self.local_branch_path(branch))
694
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
695
repo.bzrdir.transport.local_abspath('repository'))
696
self.assertEqual(relpath, 'bar')
698
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
699
self.make_repository('bar')
700
self.build_tree(['bar/baz/'])
701
tree, branch, repo, relpath = \
702
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
704
self.assertEqual(tree, None)
705
self.assertEqual(branch, None)
707
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
708
repo.bzrdir.transport.local_abspath('repository'))
709
self.assertEqual(relpath, 'baz')
711
def test_open_containing_from_transport(self):
712
self.assertRaises(NotBranchError,
713
bzrdir.BzrDir.open_containing_from_transport,
714
_mod_transport.get_transport(self.get_readonly_url('')))
715
self.assertRaises(NotBranchError,
716
bzrdir.BzrDir.open_containing_from_transport,
717
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
718
control = bzrdir.BzrDir.create(self.get_url())
719
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
720
_mod_transport.get_transport(self.get_readonly_url('')))
721
self.assertEqual('', relpath)
722
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
723
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
724
self.assertEqual('g/p/q', relpath)
726
def test_open_containing_tree_or_branch(self):
727
self.make_branch_and_tree('topdir')
728
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
730
self.assertEqual(os.path.realpath('topdir'),
731
os.path.realpath(tree.basedir))
732
self.assertEqual(os.path.realpath('topdir'),
733
self.local_branch_path(branch))
734
self.assertIs(tree.bzrdir, branch.bzrdir)
735
self.assertEqual('foo', relpath)
736
# opening from non-local should not return the tree
737
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
738
self.get_readonly_url('topdir/foo'))
739
self.assertEqual(None, tree)
740
self.assertEqual('foo', relpath)
742
self.make_branch('topdir/foo')
743
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
745
self.assertIs(tree, None)
746
self.assertEqual(os.path.realpath('topdir/foo'),
747
self.local_branch_path(branch))
748
self.assertEqual('', relpath)
750
def test_open_tree_or_branch(self):
751
self.make_branch_and_tree('topdir')
752
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
753
self.assertEqual(os.path.realpath('topdir'),
754
os.path.realpath(tree.basedir))
755
self.assertEqual(os.path.realpath('topdir'),
756
self.local_branch_path(branch))
757
self.assertIs(tree.bzrdir, branch.bzrdir)
758
# opening from non-local should not return the tree
759
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
760
self.get_readonly_url('topdir'))
761
self.assertEqual(None, tree)
763
self.make_branch('topdir/foo')
764
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
765
self.assertIs(tree, None)
766
self.assertEqual(os.path.realpath('topdir/foo'),
767
self.local_branch_path(branch))
769
def test_open_from_transport(self):
770
# transport pointing at bzrdir should give a bzrdir with root transport
771
# set to the given transport
772
control = bzrdir.BzrDir.create(self.get_url())
773
t = self.get_transport()
774
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
775
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
776
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
778
def test_open_from_transport_no_bzrdir(self):
779
t = self.get_transport()
780
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
782
def test_open_from_transport_bzrdir_in_parent(self):
783
control = bzrdir.BzrDir.create(self.get_url())
784
t = self.get_transport()
786
t = t.clone('subdir')
787
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
789
def test_sprout_recursive(self):
790
tree = self.make_branch_and_tree('tree1',
791
format='dirstate-with-subtree')
792
sub_tree = self.make_branch_and_tree('tree1/subtree',
793
format='dirstate-with-subtree')
794
sub_tree.set_root_id('subtree-root')
795
tree.add_reference(sub_tree)
796
self.build_tree(['tree1/subtree/file'])
798
tree.commit('Initial commit')
799
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
801
self.addCleanup(tree2.unlock)
802
self.assertPathExists('tree2/subtree/file')
803
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
805
def test_cloning_metadir(self):
806
"""Ensure that cloning metadir is suitable"""
807
bzrdir = self.make_bzrdir('bzrdir')
808
bzrdir.cloning_metadir()
809
branch = self.make_branch('branch', format='knit')
810
format = branch.bzrdir.cloning_metadir()
811
self.assertIsInstance(format.workingtree_format,
812
workingtree_4.WorkingTreeFormat6)
814
def test_sprout_recursive_treeless(self):
815
tree = self.make_branch_and_tree('tree1',
816
format='dirstate-with-subtree')
817
sub_tree = self.make_branch_and_tree('tree1/subtree',
818
format='dirstate-with-subtree')
819
tree.add_reference(sub_tree)
820
self.build_tree(['tree1/subtree/file'])
822
tree.commit('Initial commit')
823
# The following line force the orhaning to reveal bug #634470
824
tree.branch.get_config().set_user_option(
825
'bzr.transform.orphan_policy', 'move')
826
tree.bzrdir.destroy_workingtree()
827
# FIXME: subtree/.bzr is left here which allows the test to pass (or
828
# fail :-( ) -- vila 20100909
829
repo = self.make_repository('repo', shared=True,
830
format='dirstate-with-subtree')
831
repo.set_make_working_trees(False)
832
# FIXME: we just deleted the workingtree and now we want to use it ????
833
# At a minimum, we should use tree.branch below (but this fails too
834
# currently) or stop calling this test 'treeless'. Specifically, I've
835
# turn the line below into an assertRaises when 'subtree/.bzr' is
836
# orphaned and sprout tries to access the branch there (which is left
837
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
838
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
839
# #634470. -- vila 20100909
840
self.assertRaises(errors.NotBranchError,
841
tree.bzrdir.sprout, 'repo/tree2')
842
# self.assertPathExists('repo/tree2/subtree')
843
# self.assertPathDoesNotExist('repo/tree2/subtree/file')
845
def make_foo_bar_baz(self):
846
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
847
bar = self.make_branch('foo/bar').bzrdir
848
baz = self.make_branch('baz').bzrdir
851
def test_find_bzrdirs(self):
852
foo, bar, baz = self.make_foo_bar_baz()
853
t = self.get_transport()
854
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
856
def make_fake_permission_denied_transport(self, transport, paths):
857
"""Create a transport that raises PermissionDenied for some paths."""
860
raise errors.PermissionDenied(path)
862
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
863
path_filter_server.start_server()
864
self.addCleanup(path_filter_server.stop_server)
865
path_filter_transport = pathfilter.PathFilteringTransport(
866
path_filter_server, '.')
867
return (path_filter_server, path_filter_transport)
869
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
870
"""Check that each branch url ends with the given suffix."""
871
for actual_bzrdir in actual_bzrdirs:
872
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
874
def test_find_bzrdirs_permission_denied(self):
875
foo, bar, baz = self.make_foo_bar_baz()
876
t = self.get_transport()
877
path_filter_server, path_filter_transport = \
878
self.make_fake_permission_denied_transport(t, ['foo'])
880
self.assertBranchUrlsEndWith('/baz/',
881
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
883
smart_transport = self.make_smart_server('.',
884
backing_server=path_filter_server)
885
self.assertBranchUrlsEndWith('/baz/',
886
bzrdir.BzrDir.find_bzrdirs(smart_transport))
888
def test_find_bzrdirs_list_current(self):
889
def list_current(transport):
890
return [s for s in transport.list_dir('') if s != 'baz']
892
foo, bar, baz = self.make_foo_bar_baz()
893
t = self.get_transport()
894
self.assertEqualBzrdirs(
896
bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
898
def test_find_bzrdirs_evaluate(self):
899
def evaluate(bzrdir):
901
repo = bzrdir.open_repository()
902
except NoRepositoryPresent:
903
return True, bzrdir.root_transport.base
905
return False, bzrdir.root_transport.base
907
foo, bar, baz = self.make_foo_bar_baz()
908
t = self.get_transport()
909
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
910
list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
912
def assertEqualBzrdirs(self, first, second):
914
second = list(second)
915
self.assertEqual(len(first), len(second))
916
for x, y in zip(first, second):
917
self.assertEqual(x.root_transport.base, y.root_transport.base)
919
def test_find_branches(self):
920
root = self.make_repository('', shared=True)
921
foo, bar, baz = self.make_foo_bar_baz()
922
qux = self.make_bzrdir('foo/qux')
923
t = self.get_transport()
924
branches = bzrdir.BzrDir.find_branches(t)
925
self.assertEqual(baz.root_transport.base, branches[0].base)
926
self.assertEqual(foo.root_transport.base, branches[1].base)
927
self.assertEqual(bar.root_transport.base, branches[2].base)
929
# ensure this works without a top-level repo
930
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
931
self.assertEqual(foo.root_transport.base, branches[0].base)
932
self.assertEqual(bar.root_transport.base, branches[1].base)
935
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
937
def test_find_bzrdirs_missing_repo(self):
938
t = self.get_transport()
939
arepo = self.make_repository('arepo', shared=True)
940
abranch_url = arepo.user_url + '/abranch'
941
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
942
t.delete_tree('arepo/.bzr')
943
self.assertRaises(errors.NoRepositoryPresent,
944
branch.Branch.open, abranch_url)
945
self.make_branch('baz')
946
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
947
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
950
class TestMeta1DirFormat(TestCaseWithTransport):
951
"""Tests specific to the meta1 dir format."""
953
def test_right_base_dirs(self):
954
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
956
branch_base = t.clone('branch').base
957
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
958
self.assertEqual(branch_base,
959
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
960
repository_base = t.clone('repository').base
961
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
962
repository_format = repository.format_registry.get_default()
963
self.assertEqual(repository_base,
964
dir.get_repository_transport(repository_format).base)
965
checkout_base = t.clone('checkout').base
966
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
967
self.assertEqual(checkout_base,
968
dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
970
def test_meta1dir_uses_lockdir(self):
971
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
972
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
974
self.assertIsDirectory('branch-lock', t)
976
def test_comparison(self):
977
"""Equality and inequality behave properly.
979
Metadirs should compare equal iff they have the same repo, branch and
982
mydir = bzrdir.format_registry.make_bzrdir('knit')
983
self.assertEqual(mydir, mydir)
984
self.assertFalse(mydir != mydir)
985
otherdir = bzrdir.format_registry.make_bzrdir('knit')
986
self.assertEqual(otherdir, mydir)
987
self.assertFalse(otherdir != mydir)
988
otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
989
self.assertNotEqual(otherdir2, mydir)
990
self.assertFalse(otherdir2 == mydir)
992
def test_needs_conversion_different_working_tree(self):
993
# meta1dirs need an conversion if any element is not the default.
994
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
995
tree = self.make_branch_and_tree('tree', format='knit')
996
self.assertTrue(tree.bzrdir.needs_format_conversion(
999
def test_initialize_on_format_uses_smart_transport(self):
1000
self.setup_smart_server_with_call_log()
1001
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
1002
transport = self.get_transport('target')
1003
transport.ensure_base()
1004
self.reset_smart_call_log()
1005
instance = new_format.initialize_on_transport(transport)
1006
self.assertIsInstance(instance, remote.RemoteBzrDir)
1007
rpc_count = len(self.hpss_calls)
1008
# This figure represent the amount of work to perform this use case. It
1009
# is entirely ok to reduce this number if a test fails due to rpc_count
1010
# being too low. If rpc_count increases, more network roundtrips have
1011
# become necessary for this use case. Please do not adjust this number
1012
# upwards without agreement from bzr's network support maintainers.
1013
self.assertEqual(2, rpc_count)
1016
class NonLocalTests(TestCaseWithTransport):
1017
"""Tests for bzrdir static behaviour on non local paths."""
1020
super(NonLocalTests, self).setUp()
1021
self.vfs_transport_factory = memory.MemoryServer
1023
def test_create_branch_convenience(self):
1024
# outside a repo the default convenience output is a repo+branch_tree
1025
format = bzrdir.format_registry.make_bzrdir('knit')
1026
branch = bzrdir.BzrDir.create_branch_convenience(
1027
self.get_url('foo'), format=format)
1028
self.assertRaises(errors.NoWorkingTree,
1029
branch.bzrdir.open_workingtree)
1030
branch.bzrdir.open_repository()
1032
def test_create_branch_convenience_force_tree_not_local_fails(self):
1033
# outside a repo the default convenience output is a repo+branch_tree
1034
format = bzrdir.format_registry.make_bzrdir('knit')
1035
self.assertRaises(errors.NotLocalUrl,
1036
bzrdir.BzrDir.create_branch_convenience,
1037
self.get_url('foo'),
1038
force_new_tree=True,
1040
t = self.get_transport()
1041
self.assertFalse(t.has('foo'))
1043
def test_clone(self):
1044
# clone into a nonlocal path works
1045
format = bzrdir.format_registry.make_bzrdir('knit')
1046
branch = bzrdir.BzrDir.create_branch_convenience('local',
1048
branch.bzrdir.open_workingtree()
1049
result = branch.bzrdir.clone(self.get_url('remote'))
1050
self.assertRaises(errors.NoWorkingTree,
1051
result.open_workingtree)
1052
result.open_branch()
1053
result.open_repository()
1055
def test_checkout_metadir(self):
1056
# checkout_metadir has reasonable working tree format even when no
1057
# working tree is present
1058
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1059
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1060
checkout_format = my_bzrdir.checkout_metadir()
1061
self.assertIsInstance(checkout_format.workingtree_format,
1062
workingtree_4.WorkingTreeFormat4)
1065
class TestHTTPRedirections(object):
1066
"""Test redirection between two http servers.
1068
This MUST be used by daughter classes that also inherit from
1069
TestCaseWithTwoWebservers.
1071
We can't inherit directly from TestCaseWithTwoWebservers or the
1072
test framework will try to create an instance which cannot
1073
run, its implementation being incomplete.
1076
def create_transport_readonly_server(self):
1077
# We don't set the http protocol version, relying on the default
1078
return http_utils.HTTPServerRedirecting()
1080
def create_transport_secondary_server(self):
1081
# We don't set the http protocol version, relying on the default
1082
return http_utils.HTTPServerRedirecting()
1085
super(TestHTTPRedirections, self).setUp()
1086
# The redirections will point to the new server
1087
self.new_server = self.get_readonly_server()
1088
# The requests to the old server will be redirected
1089
self.old_server = self.get_secondary_server()
1090
# Configure the redirections
1091
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1093
def test_loop(self):
1094
# Both servers redirect to each other creating a loop
1095
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1096
# Starting from either server should loop
1097
old_url = self._qualified_url(self.old_server.host,
1098
self.old_server.port)
1099
oldt = self._transport(old_url)
1100
self.assertRaises(errors.NotBranchError,
1101
bzrdir.BzrDir.open_from_transport, oldt)
1102
new_url = self._qualified_url(self.new_server.host,
1103
self.new_server.port)
1104
newt = self._transport(new_url)
1105
self.assertRaises(errors.NotBranchError,
1106
bzrdir.BzrDir.open_from_transport, newt)
1108
def test_qualifier_preserved(self):
1109
wt = self.make_branch_and_tree('branch')
1110
old_url = self._qualified_url(self.old_server.host,
1111
self.old_server.port)
1112
start = self._transport(old_url).clone('branch')
1113
bdir = bzrdir.BzrDir.open_from_transport(start)
1114
# Redirection should preserve the qualifier, hence the transport class
1116
self.assertIsInstance(bdir.root_transport, type(start))
1119
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1120
http_utils.TestCaseWithTwoWebservers):
1121
"""Tests redirections for urllib implementation"""
1123
_transport = HttpTransport_urllib
1125
def _qualified_url(self, host, port):
1126
result = 'http+urllib://%s:%s' % (host, port)
1127
self.permit_url(result)
1132
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1133
TestHTTPRedirections,
1134
http_utils.TestCaseWithTwoWebservers):
1135
"""Tests redirections for pycurl implementation"""
1137
def _qualified_url(self, host, port):
1138
result = 'http+pycurl://%s:%s' % (host, port)
1139
self.permit_url(result)
1143
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1144
http_utils.TestCaseWithTwoWebservers):
1145
"""Tests redirections for the nosmart decorator"""
1147
_transport = NoSmartTransportDecorator
1149
def _qualified_url(self, host, port):
1150
result = 'nosmart+http://%s:%s' % (host, port)
1151
self.permit_url(result)
1155
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1156
http_utils.TestCaseWithTwoWebservers):
1157
"""Tests redirections for readonly decoratror"""
1159
_transport = ReadonlyTransportDecorator
1161
def _qualified_url(self, host, port):
1162
result = 'readonly+http://%s:%s' % (host, port)
1163
self.permit_url(result)
1167
class TestDotBzrHidden(TestCaseWithTransport):
1170
if sys.platform == 'win32':
1171
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1174
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1175
stderr=subprocess.PIPE)
1176
out, err = f.communicate()
1177
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1179
return out.splitlines()
1181
def test_dot_bzr_hidden(self):
1182
if sys.platform == 'win32' and not win32utils.has_win32file:
1183
raise TestSkipped('unable to make file hidden without pywin32 library')
1184
b = bzrdir.BzrDir.create('.')
1185
self.build_tree(['a'])
1186
self.assertEquals(['a'], self.get_ls())
1188
def test_dot_bzr_hidden_with_url(self):
1189
if sys.platform == 'win32' and not win32utils.has_win32file:
1190
raise TestSkipped('unable to make file hidden without pywin32 library')
1191
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1192
self.build_tree(['a'])
1193
self.assertEquals(['a'], self.get_ls())
1196
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1197
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1199
def _open(self, transport):
1200
return _TestBzrDir(transport, self)
1203
class _TestBzrDir(bzrdir.BzrDirMeta1):
1204
"""Test BzrDir implementation for TestBzrDirSprout.
1206
When created a _TestBzrDir already has repository and a branch. The branch
1207
is a test double as well.
1210
def __init__(self, *args, **kwargs):
1211
super(_TestBzrDir, self).__init__(*args, **kwargs)
1212
self.test_branch = _TestBranch(self.transport)
1213
self.test_branch.repository = self.create_repository()
1215
def open_branch(self, unsupported=False):
1216
return self.test_branch
1218
def cloning_metadir(self, require_stacking=False):
1219
return _TestBzrDirFormat()
1222
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1223
"""Test Branch format for TestBzrDirSprout."""
1226
class _TestBranch(bzrlib.branch.Branch):
1227
"""Test Branch implementation for TestBzrDirSprout."""
1229
def __init__(self, transport, *args, **kwargs):
1230
self._format = _TestBranchFormat()
1231
self._transport = transport
1232
self.base = transport.base
1233
super(_TestBranch, self).__init__(*args, **kwargs)
1237
def sprout(self, *args, **kwargs):
1238
self.calls.append('sprout')
1239
return _TestBranch(self._transport)
1241
def copy_content_into(self, destination, revision_id=None):
1242
self.calls.append('copy_content_into')
1244
def last_revision(self):
1245
return _mod_revision.NULL_REVISION
1247
def get_parent(self):
1250
def _get_config(self):
1251
return config.TransportConfig(self._transport, 'branch.conf')
1253
def set_parent(self, parent):
1254
self._parent = parent
1256
def lock_read(self):
1257
return lock.LogicalLockResult(self.unlock)
1263
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1265
def test_sprout_uses_branch_sprout(self):
1266
"""BzrDir.sprout calls Branch.sprout.
1268
Usually, BzrDir.sprout should delegate to the branch's sprout method
1269
for part of the work. This allows the source branch to control the
1270
choice of format for the new branch.
1272
There are exceptions, but this tests avoids them:
1273
- if there's no branch in the source bzrdir,
1274
- or if the stacking has been requested and the format needs to be
1275
overridden to satisfy that.
1277
# Make an instrumented bzrdir.
1278
t = self.get_transport('source')
1280
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1281
# The instrumented bzrdir has a test_branch attribute that logs calls
1282
# made to the branch contained in that bzrdir. Initially the test
1283
# branch exists but no calls have been made to it.
1284
self.assertEqual([], source_bzrdir.test_branch.calls)
1287
target_url = self.get_url('target')
1288
result = source_bzrdir.sprout(target_url, recurse='no')
1290
# The bzrdir called the branch's sprout method.
1291
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1293
def test_sprout_parent(self):
1294
grandparent_tree = self.make_branch('grandparent')
1295
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1296
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1297
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1300
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1302
def test_pre_open_called(self):
1304
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1305
transport = self.get_transport('foo')
1306
url = transport.base
1307
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1308
self.assertEqual([transport.base], [t.base for t in calls])
1310
def test_pre_open_actual_exceptions_raised(self):
1312
def fail_once(transport):
1315
raise errors.BzrError("fail")
1316
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1317
transport = self.get_transport('foo')
1318
url = transport.base
1319
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1320
self.assertEqual('fail', err._preformatted_string)
1322
def test_post_repo_init(self):
1323
from bzrlib.bzrdir import RepoInitHookParams
1325
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1327
self.make_repository('foo')
1328
self.assertLength(1, calls)
1330
self.assertIsInstance(params, RepoInitHookParams)
1331
self.assertTrue(hasattr(params, 'bzrdir'))
1332
self.assertTrue(hasattr(params, 'repository'))
1334
def test_post_repo_init_hook_repr(self):
1336
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1337
lambda params: param_reprs.append(repr(params)), None)
1338
self.make_repository('foo')
1339
self.assertLength(1, param_reprs)
1340
param_repr = param_reprs[0]
1341
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1344
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1345
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1346
# moved to per_bzrdir or per_transport for better coverage ?
1350
super(TestGenerateBackupName, self).setUp()
1351
self._transport = self.get_transport()
1352
bzrdir.BzrDir.create(self.get_url(),
1353
possible_transports=[self._transport])
1354
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1356
def test_deprecated_generate_backup_name(self):
1357
res = self.applyDeprecated(
1358
symbol_versioning.deprecated_in((2, 3, 0)),
1359
self._bzrdir.generate_backup_name, 'whatever')
1362
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1364
def test_exiting(self):
1365
self._transport.put_bytes("a.~1~", "some content")
1366
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))