55
52
from bzrlib.tests.test_http import TestWithTransport_pycurl
56
from bzrlib.transport import (
53
from bzrlib.transport import get_transport
61
54
from bzrlib.transport.http._urllib import HttpTransport_urllib
55
from bzrlib.transport.memory import MemoryServer
62
56
from bzrlib.transport.nosmart import NoSmartTransportDecorator
63
57
from bzrlib.transport.readonly import ReadonlyTransportDecorator
64
58
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
70
64
old_format = bzrdir.BzrDirFormat.get_default_format()
71
65
# default is BzrDirFormat6
72
66
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
73
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
67
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
74
68
# creating a bzr dir should now create an instrumented dir.
76
70
result = bzrdir.BzrDir.create('memory:///')
77
71
self.failUnless(isinstance(result, SampleBzrDir))
79
controldir.ControlDirFormat._set_default_format(old_format)
73
bzrdir.BzrDirFormat._set_default_format(old_format)
80
74
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
83
77
class TestFormatRegistry(TestCase):
85
79
def make_format_registry(self):
86
my_format_registry = controldir.ControlDirFormatRegistry()
80
my_format_registry = bzrdir.BzrDirFormatRegistry()
87
81
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
88
82
'Pre-0.8 format. Slower and does not support checkouts or shared'
89
83
' repositories', deprecated=True)
90
84
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
91
85
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
92
bzrdir.register_metadir(my_format_registry, 'knit',
86
my_format_registry.register_metadir('knit',
93
87
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
94
88
'Format using knits',
96
90
my_format_registry.set_default('knit')
97
bzrdir.register_metadir(my_format_registry,
91
my_format_registry.register_metadir(
99
93
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
100
94
'Experimental successor to knit. Use at your own risk.',
101
95
branch_format='bzrlib.branch.BzrBranchFormat6',
102
96
experimental=True)
103
bzrdir.register_metadir(my_format_registry,
97
my_format_registry.register_metadir(
105
99
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
106
100
'Experimental successor to knit. Use at your own risk.',
151
145
new = topics.get_detail('current-formats')
152
146
rest = topics.get_detail('other-formats')
153
147
experimental, deprecated = rest.split('Deprecated formats')
154
self.assertContainsRe(new, 'formats-help')
148
self.assertContainsRe(new, 'bzr help formats')
155
149
self.assertContainsRe(new,
156
150
':knit:\n \(native\) \(default\) Format using knits\n')
157
151
self.assertContainsRe(experimental,
175
169
bzrdir.format_registry.set_default_repository(old_default)
177
171
def test_aliases(self):
178
a_registry = controldir.ControlDirFormatRegistry()
172
a_registry = bzrdir.BzrDirFormatRegistry()
179
173
a_registry.register('weave', bzrdir.BzrDirFormat6,
180
174
'Pre-0.8 format. Slower and does not support checkouts or shared'
181
175
' repositories', deprecated=True)
210
204
"""See BzrDir.open_repository."""
211
205
return SampleRepository(self)
213
def create_branch(self, name=None):
207
def create_branch(self):
214
208
"""See BzrDir.create_branch."""
216
raise NoColocatedBranchSupport(self)
217
209
return SampleBranch(self)
219
211
def create_workingtree(self):
363
355
def test_create_branch_convenience_root(self):
364
356
"""Creating a branch at the root of a fs should work."""
365
self.vfs_transport_factory = memory.MemoryServer
357
self.vfs_transport_factory = MemoryServer
366
358
# outside a repo the default convenience output is a repo+branch_tree
367
359
format = bzrdir.format_registry.make_bzrdir('knit')
368
360
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
474
466
# Make stackable source branch with an unstackable repo format.
475
467
source_bzrdir = self.make_bzrdir('source')
476
468
pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
477
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
469
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
479
470
# Make a directory with a default stacking policy
480
471
parent_bzrdir = self.make_bzrdir('parent')
481
472
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
576
567
super(ChrootedTests, self).setUp()
577
if not self.vfs_transport_factory == memory.MemoryServer:
568
if not self.vfs_transport_factory == MemoryServer:
578
569
self.transport_readonly_server = http_server.HttpServer
580
571
def local_branch_path(self, branch):
810
801
self.assertEqualBzrdirs([baz, foo, bar],
811
802
bzrdir.BzrDir.find_bzrdirs(transport))
813
def make_fake_permission_denied_transport(self, transport, paths):
814
"""Create a transport that raises PermissionDenied for some paths."""
817
raise errors.PermissionDenied(path)
819
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
820
path_filter_server.start_server()
821
self.addCleanup(path_filter_server.stop_server)
822
path_filter_transport = pathfilter.PathFilteringTransport(
823
path_filter_server, '.')
824
return (path_filter_server, path_filter_transport)
826
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
827
"""Check that each branch url ends with the given suffix."""
828
for actual_bzrdir in actual_bzrdirs:
829
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
831
def test_find_bzrdirs_permission_denied(self):
832
foo, bar, baz = self.make_foo_bar_baz()
833
transport = get_transport(self.get_url())
834
path_filter_server, path_filter_transport = \
835
self.make_fake_permission_denied_transport(transport, ['foo'])
837
self.assertBranchUrlsEndWith('/baz/',
838
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
840
smart_transport = self.make_smart_server('.',
841
backing_server=path_filter_server)
842
self.assertBranchUrlsEndWith('/baz/',
843
bzrdir.BzrDir.find_bzrdirs(smart_transport))
845
804
def test_find_bzrdirs_list_current(self):
846
805
def list_current(transport):
847
806
return [s for s in transport.list_dir('') if s != 'baz']
890
850
self.assertEqual(bar.root_transport.base, branches[1].base)
893
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
895
def test_find_bzrdirs_missing_repo(self):
896
transport = get_transport(self.get_url())
897
arepo = self.make_repository('arepo', shared=True)
898
abranch_url = arepo.user_url + '/abranch'
899
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
900
transport.delete_tree('arepo/.bzr')
901
self.assertRaises(errors.NoRepositoryPresent,
902
branch.Branch.open, abranch_url)
903
self.make_branch('baz')
904
for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
905
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
908
853
class TestMeta1DirFormat(TestCaseWithTransport):
909
854
"""Tests specific to the meta1 dir format."""
1059
1004
def _known_formats(self):
1060
1005
return set([NotBzrDirFormat()])
1063
class NotBzrDirProber(controldir.Prober):
1065
1008
def probe_transport(self, transport):
1066
1009
"""Our format is present if the transport ends in '.not/'."""
1067
1010
if transport.has('.not'):
1081
1024
dir = format.initialize(self.get_url())
1082
1025
self.assertIsInstance(dir, NotBzrDir)
1083
1026
# now probe for it.
1084
controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1027
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1086
1029
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1087
1030
get_transport(self.get_url()))
1088
1031
self.assertIsInstance(found, NotBzrDirFormat)
1090
controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1033
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1092
1035
def test_included_in_known_formats(self):
1093
not_format = NotBzrDirFormat()
1094
bzrlib.controldir.ControlDirFormat.register_format(not_format)
1036
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1096
1038
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1097
1039
for format in formats:
1108
1050
def setUp(self):
1109
1051
super(NonLocalTests, self).setUp()
1110
self.vfs_transport_factory = memory.MemoryServer
1052
self.vfs_transport_factory = MemoryServer
1112
1054
def test_create_branch_convenience(self):
1113
1055
# outside a repo the default convenience output is a repo+branch_tree
1165
1107
def create_transport_readonly_server(self):
1166
# We don't set the http protocol version, relying on the default
1167
1108
return http_utils.HTTPServerRedirecting()
1169
1110
def create_transport_secondary_server(self):
1170
# We don't set the http protocol version, relying on the default
1171
1111
return http_utils.HTTPServerRedirecting()
1173
1113
def setUp(self):
1393
1333
url = transport.base
1394
1334
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1395
1335
self.assertEqual('fail', err._preformatted_string)
1397
def test_post_repo_init(self):
1398
from bzrlib.bzrdir import RepoInitHookParams
1400
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1402
self.make_repository('foo')
1403
self.assertLength(1, calls)
1405
self.assertIsInstance(params, RepoInitHookParams)
1406
self.assertTrue(hasattr(params, 'bzrdir'))
1407
self.assertTrue(hasattr(params, 'repository'))
1409
def test_post_repo_init_hook_repr(self):
1411
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1412
lambda params: param_reprs.append(repr(params)), None)
1413
self.make_repository('foo')
1414
self.assertLength(1, param_reprs)
1415
param_repr = param_reprs[0]
1416
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1419
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1422
super(TestGenerateBackupName, self).setUp()
1423
self._transport = get_transport(self.get_url())
1424
bzrdir.BzrDir.create(self.get_url(),
1425
possible_transports=[self._transport])
1426
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1429
self.assertEqual("a.~1~", self._bzrdir.generate_backup_name("a"))
1431
def test_exiting(self):
1432
self._transport.put_bytes("a.~1~", "some content")
1433
self.assertEqual("a.~2~", self._bzrdir.generate_backup_name("a"))