1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
39
from bzrlib.errors import (NotBranchError,
40
NoColocatedBranchSupport,
42
UnsupportedFormatError,
44
from bzrlib.tests import (
46
TestCaseWithMemoryTransport,
47
TestCaseWithTransport,
50
from bzrlib.tests import(
54
from bzrlib.tests.test_http import TestWithTransport_pycurl
55
from bzrlib.transport import (
60
from bzrlib.transport.http._urllib import HttpTransport_urllib
61
from bzrlib.transport.nosmart import NoSmartTransportDecorator
62
from bzrlib.transport.readonly import ReadonlyTransportDecorator
63
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
66
class TestDefaultFormat(TestCase):
68
def test_get_set_default_format(self):
69
old_format = bzrdir.BzrDirFormat.get_default_format()
70
# default is BzrDirFormat6
71
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
72
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
73
# creating a bzr dir should now create an instrumented dir.
75
result = bzrdir.BzrDir.create('memory:///')
76
self.failUnless(isinstance(result, SampleBzrDir))
78
bzrdir.BzrDirFormat._set_default_format(old_format)
79
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
82
class TestFormatRegistry(TestCase):
84
def make_format_registry(self):
85
my_format_registry = bzrdir.BzrDirFormatRegistry()
86
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
87
'Pre-0.8 format. Slower and does not support checkouts or shared'
88
' repositories', deprecated=True)
89
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
90
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
91
my_format_registry.register_metadir('knit',
92
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
95
my_format_registry.set_default('knit')
96
my_format_registry.register_metadir(
98
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
99
'Experimental successor to knit. Use at your own risk.',
100
branch_format='bzrlib.branch.BzrBranchFormat6',
102
my_format_registry.register_metadir(
104
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
105
'Experimental successor to knit. Use at your own risk.',
106
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
107
my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
108
'Pre-0.8 format. Slower and does not support checkouts or shared'
109
' repositories', hidden=True)
110
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
111
'BzrDirFormat6', 'Format registered lazily', deprecated=True,
113
return my_format_registry
115
def test_format_registry(self):
116
my_format_registry = self.make_format_registry()
117
my_bzrdir = my_format_registry.make_bzrdir('lazy')
118
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
119
my_bzrdir = my_format_registry.make_bzrdir('weave')
120
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
121
my_bzrdir = my_format_registry.make_bzrdir('default')
122
self.assertIsInstance(my_bzrdir.repository_format,
123
knitrepo.RepositoryFormatKnit1)
124
my_bzrdir = my_format_registry.make_bzrdir('knit')
125
self.assertIsInstance(my_bzrdir.repository_format,
126
knitrepo.RepositoryFormatKnit1)
127
my_bzrdir = my_format_registry.make_bzrdir('branch6')
128
self.assertIsInstance(my_bzrdir.get_branch_format(),
129
bzrlib.branch.BzrBranchFormat6)
131
def test_get_help(self):
132
my_format_registry = self.make_format_registry()
133
self.assertEqual('Format registered lazily',
134
my_format_registry.get_help('lazy'))
135
self.assertEqual('Format using knits',
136
my_format_registry.get_help('knit'))
137
self.assertEqual('Format using knits',
138
my_format_registry.get_help('default'))
139
self.assertEqual('Pre-0.8 format. Slower and does not support'
140
' checkouts or shared repositories',
141
my_format_registry.get_help('weave'))
143
def test_help_topic(self):
144
topics = help_topics.HelpTopicRegistry()
145
registry = self.make_format_registry()
146
topics.register('current-formats', registry.help_topic,
148
topics.register('other-formats', registry.help_topic,
150
new = topics.get_detail('current-formats')
151
rest = topics.get_detail('other-formats')
152
experimental, deprecated = rest.split('Deprecated formats')
153
self.assertContainsRe(new, 'formats-help')
154
self.assertContainsRe(new,
155
':knit:\n \(native\) \(default\) Format using knits\n')
156
self.assertContainsRe(experimental,
157
':branch6:\n \(native\) Experimental successor to knit')
158
self.assertContainsRe(deprecated,
159
':lazy:\n \(native\) Format registered lazily\n')
160
self.assertNotContainsRe(new, 'hidden')
162
def test_set_default_repository(self):
163
default_factory = bzrdir.format_registry.get('default')
164
old_default = [k for k, v in bzrdir.format_registry.iteritems()
165
if v == default_factory and k != 'default'][0]
166
bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
168
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
169
bzrdir.format_registry.get('default'))
171
repository.RepositoryFormat.get_default_format().__class__,
172
knitrepo.RepositoryFormatKnit3)
174
bzrdir.format_registry.set_default_repository(old_default)
176
def test_aliases(self):
177
a_registry = bzrdir.BzrDirFormatRegistry()
178
a_registry.register('weave', bzrdir.BzrDirFormat6,
179
'Pre-0.8 format. Slower and does not support checkouts or shared'
180
' repositories', deprecated=True)
181
a_registry.register('weavealias', bzrdir.BzrDirFormat6,
182
'Pre-0.8 format. Slower and does not support checkouts or shared'
183
' repositories', deprecated=True, alias=True)
184
self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
187
class SampleBranch(bzrlib.branch.Branch):
188
"""A dummy branch for guess what, dummy use."""
190
def __init__(self, dir):
194
class SampleRepository(bzrlib.repository.Repository):
197
def __init__(self, dir):
201
class SampleBzrDir(bzrdir.BzrDir):
202
"""A sample BzrDir implementation to allow testing static methods."""
204
def create_repository(self, shared=False):
205
"""See BzrDir.create_repository."""
206
return "A repository"
208
def open_repository(self):
209
"""See BzrDir.open_repository."""
210
return SampleRepository(self)
212
def create_branch(self, name=None):
213
"""See BzrDir.create_branch."""
215
raise NoColocatedBranchSupport(self)
216
return SampleBranch(self)
218
def create_workingtree(self):
219
"""See BzrDir.create_workingtree."""
223
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
226
this format is initializable, unsupported to aid in testing the
227
open and open_downlevel routines.
230
def get_format_string(self):
231
"""See BzrDirFormat.get_format_string()."""
232
return "Sample .bzr dir format."
234
def initialize_on_transport(self, t):
235
"""Create a bzr dir."""
237
t.put_bytes('.bzr/branch-format', self.get_format_string())
238
return SampleBzrDir(t, self)
240
def is_supported(self):
243
def open(self, transport, _found=None):
244
return "opened branch."
247
class TestBzrDirFormat(TestCaseWithTransport):
248
"""Tests for the BzrDirFormat facility."""
250
def test_find_format(self):
251
# is the right format object found for a branch?
252
# create a branch with a few known format objects.
253
# this is not quite the same as
254
t = get_transport(self.get_url())
255
self.build_tree(["foo/", "bar/"], transport=t)
256
def check_format(format, url):
257
format.initialize(url)
258
t = get_transport(url)
259
found_format = bzrdir.BzrDirFormat.find_format(t)
260
self.failUnless(isinstance(found_format, format.__class__))
261
check_format(bzrdir.BzrDirFormat5(), "foo")
262
check_format(bzrdir.BzrDirFormat6(), "bar")
264
def test_find_format_nothing_there(self):
265
self.assertRaises(NotBranchError,
266
bzrdir.BzrDirFormat.find_format,
269
def test_find_format_unknown_format(self):
270
t = get_transport(self.get_url())
272
t.put_bytes('.bzr/branch-format', '')
273
self.assertRaises(UnknownFormatError,
274
bzrdir.BzrDirFormat.find_format,
277
def test_register_unregister_format(self):
278
format = SampleBzrDirFormat()
281
format.initialize(url)
282
# register a format for it.
283
bzrdir.BzrDirFormat.register_format(format)
284
# which bzrdir.Open will refuse (not supported)
285
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
286
# which bzrdir.open_containing will refuse (not supported)
287
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
288
# but open_downlevel will work
289
t = get_transport(url)
290
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
291
# unregister the format
292
bzrdir.BzrDirFormat.unregister_format(format)
293
# now open_downlevel should fail too.
294
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
296
def test_create_branch_and_repo_uses_default(self):
297
format = SampleBzrDirFormat()
298
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
300
self.assertTrue(isinstance(branch, SampleBranch))
302
def test_create_branch_and_repo_under_shared(self):
303
# creating a branch and repo in a shared repo uses the
305
format = bzrdir.format_registry.make_bzrdir('knit')
306
self.make_repository('.', shared=True, format=format)
307
branch = bzrdir.BzrDir.create_branch_and_repo(
308
self.get_url('child'), format=format)
309
self.assertRaises(errors.NoRepositoryPresent,
310
branch.bzrdir.open_repository)
312
def test_create_branch_and_repo_under_shared_force_new(self):
313
# creating a branch and repo in a shared repo can be forced to
315
format = bzrdir.format_registry.make_bzrdir('knit')
316
self.make_repository('.', shared=True, format=format)
317
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
320
branch.bzrdir.open_repository()
322
def test_create_standalone_working_tree(self):
323
format = SampleBzrDirFormat()
324
# note this is deliberately readonly, as this failure should
325
# occur before any writes.
326
self.assertRaises(errors.NotLocalUrl,
327
bzrdir.BzrDir.create_standalone_workingtree,
328
self.get_readonly_url(), format=format)
329
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
331
self.assertEqual('A tree', tree)
333
def test_create_standalone_working_tree_under_shared_repo(self):
334
# create standalone working tree always makes a repo.
335
format = bzrdir.format_registry.make_bzrdir('knit')
336
self.make_repository('.', shared=True, format=format)
337
# note this is deliberately readonly, as this failure should
338
# occur before any writes.
339
self.assertRaises(errors.NotLocalUrl,
340
bzrdir.BzrDir.create_standalone_workingtree,
341
self.get_readonly_url('child'), format=format)
342
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
344
tree.bzrdir.open_repository()
346
def test_create_branch_convenience(self):
347
# outside a repo the default convenience output is a repo+branch_tree
348
format = bzrdir.format_registry.make_bzrdir('knit')
349
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
350
branch.bzrdir.open_workingtree()
351
branch.bzrdir.open_repository()
353
def test_create_branch_convenience_possible_transports(self):
354
"""Check that the optional 'possible_transports' is recognized"""
355
format = bzrdir.format_registry.make_bzrdir('knit')
356
t = self.get_transport()
357
branch = bzrdir.BzrDir.create_branch_convenience(
358
'.', format=format, possible_transports=[t])
359
branch.bzrdir.open_workingtree()
360
branch.bzrdir.open_repository()
362
def test_create_branch_convenience_root(self):
363
"""Creating a branch at the root of a fs should work."""
364
self.vfs_transport_factory = memory.MemoryServer
365
# outside a repo the default convenience output is a repo+branch_tree
366
format = bzrdir.format_registry.make_bzrdir('knit')
367
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
369
self.assertRaises(errors.NoWorkingTree,
370
branch.bzrdir.open_workingtree)
371
branch.bzrdir.open_repository()
373
def test_create_branch_convenience_under_shared_repo(self):
374
# inside a repo the default convenience output is a branch+ follow the
376
format = bzrdir.format_registry.make_bzrdir('knit')
377
self.make_repository('.', shared=True, format=format)
378
branch = bzrdir.BzrDir.create_branch_convenience('child',
380
branch.bzrdir.open_workingtree()
381
self.assertRaises(errors.NoRepositoryPresent,
382
branch.bzrdir.open_repository)
384
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
385
# inside a repo the default convenience output is a branch+ follow the
386
# repo tree policy but we can override that
387
format = bzrdir.format_registry.make_bzrdir('knit')
388
self.make_repository('.', shared=True, format=format)
389
branch = bzrdir.BzrDir.create_branch_convenience('child',
390
force_new_tree=False, format=format)
391
self.assertRaises(errors.NoWorkingTree,
392
branch.bzrdir.open_workingtree)
393
self.assertRaises(errors.NoRepositoryPresent,
394
branch.bzrdir.open_repository)
396
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
397
# inside a repo the default convenience output is a branch+ follow the
399
format = bzrdir.format_registry.make_bzrdir('knit')
400
repo = self.make_repository('.', shared=True, format=format)
401
repo.set_make_working_trees(False)
402
branch = bzrdir.BzrDir.create_branch_convenience('child',
404
self.assertRaises(errors.NoWorkingTree,
405
branch.bzrdir.open_workingtree)
406
self.assertRaises(errors.NoRepositoryPresent,
407
branch.bzrdir.open_repository)
409
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
410
# inside a repo the default convenience output is a branch+ follow the
411
# repo tree policy but we can override that
412
format = bzrdir.format_registry.make_bzrdir('knit')
413
repo = self.make_repository('.', shared=True, format=format)
414
repo.set_make_working_trees(False)
415
branch = bzrdir.BzrDir.create_branch_convenience('child',
416
force_new_tree=True, format=format)
417
branch.bzrdir.open_workingtree()
418
self.assertRaises(errors.NoRepositoryPresent,
419
branch.bzrdir.open_repository)
421
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
422
# inside a repo the default convenience output is overridable to give
424
format = bzrdir.format_registry.make_bzrdir('knit')
425
self.make_repository('.', shared=True, format=format)
426
branch = bzrdir.BzrDir.create_branch_convenience('child',
427
force_new_repo=True, format=format)
428
branch.bzrdir.open_repository()
429
branch.bzrdir.open_workingtree()
432
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
434
def test_acquire_repository_standalone(self):
435
"""The default acquisition policy should create a standalone branch."""
436
my_bzrdir = self.make_bzrdir('.')
437
repo_policy = my_bzrdir.determine_repository_policy()
438
repo, is_new = repo_policy.acquire_repository()
439
self.assertEqual(repo.bzrdir.root_transport.base,
440
my_bzrdir.root_transport.base)
441
self.assertFalse(repo.is_shared())
443
def test_determine_stacking_policy(self):
444
parent_bzrdir = self.make_bzrdir('.')
445
child_bzrdir = self.make_bzrdir('child')
446
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
447
repo_policy = child_bzrdir.determine_repository_policy()
448
self.assertEqual('http://example.org', repo_policy._stack_on)
450
def test_determine_stacking_policy_relative(self):
451
parent_bzrdir = self.make_bzrdir('.')
452
child_bzrdir = self.make_bzrdir('child')
453
parent_bzrdir.get_config().set_default_stack_on('child2')
454
repo_policy = child_bzrdir.determine_repository_policy()
455
self.assertEqual('child2', repo_policy._stack_on)
456
self.assertEqual(parent_bzrdir.root_transport.base,
457
repo_policy._stack_on_pwd)
459
def prepare_default_stacking(self, child_format='1.6'):
460
parent_bzrdir = self.make_bzrdir('.')
461
child_branch = self.make_branch('child', format=child_format)
462
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
463
new_child_transport = parent_bzrdir.transport.clone('child2')
464
return child_branch, new_child_transport
466
def test_clone_on_transport_obeys_stacking_policy(self):
467
child_branch, new_child_transport = self.prepare_default_stacking()
468
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
469
self.assertEqual(child_branch.base,
470
new_child.open_branch().get_stacked_on_url())
472
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
473
# Make stackable source branch with an unstackable repo format.
474
source_bzrdir = self.make_bzrdir('source')
475
pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
476
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
478
# Make a directory with a default stacking policy
479
parent_bzrdir = self.make_bzrdir('parent')
480
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
481
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
482
# Clone source into directory
483
target = source_bzrdir.clone(self.get_url('parent/target'))
485
def test_sprout_obeys_stacking_policy(self):
486
child_branch, new_child_transport = self.prepare_default_stacking()
487
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
488
self.assertEqual(child_branch.base,
489
new_child.open_branch().get_stacked_on_url())
491
def test_clone_ignores_policy_for_unsupported_formats(self):
492
child_branch, new_child_transport = self.prepare_default_stacking(
493
child_format='pack-0.92')
494
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
495
self.assertRaises(errors.UnstackableBranchFormat,
496
new_child.open_branch().get_stacked_on_url)
498
def test_sprout_ignores_policy_for_unsupported_formats(self):
499
child_branch, new_child_transport = self.prepare_default_stacking(
500
child_format='pack-0.92')
501
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
502
self.assertRaises(errors.UnstackableBranchFormat,
503
new_child.open_branch().get_stacked_on_url)
505
def test_sprout_upgrades_format_if_stacked_specified(self):
506
child_branch, new_child_transport = self.prepare_default_stacking(
507
child_format='pack-0.92')
508
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
510
self.assertEqual(child_branch.bzrdir.root_transport.base,
511
new_child.open_branch().get_stacked_on_url())
512
repo = new_child.open_repository()
513
self.assertTrue(repo._format.supports_external_lookups)
514
self.assertFalse(repo.supports_rich_root())
516
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
517
child_branch, new_child_transport = self.prepare_default_stacking(
518
child_format='pack-0.92')
519
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
520
stacked_on=child_branch.bzrdir.root_transport.base)
521
self.assertEqual(child_branch.bzrdir.root_transport.base,
522
new_child.open_branch().get_stacked_on_url())
523
repo = new_child.open_repository()
524
self.assertTrue(repo._format.supports_external_lookups)
525
self.assertFalse(repo.supports_rich_root())
527
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
528
child_branch, new_child_transport = self.prepare_default_stacking(
529
child_format='rich-root-pack')
530
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
532
repo = new_child.open_repository()
533
self.assertTrue(repo._format.supports_external_lookups)
534
self.assertTrue(repo.supports_rich_root())
536
def test_add_fallback_repo_handles_absolute_urls(self):
537
stack_on = self.make_branch('stack_on', format='1.6')
538
repo = self.make_repository('repo', format='1.6')
539
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
540
policy._add_fallback(repo)
542
def test_add_fallback_repo_handles_relative_urls(self):
543
stack_on = self.make_branch('stack_on', format='1.6')
544
repo = self.make_repository('repo', format='1.6')
545
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
546
policy._add_fallback(repo)
548
def test_configure_relative_branch_stacking_url(self):
549
stack_on = self.make_branch('stack_on', format='1.6')
550
stacked = self.make_branch('stack_on/stacked', format='1.6')
551
policy = bzrdir.UseExistingRepository(stacked.repository,
553
policy.configure_branch(stacked)
554
self.assertEqual('..', stacked.get_stacked_on_url())
556
def test_relative_branch_stacking_to_absolute(self):
557
stack_on = self.make_branch('stack_on', format='1.6')
558
stacked = self.make_branch('stack_on/stacked', format='1.6')
559
policy = bzrdir.UseExistingRepository(stacked.repository,
560
'.', self.get_readonly_url('stack_on'))
561
policy.configure_branch(stacked)
562
self.assertEqual(self.get_readonly_url('stack_on'),
563
stacked.get_stacked_on_url())
566
class ChrootedTests(TestCaseWithTransport):
567
"""A support class that provides readonly urls outside the local namespace.
569
This is done by checking if self.transport_server is a MemoryServer. if it
570
is then we are chrooted already, if it is not then an HttpServer is used
575
super(ChrootedTests, self).setUp()
576
if not self.vfs_transport_factory == memory.MemoryServer:
577
self.transport_readonly_server = http_server.HttpServer
579
def local_branch_path(self, branch):
580
return os.path.realpath(urlutils.local_path_from_url(branch.base))
582
def test_open_containing(self):
583
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
584
self.get_readonly_url(''))
585
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
586
self.get_readonly_url('g/p/q'))
587
control = bzrdir.BzrDir.create(self.get_url())
588
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
589
self.assertEqual('', relpath)
590
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
591
self.assertEqual('g/p/q', relpath)
593
def test_open_containing_tree_branch_or_repository_empty(self):
594
self.assertRaises(errors.NotBranchError,
595
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
596
self.get_readonly_url(''))
598
def test_open_containing_tree_branch_or_repository_all(self):
599
self.make_branch_and_tree('topdir')
600
tree, branch, repo, relpath = \
601
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
603
self.assertEqual(os.path.realpath('topdir'),
604
os.path.realpath(tree.basedir))
605
self.assertEqual(os.path.realpath('topdir'),
606
self.local_branch_path(branch))
608
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
609
repo.bzrdir.transport.local_abspath('repository'))
610
self.assertEqual(relpath, 'foo')
612
def test_open_containing_tree_branch_or_repository_no_tree(self):
613
self.make_branch('branch')
614
tree, branch, repo, relpath = \
615
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
617
self.assertEqual(tree, None)
618
self.assertEqual(os.path.realpath('branch'),
619
self.local_branch_path(branch))
621
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
622
repo.bzrdir.transport.local_abspath('repository'))
623
self.assertEqual(relpath, 'foo')
625
def test_open_containing_tree_branch_or_repository_repo(self):
626
self.make_repository('repo')
627
tree, branch, repo, relpath = \
628
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
630
self.assertEqual(tree, None)
631
self.assertEqual(branch, None)
633
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
634
repo.bzrdir.transport.local_abspath('repository'))
635
self.assertEqual(relpath, '')
637
def test_open_containing_tree_branch_or_repository_shared_repo(self):
638
self.make_repository('shared', shared=True)
639
bzrdir.BzrDir.create_branch_convenience('shared/branch',
640
force_new_tree=False)
641
tree, branch, repo, relpath = \
642
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
644
self.assertEqual(tree, None)
645
self.assertEqual(os.path.realpath('shared/branch'),
646
self.local_branch_path(branch))
648
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
649
repo.bzrdir.transport.local_abspath('repository'))
650
self.assertEqual(relpath, '')
652
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
653
self.make_branch_and_tree('foo')
654
self.build_tree(['foo/bar/'])
655
tree, branch, repo, relpath = \
656
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
658
self.assertEqual(os.path.realpath('foo'),
659
os.path.realpath(tree.basedir))
660
self.assertEqual(os.path.realpath('foo'),
661
self.local_branch_path(branch))
663
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
664
repo.bzrdir.transport.local_abspath('repository'))
665
self.assertEqual(relpath, 'bar')
667
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
668
self.make_repository('bar')
669
self.build_tree(['bar/baz/'])
670
tree, branch, repo, relpath = \
671
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
673
self.assertEqual(tree, None)
674
self.assertEqual(branch, None)
676
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
677
repo.bzrdir.transport.local_abspath('repository'))
678
self.assertEqual(relpath, 'baz')
680
def test_open_containing_from_transport(self):
681
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
682
get_transport(self.get_readonly_url('')))
683
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
684
get_transport(self.get_readonly_url('g/p/q')))
685
control = bzrdir.BzrDir.create(self.get_url())
686
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
687
get_transport(self.get_readonly_url('')))
688
self.assertEqual('', relpath)
689
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
690
get_transport(self.get_readonly_url('g/p/q')))
691
self.assertEqual('g/p/q', relpath)
693
def test_open_containing_tree_or_branch(self):
694
self.make_branch_and_tree('topdir')
695
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
697
self.assertEqual(os.path.realpath('topdir'),
698
os.path.realpath(tree.basedir))
699
self.assertEqual(os.path.realpath('topdir'),
700
self.local_branch_path(branch))
701
self.assertIs(tree.bzrdir, branch.bzrdir)
702
self.assertEqual('foo', relpath)
703
# opening from non-local should not return the tree
704
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
705
self.get_readonly_url('topdir/foo'))
706
self.assertEqual(None, tree)
707
self.assertEqual('foo', relpath)
709
self.make_branch('topdir/foo')
710
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
712
self.assertIs(tree, None)
713
self.assertEqual(os.path.realpath('topdir/foo'),
714
self.local_branch_path(branch))
715
self.assertEqual('', relpath)
717
def test_open_tree_or_branch(self):
718
self.make_branch_and_tree('topdir')
719
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
720
self.assertEqual(os.path.realpath('topdir'),
721
os.path.realpath(tree.basedir))
722
self.assertEqual(os.path.realpath('topdir'),
723
self.local_branch_path(branch))
724
self.assertIs(tree.bzrdir, branch.bzrdir)
725
# opening from non-local should not return the tree
726
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
727
self.get_readonly_url('topdir'))
728
self.assertEqual(None, tree)
730
self.make_branch('topdir/foo')
731
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
732
self.assertIs(tree, None)
733
self.assertEqual(os.path.realpath('topdir/foo'),
734
self.local_branch_path(branch))
736
def test_open_from_transport(self):
737
# transport pointing at bzrdir should give a bzrdir with root transport
738
# set to the given transport
739
control = bzrdir.BzrDir.create(self.get_url())
740
transport = get_transport(self.get_url())
741
opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
742
self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
743
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
745
def test_open_from_transport_no_bzrdir(self):
746
transport = get_transport(self.get_url())
747
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
750
def test_open_from_transport_bzrdir_in_parent(self):
751
control = bzrdir.BzrDir.create(self.get_url())
752
transport = get_transport(self.get_url())
753
transport.mkdir('subdir')
754
transport = transport.clone('subdir')
755
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
758
def test_sprout_recursive(self):
759
tree = self.make_branch_and_tree('tree1',
760
format='dirstate-with-subtree')
761
sub_tree = self.make_branch_and_tree('tree1/subtree',
762
format='dirstate-with-subtree')
763
sub_tree.set_root_id('subtree-root')
764
tree.add_reference(sub_tree)
765
self.build_tree(['tree1/subtree/file'])
767
tree.commit('Initial commit')
768
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
770
self.addCleanup(tree2.unlock)
771
self.failUnlessExists('tree2/subtree/file')
772
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
774
def test_cloning_metadir(self):
775
"""Ensure that cloning metadir is suitable"""
776
bzrdir = self.make_bzrdir('bzrdir')
777
bzrdir.cloning_metadir()
778
branch = self.make_branch('branch', format='knit')
779
format = branch.bzrdir.cloning_metadir()
780
self.assertIsInstance(format.workingtree_format,
781
workingtree.WorkingTreeFormat3)
783
def test_sprout_recursive_treeless(self):
784
tree = self.make_branch_and_tree('tree1',
785
format='dirstate-with-subtree')
786
sub_tree = self.make_branch_and_tree('tree1/subtree',
787
format='dirstate-with-subtree')
788
tree.add_reference(sub_tree)
789
self.build_tree(['tree1/subtree/file'])
791
tree.commit('Initial commit')
792
tree.bzrdir.destroy_workingtree()
793
repo = self.make_repository('repo', shared=True,
794
format='dirstate-with-subtree')
795
repo.set_make_working_trees(False)
796
tree.bzrdir.sprout('repo/tree2')
797
self.failUnlessExists('repo/tree2/subtree')
798
self.failIfExists('repo/tree2/subtree/file')
800
def make_foo_bar_baz(self):
801
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
802
bar = self.make_branch('foo/bar').bzrdir
803
baz = self.make_branch('baz').bzrdir
806
def test_find_bzrdirs(self):
807
foo, bar, baz = self.make_foo_bar_baz()
808
transport = get_transport(self.get_url())
809
self.assertEqualBzrdirs([baz, foo, bar],
810
bzrdir.BzrDir.find_bzrdirs(transport))
812
def make_fake_permission_denied_transport(self, transport, paths):
813
"""Create a transport that raises PermissionDenied for some paths."""
816
raise errors.PermissionDenied(path)
818
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
819
path_filter_server.start_server()
820
self.addCleanup(path_filter_server.stop_server)
821
path_filter_transport = pathfilter.PathFilteringTransport(
822
path_filter_server, '.')
823
return (path_filter_server, path_filter_transport)
825
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
826
"""Check that each branch url ends with the given suffix."""
827
for actual_bzrdir in actual_bzrdirs:
828
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
830
def test_find_bzrdirs_permission_denied(self):
831
foo, bar, baz = self.make_foo_bar_baz()
832
transport = get_transport(self.get_url())
833
path_filter_server, path_filter_transport = \
834
self.make_fake_permission_denied_transport(transport, ['foo'])
836
self.assertBranchUrlsEndWith('/baz/',
837
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
839
smart_transport = self.make_smart_server('.',
840
backing_server=path_filter_server)
841
self.assertBranchUrlsEndWith('/baz/',
842
bzrdir.BzrDir.find_bzrdirs(smart_transport))
844
def test_find_bzrdirs_list_current(self):
845
def list_current(transport):
846
return [s for s in transport.list_dir('') if s != 'baz']
848
foo, bar, baz = self.make_foo_bar_baz()
849
transport = get_transport(self.get_url())
850
self.assertEqualBzrdirs([foo, bar],
851
bzrdir.BzrDir.find_bzrdirs(transport,
852
list_current=list_current))
854
def test_find_bzrdirs_evaluate(self):
855
def evaluate(bzrdir):
857
repo = bzrdir.open_repository()
858
except NoRepositoryPresent:
859
return True, bzrdir.root_transport.base
861
return False, bzrdir.root_transport.base
863
foo, bar, baz = self.make_foo_bar_baz()
864
transport = get_transport(self.get_url())
865
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
866
list(bzrdir.BzrDir.find_bzrdirs(transport,
869
def assertEqualBzrdirs(self, first, second):
871
second = list(second)
872
self.assertEqual(len(first), len(second))
873
for x, y in zip(first, second):
874
self.assertEqual(x.root_transport.base, y.root_transport.base)
876
def test_find_branches(self):
877
root = self.make_repository('', shared=True)
878
foo, bar, baz = self.make_foo_bar_baz()
879
qux = self.make_bzrdir('foo/qux')
880
transport = get_transport(self.get_url())
881
branches = bzrdir.BzrDir.find_branches(transport)
882
self.assertEqual(baz.root_transport.base, branches[0].base)
883
self.assertEqual(foo.root_transport.base, branches[1].base)
884
self.assertEqual(bar.root_transport.base, branches[2].base)
886
# ensure this works without a top-level repo
887
branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
888
self.assertEqual(foo.root_transport.base, branches[0].base)
889
self.assertEqual(bar.root_transport.base, branches[1].base)
892
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
894
def test_find_bzrdirs_missing_repo(self):
895
transport = get_transport(self.get_url())
896
arepo = self.make_repository('arepo', shared=True)
897
abranch_url = arepo.user_url + '/abranch'
898
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
899
transport.delete_tree('arepo/.bzr')
900
self.assertRaises(errors.NoRepositoryPresent,
901
branch.Branch.open, abranch_url)
902
self.make_branch('baz')
903
for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
904
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
907
class TestMeta1DirFormat(TestCaseWithTransport):
908
"""Tests specific to the meta1 dir format."""
910
def test_right_base_dirs(self):
911
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
913
branch_base = t.clone('branch').base
914
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
915
self.assertEqual(branch_base,
916
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
917
repository_base = t.clone('repository').base
918
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
919
self.assertEqual(repository_base,
920
dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
921
checkout_base = t.clone('checkout').base
922
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
923
self.assertEqual(checkout_base,
924
dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
926
def test_meta1dir_uses_lockdir(self):
927
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
928
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
930
self.assertIsDirectory('branch-lock', t)
932
def test_comparison(self):
933
"""Equality and inequality behave properly.
935
Metadirs should compare equal iff they have the same repo, branch and
938
mydir = bzrdir.format_registry.make_bzrdir('knit')
939
self.assertEqual(mydir, mydir)
940
self.assertFalse(mydir != mydir)
941
otherdir = bzrdir.format_registry.make_bzrdir('knit')
942
self.assertEqual(otherdir, mydir)
943
self.assertFalse(otherdir != mydir)
944
otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
945
self.assertNotEqual(otherdir2, mydir)
946
self.assertFalse(otherdir2 == mydir)
948
def test_needs_conversion_different_working_tree(self):
949
# meta1dirs need an conversion if any element is not the default.
950
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
951
tree = self.make_branch_and_tree('tree', format='knit')
952
self.assertTrue(tree.bzrdir.needs_format_conversion(
955
def test_initialize_on_format_uses_smart_transport(self):
956
self.setup_smart_server_with_call_log()
957
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
958
transport = self.get_transport('target')
959
transport.ensure_base()
960
self.reset_smart_call_log()
961
instance = new_format.initialize_on_transport(transport)
962
self.assertIsInstance(instance, remote.RemoteBzrDir)
963
rpc_count = len(self.hpss_calls)
964
# This figure represent the amount of work to perform this use case. It
965
# is entirely ok to reduce this number if a test fails due to rpc_count
966
# being too low. If rpc_count increases, more network roundtrips have
967
# become necessary for this use case. Please do not adjust this number
968
# upwards without agreement from bzr's network support maintainers.
969
self.assertEqual(2, rpc_count)
972
class TestFormat5(TestCaseWithTransport):
973
"""Tests specific to the version 5 bzrdir format."""
975
def test_same_lockfiles_between_tree_repo_branch(self):
976
# this checks that only a single lockfiles instance is created
977
# for format 5 objects
978
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
979
def check_dir_components_use_same_lock(dir):
980
ctrl_1 = dir.open_repository().control_files
981
ctrl_2 = dir.open_branch().control_files
982
ctrl_3 = dir.open_workingtree()._control_files
983
self.assertTrue(ctrl_1 is ctrl_2)
984
self.assertTrue(ctrl_2 is ctrl_3)
985
check_dir_components_use_same_lock(dir)
986
# and if we open it normally.
987
dir = bzrdir.BzrDir.open(self.get_url())
988
check_dir_components_use_same_lock(dir)
990
def test_can_convert(self):
991
# format 5 dirs are convertable
992
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
993
self.assertTrue(dir.can_convert_format())
995
def test_needs_conversion(self):
996
# format 5 dirs need a conversion if they are not the default,
998
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
999
# don't need to convert it to itself
1000
self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
1001
# do need to convert it to the current default
1002
self.assertTrue(dir.needs_format_conversion(
1003
bzrdir.BzrDirFormat.get_default_format()))
1006
class TestFormat6(TestCaseWithTransport):
1007
"""Tests specific to the version 6 bzrdir format."""
1009
def test_same_lockfiles_between_tree_repo_branch(self):
1010
# this checks that only a single lockfiles instance is created
1011
# for format 6 objects
1012
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1013
def check_dir_components_use_same_lock(dir):
1014
ctrl_1 = dir.open_repository().control_files
1015
ctrl_2 = dir.open_branch().control_files
1016
ctrl_3 = dir.open_workingtree()._control_files
1017
self.assertTrue(ctrl_1 is ctrl_2)
1018
self.assertTrue(ctrl_2 is ctrl_3)
1019
check_dir_components_use_same_lock(dir)
1020
# and if we open it normally.
1021
dir = bzrdir.BzrDir.open(self.get_url())
1022
check_dir_components_use_same_lock(dir)
1024
def test_can_convert(self):
1025
# format 6 dirs are convertable
1026
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1027
self.assertTrue(dir.can_convert_format())
1029
def test_needs_conversion(self):
1030
# format 6 dirs need an conversion if they are not the default.
1031
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1032
self.assertTrue(dir.needs_format_conversion(
1033
bzrdir.BzrDirFormat.get_default_format()))
1036
class NotBzrDir(bzrlib.bzrdir.BzrDir):
1037
"""A non .bzr based control directory."""
1039
def __init__(self, transport, format):
1040
self._format = format
1041
self.root_transport = transport
1042
self.transport = transport.clone('.not')
1045
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
1046
"""A test class representing any non-.bzr based disk format."""
1048
def initialize_on_transport(self, transport):
1049
"""Initialize a new .not dir in the base directory of a Transport."""
1050
transport.mkdir('.not')
1051
return self.open(transport)
1053
def open(self, transport):
1054
"""Open this directory."""
1055
return NotBzrDir(transport, self)
1058
def _known_formats(self):
1059
return set([NotBzrDirFormat()])
1062
def probe_transport(self, transport):
1063
"""Our format is present if the transport ends in '.not/'."""
1064
if transport.has('.not'):
1065
return NotBzrDirFormat()
1068
class TestNotBzrDir(TestCaseWithTransport):
1069
"""Tests for using the bzrdir api with a non .bzr based disk format.
1071
If/when one of these is in the core, we can let the implementation tests
1075
def test_create_and_find_format(self):
1076
# create a .notbzr dir
1077
format = NotBzrDirFormat()
1078
dir = format.initialize(self.get_url())
1079
self.assertIsInstance(dir, NotBzrDir)
1081
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1083
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1084
get_transport(self.get_url()))
1085
self.assertIsInstance(found, NotBzrDirFormat)
1087
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1089
def test_included_in_known_formats(self):
1090
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1092
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1093
for format in formats:
1094
if isinstance(format, NotBzrDirFormat):
1096
self.fail("No NotBzrDirFormat in %s" % formats)
1098
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
1101
class NonLocalTests(TestCaseWithTransport):
1102
"""Tests for bzrdir static behaviour on non local paths."""
1105
super(NonLocalTests, self).setUp()
1106
self.vfs_transport_factory = memory.MemoryServer
1108
def test_create_branch_convenience(self):
1109
# outside a repo the default convenience output is a repo+branch_tree
1110
format = bzrdir.format_registry.make_bzrdir('knit')
1111
branch = bzrdir.BzrDir.create_branch_convenience(
1112
self.get_url('foo'), format=format)
1113
self.assertRaises(errors.NoWorkingTree,
1114
branch.bzrdir.open_workingtree)
1115
branch.bzrdir.open_repository()
1117
def test_create_branch_convenience_force_tree_not_local_fails(self):
1118
# outside a repo the default convenience output is a repo+branch_tree
1119
format = bzrdir.format_registry.make_bzrdir('knit')
1120
self.assertRaises(errors.NotLocalUrl,
1121
bzrdir.BzrDir.create_branch_convenience,
1122
self.get_url('foo'),
1123
force_new_tree=True,
1125
t = get_transport(self.get_url('.'))
1126
self.assertFalse(t.has('foo'))
1128
def test_clone(self):
1129
# clone into a nonlocal path works
1130
format = bzrdir.format_registry.make_bzrdir('knit')
1131
branch = bzrdir.BzrDir.create_branch_convenience('local',
1133
branch.bzrdir.open_workingtree()
1134
result = branch.bzrdir.clone(self.get_url('remote'))
1135
self.assertRaises(errors.NoWorkingTree,
1136
result.open_workingtree)
1137
result.open_branch()
1138
result.open_repository()
1140
def test_checkout_metadir(self):
1141
# checkout_metadir has reasonable working tree format even when no
1142
# working tree is present
1143
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1144
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1145
checkout_format = my_bzrdir.checkout_metadir()
1146
self.assertIsInstance(checkout_format.workingtree_format,
1147
workingtree.WorkingTreeFormat3)
1150
class TestHTTPRedirections(object):
1151
"""Test redirection between two http servers.
1153
This MUST be used by daughter classes that also inherit from
1154
TestCaseWithTwoWebservers.
1156
We can't inherit directly from TestCaseWithTwoWebservers or the
1157
test framework will try to create an instance which cannot
1158
run, its implementation being incomplete.
1161
def create_transport_readonly_server(self):
1162
# We don't set the http protocol version, relying on the default
1163
return http_utils.HTTPServerRedirecting()
1165
def create_transport_secondary_server(self):
1166
# We don't set the http protocol version, relying on the default
1167
return http_utils.HTTPServerRedirecting()
1170
super(TestHTTPRedirections, self).setUp()
1171
# The redirections will point to the new server
1172
self.new_server = self.get_readonly_server()
1173
# The requests to the old server will be redirected
1174
self.old_server = self.get_secondary_server()
1175
# Configure the redirections
1176
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1178
def test_loop(self):
1179
# Both servers redirect to each other creating a loop
1180
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1181
# Starting from either server should loop
1182
old_url = self._qualified_url(self.old_server.host,
1183
self.old_server.port)
1184
oldt = self._transport(old_url)
1185
self.assertRaises(errors.NotBranchError,
1186
bzrdir.BzrDir.open_from_transport, oldt)
1187
new_url = self._qualified_url(self.new_server.host,
1188
self.new_server.port)
1189
newt = self._transport(new_url)
1190
self.assertRaises(errors.NotBranchError,
1191
bzrdir.BzrDir.open_from_transport, newt)
1193
def test_qualifier_preserved(self):
1194
wt = self.make_branch_and_tree('branch')
1195
old_url = self._qualified_url(self.old_server.host,
1196
self.old_server.port)
1197
start = self._transport(old_url).clone('branch')
1198
bdir = bzrdir.BzrDir.open_from_transport(start)
1199
# Redirection should preserve the qualifier, hence the transport class
1201
self.assertIsInstance(bdir.root_transport, type(start))
1204
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1205
http_utils.TestCaseWithTwoWebservers):
1206
"""Tests redirections for urllib implementation"""
1208
_transport = HttpTransport_urllib
1210
def _qualified_url(self, host, port):
1211
result = 'http+urllib://%s:%s' % (host, port)
1212
self.permit_url(result)
1217
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1218
TestHTTPRedirections,
1219
http_utils.TestCaseWithTwoWebservers):
1220
"""Tests redirections for pycurl implementation"""
1222
def _qualified_url(self, host, port):
1223
result = 'http+pycurl://%s:%s' % (host, port)
1224
self.permit_url(result)
1228
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1229
http_utils.TestCaseWithTwoWebservers):
1230
"""Tests redirections for the nosmart decorator"""
1232
_transport = NoSmartTransportDecorator
1234
def _qualified_url(self, host, port):
1235
result = 'nosmart+http://%s:%s' % (host, port)
1236
self.permit_url(result)
1240
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1241
http_utils.TestCaseWithTwoWebservers):
1242
"""Tests redirections for readonly decoratror"""
1244
_transport = ReadonlyTransportDecorator
1246
def _qualified_url(self, host, port):
1247
result = 'readonly+http://%s:%s' % (host, port)
1248
self.permit_url(result)
1252
class TestDotBzrHidden(TestCaseWithTransport):
1255
if sys.platform == 'win32':
1256
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1259
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1260
stderr=subprocess.PIPE)
1261
out, err = f.communicate()
1262
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1264
return out.splitlines()
1266
def test_dot_bzr_hidden(self):
1267
if sys.platform == 'win32' and not win32utils.has_win32file:
1268
raise TestSkipped('unable to make file hidden without pywin32 library')
1269
b = bzrdir.BzrDir.create('.')
1270
self.build_tree(['a'])
1271
self.assertEquals(['a'], self.get_ls())
1273
def test_dot_bzr_hidden_with_url(self):
1274
if sys.platform == 'win32' and not win32utils.has_win32file:
1275
raise TestSkipped('unable to make file hidden without pywin32 library')
1276
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1277
self.build_tree(['a'])
1278
self.assertEquals(['a'], self.get_ls())
1281
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1282
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1284
def _open(self, transport):
1285
return _TestBzrDir(transport, self)
1288
class _TestBzrDir(bzrdir.BzrDirMeta1):
1289
"""Test BzrDir implementation for TestBzrDirSprout.
1291
When created a _TestBzrDir already has repository and a branch. The branch
1292
is a test double as well.
1295
def __init__(self, *args, **kwargs):
1296
super(_TestBzrDir, self).__init__(*args, **kwargs)
1297
self.test_branch = _TestBranch()
1298
self.test_branch.repository = self.create_repository()
1300
def open_branch(self, unsupported=False):
1301
return self.test_branch
1303
def cloning_metadir(self, require_stacking=False):
1304
return _TestBzrDirFormat()
1307
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1308
"""Test Branch format for TestBzrDirSprout."""
1311
class _TestBranch(bzrlib.branch.Branch):
1312
"""Test Branch implementation for TestBzrDirSprout."""
1314
def __init__(self, *args, **kwargs):
1315
self._format = _TestBranchFormat()
1316
super(_TestBranch, self).__init__(*args, **kwargs)
1320
def sprout(self, *args, **kwargs):
1321
self.calls.append('sprout')
1322
return _TestBranch()
1324
def copy_content_into(self, destination, revision_id=None):
1325
self.calls.append('copy_content_into')
1327
def get_parent(self):
1330
def set_parent(self, parent):
1331
self._parent = parent
1334
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1336
def test_sprout_uses_branch_sprout(self):
1337
"""BzrDir.sprout calls Branch.sprout.
1339
Usually, BzrDir.sprout should delegate to the branch's sprout method
1340
for part of the work. This allows the source branch to control the
1341
choice of format for the new branch.
1343
There are exceptions, but this tests avoids them:
1344
- if there's no branch in the source bzrdir,
1345
- or if the stacking has been requested and the format needs to be
1346
overridden to satisfy that.
1348
# Make an instrumented bzrdir.
1349
t = self.get_transport('source')
1351
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1352
# The instrumented bzrdir has a test_branch attribute that logs calls
1353
# made to the branch contained in that bzrdir. Initially the test
1354
# branch exists but no calls have been made to it.
1355
self.assertEqual([], source_bzrdir.test_branch.calls)
1358
target_url = self.get_url('target')
1359
result = source_bzrdir.sprout(target_url, recurse='no')
1361
# The bzrdir called the branch's sprout method.
1362
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1364
def test_sprout_parent(self):
1365
grandparent_tree = self.make_branch('grandparent')
1366
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1367
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1368
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1371
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1373
def test_pre_open_called(self):
1375
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1376
transport = self.get_transport('foo')
1377
url = transport.base
1378
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1379
self.assertEqual([transport.base], [t.base for t in calls])
1381
def test_pre_open_actual_exceptions_raised(self):
1383
def fail_once(transport):
1386
raise errors.BzrError("fail")
1387
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1388
transport = self.get_transport('foo')
1389
url = transport.base
1390
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1391
self.assertEqual('fail', err._preformatted_string)
1393
def test_post_repo_init(self):
1394
from bzrlib.bzrdir import RepoInitHookParams
1396
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1398
self.make_repository('foo')
1399
self.assertLength(1, calls)
1401
self.assertIsInstance(params, RepoInitHookParams)
1402
self.assertTrue(hasattr(params, 'bzrdir'))
1403
self.assertTrue(hasattr(params, 'repository'))