52
56
from bzrlib.tests.test_http import TestWithTransport_pycurl
53
from bzrlib.transport import get_transport
57
from bzrlib.transport import (
54
62
from bzrlib.transport.http._urllib import HttpTransport_urllib
55
from bzrlib.transport.memory import MemoryServer
56
63
from bzrlib.transport.nosmart import NoSmartTransportDecorator
57
64
from bzrlib.transport.readonly import ReadonlyTransportDecorator
58
65
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
64
71
old_format = bzrdir.BzrDirFormat.get_default_format()
65
72
# default is BzrDirFormat6
66
73
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
67
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
74
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
68
75
# creating a bzr dir should now create an instrumented dir.
70
77
result = bzrdir.BzrDir.create('memory:///')
71
78
self.failUnless(isinstance(result, SampleBzrDir))
73
bzrdir.BzrDirFormat._set_default_format(old_format)
80
controldir.ControlDirFormat._set_default_format(old_format)
74
81
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
77
84
class TestFormatRegistry(TestCase):
79
86
def make_format_registry(self):
80
my_format_registry = bzrdir.BzrDirFormatRegistry()
87
my_format_registry = controldir.ControlDirFormatRegistry()
81
88
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
82
89
'Pre-0.8 format. Slower and does not support checkouts or shared'
83
90
' repositories', deprecated=True)
84
91
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
85
92
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
86
my_format_registry.register_metadir('knit',
93
bzrdir.register_metadir(my_format_registry, 'knit',
87
94
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
88
95
'Format using knits',
90
97
my_format_registry.set_default('knit')
91
my_format_registry.register_metadir(
98
bzrdir.register_metadir(my_format_registry,
93
100
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
94
101
'Experimental successor to knit. Use at your own risk.',
95
102
branch_format='bzrlib.branch.BzrBranchFormat6',
96
103
experimental=True)
97
my_format_registry.register_metadir(
104
bzrdir.register_metadir(my_format_registry,
99
106
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
100
107
'Experimental successor to knit. Use at your own risk.',
169
176
bzrdir.format_registry.set_default_repository(old_default)
171
178
def test_aliases(self):
172
a_registry = bzrdir.BzrDirFormatRegistry()
179
a_registry = controldir.ControlDirFormatRegistry()
173
180
a_registry.register('weave', bzrdir.BzrDirFormat6,
174
181
'Pre-0.8 format. Slower and does not support checkouts or shared'
175
182
' repositories', deprecated=True)
204
211
"""See BzrDir.open_repository."""
205
212
return SampleRepository(self)
207
def create_branch(self):
214
def create_branch(self, name=None):
208
215
"""See BzrDir.create_branch."""
217
raise NoColocatedBranchSupport(self)
209
218
return SampleBranch(self)
211
220
def create_workingtree(self):
355
364
def test_create_branch_convenience_root(self):
356
365
"""Creating a branch at the root of a fs should work."""
357
self.vfs_transport_factory = MemoryServer
366
self.vfs_transport_factory = memory.MemoryServer
358
367
# outside a repo the default convenience output is a repo+branch_tree
359
368
format = bzrdir.format_registry.make_bzrdir('knit')
360
369
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
466
475
# Make stackable source branch with an unstackable repo format.
467
476
source_bzrdir = self.make_bzrdir('source')
468
477
pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
469
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
478
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
470
480
# Make a directory with a default stacking policy
471
481
parent_bzrdir = self.make_bzrdir('parent')
472
482
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
567
577
super(ChrootedTests, self).setUp()
568
if not self.vfs_transport_factory == MemoryServer:
578
if not self.vfs_transport_factory == memory.MemoryServer:
569
579
self.transport_readonly_server = http_server.HttpServer
571
581
def local_branch_path(self, branch):
781
791
self.build_tree(['tree1/subtree/file'])
782
792
sub_tree.add('file')
783
793
tree.commit('Initial commit')
794
# The following line force the orhaning to reveal bug #634470
795
tree.branch.get_config().set_user_option(
796
'bzr.transform.orphan_policy', 'move')
784
797
tree.bzrdir.destroy_workingtree()
798
# FIXME: subtree/.bzr is left here which allows the test to pass (or
799
# fail :-( ) -- vila 20100909
785
800
repo = self.make_repository('repo', shared=True,
786
801
format='dirstate-with-subtree')
787
802
repo.set_make_working_trees(False)
788
tree.bzrdir.sprout('repo/tree2')
789
self.failUnlessExists('repo/tree2/subtree')
790
self.failIfExists('repo/tree2/subtree/file')
803
# FIXME: we just deleted the workingtree and now we want to use it ????
804
# At a minimum, we should use tree.branch below (but this fails too
805
# currently) or stop calling this test 'treeless'. Specifically, I've
806
# turn the line below into an assertRaises when 'subtree/.bzr' is
807
# orphaned and sprout tries to access the branch there (which is left
808
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
809
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
810
# #634470. -- vila 20100909
811
self.assertRaises(errors.NotBranchError,
812
tree.bzrdir.sprout, 'repo/tree2')
813
# self.failUnlessExists('repo/tree2/subtree')
814
# self.failIfExists('repo/tree2/subtree/file')
792
816
def make_foo_bar_baz(self):
793
817
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
801
825
self.assertEqualBzrdirs([baz, foo, bar],
802
826
bzrdir.BzrDir.find_bzrdirs(transport))
828
def make_fake_permission_denied_transport(self, transport, paths):
829
"""Create a transport that raises PermissionDenied for some paths."""
832
raise errors.PermissionDenied(path)
834
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
835
path_filter_server.start_server()
836
self.addCleanup(path_filter_server.stop_server)
837
path_filter_transport = pathfilter.PathFilteringTransport(
838
path_filter_server, '.')
839
return (path_filter_server, path_filter_transport)
841
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
842
"""Check that each branch url ends with the given suffix."""
843
for actual_bzrdir in actual_bzrdirs:
844
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
846
def test_find_bzrdirs_permission_denied(self):
847
foo, bar, baz = self.make_foo_bar_baz()
848
transport = get_transport(self.get_url())
849
path_filter_server, path_filter_transport = \
850
self.make_fake_permission_denied_transport(transport, ['foo'])
852
self.assertBranchUrlsEndWith('/baz/',
853
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
855
smart_transport = self.make_smart_server('.',
856
backing_server=path_filter_server)
857
self.assertBranchUrlsEndWith('/baz/',
858
bzrdir.BzrDir.find_bzrdirs(smart_transport))
804
860
def test_find_bzrdirs_list_current(self):
805
861
def list_current(transport):
806
862
return [s for s in transport.list_dir('') if s != 'baz']
850
905
self.assertEqual(bar.root_transport.base, branches[1].base)
908
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
910
def test_find_bzrdirs_missing_repo(self):
911
transport = get_transport(self.get_url())
912
arepo = self.make_repository('arepo', shared=True)
913
abranch_url = arepo.user_url + '/abranch'
914
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
915
transport.delete_tree('arepo/.bzr')
916
self.assertRaises(errors.NoRepositoryPresent,
917
branch.Branch.open, abranch_url)
918
self.make_branch('baz')
919
for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
920
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
853
923
class TestMeta1DirFormat(TestCaseWithTransport):
854
924
"""Tests specific to the meta1 dir format."""
1004
1074
def _known_formats(self):
1005
1075
return set([NotBzrDirFormat()])
1078
class NotBzrDirProber(controldir.Prober):
1008
1080
def probe_transport(self, transport):
1009
1081
"""Our format is present if the transport ends in '.not/'."""
1010
1082
if transport.has('.not'):
1024
1096
dir = format.initialize(self.get_url())
1025
1097
self.assertIsInstance(dir, NotBzrDir)
1026
1098
# now probe for it.
1027
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1099
controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1029
1101
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1030
1102
get_transport(self.get_url()))
1031
1103
self.assertIsInstance(found, NotBzrDirFormat)
1033
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1105
controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1035
1107
def test_included_in_known_formats(self):
1036
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1108
not_format = NotBzrDirFormat()
1109
bzrlib.controldir.ControlDirFormat.register_format(not_format)
1038
1111
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1039
1112
for format in formats:
1050
1123
def setUp(self):
1051
1124
super(NonLocalTests, self).setUp()
1052
self.vfs_transport_factory = MemoryServer
1125
self.vfs_transport_factory = memory.MemoryServer
1054
1127
def test_create_branch_convenience(self):
1055
1128
# outside a repo the default convenience output is a repo+branch_tree
1107
1180
def create_transport_readonly_server(self):
1181
# We don't set the http protocol version, relying on the default
1108
1182
return http_utils.HTTPServerRedirecting()
1110
1184
def create_transport_secondary_server(self):
1185
# We don't set the http protocol version, relying on the default
1111
1186
return http_utils.HTTPServerRedirecting()
1113
1188
def setUp(self):
1333
1408
url = transport.base
1334
1409
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1335
1410
self.assertEqual('fail', err._preformatted_string)
1412
def test_post_repo_init(self):
1413
from bzrlib.bzrdir import RepoInitHookParams
1415
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1417
self.make_repository('foo')
1418
self.assertLength(1, calls)
1420
self.assertIsInstance(params, RepoInitHookParams)
1421
self.assertTrue(hasattr(params, 'bzrdir'))
1422
self.assertTrue(hasattr(params, 'repository'))
1424
def test_post_repo_init_hook_repr(self):
1426
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1427
lambda params: param_reprs.append(repr(params)), None)
1428
self.make_repository('foo')
1429
self.assertLength(1, param_reprs)
1430
param_repr = param_reprs[0]
1431
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1434
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1435
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1436
# moved to per_bzrdir or per_transport for better coverage ?
1440
super(TestGenerateBackupName, self).setUp()
1441
self._transport = get_transport(self.get_url())
1442
bzrdir.BzrDir.create(self.get_url(),
1443
possible_transports=[self._transport])
1444
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1446
def test_deprecated_generate_backup_name(self):
1447
res = self.applyDeprecated(
1448
symbol_versioning.deprecated_in((2, 3, 0)),
1449
self._bzrdir.generate_backup_name, 'whatever')
1452
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1454
def test_exiting(self):
1455
self._transport.put_bytes("a.~1~", "some content")
1456
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))