113
115
return self.get_transport().get_smart_medium()
118
class TestByteStreamToStream(tests.TestCase):
120
def test_repeated_substreams_same_kind_are_one_stream(self):
121
# Make a stream - an iterable of bytestrings.
122
stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
123
None, 'foo')]),('text', [
124
versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
125
fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
126
bytes = smart.repository._stream_to_byte_stream(stream, fmt)
128
# Iterate the resulting iterable; checking that we get only one stream
130
fmt, stream = smart.repository._byte_stream_to_stream(bytes)
131
for kind, substream in stream:
132
streams.append((kind, list(substream)))
133
self.assertLength(1, streams)
134
self.assertLength(2, streams[0][1])
116
137
class TestSmartServerResponse(tests.TestCase):
118
139
def test__eq__(self):
283
if repo._format.supports_external_lookups:
250
287
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
288
self._request_class):
252
289
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
290
('ok', '', rich_root, subtrees, external,
254
291
repo._format.network_name()))
255
292
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
293
self._request_class):
257
294
# All tests so far are on formats, and for non-external
259
296
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
297
('ok', '', rich_root, subtrees, external))
262
299
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
390
427
'False', 'False', 'False', '', '', '', '', 'False')
430
class TestSmartServerRequestOpenBzrDir(tests.TestCaseWithMemoryTransport):
432
def test_no_directory(self):
433
backing = self.get_transport()
434
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
435
self.assertEqual(SmartServerResponse(('no', )),
436
request.execute('does-not-exist'))
438
def test_empty_directory(self):
439
backing = self.get_transport()
440
backing.mkdir('empty')
441
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
442
self.assertEqual(SmartServerResponse(('no', )),
443
request.execute('empty'))
445
def test_outside_root_client_path(self):
446
backing = self.get_transport()
447
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing,
448
root_client_path='root')
449
self.assertEqual(SmartServerResponse(('no', )),
450
request.execute('not-root'))
453
class TestSmartServerRequestOpenBzrDir_2_1(tests.TestCaseWithMemoryTransport):
455
def test_no_directory(self):
456
backing = self.get_transport()
457
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
458
self.assertEqual(SmartServerResponse(('no', )),
459
request.execute('does-not-exist'))
461
def test_empty_directory(self):
462
backing = self.get_transport()
463
backing.mkdir('empty')
464
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
465
self.assertEqual(SmartServerResponse(('no', )),
466
request.execute('empty'))
468
def test_present_without_workingtree(self):
469
backing = self.get_transport()
470
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
471
self.make_bzrdir('.')
472
self.assertEqual(SmartServerResponse(('yes', 'no')),
475
def test_outside_root_client_path(self):
476
backing = self.get_transport()
477
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing,
478
root_client_path='root')
479
self.assertEqual(SmartServerResponse(('no',)),
480
request.execute('not-root'))
483
class TestSmartServerRequestOpenBzrDir_2_1_disk(TestCaseWithChrootedTransport):
485
def test_present_with_workingtree(self):
486
self.vfs_transport_factory = local.LocalURLServer
487
backing = self.get_transport()
488
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
489
bd = self.make_bzrdir('.')
490
bd.create_repository()
492
bd.create_workingtree()
493
self.assertEqual(SmartServerResponse(('yes', 'yes')),
393
497
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
395
499
def test_no_branch(self):
441
554
def test_branch_reference(self):
442
555
"""When there is a branch reference, the reference URL is returned."""
443
backing = self.get_transport()
444
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
445
branch = self.make_branch('branch')
446
checkout = branch.create_checkout('reference',lightweight=True)
447
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
448
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
449
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
450
request.execute('reference'))
452
def test_stacked_branch(self):
453
"""Opening a stacked branch does not open the stacked-on branch."""
454
trunk = self.make_branch('trunk')
455
feature = self.make_branch('feature', format='1.9')
456
feature.set_stacked_on_url(trunk.base)
458
Branch.hooks.install_named_hook('open', opened_branches.append, None)
459
backing = self.get_transport()
460
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
463
response = request.execute('feature')
465
request.teardown_jail()
466
expected_format = feature._format.network_name()
468
SuccessfulSmartServerResponse(('branch', expected_format)),
470
self.assertLength(1, opened_branches)
556
self.vfs_transport_factory = local.LocalURLServer
557
backing = self.get_transport()
558
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
559
branch = self.make_branch('branch')
560
checkout = branch.create_checkout('reference',lightweight=True)
561
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
562
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
563
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
564
request.execute('reference'))
566
def test_stacked_branch(self):
567
"""Opening a stacked branch does not open the stacked-on branch."""
568
trunk = self.make_branch('trunk')
569
feature = self.make_branch('feature')
570
feature.set_stacked_on_url(trunk.base)
572
Branch.hooks.install_named_hook('open', opened_branches.append, None)
573
backing = self.get_transport()
574
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
577
response = request.execute('feature')
579
request.teardown_jail()
580
expected_format = feature._format.network_name()
582
SuccessfulSmartServerResponse(('branch', expected_format)),
584
self.assertLength(1, opened_branches)
586
def test_notification_on_branch_from_repository(self):
587
"""When there is a repository, the error should return details."""
588
backing = self.get_transport()
589
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
590
repo = self.make_repository('.')
591
self.assertEqual(SmartServerResponse(('nobranch',)),
595
class TestSmartServerRequestOpenBranchV3(TestCaseWithChrootedTransport):
597
def test_no_branch(self):
598
"""When there is no branch, ('nobranch', ) is returned."""
599
backing = self.get_transport()
600
self.make_bzrdir('.')
601
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
602
self.assertEqual(SmartServerResponse(('nobranch',)),
605
def test_branch(self):
606
"""When there is a branch, 'ok' is returned."""
607
backing = self.get_transport()
608
expected = self.make_branch('.')._format.network_name()
609
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
610
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
613
def test_branch_reference(self):
614
"""When there is a branch reference, the reference URL is returned."""
615
self.vfs_transport_factory = local.LocalURLServer
616
backing = self.get_transport()
617
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
618
branch = self.make_branch('branch')
619
checkout = branch.create_checkout('reference',lightweight=True)
620
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
621
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
622
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
623
request.execute('reference'))
625
def test_stacked_branch(self):
626
"""Opening a stacked branch does not open the stacked-on branch."""
627
trunk = self.make_branch('trunk')
628
feature = self.make_branch('feature')
629
feature.set_stacked_on_url(trunk.base)
631
Branch.hooks.install_named_hook('open', opened_branches.append, None)
632
backing = self.get_transport()
633
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
636
response = request.execute('feature')
638
request.teardown_jail()
639
expected_format = feature._format.network_name()
641
SuccessfulSmartServerResponse(('branch', expected_format)),
643
self.assertLength(1, opened_branches)
645
def test_notification_on_branch_from_repository(self):
646
"""When there is a repository, the error should return details."""
647
backing = self.get_transport()
648
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
649
repo = self.make_repository('.')
651
SmartServerResponse(('nobranch', 'location is a repository')),
473
655
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
788
class TestSmartServerBranchRequestSetTagsBytes(TestLockedBranch):
789
# Only called when the branch format and tags match [yay factory
790
# methods] so only need to test straight forward cases.
792
def test_set_bytes(self):
793
base_branch = self.make_branch('base')
794
tag_bytes = base_branch._get_tags_bytes()
795
# get_lock_tokens takes out a lock.
796
branch_token, repo_token = self.get_lock_tokens(base_branch)
797
request = smart.branch.SmartServerBranchSetTagsBytes(
798
self.get_transport())
799
response = request.execute('base', branch_token, repo_token)
800
self.assertEqual(None, response)
801
response = request.do_chunk(tag_bytes)
802
self.assertEqual(None, response)
803
response = request.do_end()
805
SuccessfulSmartServerResponse(()), response)
808
def test_lock_failed(self):
809
base_branch = self.make_branch('base')
810
base_branch.lock_write()
811
tag_bytes = base_branch._get_tags_bytes()
812
request = smart.branch.SmartServerBranchSetTagsBytes(
813
self.get_transport())
814
self.assertRaises(errors.TokenMismatch, request.execute,
815
'base', 'wrong token', 'wrong token')
816
# The request handler will keep processing the message parts, so even
817
# if the request fails immediately do_chunk and do_end are still
819
request.do_chunk(tag_bytes)
606
825
class SetLastRevisionTestBase(TestLockedBranch):
607
826
"""Base test case for verbs that implement set_last_revision."""