67
70
old_format = bzrdir.BzrDirFormat.get_default_format()
68
71
# default is BzrDirFormat6
69
72
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
70
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
73
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
71
74
# creating a bzr dir should now create an instrumented dir.
73
76
result = bzrdir.BzrDir.create('memory:///')
74
77
self.failUnless(isinstance(result, SampleBzrDir))
76
bzrdir.BzrDirFormat._set_default_format(old_format)
79
controldir.ControlDirFormat._set_default_format(old_format)
77
80
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
80
83
class TestFormatRegistry(TestCase):
82
85
def make_format_registry(self):
83
my_format_registry = bzrdir.BzrDirFormatRegistry()
86
my_format_registry = controldir.ControlDirFormatRegistry()
84
87
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
85
88
'Pre-0.8 format. Slower and does not support checkouts or shared'
86
89
' repositories', deprecated=True)
87
90
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
88
91
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
89
my_format_registry.register_metadir('knit',
92
bzrdir.register_metadir(my_format_registry, 'knit',
90
93
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
91
94
'Format using knits',
93
96
my_format_registry.set_default('knit')
94
my_format_registry.register_metadir(
97
bzrdir.register_metadir(my_format_registry,
96
99
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
97
100
'Experimental successor to knit. Use at your own risk.',
98
101
branch_format='bzrlib.branch.BzrBranchFormat6',
99
102
experimental=True)
100
my_format_registry.register_metadir(
103
bzrdir.register_metadir(my_format_registry,
102
105
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
103
106
'Experimental successor to knit. Use at your own risk.',
172
175
bzrdir.format_registry.set_default_repository(old_default)
174
177
def test_aliases(self):
175
a_registry = bzrdir.BzrDirFormatRegistry()
178
a_registry = controldir.ControlDirFormatRegistry()
176
179
a_registry.register('weave', bzrdir.BzrDirFormat6,
177
180
'Pre-0.8 format. Slower and does not support checkouts or shared'
178
181
' repositories', deprecated=True)
807
810
self.assertEqualBzrdirs([baz, foo, bar],
808
811
bzrdir.BzrDir.find_bzrdirs(transport))
813
def make_fake_permission_denied_transport(self, transport, paths):
814
"""Create a transport that raises PermissionDenied for some paths."""
817
raise errors.PermissionDenied(path)
819
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
820
path_filter_server.start_server()
821
self.addCleanup(path_filter_server.stop_server)
822
path_filter_transport = pathfilter.PathFilteringTransport(
823
path_filter_server, '.')
824
return (path_filter_server, path_filter_transport)
826
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
827
"""Check that each branch url ends with the given suffix."""
828
for actual_bzrdir in actual_bzrdirs:
829
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
831
def test_find_bzrdirs_permission_denied(self):
832
foo, bar, baz = self.make_foo_bar_baz()
833
transport = get_transport(self.get_url())
834
path_filter_server, path_filter_transport = \
835
self.make_fake_permission_denied_transport(transport, ['foo'])
837
self.assertBranchUrlsEndWith('/baz/',
838
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
840
smart_transport = self.make_smart_server('.',
841
backing_server=path_filter_server)
842
self.assertBranchUrlsEndWith('/baz/',
843
bzrdir.BzrDir.find_bzrdirs(smart_transport))
810
845
def test_find_bzrdirs_list_current(self):
811
846
def list_current(transport):
812
847
return [s for s in transport.list_dir('') if s != 'baz']
856
890
self.assertEqual(bar.root_transport.base, branches[1].base)
893
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
895
def test_find_bzrdirs_missing_repo(self):
896
transport = get_transport(self.get_url())
897
arepo = self.make_repository('arepo', shared=True)
898
abranch_url = arepo.user_url + '/abranch'
899
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
900
transport.delete_tree('arepo/.bzr')
901
self.assertRaises(errors.NoRepositoryPresent,
902
branch.Branch.open, abranch_url)
903
self.make_branch('baz')
904
for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
905
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
859
908
class TestMeta1DirFormat(TestCaseWithTransport):
860
909
"""Tests specific to the meta1 dir format."""
1010
1059
def _known_formats(self):
1011
1060
return set([NotBzrDirFormat()])
1063
class NotBzrDirProber(controldir.Prober):
1014
1065
def probe_transport(self, transport):
1015
1066
"""Our format is present if the transport ends in '.not/'."""
1016
1067
if transport.has('.not'):
1030
1081
dir = format.initialize(self.get_url())
1031
1082
self.assertIsInstance(dir, NotBzrDir)
1032
1083
# now probe for it.
1033
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1084
controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1035
1086
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1036
1087
get_transport(self.get_url()))
1037
1088
self.assertIsInstance(found, NotBzrDirFormat)
1039
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1090
controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1041
1092
def test_included_in_known_formats(self):
1042
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1093
not_format = NotBzrDirFormat()
1094
bzrlib.controldir.ControlDirFormat.register_format(not_format)
1044
1096
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1045
1097
for format in formats:
1113
1165
def create_transport_readonly_server(self):
1166
# We don't set the http protocol version, relying on the default
1114
1167
return http_utils.HTTPServerRedirecting()
1116
1169
def create_transport_secondary_server(self):
1170
# We don't set the http protocol version, relying on the default
1117
1171
return http_utils.HTTPServerRedirecting()
1119
1173
def setUp(self):
1351
1405
self.assertIsInstance(params, RepoInitHookParams)
1352
1406
self.assertTrue(hasattr(params, 'bzrdir'))
1353
1407
self.assertTrue(hasattr(params, 'repository'))
1409
def test_post_repo_init_hook_repr(self):
1411
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1412
lambda params: param_reprs.append(repr(params)), None)
1413
self.make_repository('foo')
1414
self.assertLength(1, param_reprs)
1415
param_repr = param_reprs[0]
1416
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1419
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1422
super(TestGenerateBackupName, self).setUp()
1423
self._transport = get_transport(self.get_url())
1424
bzrdir.BzrDir.create(self.get_url(),
1425
possible_transports=[self._transport])
1426
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1429
self.assertEqual("a.~1~", self._bzrdir.generate_backup_name("a"))
1431
def test_exiting(self):
1432
self._transport.put_bytes("a.~1~", "some content")
1433
self.assertEqual("a.~2~", self._bzrdir.generate_backup_name("a"))