127
127
my_format_registry = self.make_format_registry()
128
128
self.assertEqual('Format registered lazily',
129
129
my_format_registry.get_help('lazy'))
130
self.assertEqual('Format using knits',
130
self.assertEqual('Format using knits',
131
131
my_format_registry.get_help('knit'))
132
self.assertEqual('Format using knits',
132
self.assertEqual('Format using knits',
133
133
my_format_registry.get_help('default'))
134
134
self.assertEqual('Pre-0.8 format. Slower and does not support'
135
' checkouts or shared repositories',
135
' checkouts or shared repositories',
136
136
my_format_registry.get_help('weave'))
138
138
def test_help_topic(self):
139
139
topics = help_topics.HelpTopicRegistry()
140
topics.register('formats', self.make_format_registry().help_topic,
142
topic = topics.get_detail('formats')
143
new, rest = topic.split('Experimental formats')
140
registry = self.make_format_registry()
141
topics.register('current-formats', registry.help_topic,
143
topics.register('other-formats', registry.help_topic,
145
new = topics.get_detail('current-formats')
146
rest = topics.get_detail('other-formats')
144
147
experimental, deprecated = rest.split('Deprecated formats')
145
self.assertContainsRe(new, 'These formats can be used')
146
self.assertContainsRe(new,
148
self.assertContainsRe(new, 'bzr help formats')
149
self.assertContainsRe(new,
147
150
':knit:\n \(native\) \(default\) Format using knits\n')
148
self.assertContainsRe(experimental,
151
self.assertContainsRe(experimental,
149
152
':branch6:\n \(native\) Experimental successor to knit')
150
self.assertContainsRe(deprecated,
153
self.assertContainsRe(deprecated,
151
154
':lazy:\n \(native\) Format registered lazily\n')
152
155
self.assertNotContainsRe(new, 'hidden')
418
428
"""The default acquisition policy should create a standalone branch."""
419
429
my_bzrdir = self.make_bzrdir('.')
420
430
repo_policy = my_bzrdir.determine_repository_policy()
421
repo = repo_policy.acquire_repository()
431
repo, is_new = repo_policy.acquire_repository()
422
432
self.assertEqual(repo.bzrdir.root_transport.base,
423
433
my_bzrdir.root_transport.base)
424
434
self.assertFalse(repo.is_shared())
436
def test_determine_stacking_policy(self):
437
parent_bzrdir = self.make_bzrdir('.')
438
child_bzrdir = self.make_bzrdir('child')
439
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
440
repo_policy = child_bzrdir.determine_repository_policy()
441
self.assertEqual('http://example.org', repo_policy._stack_on)
443
def test_determine_stacking_policy_relative(self):
444
parent_bzrdir = self.make_bzrdir('.')
445
child_bzrdir = self.make_bzrdir('child')
446
parent_bzrdir.get_config().set_default_stack_on('child2')
447
repo_policy = child_bzrdir.determine_repository_policy()
448
self.assertEqual('child2', repo_policy._stack_on)
449
self.assertEqual(parent_bzrdir.root_transport.base,
450
repo_policy._stack_on_pwd)
452
def prepare_default_stacking(self, child_format='1.6'):
453
parent_bzrdir = self.make_bzrdir('.')
454
child_branch = self.make_branch('child', format=child_format)
455
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
456
new_child_transport = parent_bzrdir.transport.clone('child2')
457
return child_branch, new_child_transport
459
def test_clone_on_transport_obeys_stacking_policy(self):
460
child_branch, new_child_transport = self.prepare_default_stacking()
461
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
462
self.assertEqual(child_branch.base,
463
new_child.open_branch().get_stacked_on_url())
465
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
466
# Make stackable source branch with an unstackable repo format.
467
source_bzrdir = self.make_bzrdir('source')
468
pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
469
source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
470
# Make a directory with a default stacking policy
471
parent_bzrdir = self.make_bzrdir('parent')
472
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
473
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
474
# Clone source into directory
475
target = source_bzrdir.clone(self.get_url('parent/target'))
477
def test_sprout_obeys_stacking_policy(self):
478
child_branch, new_child_transport = self.prepare_default_stacking()
479
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
480
self.assertEqual(child_branch.base,
481
new_child.open_branch().get_stacked_on_url())
483
def test_clone_ignores_policy_for_unsupported_formats(self):
484
child_branch, new_child_transport = self.prepare_default_stacking(
485
child_format='pack-0.92')
486
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
487
self.assertRaises(errors.UnstackableBranchFormat,
488
new_child.open_branch().get_stacked_on_url)
490
def test_sprout_ignores_policy_for_unsupported_formats(self):
491
child_branch, new_child_transport = self.prepare_default_stacking(
492
child_format='pack-0.92')
493
new_child = child_branch.bzrdir.sprout(new_child_transport.base)
494
self.assertRaises(errors.UnstackableBranchFormat,
495
new_child.open_branch().get_stacked_on_url)
497
def test_sprout_upgrades_format_if_stacked_specified(self):
498
child_branch, new_child_transport = self.prepare_default_stacking(
499
child_format='pack-0.92')
500
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
502
self.assertEqual(child_branch.bzrdir.root_transport.base,
503
new_child.open_branch().get_stacked_on_url())
504
repo = new_child.open_repository()
505
self.assertTrue(repo._format.supports_external_lookups)
506
self.assertFalse(repo.supports_rich_root())
508
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
509
child_branch, new_child_transport = self.prepare_default_stacking(
510
child_format='pack-0.92')
511
new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
512
stacked_on=child_branch.bzrdir.root_transport.base)
513
self.assertEqual(child_branch.bzrdir.root_transport.base,
514
new_child.open_branch().get_stacked_on_url())
515
repo = new_child.open_repository()
516
self.assertTrue(repo._format.supports_external_lookups)
517
self.assertFalse(repo.supports_rich_root())
519
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
520
child_branch, new_child_transport = self.prepare_default_stacking(
521
child_format='rich-root-pack')
522
new_child = child_branch.bzrdir.sprout(new_child_transport.base,
524
repo = new_child.open_repository()
525
self.assertTrue(repo._format.supports_external_lookups)
526
self.assertTrue(repo.supports_rich_root())
528
def test_add_fallback_repo_handles_absolute_urls(self):
529
stack_on = self.make_branch('stack_on', format='1.6')
530
repo = self.make_repository('repo', format='1.6')
531
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
532
policy._add_fallback(repo)
534
def test_add_fallback_repo_handles_relative_urls(self):
535
stack_on = self.make_branch('stack_on', format='1.6')
536
repo = self.make_repository('repo', format='1.6')
537
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
538
policy._add_fallback(repo)
540
def test_configure_relative_branch_stacking_url(self):
541
stack_on = self.make_branch('stack_on', format='1.6')
542
stacked = self.make_branch('stack_on/stacked', format='1.6')
543
policy = bzrdir.UseExistingRepository(stacked.repository,
545
policy.configure_branch(stacked)
546
self.assertEqual('..', stacked.get_stacked_on_url())
548
def test_relative_branch_stacking_to_absolute(self):
549
stack_on = self.make_branch('stack_on', format='1.6')
550
stacked = self.make_branch('stack_on/stacked', format='1.6')
551
policy = bzrdir.UseExistingRepository(stacked.repository,
552
'.', self.get_readonly_url('stack_on'))
553
policy.configure_branch(stacked)
554
self.assertEqual(self.get_readonly_url('stack_on'),
555
stacked.get_stacked_on_url())
427
558
class ChrootedTests(TestCaseWithTransport):
428
559
"""A support class that provides readonly urls outside the local namespace.
758
894
def test_needs_conversion_different_working_tree(self):
759
895
# meta1dirs need an conversion if any element is not the default.
760
old_format = bzrdir.BzrDirFormat.get_default_format()
762
new_default = bzrdir.format_registry.make_bzrdir('dirstate')
763
bzrdir.BzrDirFormat._set_default_format(new_default)
765
tree = self.make_branch_and_tree('tree', format='knit')
766
self.assertTrue(tree.bzrdir.needs_format_conversion())
768
bzrdir.BzrDirFormat._set_default_format(old_format)
896
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
897
tree = self.make_branch_and_tree('tree', format='knit')
898
self.assertTrue(tree.bzrdir.needs_format_conversion(
901
def test_initialize_on_format_uses_smart_transport(self):
902
self.setup_smart_server_with_call_log()
903
new_format = bzrdir.format_registry.make_bzrdir('dirstate')
904
transport = self.get_transport('target')
905
transport.ensure_base()
906
self.reset_smart_call_log()
907
instance = new_format.initialize_on_transport(transport)
908
self.assertIsInstance(instance, remote.RemoteBzrDir)
909
rpc_count = len(self.hpss_calls)
910
# This figure represent the amount of work to perform this use case. It
911
# is entirely ok to reduce this number if a test fails due to rpc_count
912
# being too low. If rpc_count increases, more network roundtrips have
913
# become necessary for this use case. Please do not adjust this number
914
# upwards without agreement from bzr's network support maintainers.
915
self.assertEqual(2, rpc_count)
771
918
class TestFormat5(TestCaseWithTransport):
772
919
"""Tests specific to the version 5 bzrdir format."""
774
921
def test_same_lockfiles_between_tree_repo_branch(self):
775
# this checks that only a single lockfiles instance is created
922
# this checks that only a single lockfiles instance is created
776
923
# for format 5 objects
777
924
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
778
925
def check_dir_components_use_same_lock(dir):
785
932
# and if we open it normally.
786
933
dir = bzrdir.BzrDir.open(self.get_url())
787
934
check_dir_components_use_same_lock(dir)
789
936
def test_can_convert(self):
790
937
# format 5 dirs are convertable
791
938
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
792
939
self.assertTrue(dir.can_convert_format())
794
941
def test_needs_conversion(self):
795
# format 5 dirs need a conversion if they are not the default.
796
# and they start of not the default.
797
old_format = bzrdir.BzrDirFormat.get_default_format()
798
bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
800
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
801
self.assertFalse(dir.needs_format_conversion())
803
bzrdir.BzrDirFormat._set_default_format(old_format)
804
self.assertTrue(dir.needs_format_conversion())
942
# format 5 dirs need a conversion if they are not the default,
944
dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
945
# don't need to convert it to itself
946
self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
947
# do need to convert it to the current default
948
self.assertTrue(dir.needs_format_conversion(
949
bzrdir.BzrDirFormat.get_default_format()))
807
952
class TestFormat6(TestCaseWithTransport):
808
953
"""Tests specific to the version 6 bzrdir format."""
810
955
def test_same_lockfiles_between_tree_repo_branch(self):
811
# this checks that only a single lockfiles instance is created
956
# this checks that only a single lockfiles instance is created
812
957
# for format 6 objects
813
958
dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
814
959
def check_dir_components_use_same_lock(dir):
952
1093
workingtree.WorkingTreeFormat3)
955
class TestHTTPRedirectionLoop(object):
956
"""Test redirection loop between two http servers.
1096
class TestHTTPRedirections(object):
1097
"""Test redirection between two http servers.
958
1099
This MUST be used by daughter classes that also inherit from
959
1100
TestCaseWithTwoWebservers.
961
1102
We can't inherit directly from TestCaseWithTwoWebservers or the
962
1103
test framework will try to create an instance which cannot
963
run, its implementation being incomplete.
1104
run, its implementation being incomplete.
966
# Should be defined by daughter classes to ensure redirection
967
# still use the same transport implementation (not currently
968
# enforced as it's a bit tricky to get right (see the FIXME
969
# in BzrDir.open_from_transport for the unique use case so
973
1107
def create_transport_readonly_server(self):
974
return HTTPServerRedirecting()
1108
return http_utils.HTTPServerRedirecting()
976
1110
def create_transport_secondary_server(self):
977
return HTTPServerRedirecting()
1111
return http_utils.HTTPServerRedirecting()
979
1113
def setUp(self):
980
# Both servers redirect to each server creating a loop
981
super(TestHTTPRedirectionLoop, self).setUp()
1114
super(TestHTTPRedirections, self).setUp()
982
1115
# The redirections will point to the new server
983
1116
self.new_server = self.get_readonly_server()
984
1117
# The requests to the old server will be redirected
985
1118
self.old_server = self.get_secondary_server()
986
1119
# Configure the redirections
987
1120
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1122
def test_loop(self):
1123
# Both servers redirect to each other creating a loop
988
1124
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
990
def _qualified_url(self, host, port):
991
return 'http+%s://%s:%s' % (self._qualifier, host, port)
994
1125
# Starting from either server should loop
995
old_url = self._qualified_url(self.old_server.host,
1126
old_url = self._qualified_url(self.old_server.host,
996
1127
self.old_server.port)
997
1128
oldt = self._transport(old_url)
998
1129
self.assertRaises(errors.NotBranchError,
999
1130
bzrdir.BzrDir.open_from_transport, oldt)
1000
new_url = self._qualified_url(self.new_server.host,
1131
new_url = self._qualified_url(self.new_server.host,
1001
1132
self.new_server.port)
1002
1133
newt = self._transport(new_url)
1003
1134
self.assertRaises(errors.NotBranchError,
1004
1135
bzrdir.BzrDir.open_from_transport, newt)
1007
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
1008
TestCaseWithTwoWebservers):
1137
def test_qualifier_preserved(self):
1138
wt = self.make_branch_and_tree('branch')
1139
old_url = self._qualified_url(self.old_server.host,
1140
self.old_server.port)
1141
start = self._transport(old_url).clone('branch')
1142
bdir = bzrdir.BzrDir.open_from_transport(start)
1143
# Redirection should preserve the qualifier, hence the transport class
1145
self.assertIsInstance(bdir.root_transport, type(start))
1148
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1149
http_utils.TestCaseWithTwoWebservers):
1009
1150
"""Tests redirections for urllib implementation"""
1011
_qualifier = 'urllib'
1012
1152
_transport = HttpTransport_urllib
1154
def _qualified_url(self, host, port):
1155
return 'http+urllib://%s:%s' % (host, port)
1016
1159
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1017
TestHTTPRedirectionLoop,
1018
TestCaseWithTwoWebservers):
1160
TestHTTPRedirections,
1161
http_utils.TestCaseWithTwoWebservers):
1019
1162
"""Tests redirections for pycurl implementation"""
1021
_qualifier = 'pycurl'
1164
def _qualified_url(self, host, port):
1165
return 'http+pycurl://%s:%s' % (host, port)
1168
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1169
http_utils.TestCaseWithTwoWebservers):
1170
"""Tests redirections for the nosmart decorator"""
1172
_transport = NoSmartTransportDecorator
1174
def _qualified_url(self, host, port):
1175
return 'nosmart+http://%s:%s' % (host, port)
1178
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1179
http_utils.TestCaseWithTwoWebservers):
1180
"""Tests redirections for readonly decoratror"""
1182
_transport = ReadonlyTransportDecorator
1184
def _qualified_url(self, host, port):
1185
return 'readonly+http://%s:%s' % (host, port)
1024
1188
class TestDotBzrHidden(TestCaseWithTransport):
1048
1212
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1049
1213
self.build_tree(['a'])
1050
1214
self.assertEquals(['a'], self.get_ls())
1217
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1218
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1220
def _open(self, transport):
1221
return _TestBzrDir(transport, self)
1224
class _TestBzrDir(bzrdir.BzrDirMeta1):
1225
"""Test BzrDir implementation for TestBzrDirSprout.
1227
When created a _TestBzrDir already has repository and a branch. The branch
1228
is a test double as well.
1231
def __init__(self, *args, **kwargs):
1232
super(_TestBzrDir, self).__init__(*args, **kwargs)
1233
self.test_branch = _TestBranch()
1234
self.test_branch.repository = self.create_repository()
1236
def open_branch(self, unsupported=False):
1237
return self.test_branch
1239
def cloning_metadir(self, require_stacking=False):
1240
return _TestBzrDirFormat()
1243
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1244
"""Test Branch format for TestBzrDirSprout."""
1247
class _TestBranch(bzrlib.branch.Branch):
1248
"""Test Branch implementation for TestBzrDirSprout."""
1250
def __init__(self, *args, **kwargs):
1251
self._format = _TestBranchFormat()
1252
super(_TestBranch, self).__init__(*args, **kwargs)
1256
def sprout(self, *args, **kwargs):
1257
self.calls.append('sprout')
1258
return _TestBranch()
1260
def copy_content_into(self, destination, revision_id=None):
1261
self.calls.append('copy_content_into')
1263
def get_parent(self):
1266
def set_parent(self, parent):
1267
self._parent = parent
1270
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1272
def test_sprout_uses_branch_sprout(self):
1273
"""BzrDir.sprout calls Branch.sprout.
1275
Usually, BzrDir.sprout should delegate to the branch's sprout method
1276
for part of the work. This allows the source branch to control the
1277
choice of format for the new branch.
1279
There are exceptions, but this tests avoids them:
1280
- if there's no branch in the source bzrdir,
1281
- or if the stacking has been requested and the format needs to be
1282
overridden to satisfy that.
1284
# Make an instrumented bzrdir.
1285
t = self.get_transport('source')
1287
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1288
# The instrumented bzrdir has a test_branch attribute that logs calls
1289
# made to the branch contained in that bzrdir. Initially the test
1290
# branch exists but no calls have been made to it.
1291
self.assertEqual([], source_bzrdir.test_branch.calls)
1294
target_url = self.get_url('target')
1295
result = source_bzrdir.sprout(target_url, recurse='no')
1297
# The bzrdir called the branch's sprout method.
1298
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1300
def test_sprout_parent(self):
1301
grandparent_tree = self.make_branch('grandparent')
1302
parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1303
branch_tree = parent.bzrdir.sprout('branch').open_branch()
1304
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1307
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1309
def test_pre_open_called(self):
1311
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1312
transport = self.get_transport('foo')
1313
url = transport.base
1314
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1315
self.assertEqual([transport.base], [t.base for t in calls])
1317
def test_pre_open_actual_exceptions_raised(self):
1319
def fail_once(transport):
1322
raise errors.BzrError("fail")
1323
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1324
transport = self.get_transport('foo')
1325
url = transport.base
1326
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1327
self.assertEqual('fail', err._preformatted_string)