1
# Copyright (C) 2006-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
34
revision as _mod_revision,
38
transport as _mod_transport,
44
from bzrlib.errors import (
46
NoColocatedBranchSupport,
48
UnsupportedFormatError,
50
from bzrlib.tests import (
52
TestCaseWithMemoryTransport,
53
TestCaseWithTransport,
56
from bzrlib.tests import(
60
from bzrlib.tests.test_http import TestWithTransport_pycurl
61
from bzrlib.transport import (
65
from bzrlib.transport.http._urllib import HttpTransport_urllib
66
from bzrlib.transport.nosmart import NoSmartTransportDecorator
67
from bzrlib.transport.readonly import ReadonlyTransportDecorator
68
from bzrlib.repofmt import knitrepo, knitpack_repo
71
class TestDefaultFormat(TestCase):
73
def test_get_set_default_format(self):
74
old_format = bzrdir.BzrDirFormat.get_default_format()
75
# default is BzrDirMetaFormat1
76
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
77
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
78
# creating a bzr dir should now create an instrumented dir.
80
result = bzrdir.BzrDir.create('memory:///')
81
self.assertIsInstance(result, SampleBzrDir)
83
controldir.ControlDirFormat._set_default_format(old_format)
84
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
87
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
88
"""A deprecated bzr dir format."""
91
class TestFormatRegistry(TestCase):
93
def make_format_registry(self):
94
my_format_registry = controldir.ControlDirFormatRegistry()
95
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
96
'Some format. Slower and unawesome and deprecated.',
98
my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
99
'DeprecatedBzrDirFormat', 'Format registered lazily',
101
bzrdir.register_metadir(my_format_registry, 'knit',
102
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
103
'Format using knits',
105
my_format_registry.set_default('knit')
106
bzrdir.register_metadir(my_format_registry,
108
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
109
'Experimental successor to knit. Use at your own risk.',
110
branch_format='bzrlib.branch.BzrBranchFormat6',
112
bzrdir.register_metadir(my_format_registry,
114
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
115
'Experimental successor to knit. Use at your own risk.',
116
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
117
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
118
'Old format. Slower and does not support things. ', hidden=True)
119
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
120
'DeprecatedBzrDirFormat', 'Format registered lazily',
121
deprecated=True, hidden=True)
122
return my_format_registry
124
def test_format_registry(self):
125
my_format_registry = self.make_format_registry()
126
my_bzrdir = my_format_registry.make_bzrdir('lazy')
127
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
128
my_bzrdir = my_format_registry.make_bzrdir('deprecated')
129
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
130
my_bzrdir = my_format_registry.make_bzrdir('default')
131
self.assertIsInstance(my_bzrdir.repository_format,
132
knitrepo.RepositoryFormatKnit1)
133
my_bzrdir = my_format_registry.make_bzrdir('knit')
134
self.assertIsInstance(my_bzrdir.repository_format,
135
knitrepo.RepositoryFormatKnit1)
136
my_bzrdir = my_format_registry.make_bzrdir('branch6')
137
self.assertIsInstance(my_bzrdir.get_branch_format(),
138
bzrlib.branch.BzrBranchFormat6)
140
def test_get_help(self):
141
my_format_registry = self.make_format_registry()
142
self.assertEqual('Format registered lazily',
143
my_format_registry.get_help('lazy'))
144
self.assertEqual('Format using knits',
145
my_format_registry.get_help('knit'))
146
self.assertEqual('Format using knits',
147
my_format_registry.get_help('default'))
148
self.assertEqual('Some format. Slower and unawesome and deprecated.',
149
my_format_registry.get_help('deprecated'))
151
def test_help_topic(self):
152
topics = help_topics.HelpTopicRegistry()
153
registry = self.make_format_registry()
154
topics.register('current-formats', registry.help_topic,
156
topics.register('other-formats', registry.help_topic,
158
new = topics.get_detail('current-formats')
159
rest = topics.get_detail('other-formats')
160
experimental, deprecated = rest.split('Deprecated formats')
161
self.assertContainsRe(new, 'formats-help')
162
self.assertContainsRe(new,
163
':knit:\n \(native\) \(default\) Format using knits\n')
164
self.assertContainsRe(experimental,
165
':branch6:\n \(native\) Experimental successor to knit')
166
self.assertContainsRe(deprecated,
167
':lazy:\n \(native\) Format registered lazily\n')
168
self.assertNotContainsRe(new, 'hidden')
170
def test_set_default_repository(self):
171
default_factory = bzrdir.format_registry.get('default')
172
old_default = [k for k, v in bzrdir.format_registry.iteritems()
173
if v == default_factory and k != 'default'][0]
174
bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
176
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
177
bzrdir.format_registry.get('default'))
179
repository.format_registry.get_default().__class__,
180
knitrepo.RepositoryFormatKnit3)
182
bzrdir.format_registry.set_default_repository(old_default)
184
def test_aliases(self):
185
a_registry = controldir.ControlDirFormatRegistry()
186
a_registry.register('deprecated', DeprecatedBzrDirFormat,
187
'Old format. Slower and does not support stuff',
189
a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
190
'Old format. Slower and does not support stuff',
191
deprecated=True, alias=True)
192
self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
195
class SampleBranch(bzrlib.branch.Branch):
196
"""A dummy branch for guess what, dummy use."""
198
def __init__(self, dir):
202
class SampleRepository(bzrlib.repository.Repository):
205
def __init__(self, dir):
209
class SampleBzrDir(bzrdir.BzrDir):
210
"""A sample BzrDir implementation to allow testing static methods."""
212
def create_repository(self, shared=False):
213
"""See BzrDir.create_repository."""
214
return "A repository"
216
def open_repository(self):
217
"""See BzrDir.open_repository."""
218
return SampleRepository(self)
220
def create_branch(self, name=None):
221
"""See BzrDir.create_branch."""
223
raise NoColocatedBranchSupport(self)
224
return SampleBranch(self)
226
def create_workingtree(self):
227
"""See BzrDir.create_workingtree."""
231
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
234
this format is initializable, unsupported to aid in testing the
235
open and open_downlevel routines.
238
def get_format_string(self):
239
"""See BzrDirFormat.get_format_string()."""
240
return "Sample .bzr dir format."
242
def initialize_on_transport(self, t):
243
"""Create a bzr dir."""
245
t.put_bytes('.bzr/branch-format', self.get_format_string())
246
return SampleBzrDir(t, self)
248
def is_supported(self):
251
def open(self, transport, _found=None):
252
return "opened branch."
255
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
258
def get_format_string():
259
return "Test format 1"
262
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
265
def get_format_string():
266
return "Test format 2"
269
class TestBzrDirFormat(TestCaseWithTransport):
270
"""Tests for the BzrDirFormat facility."""
272
def test_find_format(self):
273
# is the right format object found for a branch?
274
# create a branch with a few known format objects.
275
bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
277
self.addCleanup(bzrdir.BzrProber.formats.remove,
278
BzrDirFormatTest1.get_format_string())
279
bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
281
self.addCleanup(bzrdir.BzrProber.formats.remove,
282
BzrDirFormatTest2.get_format_string())
283
t = self.get_transport()
284
self.build_tree(["foo/", "bar/"], transport=t)
285
def check_format(format, url):
286
format.initialize(url)
287
t = _mod_transport.get_transport(url)
288
found_format = bzrdir.BzrDirFormat.find_format(t)
289
self.assertIsInstance(found_format, format.__class__)
290
check_format(BzrDirFormatTest1(), "foo")
291
check_format(BzrDirFormatTest2(), "bar")
293
def test_find_format_nothing_there(self):
294
self.assertRaises(NotBranchError,
295
bzrdir.BzrDirFormat.find_format,
296
_mod_transport.get_transport('.'))
298
def test_find_format_unknown_format(self):
299
t = self.get_transport()
301
t.put_bytes('.bzr/branch-format', '')
302
self.assertRaises(UnknownFormatError,
303
bzrdir.BzrDirFormat.find_format,
304
_mod_transport.get_transport('.'))
306
def test_register_unregister_format(self):
307
format = SampleBzrDirFormat()
310
format.initialize(url)
311
# register a format for it.
312
bzrdir.BzrProber.formats.register(format.get_format_string(), format)
313
# which bzrdir.Open will refuse (not supported)
314
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
315
# which bzrdir.open_containing will refuse (not supported)
316
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
317
# but open_downlevel will work
318
t = _mod_transport.get_transport(url)
319
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
320
# unregister the format
321
bzrdir.BzrProber.formats.remove(format.get_format_string())
322
# now open_downlevel should fail too.
323
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
325
def test_create_branch_and_repo_uses_default(self):
326
format = SampleBzrDirFormat()
327
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
329
self.assertTrue(isinstance(branch, SampleBranch))
331
def test_create_branch_and_repo_under_shared(self):
332
# creating a branch and repo in a shared repo uses the
334
format = bzrdir.format_registry.make_bzrdir('knit')
335
self.make_repository('.', shared=True, format=format)
336
branch = bzrdir.BzrDir.create_branch_and_repo(
337
self.get_url('child'), format=format)
338
self.assertRaises(errors.NoRepositoryPresent,
339
branch.bzrdir.open_repository)
341
def test_create_branch_and_repo_under_shared_force_new(self):
342
# creating a branch and repo in a shared repo can be forced to
344
format = bzrdir.format_registry.make_bzrdir('knit')
345
self.make_repository('.', shared=True, format=format)
346
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
349
branch.bzrdir.open_repository()
351
def test_create_standalone_working_tree(self):
352
format = SampleBzrDirFormat()
353
# note this is deliberately readonly, as this failure should
354
# occur before any writes.
355
self.assertRaises(errors.NotLocalUrl,
356
bzrdir.BzrDir.create_standalone_workingtree,
357
self.get_readonly_url(), format=format)
358
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
360
self.assertEqual('A tree', tree)
362
def test_create_standalone_working_tree_under_shared_repo(self):
363
# create standalone working tree always makes a repo.
364
format = bzrdir.format_registry.make_bzrdir('knit')
365
self.make_repository('.', shared=True, format=format)
366
# note this is deliberately readonly, as this failure should
367
# occur before any writes.
368
self.assertRaises(errors.NotLocalUrl,
369
bzrdir.BzrDir.create_standalone_workingtree,
370
self.get_readonly_url('child'), format=format)
371
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
373
tree.bzrdir.open_repository()
375
def test_create_branch_convenience(self):
376
# outside a repo the default convenience output is a repo+branch_tree
377
format = bzrdir.format_registry.make_bzrdir('knit')
378
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
379
branch.bzrdir.open_workingtree()
380
branch.bzrdir.open_repository()
382
def test_create_branch_convenience_possible_transports(self):
383
"""Check that the optional 'possible_transports' is recognized"""
384
format = bzrdir.format_registry.make_bzrdir('knit')
385
t = self.get_transport()
386
branch = bzrdir.BzrDir.create_branch_convenience(
387
'.', format=format, possible_transports=[t])
388
branch.bzrdir.open_workingtree()
389
branch.bzrdir.open_repository()
391
def test_create_branch_convenience_root(self):
392
"""Creating a branch at the root of a fs should work."""
393
self.vfs_transport_factory = memory.MemoryServer
394
# outside a repo the default convenience output is a repo+branch_tree
395
format = bzrdir.format_registry.make_bzrdir('knit')
396
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
398
self.assertRaises(errors.NoWorkingTree,
399
branch.bzrdir.open_workingtree)
400
branch.bzrdir.open_repository()
402
def test_create_branch_convenience_under_shared_repo(self):
403
# inside a repo the default convenience output is a branch+ follow the
405
format = bzrdir.format_registry.make_bzrdir('knit')
406
self.make_repository('.', shared=True, format=format)
407
branch = bzrdir.BzrDir.create_branch_convenience('child',
409
branch.bzrdir.open_workingtree()
410
self.assertRaises(errors.NoRepositoryPresent,
411
branch.bzrdir.open_repository)
413
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
414
# inside a repo the default convenience output is a branch+ follow the
415
# repo tree policy but we can override that
416
format = bzrdir.format_registry.make_bzrdir('knit')
417
self.make_repository('.', shared=True, format=format)
418
branch = bzrdir.BzrDir.create_branch_convenience('child',
419
force_new_tree=False, format=format)
420
self.assertRaises(errors.NoWorkingTree,
421
branch.bzrdir.open_workingtree)
422
self.assertRaises(errors.NoRepositoryPresent,
423
branch.bzrdir.open_repository)
425
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
426
# inside a repo the default convenience output is a branch+ follow the
428
format = bzrdir.format_registry.make_bzrdir('knit')
429
repo = self.make_repository('.', shared=True, format=format)
430
repo.set_make_working_trees(False)
431
branch = bzrdir.BzrDir.create_branch_convenience('child',
433
self.assertRaises(errors.NoWorkingTree,
434
branch.bzrdir.open_workingtree)
435
self.assertRaises(errors.NoRepositoryPresent,
436
branch.bzrdir.open_repository)
438
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
439
# inside a repo the default convenience output is a branch+ follow the
440
# repo tree policy but we can override that
441
format = bzrdir.format_registry.make_bzrdir('knit')
442
repo = self.make_repository('.', shared=True, format=format)
443
repo.set_make_working_trees(False)
444
branch = bzrdir.BzrDir.create_branch_convenience('child',
445
force_new_tree=True, format=format)
446
branch.bzrdir.open_workingtree()
447
self.assertRaises(errors.NoRepositoryPresent,
448
branch.bzrdir.open_repository)
450
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
451
# inside a repo the default convenience output is overridable to give
453
format = bzrdir.format_registry.make_bzrdir('knit')
454
self.make_repository('.', shared=True, format=format)
455
branch = bzrdir.BzrDir.create_branch_convenience('child',
456
force_new_repo=True, format=format)
457
branch.bzrdir.open_repository()
458
branch.bzrdir.open_workingtree()
461
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
463
def test_acquire_repository_standalone(self):
464
"""The default acquisition policy should create a standalone branch."""
465
my_bzrdir = self.make_bzrdir('.')
466
repo_policy = my_bzrdir.determine_repository_policy()
467
repo, is_new = repo_policy.acquire_repository()
468
self.assertEqual(repo.bzrdir.root_transport.base,
469
my_bzrdir.root_transport.base)
470
self.assertFalse(repo.is_shared())
472
def test_determine_stacking_policy(self):
473
parent_bzrdir = self.make_bzrdir('.')
474
child_bzrdir = self.make_bzrdir('child')
475
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
476
repo_policy = child_bzrdir.determine_repository_policy()
477
self.assertEqual('http://example.org', repo_policy._stack_on)
479
def test_determine_stacking_policy_relative(self):
480
parent_bzrdir = self.make_bzrdir('.')
481
child_bzrdir = self.make_bzrdir('child')
482
parent_bzrdir.get_config().set_default_stack_on('child2')
483
repo_policy = child_bzrdir.determine_repository_policy()
484
self.assertEqual('child2', repo_policy._stack_on)
485
self.assertEqual(parent_bzrdir.root_transport.base,
486
repo_policy._stack_on_pwd)
488
def prepare_default_stacking(self, child_format='1.6'):
489
parent_bzrdir = self.make_bzrdir('.')
490
child_branch = self.make_branch('child', format=child_format)
491
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
492
new_child_transport = parent_bzrdir.transport.clone('child2')
493
return child_branch, new_child_transport
495
def test_clone_on_transport_obeys_stacking_policy(self):
496
child_branch, new_child_transport = self.prepare_default_stacking()
497
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
498
self.assertEqual(child_branch.base,
499
new_child.open_branch().get_stacked_on_url())
501
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
502
# Make stackable source branch with an unstackable repo format.
503
source_bzrdir = self.make_bzrdir('source')
504
knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
505
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
507
# Make a directory with a default stacking policy
508
parent_bzrdir = self.make_bzrdir('parent')
509
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
510
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
511
# Clone source into directory
512
target = source_bzrdir.clone(self.get_url('parent/target'))
514
def test_sprout_obeys_stacking_policy(self):
515
child_branch, new_child_transport = self.prepare_default_stacking()
516
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
517
self.assertEqual(child_branch.base,
518
new_child.open_branch().get_stacked_on_url())
520
def test_clone_ignores_policy_for_unsupported_formats(self):
521
child_branch, new_child_transport = self.prepare_default_stacking(
522
child_format='pack-0.92')
523
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
524
self.assertRaises(errors.UnstackableBranchFormat,
525
new_child.open_branch().get_stacked_on_url)
527
def test_sprout_ignores_policy_for_unsupported_formats(self):
528
child_branch, new_child_transport = self.prepare_default_stacking(
529
child_format='pack-0.92')
530
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
531
self.assertRaises(errors.UnstackableBranchFormat,
532
new_child.open_branch().get_stacked_on_url)
534
def test_sprout_upgrades_format_if_stacked_specified(self):
535
child_branch, new_child_transport = self.prepare_default_stacking(
536
child_format='pack-0.92')
537
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
539
self.assertEqual(child_branch.bzrdir.root_transport.base,
540
new_child.open_branch().get_stacked_on_url())
541
repo = new_child.open_repository()
542
self.assertTrue(repo._format.supports_external_lookups)
543
self.assertFalse(repo.supports_rich_root())
545
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
546
child_branch, new_child_transport = self.prepare_default_stacking(
547
child_format='pack-0.92')
548
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
549
stacked_on=child_branch.bzrdir.root_transport.base)
550
self.assertEqual(child_branch.bzrdir.root_transport.base,
551
new_child.open_branch().get_stacked_on_url())
552
repo = new_child.open_repository()
553
self.assertTrue(repo._format.supports_external_lookups)
554
self.assertFalse(repo.supports_rich_root())
556
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
557
child_branch, new_child_transport = self.prepare_default_stacking(
558
child_format='rich-root-pack')
559
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
561
repo = new_child.open_repository()
562
self.assertTrue(repo._format.supports_external_lookups)
563
self.assertTrue(repo.supports_rich_root())
565
def test_add_fallback_repo_handles_absolute_urls(self):
566
stack_on = self.make_branch('stack_on', format='1.6')
567
repo = self.make_repository('repo', format='1.6')
568
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
569
policy._add_fallback(repo)
571
def test_add_fallback_repo_handles_relative_urls(self):
572
stack_on = self.make_branch('stack_on', format='1.6')
573
repo = self.make_repository('repo', format='1.6')
574
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
575
policy._add_fallback(repo)
577
def test_configure_relative_branch_stacking_url(self):
578
stack_on = self.make_branch('stack_on', format='1.6')
579
stacked = self.make_branch('stack_on/stacked', format='1.6')
580
policy = bzrdir.UseExistingRepository(stacked.repository,
582
policy.configure_branch(stacked)
583
self.assertEqual('..', stacked.get_stacked_on_url())
585
def test_relative_branch_stacking_to_absolute(self):
586
stack_on = self.make_branch('stack_on', format='1.6')
587
stacked = self.make_branch('stack_on/stacked', format='1.6')
588
policy = bzrdir.UseExistingRepository(stacked.repository,
589
'.', self.get_readonly_url('stack_on'))
590
policy.configure_branch(stacked)
591
self.assertEqual(self.get_readonly_url('stack_on'),
592
stacked.get_stacked_on_url())
595
class ChrootedTests(TestCaseWithTransport):
596
"""A support class that provides readonly urls outside the local namespace.
598
This is done by checking if self.transport_server is a MemoryServer. if it
599
is then we are chrooted already, if it is not then an HttpServer is used
604
super(ChrootedTests, self).setUp()
605
if not self.vfs_transport_factory == memory.MemoryServer:
606
self.transport_readonly_server = http_server.HttpServer
608
def local_branch_path(self, branch):
609
return os.path.realpath(urlutils.local_path_from_url(branch.base))
611
def test_open_containing(self):
612
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
613
self.get_readonly_url(''))
614
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
615
self.get_readonly_url('g/p/q'))
616
control = bzrdir.BzrDir.create(self.get_url())
617
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
618
self.assertEqual('', relpath)
619
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
620
self.assertEqual('g/p/q', relpath)
622
def test_open_containing_tree_branch_or_repository_empty(self):
623
self.assertRaises(errors.NotBranchError,
624
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
625
self.get_readonly_url(''))
627
def test_open_containing_tree_branch_or_repository_all(self):
628
self.make_branch_and_tree('topdir')
629
tree, branch, repo, relpath = \
630
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
632
self.assertEqual(os.path.realpath('topdir'),
633
os.path.realpath(tree.basedir))
634
self.assertEqual(os.path.realpath('topdir'),
635
self.local_branch_path(branch))
637
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
638
repo.bzrdir.transport.local_abspath('repository'))
639
self.assertEqual(relpath, 'foo')
641
def test_open_containing_tree_branch_or_repository_no_tree(self):
642
self.make_branch('branch')
643
tree, branch, repo, relpath = \
644
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
646
self.assertEqual(tree, None)
647
self.assertEqual(os.path.realpath('branch'),
648
self.local_branch_path(branch))
650
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
651
repo.bzrdir.transport.local_abspath('repository'))
652
self.assertEqual(relpath, 'foo')
654
def test_open_containing_tree_branch_or_repository_repo(self):
655
self.make_repository('repo')
656
tree, branch, repo, relpath = \
657
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
659
self.assertEqual(tree, None)
660
self.assertEqual(branch, None)
662
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
663
repo.bzrdir.transport.local_abspath('repository'))
664
self.assertEqual(relpath, '')
666
def test_open_containing_tree_branch_or_repository_shared_repo(self):
667
self.make_repository('shared', shared=True)
668
bzrdir.BzrDir.create_branch_convenience('shared/branch',
669
force_new_tree=False)
670
tree, branch, repo, relpath = \
671
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
673
self.assertEqual(tree, None)
674
self.assertEqual(os.path.realpath('shared/branch'),
675
self.local_branch_path(branch))
677
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
678
repo.bzrdir.transport.local_abspath('repository'))
679
self.assertEqual(relpath, '')
681
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
682
self.make_branch_and_tree('foo')
683
self.build_tree(['foo/bar/'])
684
tree, branch, repo, relpath = \
685
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
687
self.assertEqual(os.path.realpath('foo'),
688
os.path.realpath(tree.basedir))
689
self.assertEqual(os.path.realpath('foo'),
690
self.local_branch_path(branch))
692
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
693
repo.bzrdir.transport.local_abspath('repository'))
694
self.assertEqual(relpath, 'bar')
696
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
697
self.make_repository('bar')
698
self.build_tree(['bar/baz/'])
699
tree, branch, repo, relpath = \
700
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
702
self.assertEqual(tree, None)
703
self.assertEqual(branch, None)
705
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
706
repo.bzrdir.transport.local_abspath('repository'))
707
self.assertEqual(relpath, 'baz')
709
def test_open_containing_from_transport(self):
710
self.assertRaises(NotBranchError,
711
bzrdir.BzrDir.open_containing_from_transport,
712
_mod_transport.get_transport(self.get_readonly_url('')))
713
self.assertRaises(NotBranchError,
714
bzrdir.BzrDir.open_containing_from_transport,
715
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
716
control = bzrdir.BzrDir.create(self.get_url())
717
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
718
_mod_transport.get_transport(self.get_readonly_url('')))
719
self.assertEqual('', relpath)
720
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
721
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
722
self.assertEqual('g/p/q', relpath)
724
def test_open_containing_tree_or_branch(self):
725
self.make_branch_and_tree('topdir')
726
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
728
self.assertEqual(os.path.realpath('topdir'),
729
os.path.realpath(tree.basedir))
730
self.assertEqual(os.path.realpath('topdir'),
731
self.local_branch_path(branch))
732
self.assertIs(tree.bzrdir, branch.bzrdir)
733
self.assertEqual('foo', relpath)
734
# opening from non-local should not return the tree
735
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
736
self.get_readonly_url('topdir/foo'))
737
self.assertEqual(None, tree)
738
self.assertEqual('foo', relpath)
740
self.make_branch('topdir/foo')
741
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
743
self.assertIs(tree, None)
744
self.assertEqual(os.path.realpath('topdir/foo'),
745
self.local_branch_path(branch))
746
self.assertEqual('', relpath)
748
def test_open_tree_or_branch(self):
749
self.make_branch_and_tree('topdir')
750
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
751
self.assertEqual(os.path.realpath('topdir'),
752
os.path.realpath(tree.basedir))
753
self.assertEqual(os.path.realpath('topdir'),
754
self.local_branch_path(branch))
755
self.assertIs(tree.bzrdir, branch.bzrdir)
756
# opening from non-local should not return the tree
757
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
758
self.get_readonly_url('topdir'))
759
self.assertEqual(None, tree)
761
self.make_branch('topdir/foo')
762
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
763
self.assertIs(tree, None)
764
self.assertEqual(os.path.realpath('topdir/foo'),
765
self.local_branch_path(branch))
767
def test_open_from_transport(self):
768
# transport pointing at bzrdir should give a bzrdir with root transport
769
# set to the given transport
770
control = bzrdir.BzrDir.create(self.get_url())
771
t = self.get_transport()
772
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
773
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
774
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
776
def test_open_from_transport_no_bzrdir(self):
777
t = self.get_transport()
778
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
780
def test_open_from_transport_bzrdir_in_parent(self):
781
control = bzrdir.BzrDir.create(self.get_url())
782
t = self.get_transport()
784
t = t.clone('subdir')
785
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
787
def test_sprout_recursive(self):
788
tree = self.make_branch_and_tree('tree1',
789
format='dirstate-with-subtree')
790
sub_tree = self.make_branch_and_tree('tree1/subtree',
791
format='dirstate-with-subtree')
792
sub_tree.set_root_id('subtree-root')
793
tree.add_reference(sub_tree)
794
self.build_tree(['tree1/subtree/file'])
796
tree.commit('Initial commit')
797
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
799
self.addCleanup(tree2.unlock)
800
self.assertPathExists('tree2/subtree/file')
801
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
803
def test_cloning_metadir(self):
804
"""Ensure that cloning metadir is suitable"""
805
bzrdir = self.make_bzrdir('bzrdir')
806
bzrdir.cloning_metadir()
807
branch = self.make_branch('branch', format='knit')
808
format = branch.bzrdir.cloning_metadir()
809
self.assertIsInstance(format.workingtree_format,
810
workingtree.WorkingTreeFormat3)
812
def test_sprout_recursive_treeless(self):
813
tree = self.make_branch_and_tree('tree1',
814
format='dirstate-with-subtree')
815
sub_tree = self.make_branch_and_tree('tree1/subtree',
816
format='dirstate-with-subtree')
817
tree.add_reference(sub_tree)
818
self.build_tree(['tree1/subtree/file'])
820
tree.commit('Initial commit')
821
# The following line force the orhaning to reveal bug #634470
822
tree.branch.get_config().set_user_option(
823
'bzr.transform.orphan_policy', 'move')
824
tree.bzrdir.destroy_workingtree()
825
# FIXME: subtree/.bzr is left here which allows the test to pass (or
826
# fail :-( ) -- vila 20100909
827
repo = self.make_repository('repo', shared=True,
828
format='dirstate-with-subtree')
829
repo.set_make_working_trees(False)
830
# FIXME: we just deleted the workingtree and now we want to use it ????
831
# At a minimum, we should use tree.branch below (but this fails too
832
# currently) or stop calling this test 'treeless'. Specifically, I've
833
# turn the line below into an assertRaises when 'subtree/.bzr' is
834
# orphaned and sprout tries to access the branch there (which is left
835
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
836
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
837
# #634470. -- vila 20100909
838
self.assertRaises(errors.NotBranchError,
839
tree.bzrdir.sprout, 'repo/tree2')
840
# self.assertPathExists('repo/tree2/subtree')
841
# self.assertPathDoesNotExist('repo/tree2/subtree/file')
843
def make_foo_bar_baz(self):
844
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
845
bar = self.make_branch('foo/bar').bzrdir
846
baz = self.make_branch('baz').bzrdir
849
def test_find_bzrdirs(self):
850
foo, bar, baz = self.make_foo_bar_baz()
851
t = self.get_transport()
852
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
854
def make_fake_permission_denied_transport(self, transport, paths):
855
"""Create a transport that raises PermissionDenied for some paths."""
858
raise errors.PermissionDenied(path)
860
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
861
path_filter_server.start_server()
862
self.addCleanup(path_filter_server.stop_server)
863
path_filter_transport = pathfilter.PathFilteringTransport(
864
path_filter_server, '.')
865
return (path_filter_server, path_filter_transport)
867
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
868
"""Check that each branch url ends with the given suffix."""
869
for actual_bzrdir in actual_bzrdirs:
870
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
872
def test_find_bzrdirs_permission_denied(self):
873
foo, bar, baz = self.make_foo_bar_baz()
874
t = self.get_transport()
875
path_filter_server, path_filter_transport = \
876
self.make_fake_permission_denied_transport(t, ['foo'])
878
self.assertBranchUrlsEndWith('/baz/',
879
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
881
smart_transport = self.make_smart_server('.',
882
backing_server=path_filter_server)
883
self.assertBranchUrlsEndWith('/baz/',
884
bzrdir.BzrDir.find_bzrdirs(smart_transport))
886
def test_find_bzrdirs_list_current(self):
887
def list_current(transport):
888
return [s for s in transport.list_dir('') if s != 'baz']
890
foo, bar, baz = self.make_foo_bar_baz()
891
t = self.get_transport()
892
self.assertEqualBzrdirs(
894
bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
896
def test_find_bzrdirs_evaluate(self):
897
def evaluate(bzrdir):
899
repo = bzrdir.open_repository()
900
except NoRepositoryPresent:
901
return True, bzrdir.root_transport.base
903
return False, bzrdir.root_transport.base
905
foo, bar, baz = self.make_foo_bar_baz()
906
t = self.get_transport()
907
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
908
list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
910
def assertEqualBzrdirs(self, first, second):
912
second = list(second)
913
self.assertEqual(len(first), len(second))
914
for x, y in zip(first, second):
915
self.assertEqual(x.root_transport.base, y.root_transport.base)
917
def test_find_branches(self):
918
root = self.make_repository('', shared=True)
919
foo, bar, baz = self.make_foo_bar_baz()
920
qux = self.make_bzrdir('foo/qux')
921
t = self.get_transport()
922
branches = bzrdir.BzrDir.find_branches(t)
923
self.assertEqual(baz.root_transport.base, branches[0].base)
924
self.assertEqual(foo.root_transport.base, branches[1].base)
925
self.assertEqual(bar.root_transport.base, branches[2].base)
927
# ensure this works without a top-level repo
928
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
929
self.assertEqual(foo.root_transport.base, branches[0].base)
930
self.assertEqual(bar.root_transport.base, branches[1].base)
933
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
935
def test_find_bzrdirs_missing_repo(self):
936
t = self.get_transport()
937
arepo = self.make_repository('arepo', shared=True)
938
abranch_url = arepo.user_url + '/abranch'
939
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
940
t.delete_tree('arepo/.bzr')
941
self.assertRaises(errors.NoRepositoryPresent,
942
branch.Branch.open, abranch_url)
943
self.make_branch('baz')
944
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
945
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
948
class TestMeta1DirFormat(TestCaseWithTransport):
949
"""Tests specific to the meta1 dir format."""
951
def test_right_base_dirs(self):
952
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
954
branch_base = t.clone('branch').base
955
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
956
self.assertEqual(branch_base,
957
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
958
repository_base = t.clone('repository').base
959
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
960
repository_format = repository.format_registry.get_default()
961
self.assertEqual(repository_base,
962
dir.get_repository_transport(repository_format).base)
963
checkout_base = t.clone('checkout').base
964
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
965
self.assertEqual(checkout_base,
966
dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
968
def test_meta1dir_uses_lockdir(self):
969
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
970
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
972
self.assertIsDirectory('branch-lock', t)
974
def test_comparison(self):
975
"""Equality and inequality behave properly.
977
Metadirs should compare equal iff they have the same repo, branch and
980
mydir = bzrdir.format_registry.make_bzrdir('knit')
981
self.assertEqual(mydir, mydir)
982
self.assertFalse(mydir != mydir)
983
otherdir = bzrdir.format_registry.make_bzrdir('knit')
984
self.assertEqual(otherdir, mydir)
985
self.assertFalse(otherdir != mydir)
986
otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
987
self.assertNotEqual(otherdir2, mydir)
988
self.assertFalse(otherdir2 == mydir)
990
def test_needs_conversion_different_working_tree(self):
991
# meta1dirs need an conversion if any element is not the default.
992
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
993
tree = self.make_branch_and_tree('tree', format='knit')
994
self.assertTrue(tree.bzrdir.needs_format_conversion(
997
def test_initialize_on_format_uses_smart_transport(self):
998
self.setup_smart_server_with_call_log()
999
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
1000
transport = self.get_transport('target')
1001
transport.ensure_base()
1002
self.reset_smart_call_log()
1003
instance = new_format.initialize_on_transport(transport)
1004
self.assertIsInstance(instance, remote.RemoteBzrDir)
1005
rpc_count = len(self.hpss_calls)
1006
# This figure represent the amount of work to perform this use case. It
1007
# is entirely ok to reduce this number if a test fails due to rpc_count
1008
# being too low. If rpc_count increases, more network roundtrips have
1009
# become necessary for this use case. Please do not adjust this number
1010
# upwards without agreement from bzr's network support maintainers.
1011
self.assertEqual(2, rpc_count)
1014
class NonLocalTests(TestCaseWithTransport):
1015
"""Tests for bzrdir static behaviour on non local paths."""
1018
super(NonLocalTests, self).setUp()
1019
self.vfs_transport_factory = memory.MemoryServer
1021
def test_create_branch_convenience(self):
1022
# outside a repo the default convenience output is a repo+branch_tree
1023
format = bzrdir.format_registry.make_bzrdir('knit')
1024
branch = bzrdir.BzrDir.create_branch_convenience(
1025
self.get_url('foo'), format=format)
1026
self.assertRaises(errors.NoWorkingTree,
1027
branch.bzrdir.open_workingtree)
1028
branch.bzrdir.open_repository()
1030
def test_create_branch_convenience_force_tree_not_local_fails(self):
1031
# outside a repo the default convenience output is a repo+branch_tree
1032
format = bzrdir.format_registry.make_bzrdir('knit')
1033
self.assertRaises(errors.NotLocalUrl,
1034
bzrdir.BzrDir.create_branch_convenience,
1035
self.get_url('foo'),
1036
force_new_tree=True,
1038
t = self.get_transport()
1039
self.assertFalse(t.has('foo'))
1041
def test_clone(self):
1042
# clone into a nonlocal path works
1043
format = bzrdir.format_registry.make_bzrdir('knit')
1044
branch = bzrdir.BzrDir.create_branch_convenience('local',
1046
branch.bzrdir.open_workingtree()
1047
result = branch.bzrdir.clone(self.get_url('remote'))
1048
self.assertRaises(errors.NoWorkingTree,
1049
result.open_workingtree)
1050
result.open_branch()
1051
result.open_repository()
1053
def test_checkout_metadir(self):
1054
# checkout_metadir has reasonable working tree format even when no
1055
# working tree is present
1056
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1057
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1058
checkout_format = my_bzrdir.checkout_metadir()
1059
self.assertIsInstance(checkout_format.workingtree_format,
1060
workingtree.WorkingTreeFormat3)
1063
class TestHTTPRedirections(object):
1064
"""Test redirection between two http servers.
1066
This MUST be used by daughter classes that also inherit from
1067
TestCaseWithTwoWebservers.
1069
We can't inherit directly from TestCaseWithTwoWebservers or the
1070
test framework will try to create an instance which cannot
1071
run, its implementation being incomplete.
1074
def create_transport_readonly_server(self):
1075
# We don't set the http protocol version, relying on the default
1076
return http_utils.HTTPServerRedirecting()
1078
def create_transport_secondary_server(self):
1079
# We don't set the http protocol version, relying on the default
1080
return http_utils.HTTPServerRedirecting()
1083
super(TestHTTPRedirections, self).setUp()
1084
# The redirections will point to the new server
1085
self.new_server = self.get_readonly_server()
1086
# The requests to the old server will be redirected
1087
self.old_server = self.get_secondary_server()
1088
# Configure the redirections
1089
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1091
def test_loop(self):
1092
# Both servers redirect to each other creating a loop
1093
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1094
# Starting from either server should loop
1095
old_url = self._qualified_url(self.old_server.host,
1096
self.old_server.port)
1097
oldt = self._transport(old_url)
1098
self.assertRaises(errors.NotBranchError,
1099
bzrdir.BzrDir.open_from_transport, oldt)
1100
new_url = self._qualified_url(self.new_server.host,
1101
self.new_server.port)
1102
newt = self._transport(new_url)
1103
self.assertRaises(errors.NotBranchError,
1104
bzrdir.BzrDir.open_from_transport, newt)
1106
def test_qualifier_preserved(self):
1107
wt = self.make_branch_and_tree('branch')
1108
old_url = self._qualified_url(self.old_server.host,
1109
self.old_server.port)
1110
start = self._transport(old_url).clone('branch')
1111
bdir = bzrdir.BzrDir.open_from_transport(start)
1112
# Redirection should preserve the qualifier, hence the transport class
1114
self.assertIsInstance(bdir.root_transport, type(start))
1117
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1118
http_utils.TestCaseWithTwoWebservers):
1119
"""Tests redirections for urllib implementation"""
1121
_transport = HttpTransport_urllib
1123
def _qualified_url(self, host, port):
1124
result = 'http+urllib://%s:%s' % (host, port)
1125
self.permit_url(result)
1130
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1131
TestHTTPRedirections,
1132
http_utils.TestCaseWithTwoWebservers):
1133
"""Tests redirections for pycurl implementation"""
1135
def _qualified_url(self, host, port):
1136
result = 'http+pycurl://%s:%s' % (host, port)
1137
self.permit_url(result)
1141
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1142
http_utils.TestCaseWithTwoWebservers):
1143
"""Tests redirections for the nosmart decorator"""
1145
_transport = NoSmartTransportDecorator
1147
def _qualified_url(self, host, port):
1148
result = 'nosmart+http://%s:%s' % (host, port)
1149
self.permit_url(result)
1153
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1154
http_utils.TestCaseWithTwoWebservers):
1155
"""Tests redirections for readonly decoratror"""
1157
_transport = ReadonlyTransportDecorator
1159
def _qualified_url(self, host, port):
1160
result = 'readonly+http://%s:%s' % (host, port)
1161
self.permit_url(result)
1165
class TestDotBzrHidden(TestCaseWithTransport):
1168
if sys.platform == 'win32':
1169
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1172
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1173
stderr=subprocess.PIPE)
1174
out, err = f.communicate()
1175
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1177
return out.splitlines()
1179
def test_dot_bzr_hidden(self):
1180
if sys.platform == 'win32' and not win32utils.has_win32file:
1181
raise TestSkipped('unable to make file hidden without pywin32 library')
1182
b = bzrdir.BzrDir.create('.')
1183
self.build_tree(['a'])
1184
self.assertEquals(['a'], self.get_ls())
1186
def test_dot_bzr_hidden_with_url(self):
1187
if sys.platform == 'win32' and not win32utils.has_win32file:
1188
raise TestSkipped('unable to make file hidden without pywin32 library')
1189
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1190
self.build_tree(['a'])
1191
self.assertEquals(['a'], self.get_ls())
1194
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1195
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1197
def _open(self, transport):
1198
return _TestBzrDir(transport, self)
1201
class _TestBzrDir(bzrdir.BzrDirMeta1):
1202
"""Test BzrDir implementation for TestBzrDirSprout.
1204
When created a _TestBzrDir already has repository and a branch. The branch
1205
is a test double as well.
1208
def __init__(self, *args, **kwargs):
1209
super(_TestBzrDir, self).__init__(*args, **kwargs)
1210
self.test_branch = _TestBranch()
1211
self.test_branch.repository = self.create_repository()
1213
def open_branch(self, unsupported=False):
1214
return self.test_branch
1216
def cloning_metadir(self, require_stacking=False):
1217
return _TestBzrDirFormat()
1220
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1221
"""Test Branch format for TestBzrDirSprout."""
1224
class _TestBranch(bzrlib.branch.Branch):
1225
"""Test Branch implementation for TestBzrDirSprout."""
1227
def __init__(self, *args, **kwargs):
1228
self._format = _TestBranchFormat()
1229
super(_TestBranch, self).__init__(*args, **kwargs)
1233
def sprout(self, *args, **kwargs):
1234
self.calls.append('sprout')
1235
return _TestBranch()
1237
def copy_content_into(self, destination, revision_id=None):
1238
self.calls.append('copy_content_into')
1240
def last_revision(self):
1241
return _mod_revision.NULL_REVISION
1243
def get_parent(self):
1246
def set_parent(self, parent):
1247
self._parent = parent
1249
def lock_read(self):
1250
return lock.LogicalLockResult(self.unlock)
1256
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1258
def test_sprout_uses_branch_sprout(self):
1259
"""BzrDir.sprout calls Branch.sprout.
1261
Usually, BzrDir.sprout should delegate to the branch's sprout method
1262
for part of the work. This allows the source branch to control the
1263
choice of format for the new branch.
1265
There are exceptions, but this tests avoids them:
1266
- if there's no branch in the source bzrdir,
1267
- or if the stacking has been requested and the format needs to be
1268
overridden to satisfy that.
1270
# Make an instrumented bzrdir.
1271
t = self.get_transport('source')
1273
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1274
# The instrumented bzrdir has a test_branch attribute that logs calls
1275
# made to the branch contained in that bzrdir. Initially the test
1276
# branch exists but no calls have been made to it.
1277
self.assertEqual([], source_bzrdir.test_branch.calls)
1280
target_url = self.get_url('target')
1281
result = source_bzrdir.sprout(target_url, recurse='no')
1283
# The bzrdir called the branch's sprout method.
1284
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1286
def test_sprout_parent(self):
1287
grandparent_tree = self.make_branch('grandparent')
1288
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1289
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1290
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1293
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1295
def test_pre_open_called(self):
1297
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1298
transport = self.get_transport('foo')
1299
url = transport.base
1300
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1301
self.assertEqual([transport.base], [t.base for t in calls])
1303
def test_pre_open_actual_exceptions_raised(self):
1305
def fail_once(transport):
1308
raise errors.BzrError("fail")
1309
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1310
transport = self.get_transport('foo')
1311
url = transport.base
1312
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1313
self.assertEqual('fail', err._preformatted_string)
1315
def test_post_repo_init(self):
1316
from bzrlib.bzrdir import RepoInitHookParams
1318
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1320
self.make_repository('foo')
1321
self.assertLength(1, calls)
1323
self.assertIsInstance(params, RepoInitHookParams)
1324
self.assertTrue(hasattr(params, 'bzrdir'))
1325
self.assertTrue(hasattr(params, 'repository'))
1327
def test_post_repo_init_hook_repr(self):
1329
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1330
lambda params: param_reprs.append(repr(params)), None)
1331
self.make_repository('foo')
1332
self.assertLength(1, param_reprs)
1333
param_repr = param_reprs[0]
1334
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1337
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1338
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1339
# moved to per_bzrdir or per_transport for better coverage ?
1343
super(TestGenerateBackupName, self).setUp()
1344
self._transport = self.get_transport()
1345
bzrdir.BzrDir.create(self.get_url(),
1346
possible_transports=[self._transport])
1347
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1349
def test_deprecated_generate_backup_name(self):
1350
res = self.applyDeprecated(
1351
symbol_versioning.deprecated_in((2, 3, 0)),
1352
self._bzrdir.generate_backup_name, 'whatever')
1355
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1357
def test_exiting(self):
1358
self._transport.put_bytes("a.~1~", "some content")
1359
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))