62
57
from bzrlib.tests.test_http import TestWithTransport_pycurl
63
58
from bzrlib.transport import (
67
63
from bzrlib.transport.http._urllib import HttpTransport_urllib
68
64
from bzrlib.transport.nosmart import NoSmartTransportDecorator
69
65
from bzrlib.transport.readonly import ReadonlyTransportDecorator
70
from bzrlib.repofmt import knitrepo, knitpack_repo
66
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
73
69
class TestDefaultFormat(TestCase):
75
71
def test_get_set_default_format(self):
76
72
old_format = bzrdir.BzrDirFormat.get_default_format()
77
# default is BzrDirMetaFormat1
78
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
73
# default is BzrDirFormat6
74
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
79
75
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
80
76
# creating a bzr dir should now create an instrumented dir.
82
78
result = bzrdir.BzrDir.create('memory:///')
83
self.assertIsInstance(result, SampleBzrDir)
79
self.failUnless(isinstance(result, SampleBzrDir))
85
81
controldir.ControlDirFormat._set_default_format(old_format)
86
82
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
89
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
90
"""A deprecated bzr dir format."""
93
85
class TestFormatRegistry(TestCase):
95
87
def make_format_registry(self):
96
88
my_format_registry = controldir.ControlDirFormatRegistry()
97
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
98
'Some format. Slower and unawesome and deprecated.',
100
my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
101
'DeprecatedBzrDirFormat', 'Format registered lazily',
89
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
90
'Pre-0.8 format. Slower and does not support checkouts or shared'
91
' repositories', deprecated=True)
92
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
93
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
103
94
bzrdir.register_metadir(my_format_registry, 'knit',
104
95
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
105
96
'Format using knits',
116
107
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
117
108
'Experimental successor to knit. Use at your own risk.',
118
109
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
119
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
120
'Old format. Slower and does not support things. ', hidden=True)
121
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
122
'DeprecatedBzrDirFormat', 'Format registered lazily',
123
deprecated=True, hidden=True)
110
my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
111
'Pre-0.8 format. Slower and does not support checkouts or shared'
112
' repositories', hidden=True)
113
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
114
'BzrDirFormat6', 'Format registered lazily', deprecated=True,
124
116
return my_format_registry
126
118
def test_format_registry(self):
127
119
my_format_registry = self.make_format_registry()
128
120
my_bzrdir = my_format_registry.make_bzrdir('lazy')
129
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
130
my_bzrdir = my_format_registry.make_bzrdir('deprecated')
131
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
121
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
122
my_bzrdir = my_format_registry.make_bzrdir('weave')
123
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
132
124
my_bzrdir = my_format_registry.make_bzrdir('default')
133
125
self.assertIsInstance(my_bzrdir.repository_format,
134
126
knitrepo.RepositoryFormatKnit1)
178
171
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
179
172
bzrdir.format_registry.get('default'))
181
repository.format_registry.get_default().__class__,
174
repository.RepositoryFormat.get_default_format().__class__,
182
175
knitrepo.RepositoryFormatKnit3)
184
177
bzrdir.format_registry.set_default_repository(old_default)
186
179
def test_aliases(self):
187
180
a_registry = controldir.ControlDirFormatRegistry()
188
a_registry.register('deprecated', DeprecatedBzrDirFormat,
189
'Old format. Slower and does not support stuff',
191
a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
192
'Old format. Slower and does not support stuff',
193
deprecated=True, alias=True)
194
self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
181
a_registry.register('weave', bzrdir.BzrDirFormat6,
182
'Pre-0.8 format. Slower and does not support checkouts or shared'
183
' repositories', deprecated=True)
184
a_registry.register('weavealias', bzrdir.BzrDirFormat6,
185
'Pre-0.8 format. Slower and does not support checkouts or shared'
186
' repositories', deprecated=True, alias=True)
187
self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
197
190
class SampleBranch(bzrlib.branch.Branch):
254
247
return "opened branch."
257
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
260
def get_format_string():
261
return "Test format 1"
264
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
267
def get_format_string():
268
return "Test format 2"
271
250
class TestBzrDirFormat(TestCaseWithTransport):
272
251
"""Tests for the BzrDirFormat facility."""
274
253
def test_find_format(self):
275
254
# is the right format object found for a branch?
276
255
# create a branch with a few known format objects.
277
bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
279
self.addCleanup(bzrdir.BzrProber.formats.remove,
280
BzrDirFormatTest1.get_format_string())
281
bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
283
self.addCleanup(bzrdir.BzrProber.formats.remove,
284
BzrDirFormatTest2.get_format_string())
285
t = self.get_transport()
256
# this is not quite the same as
257
t = get_transport(self.get_url())
286
258
self.build_tree(["foo/", "bar/"], transport=t)
287
259
def check_format(format, url):
288
260
format.initialize(url)
289
t = _mod_transport.get_transport_from_path(url)
261
t = get_transport(url)
290
262
found_format = bzrdir.BzrDirFormat.find_format(t)
291
self.assertIsInstance(found_format, format.__class__)
292
check_format(BzrDirFormatTest1(), "foo")
293
check_format(BzrDirFormatTest2(), "bar")
263
self.failUnless(isinstance(found_format, format.__class__))
264
check_format(bzrdir.BzrDirFormat5(), "foo")
265
check_format(bzrdir.BzrDirFormat6(), "bar")
295
267
def test_find_format_nothing_there(self):
296
268
self.assertRaises(NotBranchError,
297
269
bzrdir.BzrDirFormat.find_format,
298
_mod_transport.get_transport_from_path('.'))
300
272
def test_find_format_unknown_format(self):
301
t = self.get_transport()
273
t = get_transport(self.get_url())
303
275
t.put_bytes('.bzr/branch-format', '')
304
276
self.assertRaises(UnknownFormatError,
305
277
bzrdir.BzrDirFormat.find_format,
306
_mod_transport.get_transport_from_path('.'))
308
280
def test_register_unregister_format(self):
309
281
format = SampleBzrDirFormat()
312
284
format.initialize(url)
313
285
# register a format for it.
314
bzrdir.BzrProber.formats.register(format.get_format_string(), format)
286
bzrdir.BzrDirFormat.register_format(format)
315
287
# which bzrdir.Open will refuse (not supported)
316
288
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
317
289
# which bzrdir.open_containing will refuse (not supported)
318
290
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
319
291
# but open_downlevel will work
320
t = _mod_transport.get_transport_from_url(url)
292
t = get_transport(url)
321
293
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
322
294
# unregister the format
323
bzrdir.BzrProber.formats.remove(format.get_format_string())
295
bzrdir.BzrDirFormat.unregister_format(format)
324
296
# now open_downlevel should fail too.
325
297
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
709
681
self.assertEqual(relpath, 'baz')
711
683
def test_open_containing_from_transport(self):
712
self.assertRaises(NotBranchError,
713
bzrdir.BzrDir.open_containing_from_transport,
714
_mod_transport.get_transport_from_url(self.get_readonly_url('')))
715
self.assertRaises(NotBranchError,
716
bzrdir.BzrDir.open_containing_from_transport,
717
_mod_transport.get_transport_from_url(
718
self.get_readonly_url('g/p/q')))
684
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
685
get_transport(self.get_readonly_url('')))
686
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
687
get_transport(self.get_readonly_url('g/p/q')))
719
688
control = bzrdir.BzrDir.create(self.get_url())
720
689
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
721
_mod_transport.get_transport_from_url(
722
self.get_readonly_url('')))
690
get_transport(self.get_readonly_url('')))
723
691
self.assertEqual('', relpath)
724
692
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
725
_mod_transport.get_transport_from_url(
726
self.get_readonly_url('g/p/q')))
693
get_transport(self.get_readonly_url('g/p/q')))
727
694
self.assertEqual('g/p/q', relpath)
729
696
def test_open_containing_tree_or_branch(self):
773
740
# transport pointing at bzrdir should give a bzrdir with root transport
774
741
# set to the given transport
775
742
control = bzrdir.BzrDir.create(self.get_url())
776
t = self.get_transport()
777
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
778
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
743
transport = get_transport(self.get_url())
744
opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
745
self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
779
746
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
781
748
def test_open_from_transport_no_bzrdir(self):
782
t = self.get_transport()
783
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
749
transport = get_transport(self.get_url())
750
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
785
753
def test_open_from_transport_bzrdir_in_parent(self):
786
754
control = bzrdir.BzrDir.create(self.get_url())
787
t = self.get_transport()
789
t = t.clone('subdir')
790
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
755
transport = get_transport(self.get_url())
756
transport.mkdir('subdir')
757
transport = transport.clone('subdir')
758
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
792
761
def test_sprout_recursive(self):
793
762
tree = self.make_branch_and_tree('tree1',
893
863
return [s for s in transport.list_dir('') if s != 'baz']
895
865
foo, bar, baz = self.make_foo_bar_baz()
896
t = self.get_transport()
897
self.assertEqualBzrdirs(
899
bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
866
transport = get_transport(self.get_url())
867
self.assertEqualBzrdirs([foo, bar],
868
bzrdir.BzrDir.find_bzrdirs(transport,
869
list_current=list_current))
901
871
def test_find_bzrdirs_evaluate(self):
902
872
def evaluate(bzrdir):
904
874
repo = bzrdir.open_repository()
905
except errors.NoRepositoryPresent:
875
except NoRepositoryPresent:
906
876
return True, bzrdir.root_transport.base
908
878
return False, bzrdir.root_transport.base
910
880
foo, bar, baz = self.make_foo_bar_baz()
911
t = self.get_transport()
881
transport = get_transport(self.get_url())
912
882
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
913
list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
883
list(bzrdir.BzrDir.find_bzrdirs(transport,
915
886
def assertEqualBzrdirs(self, first, second):
916
887
first = list(first)
923
894
root = self.make_repository('', shared=True)
924
895
foo, bar, baz = self.make_foo_bar_baz()
925
896
qux = self.make_bzrdir('foo/qux')
926
t = self.get_transport()
927
branches = bzrdir.BzrDir.find_branches(t)
897
transport = get_transport(self.get_url())
898
branches = bzrdir.BzrDir.find_branches(transport)
928
899
self.assertEqual(baz.root_transport.base, branches[0].base)
929
900
self.assertEqual(foo.root_transport.base, branches[1].base)
930
901
self.assertEqual(bar.root_transport.base, branches[2].base)
932
903
# ensure this works without a top-level repo
933
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
904
branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
934
905
self.assertEqual(foo.root_transport.base, branches[0].base)
935
906
self.assertEqual(bar.root_transport.base, branches[1].base)
962
933
dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
963
934
repository_base = t.clone('repository').base
964
935
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
965
repository_format = repository.format_registry.get_default()
966
936
self.assertEqual(repository_base,
967
dir.get_repository_transport(repository_format).base)
937
dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
968
938
checkout_base = t.clone('checkout').base
969
939
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
970
940
self.assertEqual(checkout_base,
971
dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
941
dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
973
943
def test_meta1dir_uses_lockdir(self):
974
944
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
1016
986
self.assertEqual(2, rpc_count)
989
class TestFormat5(TestCaseWithTransport):
990
"""Tests specific to the version 5 bzrdir format."""
992
def test_same_lockfiles_between_tree_repo_branch(self):
993
# this checks that only a single lockfiles instance is created
994
# for format 5 objects
995
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
996
def check_dir_components_use_same_lock(dir):
997
ctrl_1 = dir.open_repository().control_files
998
ctrl_2 = dir.open_branch().control_files
999
ctrl_3 = dir.open_workingtree()._control_files
1000
self.assertTrue(ctrl_1 is ctrl_2)
1001
self.assertTrue(ctrl_2 is ctrl_3)
1002
check_dir_components_use_same_lock(dir)
1003
# and if we open it normally.
1004
dir = bzrdir.BzrDir.open(self.get_url())
1005
check_dir_components_use_same_lock(dir)
1007
def test_can_convert(self):
1008
# format 5 dirs are convertable
1009
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
1010
self.assertTrue(dir.can_convert_format())
1012
def test_needs_conversion(self):
1013
# format 5 dirs need a conversion if they are not the default,
1015
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
1016
# don't need to convert it to itself
1017
self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
1018
# do need to convert it to the current default
1019
self.assertTrue(dir.needs_format_conversion(
1020
bzrdir.BzrDirFormat.get_default_format()))
1023
class TestFormat6(TestCaseWithTransport):
1024
"""Tests specific to the version 6 bzrdir format."""
1026
def test_same_lockfiles_between_tree_repo_branch(self):
1027
# this checks that only a single lockfiles instance is created
1028
# for format 6 objects
1029
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1030
def check_dir_components_use_same_lock(dir):
1031
ctrl_1 = dir.open_repository().control_files
1032
ctrl_2 = dir.open_branch().control_files
1033
ctrl_3 = dir.open_workingtree()._control_files
1034
self.assertTrue(ctrl_1 is ctrl_2)
1035
self.assertTrue(ctrl_2 is ctrl_3)
1036
check_dir_components_use_same_lock(dir)
1037
# and if we open it normally.
1038
dir = bzrdir.BzrDir.open(self.get_url())
1039
check_dir_components_use_same_lock(dir)
1041
def test_can_convert(self):
1042
# format 6 dirs are convertable
1043
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1044
self.assertTrue(dir.can_convert_format())
1046
def test_needs_conversion(self):
1047
# format 6 dirs need an conversion if they are not the default.
1048
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1049
self.assertTrue(dir.needs_format_conversion(
1050
bzrdir.BzrDirFormat.get_default_format()))
1053
class NotBzrDir(bzrlib.bzrdir.BzrDir):
1054
"""A non .bzr based control directory."""
1056
def __init__(self, transport, format):
1057
self._format = format
1058
self.root_transport = transport
1059
self.transport = transport.clone('.not')
1062
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
1063
"""A test class representing any non-.bzr based disk format."""
1065
def initialize_on_transport(self, transport):
1066
"""Initialize a new .not dir in the base directory of a Transport."""
1067
transport.mkdir('.not')
1068
return self.open(transport)
1070
def open(self, transport):
1071
"""Open this directory."""
1072
return NotBzrDir(transport, self)
1075
def _known_formats(self):
1076
return set([NotBzrDirFormat()])
1079
class NotBzrDirProber(controldir.Prober):
1081
def probe_transport(self, transport):
1082
"""Our format is present if the transport ends in '.not/'."""
1083
if transport.has('.not'):
1084
return NotBzrDirFormat()
1087
class TestNotBzrDir(TestCaseWithTransport):
1088
"""Tests for using the bzrdir api with a non .bzr based disk format.
1090
If/when one of these is in the core, we can let the implementation tests
1094
def test_create_and_find_format(self):
1095
# create a .notbzr dir
1096
format = NotBzrDirFormat()
1097
dir = format.initialize(self.get_url())
1098
self.assertIsInstance(dir, NotBzrDir)
1100
controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1102
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1103
get_transport(self.get_url()))
1104
self.assertIsInstance(found, NotBzrDirFormat)
1106
controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1108
def test_included_in_known_formats(self):
1109
not_format = NotBzrDirFormat()
1110
bzrlib.controldir.ControlDirFormat.register_format(not_format)
1112
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1113
for format in formats:
1114
if isinstance(format, NotBzrDirFormat):
1116
self.fail("No NotBzrDirFormat in %s" % formats)
1118
bzrlib.controldir.ControlDirFormat.unregister_format(not_format)
1019
1121
class NonLocalTests(TestCaseWithTransport):
1020
1122
"""Tests for bzrdir static behaviour on non local paths."""
1229
1331
class _TestBranch(bzrlib.branch.Branch):
1230
1332
"""Test Branch implementation for TestBzrDirSprout."""
1232
def __init__(self, transport, *args, **kwargs):
1334
def __init__(self, *args, **kwargs):
1233
1335
self._format = _TestBranchFormat()
1234
self._transport = transport
1235
self.base = transport.base
1236
1336
super(_TestBranch, self).__init__(*args, **kwargs)
1237
1337
self.calls = []
1238
1338
self._parent = None
1240
1340
def sprout(self, *args, **kwargs):
1241
1341
self.calls.append('sprout')
1242
return _TestBranch(self._transport)
1342
return _TestBranch()
1244
1344
def copy_content_into(self, destination, revision_id=None):
1245
1345
self.calls.append('copy_content_into')
1247
def last_revision(self):
1248
return _mod_revision.NULL_REVISION
1250
1347
def get_parent(self):
1251
1348
return self._parent
1253
def _get_config(self):
1254
return config.TransportConfig(self._transport, 'branch.conf')
1256
1350
def set_parent(self, parent):
1257
1351
self._parent = parent