1
# Copyright (C) 2006-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
34
revision as _mod_revision,
38
transport as _mod_transport,
45
from bzrlib.errors import (
47
NoColocatedBranchSupport,
49
UnsupportedFormatError,
51
from bzrlib.tests import (
53
TestCaseWithMemoryTransport,
54
TestCaseWithTransport,
57
from bzrlib.tests import(
61
from bzrlib.tests.test_http import TestWithTransport_pycurl
62
from bzrlib.transport import (
66
from bzrlib.transport.http._urllib import HttpTransport_urllib
67
from bzrlib.transport.nosmart import NoSmartTransportDecorator
68
from bzrlib.transport.readonly import ReadonlyTransportDecorator
69
from bzrlib.repofmt import knitrepo, knitpack_repo
72
class TestDefaultFormat(TestCase):
74
def test_get_set_default_format(self):
75
old_format = bzrdir.BzrDirFormat.get_default_format()
76
# default is BzrDirMetaFormat1
77
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
78
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
79
# creating a bzr dir should now create an instrumented dir.
81
result = bzrdir.BzrDir.create('memory:///')
82
self.assertIsInstance(result, SampleBzrDir)
84
controldir.ControlDirFormat._set_default_format(old_format)
85
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
88
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
89
"""A deprecated bzr dir format."""
92
class TestFormatRegistry(TestCase):
94
def make_format_registry(self):
95
my_format_registry = controldir.ControlDirFormatRegistry()
96
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
97
'Some format. Slower and unawesome and deprecated.',
99
my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
100
'DeprecatedBzrDirFormat', 'Format registered lazily',
102
bzrdir.register_metadir(my_format_registry, 'knit',
103
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
104
'Format using knits',
106
my_format_registry.set_default('knit')
107
bzrdir.register_metadir(my_format_registry,
109
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
110
'Experimental successor to knit. Use at your own risk.',
111
branch_format='bzrlib.branch.BzrBranchFormat6',
113
bzrdir.register_metadir(my_format_registry,
115
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
116
'Experimental successor to knit. Use at your own risk.',
117
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
118
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
119
'Old format. Slower and does not support things. ', hidden=True)
120
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
121
'DeprecatedBzrDirFormat', 'Format registered lazily',
122
deprecated=True, hidden=True)
123
return my_format_registry
125
def test_format_registry(self):
126
my_format_registry = self.make_format_registry()
127
my_bzrdir = my_format_registry.make_bzrdir('lazy')
128
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
129
my_bzrdir = my_format_registry.make_bzrdir('deprecated')
130
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
131
my_bzrdir = my_format_registry.make_bzrdir('default')
132
self.assertIsInstance(my_bzrdir.repository_format,
133
knitrepo.RepositoryFormatKnit1)
134
my_bzrdir = my_format_registry.make_bzrdir('knit')
135
self.assertIsInstance(my_bzrdir.repository_format,
136
knitrepo.RepositoryFormatKnit1)
137
my_bzrdir = my_format_registry.make_bzrdir('branch6')
138
self.assertIsInstance(my_bzrdir.get_branch_format(),
139
bzrlib.branch.BzrBranchFormat6)
141
def test_get_help(self):
142
my_format_registry = self.make_format_registry()
143
self.assertEqual('Format registered lazily',
144
my_format_registry.get_help('lazy'))
145
self.assertEqual('Format using knits',
146
my_format_registry.get_help('knit'))
147
self.assertEqual('Format using knits',
148
my_format_registry.get_help('default'))
149
self.assertEqual('Some format. Slower and unawesome and deprecated.',
150
my_format_registry.get_help('deprecated'))
152
def test_help_topic(self):
153
topics = help_topics.HelpTopicRegistry()
154
registry = self.make_format_registry()
155
topics.register('current-formats', registry.help_topic,
157
topics.register('other-formats', registry.help_topic,
159
new = topics.get_detail('current-formats')
160
rest = topics.get_detail('other-formats')
161
experimental, deprecated = rest.split('Deprecated formats')
162
self.assertContainsRe(new, 'formats-help')
163
self.assertContainsRe(new,
164
':knit:\n \(native\) \(default\) Format using knits\n')
165
self.assertContainsRe(experimental,
166
':branch6:\n \(native\) Experimental successor to knit')
167
self.assertContainsRe(deprecated,
168
':lazy:\n \(native\) Format registered lazily\n')
169
self.assertNotContainsRe(new, 'hidden')
171
def test_set_default_repository(self):
172
default_factory = bzrdir.format_registry.get('default')
173
old_default = [k for k, v in bzrdir.format_registry.iteritems()
174
if v == default_factory and k != 'default'][0]
175
bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
177
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
178
bzrdir.format_registry.get('default'))
180
repository.format_registry.get_default().__class__,
181
knitrepo.RepositoryFormatKnit3)
183
bzrdir.format_registry.set_default_repository(old_default)
185
def test_aliases(self):
186
a_registry = controldir.ControlDirFormatRegistry()
187
a_registry.register('deprecated', DeprecatedBzrDirFormat,
188
'Old format. Slower and does not support stuff',
190
a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
191
'Old format. Slower and does not support stuff',
192
deprecated=True, alias=True)
193
self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
196
class SampleBranch(bzrlib.branch.Branch):
197
"""A dummy branch for guess what, dummy use."""
199
def __init__(self, dir):
203
class SampleRepository(bzrlib.repository.Repository):
206
def __init__(self, dir):
210
class SampleBzrDir(bzrdir.BzrDir):
211
"""A sample BzrDir implementation to allow testing static methods."""
213
def create_repository(self, shared=False):
214
"""See BzrDir.create_repository."""
215
return "A repository"
217
def open_repository(self):
218
"""See BzrDir.open_repository."""
219
return SampleRepository(self)
221
def create_branch(self, name=None):
222
"""See BzrDir.create_branch."""
224
raise NoColocatedBranchSupport(self)
225
return SampleBranch(self)
227
def create_workingtree(self):
228
"""See BzrDir.create_workingtree."""
232
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
235
this format is initializable, unsupported to aid in testing the
236
open and open_downlevel routines.
239
def get_format_string(self):
240
"""See BzrDirFormat.get_format_string()."""
241
return "Sample .bzr dir format."
243
def initialize_on_transport(self, t):
244
"""Create a bzr dir."""
246
t.put_bytes('.bzr/branch-format', self.get_format_string())
247
return SampleBzrDir(t, self)
249
def is_supported(self):
252
def open(self, transport, _found=None):
253
return "opened branch."
256
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
259
def get_format_string():
260
return "Test format 1"
263
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
266
def get_format_string():
267
return "Test format 2"
270
class TestBzrDirFormat(TestCaseWithTransport):
271
"""Tests for the BzrDirFormat facility."""
273
def test_find_format(self):
274
# is the right format object found for a branch?
275
# create a branch with a few known format objects.
276
bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
278
self.addCleanup(bzrdir.BzrProber.formats.remove,
279
BzrDirFormatTest1.get_format_string())
280
bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
282
self.addCleanup(bzrdir.BzrProber.formats.remove,
283
BzrDirFormatTest2.get_format_string())
284
t = self.get_transport()
285
self.build_tree(["foo/", "bar/"], transport=t)
286
def check_format(format, url):
287
format.initialize(url)
288
t = _mod_transport.get_transport(url)
289
found_format = bzrdir.BzrDirFormat.find_format(t)
290
self.assertIsInstance(found_format, format.__class__)
291
check_format(BzrDirFormatTest1(), "foo")
292
check_format(BzrDirFormatTest2(), "bar")
294
def test_find_format_nothing_there(self):
295
self.assertRaises(NotBranchError,
296
bzrdir.BzrDirFormat.find_format,
297
_mod_transport.get_transport('.'))
299
def test_find_format_unknown_format(self):
300
t = self.get_transport()
302
t.put_bytes('.bzr/branch-format', '')
303
self.assertRaises(UnknownFormatError,
304
bzrdir.BzrDirFormat.find_format,
305
_mod_transport.get_transport('.'))
307
def test_register_unregister_format(self):
308
format = SampleBzrDirFormat()
311
format.initialize(url)
312
# register a format for it.
313
bzrdir.BzrProber.formats.register(format.get_format_string(), format)
314
# which bzrdir.Open will refuse (not supported)
315
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
316
# which bzrdir.open_containing will refuse (not supported)
317
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
318
# but open_downlevel will work
319
t = _mod_transport.get_transport(url)
320
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
321
# unregister the format
322
bzrdir.BzrProber.formats.remove(format.get_format_string())
323
# now open_downlevel should fail too.
324
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
326
def test_create_branch_and_repo_uses_default(self):
327
format = SampleBzrDirFormat()
328
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
330
self.assertTrue(isinstance(branch, SampleBranch))
332
def test_create_branch_and_repo_under_shared(self):
333
# creating a branch and repo in a shared repo uses the
335
format = bzrdir.format_registry.make_bzrdir('knit')
336
self.make_repository('.', shared=True, format=format)
337
branch = bzrdir.BzrDir.create_branch_and_repo(
338
self.get_url('child'), format=format)
339
self.assertRaises(errors.NoRepositoryPresent,
340
branch.bzrdir.open_repository)
342
def test_create_branch_and_repo_under_shared_force_new(self):
343
# creating a branch and repo in a shared repo can be forced to
345
format = bzrdir.format_registry.make_bzrdir('knit')
346
self.make_repository('.', shared=True, format=format)
347
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
350
branch.bzrdir.open_repository()
352
def test_create_standalone_working_tree(self):
353
format = SampleBzrDirFormat()
354
# note this is deliberately readonly, as this failure should
355
# occur before any writes.
356
self.assertRaises(errors.NotLocalUrl,
357
bzrdir.BzrDir.create_standalone_workingtree,
358
self.get_readonly_url(), format=format)
359
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
361
self.assertEqual('A tree', tree)
363
def test_create_standalone_working_tree_under_shared_repo(self):
364
# create standalone working tree always makes a repo.
365
format = bzrdir.format_registry.make_bzrdir('knit')
366
self.make_repository('.', shared=True, format=format)
367
# note this is deliberately readonly, as this failure should
368
# occur before any writes.
369
self.assertRaises(errors.NotLocalUrl,
370
bzrdir.BzrDir.create_standalone_workingtree,
371
self.get_readonly_url('child'), format=format)
372
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
374
tree.bzrdir.open_repository()
376
def test_create_branch_convenience(self):
377
# outside a repo the default convenience output is a repo+branch_tree
378
format = bzrdir.format_registry.make_bzrdir('knit')
379
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
380
branch.bzrdir.open_workingtree()
381
branch.bzrdir.open_repository()
383
def test_create_branch_convenience_possible_transports(self):
384
"""Check that the optional 'possible_transports' is recognized"""
385
format = bzrdir.format_registry.make_bzrdir('knit')
386
t = self.get_transport()
387
branch = bzrdir.BzrDir.create_branch_convenience(
388
'.', format=format, possible_transports=[t])
389
branch.bzrdir.open_workingtree()
390
branch.bzrdir.open_repository()
392
def test_create_branch_convenience_root(self):
393
"""Creating a branch at the root of a fs should work."""
394
self.vfs_transport_factory = memory.MemoryServer
395
# outside a repo the default convenience output is a repo+branch_tree
396
format = bzrdir.format_registry.make_bzrdir('knit')
397
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
399
self.assertRaises(errors.NoWorkingTree,
400
branch.bzrdir.open_workingtree)
401
branch.bzrdir.open_repository()
403
def test_create_branch_convenience_under_shared_repo(self):
404
# inside a repo the default convenience output is a branch+ follow the
406
format = bzrdir.format_registry.make_bzrdir('knit')
407
self.make_repository('.', shared=True, format=format)
408
branch = bzrdir.BzrDir.create_branch_convenience('child',
410
branch.bzrdir.open_workingtree()
411
self.assertRaises(errors.NoRepositoryPresent,
412
branch.bzrdir.open_repository)
414
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
415
# inside a repo the default convenience output is a branch+ follow the
416
# repo tree policy but we can override that
417
format = bzrdir.format_registry.make_bzrdir('knit')
418
self.make_repository('.', shared=True, format=format)
419
branch = bzrdir.BzrDir.create_branch_convenience('child',
420
force_new_tree=False, format=format)
421
self.assertRaises(errors.NoWorkingTree,
422
branch.bzrdir.open_workingtree)
423
self.assertRaises(errors.NoRepositoryPresent,
424
branch.bzrdir.open_repository)
426
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
427
# inside a repo the default convenience output is a branch+ follow the
429
format = bzrdir.format_registry.make_bzrdir('knit')
430
repo = self.make_repository('.', shared=True, format=format)
431
repo.set_make_working_trees(False)
432
branch = bzrdir.BzrDir.create_branch_convenience('child',
434
self.assertRaises(errors.NoWorkingTree,
435
branch.bzrdir.open_workingtree)
436
self.assertRaises(errors.NoRepositoryPresent,
437
branch.bzrdir.open_repository)
439
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
440
# inside a repo the default convenience output is a branch+ follow the
441
# repo tree policy but we can override that
442
format = bzrdir.format_registry.make_bzrdir('knit')
443
repo = self.make_repository('.', shared=True, format=format)
444
repo.set_make_working_trees(False)
445
branch = bzrdir.BzrDir.create_branch_convenience('child',
446
force_new_tree=True, format=format)
447
branch.bzrdir.open_workingtree()
448
self.assertRaises(errors.NoRepositoryPresent,
449
branch.bzrdir.open_repository)
451
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
452
# inside a repo the default convenience output is overridable to give
454
format = bzrdir.format_registry.make_bzrdir('knit')
455
self.make_repository('.', shared=True, format=format)
456
branch = bzrdir.BzrDir.create_branch_convenience('child',
457
force_new_repo=True, format=format)
458
branch.bzrdir.open_repository()
459
branch.bzrdir.open_workingtree()
462
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
464
def test_acquire_repository_standalone(self):
465
"""The default acquisition policy should create a standalone branch."""
466
my_bzrdir = self.make_bzrdir('.')
467
repo_policy = my_bzrdir.determine_repository_policy()
468
repo, is_new = repo_policy.acquire_repository()
469
self.assertEqual(repo.bzrdir.root_transport.base,
470
my_bzrdir.root_transport.base)
471
self.assertFalse(repo.is_shared())
473
def test_determine_stacking_policy(self):
474
parent_bzrdir = self.make_bzrdir('.')
475
child_bzrdir = self.make_bzrdir('child')
476
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
477
repo_policy = child_bzrdir.determine_repository_policy()
478
self.assertEqual('http://example.org', repo_policy._stack_on)
480
def test_determine_stacking_policy_relative(self):
481
parent_bzrdir = self.make_bzrdir('.')
482
child_bzrdir = self.make_bzrdir('child')
483
parent_bzrdir.get_config().set_default_stack_on('child2')
484
repo_policy = child_bzrdir.determine_repository_policy()
485
self.assertEqual('child2', repo_policy._stack_on)
486
self.assertEqual(parent_bzrdir.root_transport.base,
487
repo_policy._stack_on_pwd)
489
def prepare_default_stacking(self, child_format='1.6'):
490
parent_bzrdir = self.make_bzrdir('.')
491
child_branch = self.make_branch('child', format=child_format)
492
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
493
new_child_transport = parent_bzrdir.transport.clone('child2')
494
return child_branch, new_child_transport
496
def test_clone_on_transport_obeys_stacking_policy(self):
497
child_branch, new_child_transport = self.prepare_default_stacking()
498
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
499
self.assertEqual(child_branch.base,
500
new_child.open_branch().get_stacked_on_url())
502
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
503
# Make stackable source branch with an unstackable repo format.
504
source_bzrdir = self.make_bzrdir('source')
505
knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
506
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
508
# Make a directory with a default stacking policy
509
parent_bzrdir = self.make_bzrdir('parent')
510
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
511
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
512
# Clone source into directory
513
target = source_bzrdir.clone(self.get_url('parent/target'))
515
def test_sprout_obeys_stacking_policy(self):
516
child_branch, new_child_transport = self.prepare_default_stacking()
517
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
518
self.assertEqual(child_branch.base,
519
new_child.open_branch().get_stacked_on_url())
521
def test_clone_ignores_policy_for_unsupported_formats(self):
522
child_branch, new_child_transport = self.prepare_default_stacking(
523
child_format='pack-0.92')
524
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
525
self.assertRaises(errors.UnstackableBranchFormat,
526
new_child.open_branch().get_stacked_on_url)
528
def test_sprout_ignores_policy_for_unsupported_formats(self):
529
child_branch, new_child_transport = self.prepare_default_stacking(
530
child_format='pack-0.92')
531
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
532
self.assertRaises(errors.UnstackableBranchFormat,
533
new_child.open_branch().get_stacked_on_url)
535
def test_sprout_upgrades_format_if_stacked_specified(self):
536
child_branch, new_child_transport = self.prepare_default_stacking(
537
child_format='pack-0.92')
538
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
540
self.assertEqual(child_branch.bzrdir.root_transport.base,
541
new_child.open_branch().get_stacked_on_url())
542
repo = new_child.open_repository()
543
self.assertTrue(repo._format.supports_external_lookups)
544
self.assertFalse(repo.supports_rich_root())
546
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
547
child_branch, new_child_transport = self.prepare_default_stacking(
548
child_format='pack-0.92')
549
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
550
stacked_on=child_branch.bzrdir.root_transport.base)
551
self.assertEqual(child_branch.bzrdir.root_transport.base,
552
new_child.open_branch().get_stacked_on_url())
553
repo = new_child.open_repository()
554
self.assertTrue(repo._format.supports_external_lookups)
555
self.assertFalse(repo.supports_rich_root())
557
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
558
child_branch, new_child_transport = self.prepare_default_stacking(
559
child_format='rich-root-pack')
560
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
562
repo = new_child.open_repository()
563
self.assertTrue(repo._format.supports_external_lookups)
564
self.assertTrue(repo.supports_rich_root())
566
def test_add_fallback_repo_handles_absolute_urls(self):
567
stack_on = self.make_branch('stack_on', format='1.6')
568
repo = self.make_repository('repo', format='1.6')
569
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
570
policy._add_fallback(repo)
572
def test_add_fallback_repo_handles_relative_urls(self):
573
stack_on = self.make_branch('stack_on', format='1.6')
574
repo = self.make_repository('repo', format='1.6')
575
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
576
policy._add_fallback(repo)
578
def test_configure_relative_branch_stacking_url(self):
579
stack_on = self.make_branch('stack_on', format='1.6')
580
stacked = self.make_branch('stack_on/stacked', format='1.6')
581
policy = bzrdir.UseExistingRepository(stacked.repository,
583
policy.configure_branch(stacked)
584
self.assertEqual('..', stacked.get_stacked_on_url())
586
def test_relative_branch_stacking_to_absolute(self):
587
stack_on = self.make_branch('stack_on', format='1.6')
588
stacked = self.make_branch('stack_on/stacked', format='1.6')
589
policy = bzrdir.UseExistingRepository(stacked.repository,
590
'.', self.get_readonly_url('stack_on'))
591
policy.configure_branch(stacked)
592
self.assertEqual(self.get_readonly_url('stack_on'),
593
stacked.get_stacked_on_url())
596
class ChrootedTests(TestCaseWithTransport):
597
"""A support class that provides readonly urls outside the local namespace.
599
This is done by checking if self.transport_server is a MemoryServer. if it
600
is then we are chrooted already, if it is not then an HttpServer is used
605
super(ChrootedTests, self).setUp()
606
if not self.vfs_transport_factory == memory.MemoryServer:
607
self.transport_readonly_server = http_server.HttpServer
609
def local_branch_path(self, branch):
610
return os.path.realpath(urlutils.local_path_from_url(branch.base))
612
def test_open_containing(self):
613
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
614
self.get_readonly_url(''))
615
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
616
self.get_readonly_url('g/p/q'))
617
control = bzrdir.BzrDir.create(self.get_url())
618
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
619
self.assertEqual('', relpath)
620
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
621
self.assertEqual('g/p/q', relpath)
623
def test_open_containing_tree_branch_or_repository_empty(self):
624
self.assertRaises(errors.NotBranchError,
625
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
626
self.get_readonly_url(''))
628
def test_open_containing_tree_branch_or_repository_all(self):
629
self.make_branch_and_tree('topdir')
630
tree, branch, repo, relpath = \
631
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
633
self.assertEqual(os.path.realpath('topdir'),
634
os.path.realpath(tree.basedir))
635
self.assertEqual(os.path.realpath('topdir'),
636
self.local_branch_path(branch))
638
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
639
repo.bzrdir.transport.local_abspath('repository'))
640
self.assertEqual(relpath, 'foo')
642
def test_open_containing_tree_branch_or_repository_no_tree(self):
643
self.make_branch('branch')
644
tree, branch, repo, relpath = \
645
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
647
self.assertEqual(tree, None)
648
self.assertEqual(os.path.realpath('branch'),
649
self.local_branch_path(branch))
651
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
652
repo.bzrdir.transport.local_abspath('repository'))
653
self.assertEqual(relpath, 'foo')
655
def test_open_containing_tree_branch_or_repository_repo(self):
656
self.make_repository('repo')
657
tree, branch, repo, relpath = \
658
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
660
self.assertEqual(tree, None)
661
self.assertEqual(branch, None)
663
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
664
repo.bzrdir.transport.local_abspath('repository'))
665
self.assertEqual(relpath, '')
667
def test_open_containing_tree_branch_or_repository_shared_repo(self):
668
self.make_repository('shared', shared=True)
669
bzrdir.BzrDir.create_branch_convenience('shared/branch',
670
force_new_tree=False)
671
tree, branch, repo, relpath = \
672
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
674
self.assertEqual(tree, None)
675
self.assertEqual(os.path.realpath('shared/branch'),
676
self.local_branch_path(branch))
678
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
679
repo.bzrdir.transport.local_abspath('repository'))
680
self.assertEqual(relpath, '')
682
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
683
self.make_branch_and_tree('foo')
684
self.build_tree(['foo/bar/'])
685
tree, branch, repo, relpath = \
686
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
688
self.assertEqual(os.path.realpath('foo'),
689
os.path.realpath(tree.basedir))
690
self.assertEqual(os.path.realpath('foo'),
691
self.local_branch_path(branch))
693
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
694
repo.bzrdir.transport.local_abspath('repository'))
695
self.assertEqual(relpath, 'bar')
697
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
698
self.make_repository('bar')
699
self.build_tree(['bar/baz/'])
700
tree, branch, repo, relpath = \
701
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
703
self.assertEqual(tree, None)
704
self.assertEqual(branch, None)
706
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
707
repo.bzrdir.transport.local_abspath('repository'))
708
self.assertEqual(relpath, 'baz')
710
def test_open_containing_from_transport(self):
711
self.assertRaises(NotBranchError,
712
bzrdir.BzrDir.open_containing_from_transport,
713
_mod_transport.get_transport(self.get_readonly_url('')))
714
self.assertRaises(NotBranchError,
715
bzrdir.BzrDir.open_containing_from_transport,
716
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
717
control = bzrdir.BzrDir.create(self.get_url())
718
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
719
_mod_transport.get_transport(self.get_readonly_url('')))
720
self.assertEqual('', relpath)
721
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
722
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
723
self.assertEqual('g/p/q', relpath)
725
def test_open_containing_tree_or_branch(self):
726
self.make_branch_and_tree('topdir')
727
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
729
self.assertEqual(os.path.realpath('topdir'),
730
os.path.realpath(tree.basedir))
731
self.assertEqual(os.path.realpath('topdir'),
732
self.local_branch_path(branch))
733
self.assertIs(tree.bzrdir, branch.bzrdir)
734
self.assertEqual('foo', relpath)
735
# opening from non-local should not return the tree
736
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
737
self.get_readonly_url('topdir/foo'))
738
self.assertEqual(None, tree)
739
self.assertEqual('foo', relpath)
741
self.make_branch('topdir/foo')
742
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
744
self.assertIs(tree, None)
745
self.assertEqual(os.path.realpath('topdir/foo'),
746
self.local_branch_path(branch))
747
self.assertEqual('', relpath)
749
def test_open_tree_or_branch(self):
750
self.make_branch_and_tree('topdir')
751
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
752
self.assertEqual(os.path.realpath('topdir'),
753
os.path.realpath(tree.basedir))
754
self.assertEqual(os.path.realpath('topdir'),
755
self.local_branch_path(branch))
756
self.assertIs(tree.bzrdir, branch.bzrdir)
757
# opening from non-local should not return the tree
758
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
759
self.get_readonly_url('topdir'))
760
self.assertEqual(None, tree)
762
self.make_branch('topdir/foo')
763
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
764
self.assertIs(tree, None)
765
self.assertEqual(os.path.realpath('topdir/foo'),
766
self.local_branch_path(branch))
768
def test_open_from_transport(self):
769
# transport pointing at bzrdir should give a bzrdir with root transport
770
# set to the given transport
771
control = bzrdir.BzrDir.create(self.get_url())
772
t = self.get_transport()
773
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
774
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
775
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
777
def test_open_from_transport_no_bzrdir(self):
778
t = self.get_transport()
779
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
781
def test_open_from_transport_bzrdir_in_parent(self):
782
control = bzrdir.BzrDir.create(self.get_url())
783
t = self.get_transport()
785
t = t.clone('subdir')
786
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
788
def test_sprout_recursive(self):
789
tree = self.make_branch_and_tree('tree1',
790
format='dirstate-with-subtree')
791
sub_tree = self.make_branch_and_tree('tree1/subtree',
792
format='dirstate-with-subtree')
793
sub_tree.set_root_id('subtree-root')
794
tree.add_reference(sub_tree)
795
self.build_tree(['tree1/subtree/file'])
797
tree.commit('Initial commit')
798
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
800
self.addCleanup(tree2.unlock)
801
self.assertPathExists('tree2/subtree/file')
802
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
804
def test_cloning_metadir(self):
805
"""Ensure that cloning metadir is suitable"""
806
bzrdir = self.make_bzrdir('bzrdir')
807
bzrdir.cloning_metadir()
808
branch = self.make_branch('branch', format='knit')
809
format = branch.bzrdir.cloning_metadir()
810
self.assertIsInstance(format.workingtree_format,
811
workingtree_4.WorkingTreeFormat6)
813
def test_sprout_recursive_treeless(self):
814
tree = self.make_branch_and_tree('tree1',
815
format='dirstate-with-subtree')
816
sub_tree = self.make_branch_and_tree('tree1/subtree',
817
format='dirstate-with-subtree')
818
tree.add_reference(sub_tree)
819
self.build_tree(['tree1/subtree/file'])
821
tree.commit('Initial commit')
822
# The following line force the orhaning to reveal bug #634470
823
tree.branch.get_config().set_user_option(
824
'bzr.transform.orphan_policy', 'move')
825
tree.bzrdir.destroy_workingtree()
826
# FIXME: subtree/.bzr is left here which allows the test to pass (or
827
# fail :-( ) -- vila 20100909
828
repo = self.make_repository('repo', shared=True,
829
format='dirstate-with-subtree')
830
repo.set_make_working_trees(False)
831
# FIXME: we just deleted the workingtree and now we want to use it ????
832
# At a minimum, we should use tree.branch below (but this fails too
833
# currently) or stop calling this test 'treeless'. Specifically, I've
834
# turn the line below into an assertRaises when 'subtree/.bzr' is
835
# orphaned and sprout tries to access the branch there (which is left
836
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
837
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
838
# #634470. -- vila 20100909
839
self.assertRaises(errors.NotBranchError,
840
tree.bzrdir.sprout, 'repo/tree2')
841
# self.assertPathExists('repo/tree2/subtree')
842
# self.assertPathDoesNotExist('repo/tree2/subtree/file')
844
def make_foo_bar_baz(self):
845
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
846
bar = self.make_branch('foo/bar').bzrdir
847
baz = self.make_branch('baz').bzrdir
850
def test_find_bzrdirs(self):
851
foo, bar, baz = self.make_foo_bar_baz()
852
t = self.get_transport()
853
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
855
def make_fake_permission_denied_transport(self, transport, paths):
856
"""Create a transport that raises PermissionDenied for some paths."""
859
raise errors.PermissionDenied(path)
861
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
862
path_filter_server.start_server()
863
self.addCleanup(path_filter_server.stop_server)
864
path_filter_transport = pathfilter.PathFilteringTransport(
865
path_filter_server, '.')
866
return (path_filter_server, path_filter_transport)
868
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
869
"""Check that each branch url ends with the given suffix."""
870
for actual_bzrdir in actual_bzrdirs:
871
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
873
def test_find_bzrdirs_permission_denied(self):
874
foo, bar, baz = self.make_foo_bar_baz()
875
t = self.get_transport()
876
path_filter_server, path_filter_transport = \
877
self.make_fake_permission_denied_transport(t, ['foo'])
879
self.assertBranchUrlsEndWith('/baz/',
880
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
882
smart_transport = self.make_smart_server('.',
883
backing_server=path_filter_server)
884
self.assertBranchUrlsEndWith('/baz/',
885
bzrdir.BzrDir.find_bzrdirs(smart_transport))
887
def test_find_bzrdirs_list_current(self):
888
def list_current(transport):
889
return [s for s in transport.list_dir('') if s != 'baz']
891
foo, bar, baz = self.make_foo_bar_baz()
892
t = self.get_transport()
893
self.assertEqualBzrdirs(
895
bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
897
def test_find_bzrdirs_evaluate(self):
898
def evaluate(bzrdir):
900
repo = bzrdir.open_repository()
901
except NoRepositoryPresent:
902
return True, bzrdir.root_transport.base
904
return False, bzrdir.root_transport.base
906
foo, bar, baz = self.make_foo_bar_baz()
907
t = self.get_transport()
908
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
909
list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
911
def assertEqualBzrdirs(self, first, second):
913
second = list(second)
914
self.assertEqual(len(first), len(second))
915
for x, y in zip(first, second):
916
self.assertEqual(x.root_transport.base, y.root_transport.base)
918
def test_find_branches(self):
919
root = self.make_repository('', shared=True)
920
foo, bar, baz = self.make_foo_bar_baz()
921
qux = self.make_bzrdir('foo/qux')
922
t = self.get_transport()
923
branches = bzrdir.BzrDir.find_branches(t)
924
self.assertEqual(baz.root_transport.base, branches[0].base)
925
self.assertEqual(foo.root_transport.base, branches[1].base)
926
self.assertEqual(bar.root_transport.base, branches[2].base)
928
# ensure this works without a top-level repo
929
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
930
self.assertEqual(foo.root_transport.base, branches[0].base)
931
self.assertEqual(bar.root_transport.base, branches[1].base)
934
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
936
def test_find_bzrdirs_missing_repo(self):
937
t = self.get_transport()
938
arepo = self.make_repository('arepo', shared=True)
939
abranch_url = arepo.user_url + '/abranch'
940
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
941
t.delete_tree('arepo/.bzr')
942
self.assertRaises(errors.NoRepositoryPresent,
943
branch.Branch.open, abranch_url)
944
self.make_branch('baz')
945
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
946
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
949
class TestMeta1DirFormat(TestCaseWithTransport):
950
"""Tests specific to the meta1 dir format."""
952
def test_right_base_dirs(self):
953
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
955
branch_base = t.clone('branch').base
956
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
957
self.assertEqual(branch_base,
958
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
959
repository_base = t.clone('repository').base
960
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
961
repository_format = repository.format_registry.get_default()
962
self.assertEqual(repository_base,
963
dir.get_repository_transport(repository_format).base)
964
checkout_base = t.clone('checkout').base
965
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
966
self.assertEqual(checkout_base,
967
dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
969
def test_meta1dir_uses_lockdir(self):
970
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
971
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
973
self.assertIsDirectory('branch-lock', t)
975
def test_comparison(self):
976
"""Equality and inequality behave properly.
978
Metadirs should compare equal iff they have the same repo, branch and
981
mydir = bzrdir.format_registry.make_bzrdir('knit')
982
self.assertEqual(mydir, mydir)
983
self.assertFalse(mydir != mydir)
984
otherdir = bzrdir.format_registry.make_bzrdir('knit')
985
self.assertEqual(otherdir, mydir)
986
self.assertFalse(otherdir != mydir)
987
otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
988
self.assertNotEqual(otherdir2, mydir)
989
self.assertFalse(otherdir2 == mydir)
991
def test_needs_conversion_different_working_tree(self):
992
# meta1dirs need an conversion if any element is not the default.
993
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
994
tree = self.make_branch_and_tree('tree', format='knit')
995
self.assertTrue(tree.bzrdir.needs_format_conversion(
998
def test_initialize_on_format_uses_smart_transport(self):
999
self.setup_smart_server_with_call_log()
1000
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
1001
transport = self.get_transport('target')
1002
transport.ensure_base()
1003
self.reset_smart_call_log()
1004
instance = new_format.initialize_on_transport(transport)
1005
self.assertIsInstance(instance, remote.RemoteBzrDir)
1006
rpc_count = len(self.hpss_calls)
1007
# This figure represent the amount of work to perform this use case. It
1008
# is entirely ok to reduce this number if a test fails due to rpc_count
1009
# being too low. If rpc_count increases, more network roundtrips have
1010
# become necessary for this use case. Please do not adjust this number
1011
# upwards without agreement from bzr's network support maintainers.
1012
self.assertEqual(2, rpc_count)
1015
class NonLocalTests(TestCaseWithTransport):
1016
"""Tests for bzrdir static behaviour on non local paths."""
1019
super(NonLocalTests, self).setUp()
1020
self.vfs_transport_factory = memory.MemoryServer
1022
def test_create_branch_convenience(self):
1023
# outside a repo the default convenience output is a repo+branch_tree
1024
format = bzrdir.format_registry.make_bzrdir('knit')
1025
branch = bzrdir.BzrDir.create_branch_convenience(
1026
self.get_url('foo'), format=format)
1027
self.assertRaises(errors.NoWorkingTree,
1028
branch.bzrdir.open_workingtree)
1029
branch.bzrdir.open_repository()
1031
def test_create_branch_convenience_force_tree_not_local_fails(self):
1032
# outside a repo the default convenience output is a repo+branch_tree
1033
format = bzrdir.format_registry.make_bzrdir('knit')
1034
self.assertRaises(errors.NotLocalUrl,
1035
bzrdir.BzrDir.create_branch_convenience,
1036
self.get_url('foo'),
1037
force_new_tree=True,
1039
t = self.get_transport()
1040
self.assertFalse(t.has('foo'))
1042
def test_clone(self):
1043
# clone into a nonlocal path works
1044
format = bzrdir.format_registry.make_bzrdir('knit')
1045
branch = bzrdir.BzrDir.create_branch_convenience('local',
1047
branch.bzrdir.open_workingtree()
1048
result = branch.bzrdir.clone(self.get_url('remote'))
1049
self.assertRaises(errors.NoWorkingTree,
1050
result.open_workingtree)
1051
result.open_branch()
1052
result.open_repository()
1054
def test_checkout_metadir(self):
1055
# checkout_metadir has reasonable working tree format even when no
1056
# working tree is present
1057
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1058
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1059
checkout_format = my_bzrdir.checkout_metadir()
1060
self.assertIsInstance(checkout_format.workingtree_format,
1061
workingtree_4.WorkingTreeFormat4)
1064
class TestHTTPRedirections(object):
1065
"""Test redirection between two http servers.
1067
This MUST be used by daughter classes that also inherit from
1068
TestCaseWithTwoWebservers.
1070
We can't inherit directly from TestCaseWithTwoWebservers or the
1071
test framework will try to create an instance which cannot
1072
run, its implementation being incomplete.
1075
def create_transport_readonly_server(self):
1076
# We don't set the http protocol version, relying on the default
1077
return http_utils.HTTPServerRedirecting()
1079
def create_transport_secondary_server(self):
1080
# We don't set the http protocol version, relying on the default
1081
return http_utils.HTTPServerRedirecting()
1084
super(TestHTTPRedirections, self).setUp()
1085
# The redirections will point to the new server
1086
self.new_server = self.get_readonly_server()
1087
# The requests to the old server will be redirected
1088
self.old_server = self.get_secondary_server()
1089
# Configure the redirections
1090
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1092
def test_loop(self):
1093
# Both servers redirect to each other creating a loop
1094
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1095
# Starting from either server should loop
1096
old_url = self._qualified_url(self.old_server.host,
1097
self.old_server.port)
1098
oldt = self._transport(old_url)
1099
self.assertRaises(errors.NotBranchError,
1100
bzrdir.BzrDir.open_from_transport, oldt)
1101
new_url = self._qualified_url(self.new_server.host,
1102
self.new_server.port)
1103
newt = self._transport(new_url)
1104
self.assertRaises(errors.NotBranchError,
1105
bzrdir.BzrDir.open_from_transport, newt)
1107
def test_qualifier_preserved(self):
1108
wt = self.make_branch_and_tree('branch')
1109
old_url = self._qualified_url(self.old_server.host,
1110
self.old_server.port)
1111
start = self._transport(old_url).clone('branch')
1112
bdir = bzrdir.BzrDir.open_from_transport(start)
1113
# Redirection should preserve the qualifier, hence the transport class
1115
self.assertIsInstance(bdir.root_transport, type(start))
1118
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1119
http_utils.TestCaseWithTwoWebservers):
1120
"""Tests redirections for urllib implementation"""
1122
_transport = HttpTransport_urllib
1124
def _qualified_url(self, host, port):
1125
result = 'http+urllib://%s:%s' % (host, port)
1126
self.permit_url(result)
1131
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1132
TestHTTPRedirections,
1133
http_utils.TestCaseWithTwoWebservers):
1134
"""Tests redirections for pycurl implementation"""
1136
def _qualified_url(self, host, port):
1137
result = 'http+pycurl://%s:%s' % (host, port)
1138
self.permit_url(result)
1142
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1143
http_utils.TestCaseWithTwoWebservers):
1144
"""Tests redirections for the nosmart decorator"""
1146
_transport = NoSmartTransportDecorator
1148
def _qualified_url(self, host, port):
1149
result = 'nosmart+http://%s:%s' % (host, port)
1150
self.permit_url(result)
1154
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1155
http_utils.TestCaseWithTwoWebservers):
1156
"""Tests redirections for readonly decoratror"""
1158
_transport = ReadonlyTransportDecorator
1160
def _qualified_url(self, host, port):
1161
result = 'readonly+http://%s:%s' % (host, port)
1162
self.permit_url(result)
1166
class TestDotBzrHidden(TestCaseWithTransport):
1169
if sys.platform == 'win32':
1170
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1173
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1174
stderr=subprocess.PIPE)
1175
out, err = f.communicate()
1176
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1178
return out.splitlines()
1180
def test_dot_bzr_hidden(self):
1181
if sys.platform == 'win32' and not win32utils.has_win32file:
1182
raise TestSkipped('unable to make file hidden without pywin32 library')
1183
b = bzrdir.BzrDir.create('.')
1184
self.build_tree(['a'])
1185
self.assertEquals(['a'], self.get_ls())
1187
def test_dot_bzr_hidden_with_url(self):
1188
if sys.platform == 'win32' and not win32utils.has_win32file:
1189
raise TestSkipped('unable to make file hidden without pywin32 library')
1190
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1191
self.build_tree(['a'])
1192
self.assertEquals(['a'], self.get_ls())
1195
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1196
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1198
def _open(self, transport):
1199
return _TestBzrDir(transport, self)
1202
class _TestBzrDir(bzrdir.BzrDirMeta1):
1203
"""Test BzrDir implementation for TestBzrDirSprout.
1205
When created a _TestBzrDir already has repository and a branch. The branch
1206
is a test double as well.
1209
def __init__(self, *args, **kwargs):
1210
super(_TestBzrDir, self).__init__(*args, **kwargs)
1211
self.test_branch = _TestBranch()
1212
self.test_branch.repository = self.create_repository()
1214
def open_branch(self, unsupported=False):
1215
return self.test_branch
1217
def cloning_metadir(self, require_stacking=False):
1218
return _TestBzrDirFormat()
1221
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1222
"""Test Branch format for TestBzrDirSprout."""
1225
class _TestBranch(bzrlib.branch.Branch):
1226
"""Test Branch implementation for TestBzrDirSprout."""
1228
def __init__(self, *args, **kwargs):
1229
self._format = _TestBranchFormat()
1230
super(_TestBranch, self).__init__(*args, **kwargs)
1234
def sprout(self, *args, **kwargs):
1235
self.calls.append('sprout')
1236
return _TestBranch()
1238
def copy_content_into(self, destination, revision_id=None):
1239
self.calls.append('copy_content_into')
1241
def last_revision(self):
1242
return _mod_revision.NULL_REVISION
1244
def get_parent(self):
1247
def set_parent(self, parent):
1248
self._parent = parent
1250
def lock_read(self):
1251
return lock.LogicalLockResult(self.unlock)
1257
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1259
def test_sprout_uses_branch_sprout(self):
1260
"""BzrDir.sprout calls Branch.sprout.
1262
Usually, BzrDir.sprout should delegate to the branch's sprout method
1263
for part of the work. This allows the source branch to control the
1264
choice of format for the new branch.
1266
There are exceptions, but this tests avoids them:
1267
- if there's no branch in the source bzrdir,
1268
- or if the stacking has been requested and the format needs to be
1269
overridden to satisfy that.
1271
# Make an instrumented bzrdir.
1272
t = self.get_transport('source')
1274
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1275
# The instrumented bzrdir has a test_branch attribute that logs calls
1276
# made to the branch contained in that bzrdir. Initially the test
1277
# branch exists but no calls have been made to it.
1278
self.assertEqual([], source_bzrdir.test_branch.calls)
1281
target_url = self.get_url('target')
1282
result = source_bzrdir.sprout(target_url, recurse='no')
1284
# The bzrdir called the branch's sprout method.
1285
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1287
def test_sprout_parent(self):
1288
grandparent_tree = self.make_branch('grandparent')
1289
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1290
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1291
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1294
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1296
def test_pre_open_called(self):
1298
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1299
transport = self.get_transport('foo')
1300
url = transport.base
1301
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1302
self.assertEqual([transport.base], [t.base for t in calls])
1304
def test_pre_open_actual_exceptions_raised(self):
1306
def fail_once(transport):
1309
raise errors.BzrError("fail")
1310
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1311
transport = self.get_transport('foo')
1312
url = transport.base
1313
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1314
self.assertEqual('fail', err._preformatted_string)
1316
def test_post_repo_init(self):
1317
from bzrlib.bzrdir import RepoInitHookParams
1319
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1321
self.make_repository('foo')
1322
self.assertLength(1, calls)
1324
self.assertIsInstance(params, RepoInitHookParams)
1325
self.assertTrue(hasattr(params, 'bzrdir'))
1326
self.assertTrue(hasattr(params, 'repository'))
1328
def test_post_repo_init_hook_repr(self):
1330
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1331
lambda params: param_reprs.append(repr(params)), None)
1332
self.make_repository('foo')
1333
self.assertLength(1, param_reprs)
1334
param_repr = param_reprs[0]
1335
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1338
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1339
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1340
# moved to per_bzrdir or per_transport for better coverage ?
1344
super(TestGenerateBackupName, self).setUp()
1345
self._transport = self.get_transport()
1346
bzrdir.BzrDir.create(self.get_url(),
1347
possible_transports=[self._transport])
1348
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1350
def test_deprecated_generate_backup_name(self):
1351
res = self.applyDeprecated(
1352
symbol_versioning.deprecated_in((2, 3, 0)),
1353
self._bzrdir.generate_backup_name, 'whatever')
1356
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1358
def test_exiting(self):
1359
self._transport.put_bytes("a.~1~", "some content")
1360
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))