1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
40
from bzrlib.errors import (NotBranchError,
41
NoColocatedBranchSupport,
43
UnsupportedFormatError,
45
from bzrlib.tests import (
47
TestCaseWithMemoryTransport,
48
TestCaseWithTransport,
51
from bzrlib.tests import(
55
from bzrlib.tests.test_http import TestWithTransport_pycurl
56
from bzrlib.transport import (
61
from bzrlib.transport.http._urllib import HttpTransport_urllib
62
from bzrlib.transport.nosmart import NoSmartTransportDecorator
63
from bzrlib.transport.readonly import ReadonlyTransportDecorator
64
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
67
class TestDefaultFormat(TestCase):
69
def test_get_set_default_format(self):
70
old_format = bzrdir.BzrDirFormat.get_default_format()
71
# default is BzrDirFormat6
72
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
73
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
74
# creating a bzr dir should now create an instrumented dir.
76
result = bzrdir.BzrDir.create('memory:///')
77
self.failUnless(isinstance(result, SampleBzrDir))
79
controldir.ControlDirFormat._set_default_format(old_format)
80
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
83
class TestFormatRegistry(TestCase):
85
def make_format_registry(self):
86
my_format_registry = controldir.ControlDirFormatRegistry()
87
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
88
'Pre-0.8 format. Slower and does not support checkouts or shared'
89
' repositories', deprecated=True)
90
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
91
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
92
bzrdir.register_metadir(my_format_registry, 'knit',
93
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
96
my_format_registry.set_default('knit')
97
bzrdir.register_metadir(my_format_registry,
99
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
100
'Experimental successor to knit. Use at your own risk.',
101
branch_format='bzrlib.branch.BzrBranchFormat6',
103
bzrdir.register_metadir(my_format_registry,
105
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
106
'Experimental successor to knit. Use at your own risk.',
107
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
108
my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
109
'Pre-0.8 format. Slower and does not support checkouts or shared'
110
' repositories', hidden=True)
111
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
112
'BzrDirFormat6', 'Format registered lazily', deprecated=True,
114
return my_format_registry
116
def test_format_registry(self):
117
my_format_registry = self.make_format_registry()
118
my_bzrdir = my_format_registry.make_bzrdir('lazy')
119
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
120
my_bzrdir = my_format_registry.make_bzrdir('weave')
121
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
122
my_bzrdir = my_format_registry.make_bzrdir('default')
123
self.assertIsInstance(my_bzrdir.repository_format,
124
knitrepo.RepositoryFormatKnit1)
125
my_bzrdir = my_format_registry.make_bzrdir('knit')
126
self.assertIsInstance(my_bzrdir.repository_format,
127
knitrepo.RepositoryFormatKnit1)
128
my_bzrdir = my_format_registry.make_bzrdir('branch6')
129
self.assertIsInstance(my_bzrdir.get_branch_format(),
130
bzrlib.branch.BzrBranchFormat6)
132
def test_get_help(self):
133
my_format_registry = self.make_format_registry()
134
self.assertEqual('Format registered lazily',
135
my_format_registry.get_help('lazy'))
136
self.assertEqual('Format using knits',
137
my_format_registry.get_help('knit'))
138
self.assertEqual('Format using knits',
139
my_format_registry.get_help('default'))
140
self.assertEqual('Pre-0.8 format. Slower and does not support'
141
' checkouts or shared repositories',
142
my_format_registry.get_help('weave'))
144
def test_help_topic(self):
145
topics = help_topics.HelpTopicRegistry()
146
registry = self.make_format_registry()
147
topics.register('current-formats', registry.help_topic,
149
topics.register('other-formats', registry.help_topic,
151
new = topics.get_detail('current-formats')
152
rest = topics.get_detail('other-formats')
153
experimental, deprecated = rest.split('Deprecated formats')
154
self.assertContainsRe(new, 'formats-help')
155
self.assertContainsRe(new,
156
':knit:\n \(native\) \(default\) Format using knits\n')
157
self.assertContainsRe(experimental,
158
':branch6:\n \(native\) Experimental successor to knit')
159
self.assertContainsRe(deprecated,
160
':lazy:\n \(native\) Format registered lazily\n')
161
self.assertNotContainsRe(new, 'hidden')
163
def test_set_default_repository(self):
164
default_factory = bzrdir.format_registry.get('default')
165
old_default = [k for k, v in bzrdir.format_registry.iteritems()
166
if v == default_factory and k != 'default'][0]
167
bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
169
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
170
bzrdir.format_registry.get('default'))
172
repository.RepositoryFormat.get_default_format().__class__,
173
knitrepo.RepositoryFormatKnit3)
175
bzrdir.format_registry.set_default_repository(old_default)
177
def test_aliases(self):
178
a_registry = controldir.ControlDirFormatRegistry()
179
a_registry.register('weave', bzrdir.BzrDirFormat6,
180
'Pre-0.8 format. Slower and does not support checkouts or shared'
181
' repositories', deprecated=True)
182
a_registry.register('weavealias', bzrdir.BzrDirFormat6,
183
'Pre-0.8 format. Slower and does not support checkouts or shared'
184
' repositories', deprecated=True, alias=True)
185
self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
188
class SampleBranch(bzrlib.branch.Branch):
189
"""A dummy branch for guess what, dummy use."""
191
def __init__(self, dir):
195
class SampleRepository(bzrlib.repository.Repository):
198
def __init__(self, dir):
202
class SampleBzrDir(bzrdir.BzrDir):
203
"""A sample BzrDir implementation to allow testing static methods."""
205
def create_repository(self, shared=False):
206
"""See BzrDir.create_repository."""
207
return "A repository"
209
def open_repository(self):
210
"""See BzrDir.open_repository."""
211
return SampleRepository(self)
213
def create_branch(self, name=None):
214
"""See BzrDir.create_branch."""
216
raise NoColocatedBranchSupport(self)
217
return SampleBranch(self)
219
def create_workingtree(self):
220
"""See BzrDir.create_workingtree."""
224
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
227
this format is initializable, unsupported to aid in testing the
228
open and open_downlevel routines.
231
def get_format_string(self):
232
"""See BzrDirFormat.get_format_string()."""
233
return "Sample .bzr dir format."
235
def initialize_on_transport(self, t):
236
"""Create a bzr dir."""
238
t.put_bytes('.bzr/branch-format', self.get_format_string())
239
return SampleBzrDir(t, self)
241
def is_supported(self):
244
def open(self, transport, _found=None):
245
return "opened branch."
248
class TestBzrDirFormat(TestCaseWithTransport):
249
"""Tests for the BzrDirFormat facility."""
251
def test_find_format(self):
252
# is the right format object found for a branch?
253
# create a branch with a few known format objects.
254
# this is not quite the same as
255
t = get_transport(self.get_url())
256
self.build_tree(["foo/", "bar/"], transport=t)
257
def check_format(format, url):
258
format.initialize(url)
259
t = get_transport(url)
260
found_format = bzrdir.BzrDirFormat.find_format(t)
261
self.failUnless(isinstance(found_format, format.__class__))
262
check_format(bzrdir.BzrDirFormat5(), "foo")
263
check_format(bzrdir.BzrDirFormat6(), "bar")
265
def test_find_format_nothing_there(self):
266
self.assertRaises(NotBranchError,
267
bzrdir.BzrDirFormat.find_format,
270
def test_find_format_unknown_format(self):
271
t = get_transport(self.get_url())
273
t.put_bytes('.bzr/branch-format', '')
274
self.assertRaises(UnknownFormatError,
275
bzrdir.BzrDirFormat.find_format,
278
def test_register_unregister_format(self):
279
format = SampleBzrDirFormat()
282
format.initialize(url)
283
# register a format for it.
284
bzrdir.BzrDirFormat.register_format(format)
285
# which bzrdir.Open will refuse (not supported)
286
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
287
# which bzrdir.open_containing will refuse (not supported)
288
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
289
# but open_downlevel will work
290
t = get_transport(url)
291
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
292
# unregister the format
293
bzrdir.BzrDirFormat.unregister_format(format)
294
# now open_downlevel should fail too.
295
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
297
def test_create_branch_and_repo_uses_default(self):
298
format = SampleBzrDirFormat()
299
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
301
self.assertTrue(isinstance(branch, SampleBranch))
303
def test_create_branch_and_repo_under_shared(self):
304
# creating a branch and repo in a shared repo uses the
306
format = bzrdir.format_registry.make_bzrdir('knit')
307
self.make_repository('.', shared=True, format=format)
308
branch = bzrdir.BzrDir.create_branch_and_repo(
309
self.get_url('child'), format=format)
310
self.assertRaises(errors.NoRepositoryPresent,
311
branch.bzrdir.open_repository)
313
def test_create_branch_and_repo_under_shared_force_new(self):
314
# creating a branch and repo in a shared repo can be forced to
316
format = bzrdir.format_registry.make_bzrdir('knit')
317
self.make_repository('.', shared=True, format=format)
318
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
321
branch.bzrdir.open_repository()
323
def test_create_standalone_working_tree(self):
324
format = SampleBzrDirFormat()
325
# note this is deliberately readonly, as this failure should
326
# occur before any writes.
327
self.assertRaises(errors.NotLocalUrl,
328
bzrdir.BzrDir.create_standalone_workingtree,
329
self.get_readonly_url(), format=format)
330
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
332
self.assertEqual('A tree', tree)
334
def test_create_standalone_working_tree_under_shared_repo(self):
335
# create standalone working tree always makes a repo.
336
format = bzrdir.format_registry.make_bzrdir('knit')
337
self.make_repository('.', shared=True, format=format)
338
# note this is deliberately readonly, as this failure should
339
# occur before any writes.
340
self.assertRaises(errors.NotLocalUrl,
341
bzrdir.BzrDir.create_standalone_workingtree,
342
self.get_readonly_url('child'), format=format)
343
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
345
tree.bzrdir.open_repository()
347
def test_create_branch_convenience(self):
348
# outside a repo the default convenience output is a repo+branch_tree
349
format = bzrdir.format_registry.make_bzrdir('knit')
350
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
351
branch.bzrdir.open_workingtree()
352
branch.bzrdir.open_repository()
354
def test_create_branch_convenience_possible_transports(self):
355
"""Check that the optional 'possible_transports' is recognized"""
356
format = bzrdir.format_registry.make_bzrdir('knit')
357
t = self.get_transport()
358
branch = bzrdir.BzrDir.create_branch_convenience(
359
'.', format=format, possible_transports=[t])
360
branch.bzrdir.open_workingtree()
361
branch.bzrdir.open_repository()
363
def test_create_branch_convenience_root(self):
364
"""Creating a branch at the root of a fs should work."""
365
self.vfs_transport_factory = memory.MemoryServer
366
# outside a repo the default convenience output is a repo+branch_tree
367
format = bzrdir.format_registry.make_bzrdir('knit')
368
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
370
self.assertRaises(errors.NoWorkingTree,
371
branch.bzrdir.open_workingtree)
372
branch.bzrdir.open_repository()
374
def test_create_branch_convenience_under_shared_repo(self):
375
# inside a repo the default convenience output is a branch+ follow the
377
format = bzrdir.format_registry.make_bzrdir('knit')
378
self.make_repository('.', shared=True, format=format)
379
branch = bzrdir.BzrDir.create_branch_convenience('child',
381
branch.bzrdir.open_workingtree()
382
self.assertRaises(errors.NoRepositoryPresent,
383
branch.bzrdir.open_repository)
385
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
386
# inside a repo the default convenience output is a branch+ follow the
387
# repo tree policy but we can override that
388
format = bzrdir.format_registry.make_bzrdir('knit')
389
self.make_repository('.', shared=True, format=format)
390
branch = bzrdir.BzrDir.create_branch_convenience('child',
391
force_new_tree=False, format=format)
392
self.assertRaises(errors.NoWorkingTree,
393
branch.bzrdir.open_workingtree)
394
self.assertRaises(errors.NoRepositoryPresent,
395
branch.bzrdir.open_repository)
397
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
398
# inside a repo the default convenience output is a branch+ follow the
400
format = bzrdir.format_registry.make_bzrdir('knit')
401
repo = self.make_repository('.', shared=True, format=format)
402
repo.set_make_working_trees(False)
403
branch = bzrdir.BzrDir.create_branch_convenience('child',
405
self.assertRaises(errors.NoWorkingTree,
406
branch.bzrdir.open_workingtree)
407
self.assertRaises(errors.NoRepositoryPresent,
408
branch.bzrdir.open_repository)
410
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
411
# inside a repo the default convenience output is a branch+ follow the
412
# repo tree policy but we can override that
413
format = bzrdir.format_registry.make_bzrdir('knit')
414
repo = self.make_repository('.', shared=True, format=format)
415
repo.set_make_working_trees(False)
416
branch = bzrdir.BzrDir.create_branch_convenience('child',
417
force_new_tree=True, format=format)
418
branch.bzrdir.open_workingtree()
419
self.assertRaises(errors.NoRepositoryPresent,
420
branch.bzrdir.open_repository)
422
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
423
# inside a repo the default convenience output is overridable to give
425
format = bzrdir.format_registry.make_bzrdir('knit')
426
self.make_repository('.', shared=True, format=format)
427
branch = bzrdir.BzrDir.create_branch_convenience('child',
428
force_new_repo=True, format=format)
429
branch.bzrdir.open_repository()
430
branch.bzrdir.open_workingtree()
433
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
435
def test_acquire_repository_standalone(self):
436
"""The default acquisition policy should create a standalone branch."""
437
my_bzrdir = self.make_bzrdir('.')
438
repo_policy = my_bzrdir.determine_repository_policy()
439
repo, is_new = repo_policy.acquire_repository()
440
self.assertEqual(repo.bzrdir.root_transport.base,
441
my_bzrdir.root_transport.base)
442
self.assertFalse(repo.is_shared())
444
def test_determine_stacking_policy(self):
445
parent_bzrdir = self.make_bzrdir('.')
446
child_bzrdir = self.make_bzrdir('child')
447
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
448
repo_policy = child_bzrdir.determine_repository_policy()
449
self.assertEqual('http://example.org', repo_policy._stack_on)
451
def test_determine_stacking_policy_relative(self):
452
parent_bzrdir = self.make_bzrdir('.')
453
child_bzrdir = self.make_bzrdir('child')
454
parent_bzrdir.get_config().set_default_stack_on('child2')
455
repo_policy = child_bzrdir.determine_repository_policy()
456
self.assertEqual('child2', repo_policy._stack_on)
457
self.assertEqual(parent_bzrdir.root_transport.base,
458
repo_policy._stack_on_pwd)
460
def prepare_default_stacking(self, child_format='1.6'):
461
parent_bzrdir = self.make_bzrdir('.')
462
child_branch = self.make_branch('child', format=child_format)
463
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
464
new_child_transport = parent_bzrdir.transport.clone('child2')
465
return child_branch, new_child_transport
467
def test_clone_on_transport_obeys_stacking_policy(self):
468
child_branch, new_child_transport = self.prepare_default_stacking()
469
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
470
self.assertEqual(child_branch.base,
471
new_child.open_branch().get_stacked_on_url())
473
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
474
# Make stackable source branch with an unstackable repo format.
475
source_bzrdir = self.make_bzrdir('source')
476
pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
477
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
479
# Make a directory with a default stacking policy
480
parent_bzrdir = self.make_bzrdir('parent')
481
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
482
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
483
# Clone source into directory
484
target = source_bzrdir.clone(self.get_url('parent/target'))
486
def test_sprout_obeys_stacking_policy(self):
487
child_branch, new_child_transport = self.prepare_default_stacking()
488
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
489
self.assertEqual(child_branch.base,
490
new_child.open_branch().get_stacked_on_url())
492
def test_clone_ignores_policy_for_unsupported_formats(self):
493
child_branch, new_child_transport = self.prepare_default_stacking(
494
child_format='pack-0.92')
495
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
496
self.assertRaises(errors.UnstackableBranchFormat,
497
new_child.open_branch().get_stacked_on_url)
499
def test_sprout_ignores_policy_for_unsupported_formats(self):
500
child_branch, new_child_transport = self.prepare_default_stacking(
501
child_format='pack-0.92')
502
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
503
self.assertRaises(errors.UnstackableBranchFormat,
504
new_child.open_branch().get_stacked_on_url)
506
def test_sprout_upgrades_format_if_stacked_specified(self):
507
child_branch, new_child_transport = self.prepare_default_stacking(
508
child_format='pack-0.92')
509
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
511
self.assertEqual(child_branch.bzrdir.root_transport.base,
512
new_child.open_branch().get_stacked_on_url())
513
repo = new_child.open_repository()
514
self.assertTrue(repo._format.supports_external_lookups)
515
self.assertFalse(repo.supports_rich_root())
517
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
518
child_branch, new_child_transport = self.prepare_default_stacking(
519
child_format='pack-0.92')
520
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
521
stacked_on=child_branch.bzrdir.root_transport.base)
522
self.assertEqual(child_branch.bzrdir.root_transport.base,
523
new_child.open_branch().get_stacked_on_url())
524
repo = new_child.open_repository()
525
self.assertTrue(repo._format.supports_external_lookups)
526
self.assertFalse(repo.supports_rich_root())
528
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
529
child_branch, new_child_transport = self.prepare_default_stacking(
530
child_format='rich-root-pack')
531
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
533
repo = new_child.open_repository()
534
self.assertTrue(repo._format.supports_external_lookups)
535
self.assertTrue(repo.supports_rich_root())
537
def test_add_fallback_repo_handles_absolute_urls(self):
538
stack_on = self.make_branch('stack_on', format='1.6')
539
repo = self.make_repository('repo', format='1.6')
540
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
541
policy._add_fallback(repo)
543
def test_add_fallback_repo_handles_relative_urls(self):
544
stack_on = self.make_branch('stack_on', format='1.6')
545
repo = self.make_repository('repo', format='1.6')
546
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
547
policy._add_fallback(repo)
549
def test_configure_relative_branch_stacking_url(self):
550
stack_on = self.make_branch('stack_on', format='1.6')
551
stacked = self.make_branch('stack_on/stacked', format='1.6')
552
policy = bzrdir.UseExistingRepository(stacked.repository,
554
policy.configure_branch(stacked)
555
self.assertEqual('..', stacked.get_stacked_on_url())
557
def test_relative_branch_stacking_to_absolute(self):
558
stack_on = self.make_branch('stack_on', format='1.6')
559
stacked = self.make_branch('stack_on/stacked', format='1.6')
560
policy = bzrdir.UseExistingRepository(stacked.repository,
561
'.', self.get_readonly_url('stack_on'))
562
policy.configure_branch(stacked)
563
self.assertEqual(self.get_readonly_url('stack_on'),
564
stacked.get_stacked_on_url())
567
class ChrootedTests(TestCaseWithTransport):
568
"""A support class that provides readonly urls outside the local namespace.
570
This is done by checking if self.transport_server is a MemoryServer. if it
571
is then we are chrooted already, if it is not then an HttpServer is used
576
super(ChrootedTests, self).setUp()
577
if not self.vfs_transport_factory == memory.MemoryServer:
578
self.transport_readonly_server = http_server.HttpServer
580
def local_branch_path(self, branch):
581
return os.path.realpath(urlutils.local_path_from_url(branch.base))
583
def test_open_containing(self):
584
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
585
self.get_readonly_url(''))
586
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
587
self.get_readonly_url('g/p/q'))
588
control = bzrdir.BzrDir.create(self.get_url())
589
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
590
self.assertEqual('', relpath)
591
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
592
self.assertEqual('g/p/q', relpath)
594
def test_open_containing_tree_branch_or_repository_empty(self):
595
self.assertRaises(errors.NotBranchError,
596
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
597
self.get_readonly_url(''))
599
def test_open_containing_tree_branch_or_repository_all(self):
600
self.make_branch_and_tree('topdir')
601
tree, branch, repo, relpath = \
602
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
604
self.assertEqual(os.path.realpath('topdir'),
605
os.path.realpath(tree.basedir))
606
self.assertEqual(os.path.realpath('topdir'),
607
self.local_branch_path(branch))
609
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
610
repo.bzrdir.transport.local_abspath('repository'))
611
self.assertEqual(relpath, 'foo')
613
def test_open_containing_tree_branch_or_repository_no_tree(self):
614
self.make_branch('branch')
615
tree, branch, repo, relpath = \
616
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
618
self.assertEqual(tree, None)
619
self.assertEqual(os.path.realpath('branch'),
620
self.local_branch_path(branch))
622
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
623
repo.bzrdir.transport.local_abspath('repository'))
624
self.assertEqual(relpath, 'foo')
626
def test_open_containing_tree_branch_or_repository_repo(self):
627
self.make_repository('repo')
628
tree, branch, repo, relpath = \
629
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
631
self.assertEqual(tree, None)
632
self.assertEqual(branch, None)
634
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
635
repo.bzrdir.transport.local_abspath('repository'))
636
self.assertEqual(relpath, '')
638
def test_open_containing_tree_branch_or_repository_shared_repo(self):
639
self.make_repository('shared', shared=True)
640
bzrdir.BzrDir.create_branch_convenience('shared/branch',
641
force_new_tree=False)
642
tree, branch, repo, relpath = \
643
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
645
self.assertEqual(tree, None)
646
self.assertEqual(os.path.realpath('shared/branch'),
647
self.local_branch_path(branch))
649
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
650
repo.bzrdir.transport.local_abspath('repository'))
651
self.assertEqual(relpath, '')
653
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
654
self.make_branch_and_tree('foo')
655
self.build_tree(['foo/bar/'])
656
tree, branch, repo, relpath = \
657
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
659
self.assertEqual(os.path.realpath('foo'),
660
os.path.realpath(tree.basedir))
661
self.assertEqual(os.path.realpath('foo'),
662
self.local_branch_path(branch))
664
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
665
repo.bzrdir.transport.local_abspath('repository'))
666
self.assertEqual(relpath, 'bar')
668
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
669
self.make_repository('bar')
670
self.build_tree(['bar/baz/'])
671
tree, branch, repo, relpath = \
672
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
674
self.assertEqual(tree, None)
675
self.assertEqual(branch, None)
677
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
678
repo.bzrdir.transport.local_abspath('repository'))
679
self.assertEqual(relpath, 'baz')
681
def test_open_containing_from_transport(self):
682
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
683
get_transport(self.get_readonly_url('')))
684
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
685
get_transport(self.get_readonly_url('g/p/q')))
686
control = bzrdir.BzrDir.create(self.get_url())
687
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
688
get_transport(self.get_readonly_url('')))
689
self.assertEqual('', relpath)
690
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
691
get_transport(self.get_readonly_url('g/p/q')))
692
self.assertEqual('g/p/q', relpath)
694
def test_open_containing_tree_or_branch(self):
695
self.make_branch_and_tree('topdir')
696
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
698
self.assertEqual(os.path.realpath('topdir'),
699
os.path.realpath(tree.basedir))
700
self.assertEqual(os.path.realpath('topdir'),
701
self.local_branch_path(branch))
702
self.assertIs(tree.bzrdir, branch.bzrdir)
703
self.assertEqual('foo', relpath)
704
# opening from non-local should not return the tree
705
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
706
self.get_readonly_url('topdir/foo'))
707
self.assertEqual(None, tree)
708
self.assertEqual('foo', relpath)
710
self.make_branch('topdir/foo')
711
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
713
self.assertIs(tree, None)
714
self.assertEqual(os.path.realpath('topdir/foo'),
715
self.local_branch_path(branch))
716
self.assertEqual('', relpath)
718
def test_open_tree_or_branch(self):
719
self.make_branch_and_tree('topdir')
720
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
721
self.assertEqual(os.path.realpath('topdir'),
722
os.path.realpath(tree.basedir))
723
self.assertEqual(os.path.realpath('topdir'),
724
self.local_branch_path(branch))
725
self.assertIs(tree.bzrdir, branch.bzrdir)
726
# opening from non-local should not return the tree
727
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
728
self.get_readonly_url('topdir'))
729
self.assertEqual(None, tree)
731
self.make_branch('topdir/foo')
732
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
733
self.assertIs(tree, None)
734
self.assertEqual(os.path.realpath('topdir/foo'),
735
self.local_branch_path(branch))
737
def test_open_from_transport(self):
738
# transport pointing at bzrdir should give a bzrdir with root transport
739
# set to the given transport
740
control = bzrdir.BzrDir.create(self.get_url())
741
transport = get_transport(self.get_url())
742
opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
743
self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
744
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
746
def test_open_from_transport_no_bzrdir(self):
747
transport = get_transport(self.get_url())
748
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
751
def test_open_from_transport_bzrdir_in_parent(self):
752
control = bzrdir.BzrDir.create(self.get_url())
753
transport = get_transport(self.get_url())
754
transport.mkdir('subdir')
755
transport = transport.clone('subdir')
756
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
759
def test_sprout_recursive(self):
760
tree = self.make_branch_and_tree('tree1',
761
format='dirstate-with-subtree')
762
sub_tree = self.make_branch_and_tree('tree1/subtree',
763
format='dirstate-with-subtree')
764
sub_tree.set_root_id('subtree-root')
765
tree.add_reference(sub_tree)
766
self.build_tree(['tree1/subtree/file'])
768
tree.commit('Initial commit')
769
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
771
self.addCleanup(tree2.unlock)
772
self.failUnlessExists('tree2/subtree/file')
773
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
775
def test_cloning_metadir(self):
776
"""Ensure that cloning metadir is suitable"""
777
bzrdir = self.make_bzrdir('bzrdir')
778
bzrdir.cloning_metadir()
779
branch = self.make_branch('branch', format='knit')
780
format = branch.bzrdir.cloning_metadir()
781
self.assertIsInstance(format.workingtree_format,
782
workingtree.WorkingTreeFormat3)
784
def test_sprout_recursive_treeless(self):
785
tree = self.make_branch_and_tree('tree1',
786
format='dirstate-with-subtree')
787
sub_tree = self.make_branch_and_tree('tree1/subtree',
788
format='dirstate-with-subtree')
789
tree.add_reference(sub_tree)
790
self.build_tree(['tree1/subtree/file'])
792
tree.commit('Initial commit')
793
tree.bzrdir.destroy_workingtree()
794
repo = self.make_repository('repo', shared=True,
795
format='dirstate-with-subtree')
796
repo.set_make_working_trees(False)
797
tree.bzrdir.sprout('repo/tree2')
798
self.failUnlessExists('repo/tree2/subtree')
799
self.failIfExists('repo/tree2/subtree/file')
801
def make_foo_bar_baz(self):
802
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
803
bar = self.make_branch('foo/bar').bzrdir
804
baz = self.make_branch('baz').bzrdir
807
def test_find_bzrdirs(self):
808
foo, bar, baz = self.make_foo_bar_baz()
809
transport = get_transport(self.get_url())
810
self.assertEqualBzrdirs([baz, foo, bar],
811
bzrdir.BzrDir.find_bzrdirs(transport))
813
def make_fake_permission_denied_transport(self, transport, paths):
814
"""Create a transport that raises PermissionDenied for some paths."""
817
raise errors.PermissionDenied(path)
819
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
820
path_filter_server.start_server()
821
self.addCleanup(path_filter_server.stop_server)
822
path_filter_transport = pathfilter.PathFilteringTransport(
823
path_filter_server, '.')
824
return (path_filter_server, path_filter_transport)
826
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
827
"""Check that each branch url ends with the given suffix."""
828
for actual_bzrdir in actual_bzrdirs:
829
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
831
def test_find_bzrdirs_permission_denied(self):
832
foo, bar, baz = self.make_foo_bar_baz()
833
transport = get_transport(self.get_url())
834
path_filter_server, path_filter_transport = \
835
self.make_fake_permission_denied_transport(transport, ['foo'])
837
self.assertBranchUrlsEndWith('/baz/',
838
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
840
smart_transport = self.make_smart_server('.',
841
backing_server=path_filter_server)
842
self.assertBranchUrlsEndWith('/baz/',
843
bzrdir.BzrDir.find_bzrdirs(smart_transport))
845
def test_find_bzrdirs_list_current(self):
846
def list_current(transport):
847
return [s for s in transport.list_dir('') if s != 'baz']
849
foo, bar, baz = self.make_foo_bar_baz()
850
transport = get_transport(self.get_url())
851
self.assertEqualBzrdirs([foo, bar],
852
bzrdir.BzrDir.find_bzrdirs(transport,
853
list_current=list_current))
855
def test_find_bzrdirs_evaluate(self):
856
def evaluate(bzrdir):
858
repo = bzrdir.open_repository()
859
except NoRepositoryPresent:
860
return True, bzrdir.root_transport.base
862
return False, bzrdir.root_transport.base
864
foo, bar, baz = self.make_foo_bar_baz()
865
transport = get_transport(self.get_url())
866
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
867
list(bzrdir.BzrDir.find_bzrdirs(transport,
870
def assertEqualBzrdirs(self, first, second):
872
second = list(second)
873
self.assertEqual(len(first), len(second))
874
for x, y in zip(first, second):
875
self.assertEqual(x.root_transport.base, y.root_transport.base)
877
def test_find_branches(self):
878
root = self.make_repository('', shared=True)
879
foo, bar, baz = self.make_foo_bar_baz()
880
qux = self.make_bzrdir('foo/qux')
881
transport = get_transport(self.get_url())
882
branches = bzrdir.BzrDir.find_branches(transport)
883
self.assertEqual(baz.root_transport.base, branches[0].base)
884
self.assertEqual(foo.root_transport.base, branches[1].base)
885
self.assertEqual(bar.root_transport.base, branches[2].base)
887
# ensure this works without a top-level repo
888
branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
889
self.assertEqual(foo.root_transport.base, branches[0].base)
890
self.assertEqual(bar.root_transport.base, branches[1].base)
893
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
895
def test_find_bzrdirs_missing_repo(self):
896
transport = get_transport(self.get_url())
897
arepo = self.make_repository('arepo', shared=True)
898
abranch_url = arepo.user_url + '/abranch'
899
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
900
transport.delete_tree('arepo/.bzr')
901
self.assertRaises(errors.NoRepositoryPresent,
902
branch.Branch.open, abranch_url)
903
self.make_branch('baz')
904
for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
905
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
908
class TestMeta1DirFormat(TestCaseWithTransport):
909
"""Tests specific to the meta1 dir format."""
911
def test_right_base_dirs(self):
912
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
914
branch_base = t.clone('branch').base
915
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
916
self.assertEqual(branch_base,
917
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
918
repository_base = t.clone('repository').base
919
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
920
self.assertEqual(repository_base,
921
dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
922
checkout_base = t.clone('checkout').base
923
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
924
self.assertEqual(checkout_base,
925
dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
927
def test_meta1dir_uses_lockdir(self):
928
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
929
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
931
self.assertIsDirectory('branch-lock', t)
933
def test_comparison(self):
934
"""Equality and inequality behave properly.
936
Metadirs should compare equal iff they have the same repo, branch and
939
mydir = bzrdir.format_registry.make_bzrdir('knit')
940
self.assertEqual(mydir, mydir)
941
self.assertFalse(mydir != mydir)
942
otherdir = bzrdir.format_registry.make_bzrdir('knit')
943
self.assertEqual(otherdir, mydir)
944
self.assertFalse(otherdir != mydir)
945
otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
946
self.assertNotEqual(otherdir2, mydir)
947
self.assertFalse(otherdir2 == mydir)
949
def test_needs_conversion_different_working_tree(self):
950
# meta1dirs need an conversion if any element is not the default.
951
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
952
tree = self.make_branch_and_tree('tree', format='knit')
953
self.assertTrue(tree.bzrdir.needs_format_conversion(
956
def test_initialize_on_format_uses_smart_transport(self):
957
self.setup_smart_server_with_call_log()
958
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
959
transport = self.get_transport('target')
960
transport.ensure_base()
961
self.reset_smart_call_log()
962
instance = new_format.initialize_on_transport(transport)
963
self.assertIsInstance(instance, remote.RemoteBzrDir)
964
rpc_count = len(self.hpss_calls)
965
# This figure represent the amount of work to perform this use case. It
966
# is entirely ok to reduce this number if a test fails due to rpc_count
967
# being too low. If rpc_count increases, more network roundtrips have
968
# become necessary for this use case. Please do not adjust this number
969
# upwards without agreement from bzr's network support maintainers.
970
self.assertEqual(2, rpc_count)
973
class TestFormat5(TestCaseWithTransport):
974
"""Tests specific to the version 5 bzrdir format."""
976
def test_same_lockfiles_between_tree_repo_branch(self):
977
# this checks that only a single lockfiles instance is created
978
# for format 5 objects
979
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
980
def check_dir_components_use_same_lock(dir):
981
ctrl_1 = dir.open_repository().control_files
982
ctrl_2 = dir.open_branch().control_files
983
ctrl_3 = dir.open_workingtree()._control_files
984
self.assertTrue(ctrl_1 is ctrl_2)
985
self.assertTrue(ctrl_2 is ctrl_3)
986
check_dir_components_use_same_lock(dir)
987
# and if we open it normally.
988
dir = bzrdir.BzrDir.open(self.get_url())
989
check_dir_components_use_same_lock(dir)
991
def test_can_convert(self):
992
# format 5 dirs are convertable
993
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
994
self.assertTrue(dir.can_convert_format())
996
def test_needs_conversion(self):
997
# format 5 dirs need a conversion if they are not the default,
999
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
1000
# don't need to convert it to itself
1001
self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
1002
# do need to convert it to the current default
1003
self.assertTrue(dir.needs_format_conversion(
1004
bzrdir.BzrDirFormat.get_default_format()))
1007
class TestFormat6(TestCaseWithTransport):
1008
"""Tests specific to the version 6 bzrdir format."""
1010
def test_same_lockfiles_between_tree_repo_branch(self):
1011
# this checks that only a single lockfiles instance is created
1012
# for format 6 objects
1013
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1014
def check_dir_components_use_same_lock(dir):
1015
ctrl_1 = dir.open_repository().control_files
1016
ctrl_2 = dir.open_branch().control_files
1017
ctrl_3 = dir.open_workingtree()._control_files
1018
self.assertTrue(ctrl_1 is ctrl_2)
1019
self.assertTrue(ctrl_2 is ctrl_3)
1020
check_dir_components_use_same_lock(dir)
1021
# and if we open it normally.
1022
dir = bzrdir.BzrDir.open(self.get_url())
1023
check_dir_components_use_same_lock(dir)
1025
def test_can_convert(self):
1026
# format 6 dirs are convertable
1027
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1028
self.assertTrue(dir.can_convert_format())
1030
def test_needs_conversion(self):
1031
# format 6 dirs need an conversion if they are not the default.
1032
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1033
self.assertTrue(dir.needs_format_conversion(
1034
bzrdir.BzrDirFormat.get_default_format()))
1037
class NotBzrDir(bzrlib.bzrdir.BzrDir):
1038
"""A non .bzr based control directory."""
1040
def __init__(self, transport, format):
1041
self._format = format
1042
self.root_transport = transport
1043
self.transport = transport.clone('.not')
1046
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
1047
"""A test class representing any non-.bzr based disk format."""
1049
def initialize_on_transport(self, transport):
1050
"""Initialize a new .not dir in the base directory of a Transport."""
1051
transport.mkdir('.not')
1052
return self.open(transport)
1054
def open(self, transport):
1055
"""Open this directory."""
1056
return NotBzrDir(transport, self)
1059
def _known_formats(self):
1060
return set([NotBzrDirFormat()])
1063
class NotBzrDirProber(controldir.Prober):
1065
def probe_transport(self, transport):
1066
"""Our format is present if the transport ends in '.not/'."""
1067
if transport.has('.not'):
1068
return NotBzrDirFormat()
1071
class TestNotBzrDir(TestCaseWithTransport):
1072
"""Tests for using the bzrdir api with a non .bzr based disk format.
1074
If/when one of these is in the core, we can let the implementation tests
1078
def test_create_and_find_format(self):
1079
# create a .notbzr dir
1080
format = NotBzrDirFormat()
1081
dir = format.initialize(self.get_url())
1082
self.assertIsInstance(dir, NotBzrDir)
1084
controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1086
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1087
get_transport(self.get_url()))
1088
self.assertIsInstance(found, NotBzrDirFormat)
1090
controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1092
def test_included_in_known_formats(self):
1093
not_format = NotBzrDirFormat()
1094
bzrlib.controldir.ControlDirFormat.register_format(not_format)
1096
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1097
for format in formats:
1098
if isinstance(format, NotBzrDirFormat):
1100
self.fail("No NotBzrDirFormat in %s" % formats)
1102
bzrlib.controldir.ControlDirFormat.unregister_format(not_format)
1105
class NonLocalTests(TestCaseWithTransport):
1106
"""Tests for bzrdir static behaviour on non local paths."""
1109
super(NonLocalTests, self).setUp()
1110
self.vfs_transport_factory = memory.MemoryServer
1112
def test_create_branch_convenience(self):
1113
# outside a repo the default convenience output is a repo+branch_tree
1114
format = bzrdir.format_registry.make_bzrdir('knit')
1115
branch = bzrdir.BzrDir.create_branch_convenience(
1116
self.get_url('foo'), format=format)
1117
self.assertRaises(errors.NoWorkingTree,
1118
branch.bzrdir.open_workingtree)
1119
branch.bzrdir.open_repository()
1121
def test_create_branch_convenience_force_tree_not_local_fails(self):
1122
# outside a repo the default convenience output is a repo+branch_tree
1123
format = bzrdir.format_registry.make_bzrdir('knit')
1124
self.assertRaises(errors.NotLocalUrl,
1125
bzrdir.BzrDir.create_branch_convenience,
1126
self.get_url('foo'),
1127
force_new_tree=True,
1129
t = get_transport(self.get_url('.'))
1130
self.assertFalse(t.has('foo'))
1132
def test_clone(self):
1133
# clone into a nonlocal path works
1134
format = bzrdir.format_registry.make_bzrdir('knit')
1135
branch = bzrdir.BzrDir.create_branch_convenience('local',
1137
branch.bzrdir.open_workingtree()
1138
result = branch.bzrdir.clone(self.get_url('remote'))
1139
self.assertRaises(errors.NoWorkingTree,
1140
result.open_workingtree)
1141
result.open_branch()
1142
result.open_repository()
1144
def test_checkout_metadir(self):
1145
# checkout_metadir has reasonable working tree format even when no
1146
# working tree is present
1147
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1148
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1149
checkout_format = my_bzrdir.checkout_metadir()
1150
self.assertIsInstance(checkout_format.workingtree_format,
1151
workingtree.WorkingTreeFormat3)
1154
class TestHTTPRedirections(object):
1155
"""Test redirection between two http servers.
1157
This MUST be used by daughter classes that also inherit from
1158
TestCaseWithTwoWebservers.
1160
We can't inherit directly from TestCaseWithTwoWebservers or the
1161
test framework will try to create an instance which cannot
1162
run, its implementation being incomplete.
1165
def create_transport_readonly_server(self):
1166
# We don't set the http protocol version, relying on the default
1167
return http_utils.HTTPServerRedirecting()
1169
def create_transport_secondary_server(self):
1170
# We don't set the http protocol version, relying on the default
1171
return http_utils.HTTPServerRedirecting()
1174
super(TestHTTPRedirections, self).setUp()
1175
# The redirections will point to the new server
1176
self.new_server = self.get_readonly_server()
1177
# The requests to the old server will be redirected
1178
self.old_server = self.get_secondary_server()
1179
# Configure the redirections
1180
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1182
def test_loop(self):
1183
# Both servers redirect to each other creating a loop
1184
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1185
# Starting from either server should loop
1186
old_url = self._qualified_url(self.old_server.host,
1187
self.old_server.port)
1188
oldt = self._transport(old_url)
1189
self.assertRaises(errors.NotBranchError,
1190
bzrdir.BzrDir.open_from_transport, oldt)
1191
new_url = self._qualified_url(self.new_server.host,
1192
self.new_server.port)
1193
newt = self._transport(new_url)
1194
self.assertRaises(errors.NotBranchError,
1195
bzrdir.BzrDir.open_from_transport, newt)
1197
def test_qualifier_preserved(self):
1198
wt = self.make_branch_and_tree('branch')
1199
old_url = self._qualified_url(self.old_server.host,
1200
self.old_server.port)
1201
start = self._transport(old_url).clone('branch')
1202
bdir = bzrdir.BzrDir.open_from_transport(start)
1203
# Redirection should preserve the qualifier, hence the transport class
1205
self.assertIsInstance(bdir.root_transport, type(start))
1208
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1209
http_utils.TestCaseWithTwoWebservers):
1210
"""Tests redirections for urllib implementation"""
1212
_transport = HttpTransport_urllib
1214
def _qualified_url(self, host, port):
1215
result = 'http+urllib://%s:%s' % (host, port)
1216
self.permit_url(result)
1221
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1222
TestHTTPRedirections,
1223
http_utils.TestCaseWithTwoWebservers):
1224
"""Tests redirections for pycurl implementation"""
1226
def _qualified_url(self, host, port):
1227
result = 'http+pycurl://%s:%s' % (host, port)
1228
self.permit_url(result)
1232
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1233
http_utils.TestCaseWithTwoWebservers):
1234
"""Tests redirections for the nosmart decorator"""
1236
_transport = NoSmartTransportDecorator
1238
def _qualified_url(self, host, port):
1239
result = 'nosmart+http://%s:%s' % (host, port)
1240
self.permit_url(result)
1244
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1245
http_utils.TestCaseWithTwoWebservers):
1246
"""Tests redirections for readonly decoratror"""
1248
_transport = ReadonlyTransportDecorator
1250
def _qualified_url(self, host, port):
1251
result = 'readonly+http://%s:%s' % (host, port)
1252
self.permit_url(result)
1256
class TestDotBzrHidden(TestCaseWithTransport):
1259
if sys.platform == 'win32':
1260
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1263
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1264
stderr=subprocess.PIPE)
1265
out, err = f.communicate()
1266
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1268
return out.splitlines()
1270
def test_dot_bzr_hidden(self):
1271
if sys.platform == 'win32' and not win32utils.has_win32file:
1272
raise TestSkipped('unable to make file hidden without pywin32 library')
1273
b = bzrdir.BzrDir.create('.')
1274
self.build_tree(['a'])
1275
self.assertEquals(['a'], self.get_ls())
1277
def test_dot_bzr_hidden_with_url(self):
1278
if sys.platform == 'win32' and not win32utils.has_win32file:
1279
raise TestSkipped('unable to make file hidden without pywin32 library')
1280
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1281
self.build_tree(['a'])
1282
self.assertEquals(['a'], self.get_ls())
1285
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1286
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1288
def _open(self, transport):
1289
return _TestBzrDir(transport, self)
1292
class _TestBzrDir(bzrdir.BzrDirMeta1):
1293
"""Test BzrDir implementation for TestBzrDirSprout.
1295
When created a _TestBzrDir already has repository and a branch. The branch
1296
is a test double as well.
1299
def __init__(self, *args, **kwargs):
1300
super(_TestBzrDir, self).__init__(*args, **kwargs)
1301
self.test_branch = _TestBranch()
1302
self.test_branch.repository = self.create_repository()
1304
def open_branch(self, unsupported=False):
1305
return self.test_branch
1307
def cloning_metadir(self, require_stacking=False):
1308
return _TestBzrDirFormat()
1311
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1312
"""Test Branch format for TestBzrDirSprout."""
1315
class _TestBranch(bzrlib.branch.Branch):
1316
"""Test Branch implementation for TestBzrDirSprout."""
1318
def __init__(self, *args, **kwargs):
1319
self._format = _TestBranchFormat()
1320
super(_TestBranch, self).__init__(*args, **kwargs)
1324
def sprout(self, *args, **kwargs):
1325
self.calls.append('sprout')
1326
return _TestBranch()
1328
def copy_content_into(self, destination, revision_id=None):
1329
self.calls.append('copy_content_into')
1331
def get_parent(self):
1334
def set_parent(self, parent):
1335
self._parent = parent
1338
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1340
def test_sprout_uses_branch_sprout(self):
1341
"""BzrDir.sprout calls Branch.sprout.
1343
Usually, BzrDir.sprout should delegate to the branch's sprout method
1344
for part of the work. This allows the source branch to control the
1345
choice of format for the new branch.
1347
There are exceptions, but this tests avoids them:
1348
- if there's no branch in the source bzrdir,
1349
- or if the stacking has been requested and the format needs to be
1350
overridden to satisfy that.
1352
# Make an instrumented bzrdir.
1353
t = self.get_transport('source')
1355
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1356
# The instrumented bzrdir has a test_branch attribute that logs calls
1357
# made to the branch contained in that bzrdir. Initially the test
1358
# branch exists but no calls have been made to it.
1359
self.assertEqual([], source_bzrdir.test_branch.calls)
1362
target_url = self.get_url('target')
1363
result = source_bzrdir.sprout(target_url, recurse='no')
1365
# The bzrdir called the branch's sprout method.
1366
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1368
def test_sprout_parent(self):
1369
grandparent_tree = self.make_branch('grandparent')
1370
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1371
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1372
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1375
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1377
def test_pre_open_called(self):
1379
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1380
transport = self.get_transport('foo')
1381
url = transport.base
1382
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1383
self.assertEqual([transport.base], [t.base for t in calls])
1385
def test_pre_open_actual_exceptions_raised(self):
1387
def fail_once(transport):
1390
raise errors.BzrError("fail")
1391
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1392
transport = self.get_transport('foo')
1393
url = transport.base
1394
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1395
self.assertEqual('fail', err._preformatted_string)
1397
def test_post_repo_init(self):
1398
from bzrlib.bzrdir import RepoInitHookParams
1400
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1402
self.make_repository('foo')
1403
self.assertLength(1, calls)
1405
self.assertIsInstance(params, RepoInitHookParams)
1406
self.assertTrue(hasattr(params, 'bzrdir'))
1407
self.assertTrue(hasattr(params, 'repository'))
1409
def test_post_repo_init_hook_repr(self):
1411
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1412
lambda params: param_reprs.append(repr(params)), None)
1413
self.make_repository('foo')
1414
self.assertLength(1, param_reprs)
1415
param_repr = param_reprs[0]
1416
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1419
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1422
super(TestGenerateBackupName, self).setUp()
1423
self._transport = get_transport(self.get_url())
1424
bzrdir.BzrDir.create(self.get_url(),
1425
possible_transports=[self._transport])
1426
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1429
self.assertEqual("a.~1~", self._bzrdir.generate_backup_name("a"))
1431
def test_exiting(self):
1432
self._transport.put_bytes("a.~1~", "some content")
1433
self.assertEqual("a.~2~", self._bzrdir.generate_backup_name("a"))