13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/bzr_dir_implementations.
19
For interface contract tests, see tests/per_bzr_dir.
24
from StringIO import StringIO
28
26
from bzrlib import (
35
revision as _mod_revision,
39
transport as _mod_transport,
39
45
import bzrlib.branch
40
from bzrlib.errors import (NotBranchError,
42
UnsupportedFormatError,
46
from bzrlib.errors import (
48
NoColocatedBranchSupport,
50
UnsupportedFormatError,
44
52
from bzrlib.tests import (
46
54
TestCaseWithMemoryTransport,
47
55
TestCaseWithTransport,
51
from bzrlib.tests.http_server import HttpServer
52
from bzrlib.tests.http_utils import (
53
TestCaseWithTwoWebservers,
54
HTTPServerRedirecting,
58
from bzrlib.tests import(
56
62
from bzrlib.tests.test_http import TestWithTransport_pycurl
57
from bzrlib.transport import get_transport
63
from bzrlib.transport import (
58
67
from bzrlib.transport.http._urllib import HttpTransport_urllib
59
from bzrlib.transport.memory import MemoryServer
60
from bzrlib.repofmt import knitrepo, weaverepo
68
from bzrlib.transport.nosmart import NoSmartTransportDecorator
69
from bzrlib.transport.readonly import ReadonlyTransportDecorator
70
from bzrlib.repofmt import knitrepo, knitpack_repo
63
73
class TestDefaultFormat(TestCase):
65
75
def test_get_set_default_format(self):
66
76
old_format = bzrdir.BzrDirFormat.get_default_format()
67
# default is BzrDirFormat6
68
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
69
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
77
# default is BzrDirMetaFormat1
78
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
79
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
70
80
# creating a bzr dir should now create an instrumented dir.
72
82
result = bzrdir.BzrDir.create('memory:///')
73
self.failUnless(isinstance(result, SampleBzrDir))
83
self.assertIsInstance(result, SampleBzrDir)
75
bzrdir.BzrDirFormat._set_default_format(old_format)
85
controldir.ControlDirFormat._set_default_format(old_format)
76
86
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
89
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
90
"""A deprecated bzr dir format."""
79
93
class TestFormatRegistry(TestCase):
81
95
def make_format_registry(self):
82
my_format_registry = bzrdir.BzrDirFormatRegistry()
83
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
84
'Pre-0.8 format. Slower and does not support checkouts or shared'
85
' repositories', deprecated=True)
86
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
87
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
88
my_format_registry.register_metadir('knit',
96
my_format_registry = controldir.ControlDirFormatRegistry()
97
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
98
'Some format. Slower and unawesome and deprecated.',
100
my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
101
'DeprecatedBzrDirFormat', 'Format registered lazily',
103
bzrdir.register_metadir(my_format_registry, 'knit',
89
104
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
90
105
'Format using knits',
92
107
my_format_registry.set_default('knit')
93
my_format_registry.register_metadir(
108
bzrdir.register_metadir(my_format_registry,
95
110
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
96
111
'Experimental successor to knit. Use at your own risk.',
97
112
branch_format='bzrlib.branch.BzrBranchFormat6',
98
113
experimental=True)
99
my_format_registry.register_metadir(
114
bzrdir.register_metadir(my_format_registry,
101
116
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
102
117
'Experimental successor to knit. Use at your own risk.',
103
118
branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
104
my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
105
'Pre-0.8 format. Slower and does not support checkouts or shared'
106
' repositories', hidden=True)
107
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
108
'BzrDirFormat6', 'Format registered lazily', deprecated=True,
119
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
120
'Old format. Slower and does not support things. ', hidden=True)
121
my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
122
'DeprecatedBzrDirFormat', 'Format registered lazily',
123
deprecated=True, hidden=True)
110
124
return my_format_registry
112
126
def test_format_registry(self):
113
127
my_format_registry = self.make_format_registry()
114
128
my_bzrdir = my_format_registry.make_bzrdir('lazy')
115
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
116
my_bzrdir = my_format_registry.make_bzrdir('weave')
117
self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
129
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
130
my_bzrdir = my_format_registry.make_bzrdir('deprecated')
131
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
118
132
my_bzrdir = my_format_registry.make_bzrdir('default')
119
self.assertIsInstance(my_bzrdir.repository_format,
133
self.assertIsInstance(my_bzrdir.repository_format,
120
134
knitrepo.RepositoryFormatKnit1)
121
135
my_bzrdir = my_format_registry.make_bzrdir('knit')
122
self.assertIsInstance(my_bzrdir.repository_format,
136
self.assertIsInstance(my_bzrdir.repository_format,
123
137
knitrepo.RepositoryFormatKnit1)
124
138
my_bzrdir = my_format_registry.make_bzrdir('branch6')
125
139
self.assertIsInstance(my_bzrdir.get_branch_format(),
129
143
my_format_registry = self.make_format_registry()
130
144
self.assertEqual('Format registered lazily',
131
145
my_format_registry.get_help('lazy'))
132
self.assertEqual('Format using knits',
146
self.assertEqual('Format using knits',
133
147
my_format_registry.get_help('knit'))
134
self.assertEqual('Format using knits',
148
self.assertEqual('Format using knits',
135
149
my_format_registry.get_help('default'))
136
self.assertEqual('Pre-0.8 format. Slower and does not support'
137
' checkouts or shared repositories',
138
my_format_registry.get_help('weave'))
150
self.assertEqual('Some format. Slower and unawesome and deprecated.',
151
my_format_registry.get_help('deprecated'))
140
153
def test_help_topic(self):
141
154
topics = help_topics.HelpTopicRegistry()
142
topics.register('formats', self.make_format_registry().help_topic,
144
topic = topics.get_detail('formats')
145
new, rest = topic.split('Experimental formats')
155
registry = self.make_format_registry()
156
topics.register('current-formats', registry.help_topic,
158
topics.register('other-formats', registry.help_topic,
160
new = topics.get_detail('current-formats')
161
rest = topics.get_detail('other-formats')
146
162
experimental, deprecated = rest.split('Deprecated formats')
147
self.assertContainsRe(new, 'These formats can be used')
148
self.assertContainsRe(new,
163
self.assertContainsRe(new, 'formats-help')
164
self.assertContainsRe(new,
149
165
':knit:\n \(native\) \(default\) Format using knits\n')
150
self.assertContainsRe(experimental,
166
self.assertContainsRe(experimental,
151
167
':branch6:\n \(native\) Experimental successor to knit')
152
self.assertContainsRe(deprecated,
168
self.assertContainsRe(deprecated,
153
169
':lazy:\n \(native\) Format registered lazily\n')
154
170
self.assertNotContainsRe(new, 'hidden')
162
178
self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
163
179
bzrdir.format_registry.get('default'))
165
repository.RepositoryFormat.get_default_format().__class__,
181
repository.format_registry.get_default().__class__,
166
182
knitrepo.RepositoryFormatKnit3)
168
184
bzrdir.format_registry.set_default_repository(old_default)
170
186
def test_aliases(self):
171
a_registry = bzrdir.BzrDirFormatRegistry()
172
a_registry.register('weave', bzrdir.BzrDirFormat6,
173
'Pre-0.8 format. Slower and does not support checkouts or shared'
174
' repositories', deprecated=True)
175
a_registry.register('weavealias', bzrdir.BzrDirFormat6,
176
'Pre-0.8 format. Slower and does not support checkouts or shared'
177
' repositories', deprecated=True, alias=True)
178
self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
187
a_registry = controldir.ControlDirFormatRegistry()
188
a_registry.register('deprecated', DeprecatedBzrDirFormat,
189
'Old format. Slower and does not support stuff',
191
a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
192
'Old format. Slower and does not support stuff',
193
deprecated=True, alias=True)
194
self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
181
197
class SampleBranch(bzrlib.branch.Branch):
182
198
"""A dummy branch for guess what, dummy use."""
229
254
return "opened branch."
257
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
260
def get_format_string():
261
return "Test format 1"
264
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
267
def get_format_string():
268
return "Test format 2"
232
271
class TestBzrDirFormat(TestCaseWithTransport):
233
272
"""Tests for the BzrDirFormat facility."""
235
274
def test_find_format(self):
236
275
# is the right format object found for a branch?
237
276
# create a branch with a few known format objects.
238
# this is not quite the same as
239
t = get_transport(self.get_url())
277
bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
279
self.addCleanup(bzrdir.BzrProber.formats.remove,
280
BzrDirFormatTest1.get_format_string())
281
bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
283
self.addCleanup(bzrdir.BzrProber.formats.remove,
284
BzrDirFormatTest2.get_format_string())
285
t = self.get_transport()
240
286
self.build_tree(["foo/", "bar/"], transport=t)
241
287
def check_format(format, url):
242
288
format.initialize(url)
243
t = get_transport(url)
289
t = _mod_transport.get_transport(url)
244
290
found_format = bzrdir.BzrDirFormat.find_format(t)
245
self.failUnless(isinstance(found_format, format.__class__))
246
check_format(bzrdir.BzrDirFormat5(), "foo")
247
check_format(bzrdir.BzrDirFormat6(), "bar")
291
self.assertIsInstance(found_format, format.__class__)
292
check_format(BzrDirFormatTest1(), "foo")
293
check_format(BzrDirFormatTest2(), "bar")
249
295
def test_find_format_nothing_there(self):
250
296
self.assertRaises(NotBranchError,
251
297
bzrdir.BzrDirFormat.find_format,
298
_mod_transport.get_transport('.'))
254
300
def test_find_format_unknown_format(self):
255
t = get_transport(self.get_url())
301
t = self.get_transport()
257
303
t.put_bytes('.bzr/branch-format', '')
258
304
self.assertRaises(UnknownFormatError,
259
305
bzrdir.BzrDirFormat.find_format,
306
_mod_transport.get_transport('.'))
262
308
def test_register_unregister_format(self):
263
309
format = SampleBzrDirFormat()
507
565
self.assertTrue(repo.supports_rich_root())
509
567
def test_add_fallback_repo_handles_absolute_urls(self):
510
stack_on = self.make_branch('stack_on', format='development1')
511
repo = self.make_repository('repo', format='development1')
568
stack_on = self.make_branch('stack_on', format='1.6')
569
repo = self.make_repository('repo', format='1.6')
512
570
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
513
571
policy._add_fallback(repo)
515
573
def test_add_fallback_repo_handles_relative_urls(self):
516
stack_on = self.make_branch('stack_on', format='development1')
517
repo = self.make_repository('repo', format='development1')
574
stack_on = self.make_branch('stack_on', format='1.6')
575
repo = self.make_repository('repo', format='1.6')
518
576
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
519
577
policy._add_fallback(repo)
521
579
def test_configure_relative_branch_stacking_url(self):
522
stack_on = self.make_branch('stack_on', format='development1')
523
stacked = self.make_branch('stack_on/stacked', format='development1')
580
stack_on = self.make_branch('stack_on', format='1.6')
581
stacked = self.make_branch('stack_on/stacked', format='1.6')
524
582
policy = bzrdir.UseExistingRepository(stacked.repository,
525
583
'.', stack_on.base)
526
584
policy.configure_branch(stacked)
527
585
self.assertEqual('..', stacked.get_stacked_on_url())
529
587
def test_relative_branch_stacking_to_absolute(self):
530
stack_on = self.make_branch('stack_on', format='development1')
531
stacked = self.make_branch('stack_on/stacked', format='development1')
588
stack_on = self.make_branch('stack_on', format='1.6')
589
stacked = self.make_branch('stack_on/stacked', format='1.6')
532
590
policy = bzrdir.UseExistingRepository(stacked.repository,
533
591
'.', self.get_readonly_url('stack_on'))
534
592
policy.configure_branch(stacked)
651
709
self.assertEqual(relpath, 'baz')
653
711
def test_open_containing_from_transport(self):
654
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
655
get_transport(self.get_readonly_url('')))
656
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
657
get_transport(self.get_readonly_url('g/p/q')))
712
self.assertRaises(NotBranchError,
713
bzrdir.BzrDir.open_containing_from_transport,
714
_mod_transport.get_transport(self.get_readonly_url('')))
715
self.assertRaises(NotBranchError,
716
bzrdir.BzrDir.open_containing_from_transport,
717
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
658
718
control = bzrdir.BzrDir.create(self.get_url())
659
719
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
660
get_transport(self.get_readonly_url('')))
720
_mod_transport.get_transport(self.get_readonly_url('')))
661
721
self.assertEqual('', relpath)
662
722
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
663
get_transport(self.get_readonly_url('g/p/q')))
723
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
664
724
self.assertEqual('g/p/q', relpath)
666
726
def test_open_containing_tree_or_branch(self):
710
770
# transport pointing at bzrdir should give a bzrdir with root transport
711
771
# set to the given transport
712
772
control = bzrdir.BzrDir.create(self.get_url())
713
transport = get_transport(self.get_url())
714
opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
715
self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
773
t = self.get_transport()
774
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
775
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
716
776
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
718
778
def test_open_from_transport_no_bzrdir(self):
719
transport = get_transport(self.get_url())
720
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
779
t = self.get_transport()
780
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
723
782
def test_open_from_transport_bzrdir_in_parent(self):
724
783
control = bzrdir.BzrDir.create(self.get_url())
725
transport = get_transport(self.get_url())
726
transport.mkdir('subdir')
727
transport = transport.clone('subdir')
728
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
784
t = self.get_transport()
786
t = t.clone('subdir')
787
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
731
789
def test_sprout_recursive(self):
732
tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
790
tree = self.make_branch_and_tree('tree1',
791
format='dirstate-with-subtree')
733
792
sub_tree = self.make_branch_and_tree('tree1/subtree',
734
793
format='dirstate-with-subtree')
794
sub_tree.set_root_id('subtree-root')
735
795
tree.add_reference(sub_tree)
736
796
self.build_tree(['tree1/subtree/file'])
737
797
sub_tree.add('file')
738
798
tree.commit('Initial commit')
739
tree.bzrdir.sprout('tree2')
740
self.failUnlessExists('tree2/subtree/file')
799
tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
801
self.addCleanup(tree2.unlock)
802
self.assertPathExists('tree2/subtree/file')
803
self.assertEqual('tree-reference', tree2.kind('subtree-root'))
742
805
def test_cloning_metadir(self):
743
806
"""Ensure that cloning metadir is suitable"""
757
820
self.build_tree(['tree1/subtree/file'])
758
821
sub_tree.add('file')
759
822
tree.commit('Initial commit')
823
# The following line force the orhaning to reveal bug #634470
824
tree.branch.get_config().set_user_option(
825
'bzr.transform.orphan_policy', 'move')
760
826
tree.bzrdir.destroy_workingtree()
827
# FIXME: subtree/.bzr is left here which allows the test to pass (or
828
# fail :-( ) -- vila 20100909
761
829
repo = self.make_repository('repo', shared=True,
762
830
format='dirstate-with-subtree')
763
831
repo.set_make_working_trees(False)
764
tree.bzrdir.sprout('repo/tree2')
765
self.failUnlessExists('repo/tree2/subtree')
766
self.failIfExists('repo/tree2/subtree/file')
832
# FIXME: we just deleted the workingtree and now we want to use it ????
833
# At a minimum, we should use tree.branch below (but this fails too
834
# currently) or stop calling this test 'treeless'. Specifically, I've
835
# turn the line below into an assertRaises when 'subtree/.bzr' is
836
# orphaned and sprout tries to access the branch there (which is left
837
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
838
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
839
# #634470. -- vila 20100909
840
self.assertRaises(errors.NotBranchError,
841
tree.bzrdir.sprout, 'repo/tree2')
842
# self.assertPathExists('repo/tree2/subtree')
843
# self.assertPathDoesNotExist('repo/tree2/subtree/file')
768
845
def make_foo_bar_baz(self):
769
846
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
774
851
def test_find_bzrdirs(self):
775
852
foo, bar, baz = self.make_foo_bar_baz()
776
transport = get_transport(self.get_url())
777
self.assertEqualBzrdirs([baz, foo, bar],
778
bzrdir.BzrDir.find_bzrdirs(transport))
853
t = self.get_transport()
854
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
856
def make_fake_permission_denied_transport(self, transport, paths):
857
"""Create a transport that raises PermissionDenied for some paths."""
860
raise errors.PermissionDenied(path)
862
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
863
path_filter_server.start_server()
864
self.addCleanup(path_filter_server.stop_server)
865
path_filter_transport = pathfilter.PathFilteringTransport(
866
path_filter_server, '.')
867
return (path_filter_server, path_filter_transport)
869
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
870
"""Check that each branch url ends with the given suffix."""
871
for actual_bzrdir in actual_bzrdirs:
872
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
874
def test_find_bzrdirs_permission_denied(self):
875
foo, bar, baz = self.make_foo_bar_baz()
876
t = self.get_transport()
877
path_filter_server, path_filter_transport = \
878
self.make_fake_permission_denied_transport(t, ['foo'])
880
self.assertBranchUrlsEndWith('/baz/',
881
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
883
smart_transport = self.make_smart_server('.',
884
backing_server=path_filter_server)
885
self.assertBranchUrlsEndWith('/baz/',
886
bzrdir.BzrDir.find_bzrdirs(smart_transport))
780
888
def test_find_bzrdirs_list_current(self):
781
889
def list_current(transport):
782
890
return [s for s in transport.list_dir('') if s != 'baz']
784
892
foo, bar, baz = self.make_foo_bar_baz()
785
transport = get_transport(self.get_url())
786
self.assertEqualBzrdirs([foo, bar],
787
bzrdir.BzrDir.find_bzrdirs(transport,
788
list_current=list_current))
893
t = self.get_transport()
894
self.assertEqualBzrdirs(
896
bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
791
898
def test_find_bzrdirs_evaluate(self):
792
899
def evaluate(bzrdir):
814
920
root = self.make_repository('', shared=True)
815
921
foo, bar, baz = self.make_foo_bar_baz()
816
922
qux = self.make_bzrdir('foo/qux')
817
transport = get_transport(self.get_url())
818
branches = bzrdir.BzrDir.find_branches(transport)
923
t = self.get_transport()
924
branches = bzrdir.BzrDir.find_branches(t)
819
925
self.assertEqual(baz.root_transport.base, branches[0].base)
820
926
self.assertEqual(foo.root_transport.base, branches[1].base)
821
927
self.assertEqual(bar.root_transport.base, branches[2].base)
823
929
# ensure this works without a top-level repo
824
branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
930
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
825
931
self.assertEqual(foo.root_transport.base, branches[0].base)
826
932
self.assertEqual(bar.root_transport.base, branches[1].base)
935
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
937
def test_find_bzrdirs_missing_repo(self):
938
t = self.get_transport()
939
arepo = self.make_repository('arepo', shared=True)
940
abranch_url = arepo.user_url + '/abranch'
941
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
942
t.delete_tree('arepo/.bzr')
943
self.assertRaises(errors.NoRepositoryPresent,
944
branch.Branch.open, abranch_url)
945
self.make_branch('baz')
946
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
947
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
829
950
class TestMeta1DirFormat(TestCaseWithTransport):
830
951
"""Tests specific to the meta1 dir format."""
870
992
def test_needs_conversion_different_working_tree(self):
871
993
# meta1dirs need an conversion if any element is not the default.
872
old_format = bzrdir.BzrDirFormat.get_default_format()
874
new_default = bzrdir.format_registry.make_bzrdir('dirstate')
875
bzrdir.BzrDirFormat._set_default_format(new_default)
877
tree = self.make_branch_and_tree('tree', format='knit')
878
self.assertTrue(tree.bzrdir.needs_format_conversion())
880
bzrdir.BzrDirFormat._set_default_format(old_format)
883
class TestFormat5(TestCaseWithTransport):
884
"""Tests specific to the version 5 bzrdir format."""
886
def test_same_lockfiles_between_tree_repo_branch(self):
887
# this checks that only a single lockfiles instance is created
888
# for format 5 objects
889
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
890
def check_dir_components_use_same_lock(dir):
891
ctrl_1 = dir.open_repository().control_files
892
ctrl_2 = dir.open_branch().control_files
893
ctrl_3 = dir.open_workingtree()._control_files
894
self.assertTrue(ctrl_1 is ctrl_2)
895
self.assertTrue(ctrl_2 is ctrl_3)
896
check_dir_components_use_same_lock(dir)
897
# and if we open it normally.
898
dir = bzrdir.BzrDir.open(self.get_url())
899
check_dir_components_use_same_lock(dir)
901
def test_can_convert(self):
902
# format 5 dirs are convertable
903
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
904
self.assertTrue(dir.can_convert_format())
906
def test_needs_conversion(self):
907
# format 5 dirs need a conversion if they are not the default.
908
# and they start of not the default.
909
old_format = bzrdir.BzrDirFormat.get_default_format()
910
bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
912
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
913
self.assertFalse(dir.needs_format_conversion())
915
bzrdir.BzrDirFormat._set_default_format(old_format)
916
self.assertTrue(dir.needs_format_conversion())
919
class TestFormat6(TestCaseWithTransport):
920
"""Tests specific to the version 6 bzrdir format."""
922
def test_same_lockfiles_between_tree_repo_branch(self):
923
# this checks that only a single lockfiles instance is created
924
# for format 6 objects
925
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
926
def check_dir_components_use_same_lock(dir):
927
ctrl_1 = dir.open_repository().control_files
928
ctrl_2 = dir.open_branch().control_files
929
ctrl_3 = dir.open_workingtree()._control_files
930
self.assertTrue(ctrl_1 is ctrl_2)
931
self.assertTrue(ctrl_2 is ctrl_3)
932
check_dir_components_use_same_lock(dir)
933
# and if we open it normally.
934
dir = bzrdir.BzrDir.open(self.get_url())
935
check_dir_components_use_same_lock(dir)
937
def test_can_convert(self):
938
# format 6 dirs are convertable
939
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
940
self.assertTrue(dir.can_convert_format())
942
def test_needs_conversion(self):
943
# format 6 dirs need an conversion if they are not the default.
944
old_format = bzrdir.BzrDirFormat.get_default_format()
945
bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
947
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
948
self.assertTrue(dir.needs_format_conversion())
950
bzrdir.BzrDirFormat._set_default_format(old_format)
953
class NotBzrDir(bzrlib.bzrdir.BzrDir):
954
"""A non .bzr based control directory."""
956
def __init__(self, transport, format):
957
self._format = format
958
self.root_transport = transport
959
self.transport = transport.clone('.not')
962
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
963
"""A test class representing any non-.bzr based disk format."""
965
def initialize_on_transport(self, transport):
966
"""Initialize a new .not dir in the base directory of a Transport."""
967
transport.mkdir('.not')
968
return self.open(transport)
970
def open(self, transport):
971
"""Open this directory."""
972
return NotBzrDir(transport, self)
975
def _known_formats(self):
976
return set([NotBzrDirFormat()])
979
def probe_transport(self, transport):
980
"""Our format is present if the transport ends in '.not/'."""
981
if transport.has('.not'):
982
return NotBzrDirFormat()
985
class TestNotBzrDir(TestCaseWithTransport):
986
"""Tests for using the bzrdir api with a non .bzr based disk format.
988
If/when one of these is in the core, we can let the implementation tests
992
def test_create_and_find_format(self):
993
# create a .notbzr dir
994
format = NotBzrDirFormat()
995
dir = format.initialize(self.get_url())
996
self.assertIsInstance(dir, NotBzrDir)
998
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1000
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1001
get_transport(self.get_url()))
1002
self.assertIsInstance(found, NotBzrDirFormat)
1004
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1006
def test_included_in_known_formats(self):
1007
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1009
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1010
for format in formats:
1011
if isinstance(format, NotBzrDirFormat):
1013
self.fail("No NotBzrDirFormat in %s" % formats)
1015
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
994
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
995
tree = self.make_branch_and_tree('tree', format='knit')
996
self.assertTrue(tree.bzrdir.needs_format_conversion(
999
def test_initialize_on_format_uses_smart_transport(self):
1000
self.setup_smart_server_with_call_log()
1001
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
1002
transport = self.get_transport('target')
1003
transport.ensure_base()
1004
self.reset_smart_call_log()
1005
instance = new_format.initialize_on_transport(transport)
1006
self.assertIsInstance(instance, remote.RemoteBzrDir)
1007
rpc_count = len(self.hpss_calls)
1008
# This figure represent the amount of work to perform this use case. It
1009
# is entirely ok to reduce this number if a test fails due to rpc_count
1010
# being too low. If rpc_count increases, more network roundtrips have
1011
# become necessary for this use case. Please do not adjust this number
1012
# upwards without agreement from bzr's network support maintainers.
1013
self.assertEqual(2, rpc_count)
1018
1016
class NonLocalTests(TestCaseWithTransport):
1061
1059
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1062
1060
checkout_format = my_bzrdir.checkout_metadir()
1063
1061
self.assertIsInstance(checkout_format.workingtree_format,
1064
workingtree.WorkingTreeFormat3)
1067
class TestHTTPRedirectionLoop(object):
1068
"""Test redirection loop between two http servers.
1062
workingtree_4.WorkingTreeFormat4)
1065
class TestHTTPRedirections(object):
1066
"""Test redirection between two http servers.
1070
1068
This MUST be used by daughter classes that also inherit from
1071
1069
TestCaseWithTwoWebservers.
1073
1071
We can't inherit directly from TestCaseWithTwoWebservers or the
1074
1072
test framework will try to create an instance which cannot
1075
run, its implementation being incomplete.
1073
run, its implementation being incomplete.
1078
# Should be defined by daughter classes to ensure redirection
1079
# still use the same transport implementation (not currently
1080
# enforced as it's a bit tricky to get right (see the FIXME
1081
# in BzrDir.open_from_transport for the unique use case so
1085
1076
def create_transport_readonly_server(self):
1086
return HTTPServerRedirecting()
1077
# We don't set the http protocol version, relying on the default
1078
return http_utils.HTTPServerRedirecting()
1088
1080
def create_transport_secondary_server(self):
1089
return HTTPServerRedirecting()
1081
# We don't set the http protocol version, relying on the default
1082
return http_utils.HTTPServerRedirecting()
1091
1084
def setUp(self):
1092
# Both servers redirect to each server creating a loop
1093
super(TestHTTPRedirectionLoop, self).setUp()
1085
super(TestHTTPRedirections, self).setUp()
1094
1086
# The redirections will point to the new server
1095
1087
self.new_server = self.get_readonly_server()
1096
1088
# The requests to the old server will be redirected
1097
1089
self.old_server = self.get_secondary_server()
1098
1090
# Configure the redirections
1099
1091
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1093
def test_loop(self):
1094
# Both servers redirect to each other creating a loop
1100
1095
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1102
def _qualified_url(self, host, port):
1103
return 'http+%s://%s:%s' % (self._qualifier, host, port)
1105
def test_loop(self):
1106
1096
# Starting from either server should loop
1107
old_url = self._qualified_url(self.old_server.host,
1097
old_url = self._qualified_url(self.old_server.host,
1108
1098
self.old_server.port)
1109
1099
oldt = self._transport(old_url)
1110
1100
self.assertRaises(errors.NotBranchError,
1111
1101
bzrdir.BzrDir.open_from_transport, oldt)
1112
new_url = self._qualified_url(self.new_server.host,
1102
new_url = self._qualified_url(self.new_server.host,
1113
1103
self.new_server.port)
1114
1104
newt = self._transport(new_url)
1115
1105
self.assertRaises(errors.NotBranchError,
1116
1106
bzrdir.BzrDir.open_from_transport, newt)
1119
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
1120
TestCaseWithTwoWebservers):
1108
def test_qualifier_preserved(self):
1109
wt = self.make_branch_and_tree('branch')
1110
old_url = self._qualified_url(self.old_server.host,
1111
self.old_server.port)
1112
start = self._transport(old_url).clone('branch')
1113
bdir = bzrdir.BzrDir.open_from_transport(start)
1114
# Redirection should preserve the qualifier, hence the transport class
1116
self.assertIsInstance(bdir.root_transport, type(start))
1119
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1120
http_utils.TestCaseWithTwoWebservers):
1121
1121
"""Tests redirections for urllib implementation"""
1123
_qualifier = 'urllib'
1124
1123
_transport = HttpTransport_urllib
1125
def _qualified_url(self, host, port):
1126
result = 'http+urllib://%s:%s' % (host, port)
1127
self.permit_url(result)
1128
1132
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1129
TestHTTPRedirectionLoop,
1130
TestCaseWithTwoWebservers):
1133
TestHTTPRedirections,
1134
http_utils.TestCaseWithTwoWebservers):
1131
1135
"""Tests redirections for pycurl implementation"""
1133
_qualifier = 'pycurl'
1137
def _qualified_url(self, host, port):
1138
result = 'http+pycurl://%s:%s' % (host, port)
1139
self.permit_url(result)
1143
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1144
http_utils.TestCaseWithTwoWebservers):
1145
"""Tests redirections for the nosmart decorator"""
1147
_transport = NoSmartTransportDecorator
1149
def _qualified_url(self, host, port):
1150
result = 'nosmart+http://%s:%s' % (host, port)
1151
self.permit_url(result)
1155
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1156
http_utils.TestCaseWithTwoWebservers):
1157
"""Tests redirections for readonly decoratror"""
1159
_transport = ReadonlyTransportDecorator
1161
def _qualified_url(self, host, port):
1162
result = 'readonly+http://%s:%s' % (host, port)
1163
self.permit_url(result)
1136
1167
class TestDotBzrHidden(TestCaseWithTransport):
1188
1219
return _TestBzrDirFormat()
1222
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1223
"""Test Branch format for TestBzrDirSprout."""
1191
1226
class _TestBranch(bzrlib.branch.Branch):
1192
1227
"""Test Branch implementation for TestBzrDirSprout."""
1194
def __init__(self, *args, **kwargs):
1229
def __init__(self, transport, *args, **kwargs):
1230
self._format = _TestBranchFormat()
1231
self._transport = transport
1232
self.base = transport.base
1195
1233
super(_TestBranch, self).__init__(*args, **kwargs)
1196
1234
self.calls = []
1197
1235
self._parent = None
1199
1237
def sprout(self, *args, **kwargs):
1200
1238
self.calls.append('sprout')
1201
return _TestBranch()
1239
return _TestBranch(self._transport)
1203
1241
def copy_content_into(self, destination, revision_id=None):
1204
1242
self.calls.append('copy_content_into')
1244
def last_revision(self):
1245
return _mod_revision.NULL_REVISION
1206
1247
def get_parent(self):
1207
1248
return self._parent
1250
def _get_config(self):
1251
return config.TransportConfig(self._transport, 'branch.conf')
1209
1253
def set_parent(self, parent):
1210
1254
self._parent = parent
1256
def lock_read(self):
1257
return lock.LogicalLockResult(self.unlock)
1213
1263
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1245
1295
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1246
1296
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1247
1297
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1300
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1302
def test_pre_open_called(self):
1304
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1305
transport = self.get_transport('foo')
1306
url = transport.base
1307
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1308
self.assertEqual([transport.base], [t.base for t in calls])
1310
def test_pre_open_actual_exceptions_raised(self):
1312
def fail_once(transport):
1315
raise errors.BzrError("fail")
1316
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1317
transport = self.get_transport('foo')
1318
url = transport.base
1319
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1320
self.assertEqual('fail', err._preformatted_string)
1322
def test_post_repo_init(self):
1323
from bzrlib.bzrdir import RepoInitHookParams
1325
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1327
self.make_repository('foo')
1328
self.assertLength(1, calls)
1330
self.assertIsInstance(params, RepoInitHookParams)
1331
self.assertTrue(hasattr(params, 'bzrdir'))
1332
self.assertTrue(hasattr(params, 'repository'))
1334
def test_post_repo_init_hook_repr(self):
1336
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1337
lambda params: param_reprs.append(repr(params)), None)
1338
self.make_repository('foo')
1339
self.assertLength(1, param_reprs)
1340
param_repr = param_reprs[0]
1341
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1344
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1345
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1346
# moved to per_bzrdir or per_transport for better coverage ?
1350
super(TestGenerateBackupName, self).setUp()
1351
self._transport = self.get_transport()
1352
bzrdir.BzrDir.create(self.get_url(),
1353
possible_transports=[self._transport])
1354
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1356
def test_deprecated_generate_backup_name(self):
1357
res = self.applyDeprecated(
1358
symbol_versioning.deprecated_in((2, 3, 0)),
1359
self._bzrdir.generate_backup_name, 'whatever')
1362
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1364
def test_exiting(self):
1365
self._transport.put_bytes("a.~1~", "some content")
1366
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))