~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

  • Committer: Vincent Ladeuil
  • Date: 2010-01-25 15:55:48 UTC
  • mto: (4985.1.4 add-attr-cleanup)
  • mto: This revision was merged to the branch mainline in revision 4988.
  • Revision ID: v.ladeuil+lp@free.fr-20100125155548-0l352pujvt5bzl5e
Deploy addAttrCleanup on the whole test suite.

Several use case worth mentioning:

- setting a module or any other object attribute is the majority
by far. In some cases the setting itself is deferred but most of
the time we want to set at the same time we add the cleanup.

- there multiple occurrences of protecting hooks or ui factory
which are now useless (the test framework takes care of that now),

- there was some lambda uses that can now be avoided.

That first cleanup already simplifies things a lot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    smart,
37
37
    tests,
38
38
    urlutils,
 
39
    versionedfile,
39
40
    )
40
41
from bzrlib.branch import Branch, BranchReferenceFormat
41
42
import bzrlib.smart.branch
42
43
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
43
44
import bzrlib.smart.packrepository
44
45
import bzrlib.smart.repository
 
46
import bzrlib.smart.vfs
45
47
from bzrlib.smart.request import (
46
48
    FailedSmartServerResponse,
47
49
    SmartServerRequest,
51
53
from bzrlib.tests import (
52
54
    split_suite_by_re,
53
55
    )
54
 
from bzrlib.transport import chroot, get_transport
 
56
from bzrlib.transport import chroot, get_transport, local, memory
55
57
 
56
58
 
57
59
def load_tests(standard_tests, module, loader):
80
82
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
81
83
 
82
84
    def setUp(self):
 
85
        self.vfs_transport_factory = memory.MemoryServer
83
86
        tests.TestCaseWithTransport.setUp(self)
84
87
        self._chroot_server = None
85
88
 
87
90
        if self._chroot_server is None:
88
91
            backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
92
            self._chroot_server = chroot.ChrootServer(backing_transport)
90
 
            self._chroot_server.setUp()
91
 
            self.addCleanup(self._chroot_server.tearDown)
 
93
            self.start_server(self._chroot_server)
92
94
        t = get_transport(self._chroot_server.get_url())
93
95
        if relpath is not None:
94
96
            t = t.clone(relpath)
95
97
        return t
96
98
 
97
99
 
98
 
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
 
100
class TestCaseWithSmartMedium(tests.TestCaseWithMemoryTransport):
99
101
 
100
102
    def setUp(self):
101
103
        super(TestCaseWithSmartMedium, self).setUp()
113
115
        return self.get_transport().get_smart_medium()
114
116
 
115
117
 
 
118
class TestByteStreamToStream(tests.TestCase):
 
119
 
 
120
    def test_repeated_substreams_same_kind_are_one_stream(self):
 
121
        # Make a stream - an iterable of bytestrings.
 
122
        stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
 
123
            None, 'foo')]),('text', [
 
124
            versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
 
125
        fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
 
126
        bytes = smart.repository._stream_to_byte_stream(stream, fmt)
 
127
        streams = []
 
128
        # Iterate the resulting iterable; checking that we get only one stream
 
129
        # out.
 
130
        fmt, stream = smart.repository._byte_stream_to_stream(bytes)
 
131
        for kind, substream in stream:
 
132
            streams.append((kind, list(substream)))
 
133
        self.assertLength(1, streams)
 
134
        self.assertLength(2, streams[0][1])
 
135
 
 
136
 
116
137
class TestSmartServerResponse(tests.TestCase):
117
138
 
118
139
    def test__eq__(self):
150
171
        self.assertRaises(
151
172
            errors.PathNotChild, request.translate_client_path, 'bar/')
152
173
        self.assertEqual('./baz', request.translate_client_path('foo/baz'))
 
174
        e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
 
175
        self.assertEqual('./' + urlutils.escape(e_acute),
 
176
                         request.translate_client_path('foo/' + e_acute))
 
177
 
 
178
    def test_translate_client_path_vfs(self):
 
179
        """VfsRequests receive escaped paths rather than raw UTF-8."""
 
180
        transport = self.get_transport()
 
181
        request = smart.vfs.VfsRequest(transport, 'foo/')
 
182
        e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
 
183
        escaped = urlutils.escape('foo/' + e_acute)
 
184
        self.assertEqual('./' + urlutils.escape(e_acute),
 
185
                         request.translate_client_path(escaped))
153
186
 
154
187
    def test_transport_from_client_path(self):
155
188
        transport = self.get_transport()
202
235
        self.make_bzrdir('.')
203
236
        request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
237
        request = request_class(backing)
205
 
        reference_bzrdir_format = bzrdir.format_registry.get('default')()
 
238
        reference_bzrdir_format = bzrdir.format_registry.get('pack-0.92')()
206
239
        reference_format = reference_bzrdir_format.repository_format
207
240
        network_name = reference_format.network_name()
208
241
        expected = SuccessfulSmartServerResponse(
247
280
            subtrees = 'yes'
248
281
        else:
249
282
            subtrees = 'no'
 
283
        if repo._format.supports_external_lookups:
 
284
            external = 'yes'
 
285
        else:
 
286
            external = 'no'
250
287
        if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
288
            self._request_class):
252
289
            return SuccessfulSmartServerResponse(
253
 
                ('ok', '', rich_root, subtrees, 'no',
 
290
                ('ok', '', rich_root, subtrees, external,
254
291
                 repo._format.network_name()))
255
292
        elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
293
            self._request_class):
257
294
            # All tests so far are on formats, and for non-external
258
295
            # repositories.
259
296
            return SuccessfulSmartServerResponse(
260
 
                ('ok', '', rich_root, subtrees, 'no'))
 
297
                ('ok', '', rich_root, subtrees, external))
261
298
        else:
262
299
            return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
263
300
 
390
427
            'False', 'False', 'False', '', '', '', '', 'False')
391
428
 
392
429
 
 
430
class TestSmartServerRequestOpenBzrDir(tests.TestCaseWithMemoryTransport):
 
431
    
 
432
    def test_no_directory(self):
 
433
        backing = self.get_transport()
 
434
        request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
 
435
        self.assertEqual(SmartServerResponse(('no', )),
 
436
            request.execute('does-not-exist'))
 
437
 
 
438
    def test_empty_directory(self):
 
439
        backing = self.get_transport()
 
440
        backing.mkdir('empty')
 
441
        request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
 
442
        self.assertEqual(SmartServerResponse(('no', )),
 
443
            request.execute('empty'))
 
444
 
 
445
    def test_outside_root_client_path(self):
 
446
        backing = self.get_transport()
 
447
        request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing,
 
448
            root_client_path='root')
 
449
        self.assertEqual(SmartServerResponse(('no', )),
 
450
            request.execute('not-root'))
 
451
 
 
452
    
 
453
class TestSmartServerRequestOpenBzrDir_2_1(tests.TestCaseWithMemoryTransport):
 
454
    
 
455
    def test_no_directory(self):
 
456
        backing = self.get_transport()
 
457
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
458
        self.assertEqual(SmartServerResponse(('no', )),
 
459
            request.execute('does-not-exist'))
 
460
 
 
461
    def test_empty_directory(self):
 
462
        backing = self.get_transport()
 
463
        backing.mkdir('empty')
 
464
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
465
        self.assertEqual(SmartServerResponse(('no', )),
 
466
            request.execute('empty'))
 
467
 
 
468
    def test_present_without_workingtree(self):
 
469
        backing = self.get_transport()
 
470
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
471
        self.make_bzrdir('.')
 
472
        self.assertEqual(SmartServerResponse(('yes', 'no')),
 
473
            request.execute(''))
 
474
 
 
475
    def test_outside_root_client_path(self):
 
476
        backing = self.get_transport()
 
477
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing,
 
478
            root_client_path='root')
 
479
        self.assertEqual(SmartServerResponse(('no',)),
 
480
            request.execute('not-root'))
 
481
 
 
482
    
 
483
class TestSmartServerRequestOpenBzrDir_2_1_disk(TestCaseWithChrootedTransport):
 
484
 
 
485
    def test_present_with_workingtree(self):
 
486
        self.vfs_transport_factory = local.LocalURLServer
 
487
        backing = self.get_transport()
 
488
        request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
 
489
        bd = self.make_bzrdir('.')
 
490
        bd.create_repository()
 
491
        bd.create_branch()
 
492
        bd.create_workingtree()
 
493
        self.assertEqual(SmartServerResponse(('yes', 'yes')),
 
494
            request.execute(''))
 
495
 
 
496
 
393
497
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
394
498
 
395
499
    def test_no_branch(self):
410
514
 
411
515
    def test_branch_reference(self):
412
516
        """When there is a branch reference, the reference URL is returned."""
 
517
        self.vfs_transport_factory = local.LocalURLServer
413
518
        backing = self.get_transport()
414
519
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
415
520
        branch = self.make_branch('branch')
419
524
        self.assertEqual(SmartServerResponse(('ok', reference_url)),
420
525
            request.execute('reference'))
421
526
 
 
527
    def test_notification_on_branch_from_repository(self):
 
528
        """When there is a repository, the error should return details."""
 
529
        backing = self.get_transport()
 
530
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
531
        repo = self.make_repository('.')
 
532
        self.assertEqual(SmartServerResponse(('nobranch',)),
 
533
            request.execute(''))
 
534
 
422
535
 
423
536
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
424
537
 
440
553
 
441
554
    def test_branch_reference(self):
442
555
        """When there is a branch reference, the reference URL is returned."""
443
 
        backing = self.get_transport()
444
 
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
445
 
        branch = self.make_branch('branch')
446
 
        checkout = branch.create_checkout('reference',lightweight=True)
447
 
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
448
 
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
449
 
        self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
450
 
            request.execute('reference'))
451
 
 
452
 
    def test_stacked_branch(self):
453
 
        """Opening a stacked branch does not open the stacked-on branch."""
454
 
        trunk = self.make_branch('trunk')
455
 
        feature = self.make_branch('feature', format='1.9')
456
 
        feature.set_stacked_on_url(trunk.base)
457
 
        opened_branches = []
458
 
        Branch.hooks.install_named_hook('open', opened_branches.append, None)
459
 
        backing = self.get_transport()
460
 
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
461
 
        request.setup_jail()
462
 
        try:
463
 
            response = request.execute('feature')
464
 
        finally:
465
 
            request.teardown_jail()
466
 
        expected_format = feature._format.network_name()
467
 
        self.assertEqual(
468
 
            SuccessfulSmartServerResponse(('branch', expected_format)),
469
 
            response)
470
 
        self.assertLength(1, opened_branches)
 
556
        self.vfs_transport_factory = local.LocalURLServer
 
557
        backing = self.get_transport()
 
558
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
559
        branch = self.make_branch('branch')
 
560
        checkout = branch.create_checkout('reference',lightweight=True)
 
561
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
562
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
563
        self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
 
564
            request.execute('reference'))
 
565
 
 
566
    def test_stacked_branch(self):
 
567
        """Opening a stacked branch does not open the stacked-on branch."""
 
568
        trunk = self.make_branch('trunk')
 
569
        feature = self.make_branch('feature')
 
570
        feature.set_stacked_on_url(trunk.base)
 
571
        opened_branches = []
 
572
        Branch.hooks.install_named_hook('open', opened_branches.append, None)
 
573
        backing = self.get_transport()
 
574
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
575
        request.setup_jail()
 
576
        try:
 
577
            response = request.execute('feature')
 
578
        finally:
 
579
            request.teardown_jail()
 
580
        expected_format = feature._format.network_name()
 
581
        self.assertEqual(
 
582
            SuccessfulSmartServerResponse(('branch', expected_format)),
 
583
            response)
 
584
        self.assertLength(1, opened_branches)
 
585
 
 
586
    def test_notification_on_branch_from_repository(self):
 
587
        """When there is a repository, the error should return details."""
 
588
        backing = self.get_transport()
 
589
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
590
        repo = self.make_repository('.')
 
591
        self.assertEqual(SmartServerResponse(('nobranch',)),
 
592
            request.execute(''))
 
593
 
 
594
 
 
595
class TestSmartServerRequestOpenBranchV3(TestCaseWithChrootedTransport):
 
596
 
 
597
    def test_no_branch(self):
 
598
        """When there is no branch, ('nobranch', ) is returned."""
 
599
        backing = self.get_transport()
 
600
        self.make_bzrdir('.')
 
601
        request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
 
602
        self.assertEqual(SmartServerResponse(('nobranch',)),
 
603
            request.execute(''))
 
604
 
 
605
    def test_branch(self):
 
606
        """When there is a branch, 'ok' is returned."""
 
607
        backing = self.get_transport()
 
608
        expected = self.make_branch('.')._format.network_name()
 
609
        request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
 
610
        self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
 
611
            request.execute(''))
 
612
 
 
613
    def test_branch_reference(self):
 
614
        """When there is a branch reference, the reference URL is returned."""
 
615
        self.vfs_transport_factory = local.LocalURLServer
 
616
        backing = self.get_transport()
 
617
        request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
 
618
        branch = self.make_branch('branch')
 
619
        checkout = branch.create_checkout('reference',lightweight=True)
 
620
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
621
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
622
        self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
 
623
            request.execute('reference'))
 
624
 
 
625
    def test_stacked_branch(self):
 
626
        """Opening a stacked branch does not open the stacked-on branch."""
 
627
        trunk = self.make_branch('trunk')
 
628
        feature = self.make_branch('feature')
 
629
        feature.set_stacked_on_url(trunk.base)
 
630
        opened_branches = []
 
631
        Branch.hooks.install_named_hook('open', opened_branches.append, None)
 
632
        backing = self.get_transport()
 
633
        request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
 
634
        request.setup_jail()
 
635
        try:
 
636
            response = request.execute('feature')
 
637
        finally:
 
638
            request.teardown_jail()
 
639
        expected_format = feature._format.network_name()
 
640
        self.assertEqual(
 
641
            SuccessfulSmartServerResponse(('branch', expected_format)),
 
642
            response)
 
643
        self.assertLength(1, opened_branches)
 
644
 
 
645
    def test_notification_on_branch_from_repository(self):
 
646
        """When there is a repository, the error should return details."""
 
647
        backing = self.get_transport()
 
648
        request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
 
649
        repo = self.make_repository('.')
 
650
        self.assertEqual(
 
651
            SmartServerResponse(('nobranch', 'location is a repository')),
 
652
            request.execute(''))
471
653
 
472
654
 
473
655
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
603
785
        branch.unlock()
604
786
 
605
787
 
 
788
class TestSmartServerBranchRequestSetTagsBytes(TestLockedBranch):
 
789
    # Only called when the branch format and tags match [yay factory
 
790
    # methods] so only need to test straight forward cases.
 
791
 
 
792
    def test_set_bytes(self):
 
793
        base_branch = self.make_branch('base')
 
794
        tag_bytes = base_branch._get_tags_bytes()
 
795
        # get_lock_tokens takes out a lock.
 
796
        branch_token, repo_token = self.get_lock_tokens(base_branch)
 
797
        request = smart.branch.SmartServerBranchSetTagsBytes(
 
798
            self.get_transport())
 
799
        response = request.execute('base', branch_token, repo_token)
 
800
        self.assertEqual(None, response)
 
801
        response = request.do_chunk(tag_bytes)
 
802
        self.assertEqual(None, response)
 
803
        response = request.do_end()
 
804
        self.assertEquals(
 
805
            SuccessfulSmartServerResponse(()), response)
 
806
        base_branch.unlock()
 
807
 
 
808
    def test_lock_failed(self):
 
809
        base_branch = self.make_branch('base')
 
810
        base_branch.lock_write()
 
811
        tag_bytes = base_branch._get_tags_bytes()
 
812
        request = smart.branch.SmartServerBranchSetTagsBytes(
 
813
            self.get_transport())
 
814
        self.assertRaises(errors.TokenMismatch, request.execute,
 
815
            'base', 'wrong token', 'wrong token')
 
816
        # The request handler will keep processing the message parts, so even
 
817
        # if the request fails immediately do_chunk and do_end are still
 
818
        # called.
 
819
        request.do_chunk(tag_bytes)
 
820
        request.do_end()
 
821
        base_branch.unlock()
 
822
 
 
823
 
 
824
 
606
825
class SetLastRevisionTestBase(TestLockedBranch):
607
826
    """Base test case for verbs that implement set_last_revision."""
608
827
 
872
1091
 
873
1092
 
874
1093
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
875
 
# Only called when the branch format and tags match [yay factory
876
 
# methods] so only need to test straight forward cases.
 
1094
    # Only called when the branch format and tags match [yay factory
 
1095
    # methods] so only need to test straight forward cases.
877
1096
 
878
1097
    def test_get_bytes(self):
879
1098
        base_branch = self.make_branch('base')
1205
1424
            SmartServerResponse(('history-incomplete', 2, r2)),
1206
1425
            request.execute('stacked', 1, (3, r3)))
1207
1426
 
 
1427
 
1208
1428
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1209
1429
 
1210
1430
    def make_two_commit_repo(self):
1559
1779
        self.assertEqual(SmartServerResponse(('ok',)), response)
1560
1780
 
1561
1781
 
 
1782
class TestSmartServerVfsGet(tests.TestCaseWithMemoryTransport):
 
1783
 
 
1784
    def test_unicode_path(self):
 
1785
        """VFS requests expect unicode paths to be escaped."""
 
1786
        filename = u'foo\N{INTERROBANG}'
 
1787
        filename_escaped = urlutils.escape(filename)
 
1788
        backing = self.get_transport()
 
1789
        request = smart.vfs.GetRequest(backing)
 
1790
        backing.put_bytes_non_atomic(filename_escaped, 'contents')
 
1791
        self.assertEqual(SmartServerResponse(('ok', ), 'contents'),
 
1792
            request.execute(filename_escaped))
 
1793
 
 
1794
 
1562
1795
class TestHandlers(tests.TestCase):
1563
1796
    """Tests for the request.request_handlers object."""
1564
1797
 
1614
1847
            smart.bzrdir.SmartServerRequestOpenBranch)
1615
1848
        self.assertHandlerEqual('BzrDir.open_branchV2',
1616
1849
            smart.bzrdir.SmartServerRequestOpenBranchV2)
 
1850
        self.assertHandlerEqual('BzrDir.open_branchV3',
 
1851
            smart.bzrdir.SmartServerRequestOpenBranchV3)
1617
1852
        self.assertHandlerEqual('PackRepository.autopack',
1618
1853
            smart.packrepository.SmartServerPackRepositoryAutopack)
1619
1854
        self.assertHandlerEqual('Repository.gather_stats',