~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_remote.py

  • Committer: Jonathan Riddell
  • Date: 2011-09-20 11:01:37 UTC
  • mto: This revision was merged to the branch mainline in revision 6153.
  • Revision ID: jriddell@canonical.com-20110920110137-mrps089ulocx1skd
gettext() in plugin.py and hooks.py

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
30
30
    branch,
31
31
    bzrdir,
32
32
    config,
 
33
    controldir,
33
34
    errors,
34
 
    graph,
 
35
    graph as _mod_graph,
35
36
    inventory,
36
37
    inventory_delta,
37
 
    pack,
38
38
    remote,
39
39
    repository,
40
40
    tests,
 
41
    transport,
41
42
    treebuilder,
42
 
    urlutils,
43
43
    versionedfile,
44
44
    )
45
45
from bzrlib.branch import Branch
46
 
from bzrlib.bzrdir import BzrDir, BzrDirFormat
 
46
from bzrlib.bzrdir import (
 
47
    BzrDir,
 
48
    BzrDirFormat,
 
49
    RemoteBzrProber,
 
50
    )
47
51
from bzrlib.remote import (
48
52
    RemoteBranch,
49
53
    RemoteBranchFormat,
52
56
    RemoteRepository,
53
57
    RemoteRepositoryFormat,
54
58
    )
55
 
from bzrlib.repofmt import groupcompress_repo, pack_repo
 
59
from bzrlib.repofmt import groupcompress_repo, knitpack_repo
56
60
from bzrlib.revision import NULL_REVISION
57
 
from bzrlib.smart import medium
 
61
from bzrlib.smart import medium, request
58
62
from bzrlib.smart.client import _SmartClient
59
 
from bzrlib.smart.repository import SmartServerRepositoryGetParentMap
 
63
from bzrlib.smart.repository import (
 
64
    SmartServerRepositoryGetParentMap,
 
65
    SmartServerRepositoryGetStream_1_19,
 
66
    )
60
67
from bzrlib.tests import (
61
 
    condition_isinstance,
62
 
    split_suite_by_condition,
63
 
    multiply_tests,
64
68
    test_server,
65
69
    )
66
 
from bzrlib.transport import get_transport
 
70
from bzrlib.tests.scenarios import load_tests_apply_scenarios
67
71
from bzrlib.transport.memory import MemoryTransport
68
72
from bzrlib.transport.remote import (
69
73
    RemoteTransport,
70
74
    RemoteSSHTransport,
71
75
    RemoteTCPTransport,
72
 
)
73
 
 
74
 
def load_tests(standard_tests, module, loader):
75
 
    to_adapt, result = split_suite_by_condition(
76
 
        standard_tests, condition_isinstance(BasicRemoteObjectTests))
77
 
    smart_server_version_scenarios = [
 
76
    )
 
77
 
 
78
 
 
79
load_tests = load_tests_apply_scenarios
 
80
 
 
81
 
 
82
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
 
83
 
 
84
    scenarios = [
78
85
        ('HPSS-v2',
79
 
         {'transport_server': test_server.SmartTCPServer_for_testing_v2_only}),
 
86
            {'transport_server': test_server.SmartTCPServer_for_testing_v2_only}),
80
87
        ('HPSS-v3',
81
 
         {'transport_server': test_server.SmartTCPServer_for_testing})]
82
 
    return multiply_tests(to_adapt, smart_server_version_scenarios, result)
83
 
 
84
 
 
85
 
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
 
88
            {'transport_server': test_server.SmartTCPServer_for_testing})]
 
89
 
86
90
 
87
91
    def setUp(self):
88
92
        super(BasicRemoteObjectTests, self).setUp()
89
93
        self.transport = self.get_transport()
90
94
        # make a branch that can be opened over the smart transport
91
95
        self.local_wt = BzrDir.create_standalone_workingtree('.')
92
 
 
93
 
    def tearDown(self):
94
 
        self.transport.disconnect()
95
 
        tests.TestCaseWithTransport.tearDown(self)
 
96
        self.addCleanup(self.transport.disconnect)
96
97
 
97
98
    def test_create_remote_bzrdir(self):
98
 
        b = remote.RemoteBzrDir(self.transport, remote.RemoteBzrDirFormat())
 
99
        b = remote.RemoteBzrDir(self.transport, RemoteBzrDirFormat())
99
100
        self.assertIsInstance(b, BzrDir)
100
101
 
101
102
    def test_open_remote_branch(self):
102
103
        # open a standalone branch in the working directory
103
 
        b = remote.RemoteBzrDir(self.transport, remote.RemoteBzrDirFormat())
 
104
        b = remote.RemoteBzrDir(self.transport, RemoteBzrDirFormat())
104
105
        branch = b.open_branch()
105
106
        self.assertIsInstance(branch, Branch)
106
107
 
122
123
    def test_find_correct_format(self):
123
124
        """Should open a RemoteBzrDir over a RemoteTransport"""
124
125
        fmt = BzrDirFormat.find_format(self.transport)
125
 
        self.assertTrue(RemoteBzrDirFormat
126
 
                        in BzrDirFormat._control_server_formats)
127
 
        self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
 
126
        self.assertTrue(bzrdir.RemoteBzrProber
 
127
                        in controldir.ControlDirFormat._server_probers)
 
128
        self.assertIsInstance(fmt, RemoteBzrDirFormat)
128
129
 
129
130
    def test_open_detected_smart_format(self):
130
131
        fmt = BzrDirFormat.find_format(self.transport)
359
360
        a given client_base and transport_base.
360
361
        """
361
362
        client_medium = medium.SmartClientMedium(client_base)
362
 
        transport = get_transport(transport_base)
363
 
        result = client_medium.remote_path_from_transport(transport)
 
363
        t = transport.get_transport(transport_base)
 
364
        result = client_medium.remote_path_from_transport(t)
364
365
        self.assertEqual(expected, result)
365
366
 
366
367
    def test_remote_path_from_transport(self):
377
378
        a given transport_base and relpath of that transport.  (Note that
378
379
        HttpTransportBase is a subclass of SmartClientMedium)
379
380
        """
380
 
        base_transport = get_transport(transport_base)
 
381
        base_transport = transport.get_transport(transport_base)
381
382
        client_medium = base_transport.get_smart_medium()
382
383
        cloned_transport = base_transport.clone(relpath)
383
384
        result = client_medium.remote_path_from_transport(cloned_transport)
450
451
        client.add_expected_call(
451
452
            'BzrDir.open_branchV3', ('quack/',),
452
453
            'success', ('ref', self.get_url('referenced'))),
453
 
        a_bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
454
        a_bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
454
455
            _client=client)
455
456
        result = a_bzrdir.cloning_metadir()
456
457
        # We should have got a control dir matching the referenced branch.
469
470
        client.add_expected_call(
470
471
            'BzrDir.cloning_metadir', ('quack/', 'False'),
471
472
            'success', (control_name, '', ('branch', ''))),
472
 
        a_bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
473
        a_bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
473
474
            _client=client)
474
475
        result = a_bzrdir.cloning_metadir()
475
476
        # We should have got a reference control dir with default branch and
495
496
        client.add_expected_call(
496
497
            'BzrDir.open_2.1', ('quack/',), 'success', ('no',))
497
498
        self.assertRaises(errors.NotBranchError, RemoteBzrDir, transport,
498
 
                remote.RemoteBzrDirFormat(), _client=client, _force_probe=True)
 
499
                RemoteBzrDirFormat(), _client=client, _force_probe=True)
499
500
        self.assertFinished(client)
500
501
 
501
502
    def test_present_without_workingtree(self):
502
503
        client, transport = self.make_fake_client_and_transport()
503
504
        client.add_expected_call(
504
505
            'BzrDir.open_2.1', ('quack/',), 'success', ('yes', 'no'))
505
 
        bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
506
        bd = RemoteBzrDir(transport, RemoteBzrDirFormat(),
506
507
            _client=client, _force_probe=True)
507
508
        self.assertIsInstance(bd, RemoteBzrDir)
508
509
        self.assertFalse(bd.has_workingtree())
513
514
        client, transport = self.make_fake_client_and_transport()
514
515
        client.add_expected_call(
515
516
            'BzrDir.open_2.1', ('quack/',), 'success', ('yes', 'yes'))
516
 
        bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
517
        bd = RemoteBzrDir(transport, RemoteBzrDirFormat(),
517
518
            _client=client, _force_probe=True)
518
519
        self.assertIsInstance(bd, RemoteBzrDir)
519
520
        self.assertTrue(bd.has_workingtree())
526
527
            'BzrDir.open_2.1', ('quack/',), 'unknown', ('BzrDir.open_2.1',))
527
528
        client.add_expected_call(
528
529
            'BzrDir.open', ('quack/',), 'success', ('yes',))
529
 
        bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
530
        bd = RemoteBzrDir(transport, RemoteBzrDirFormat(),
530
531
            _client=client, _force_probe=True)
531
532
        self.assertIsInstance(bd, RemoteBzrDir)
532
533
        self.assertFinished(client)
548
549
            'BzrDir.open_2.1', ('quack/',), 'unknown', ('BzrDir.open_2.1',))
549
550
        client.add_expected_call(
550
551
            'BzrDir.open', ('quack/',), 'success', ('yes',))
551
 
        bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
552
        bd = RemoteBzrDir(transport, RemoteBzrDirFormat(),
552
553
            _client=client, _force_probe=True)
553
554
        self.assertIsInstance(bd, RemoteBzrDir)
554
555
        self.assertFinished(client)
585
586
        client.add_expected_call(
586
587
            'Branch.get_stacked_on_url', ('quack/',),
587
588
            'error', ('NotStacked',))
588
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
589
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
589
590
            _client=client)
590
591
        result = bzrdir.open_branch()
591
592
        self.assertIsInstance(result, RemoteBranch)
598
599
        transport = transport.clone('quack')
599
600
        client = FakeClient(transport.base)
600
601
        client.add_error_response('nobranch')
601
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
602
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
602
603
            _client=client)
603
604
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
604
605
        self.assertEqual(
609
610
        # _get_tree_branch is a form of open_branch, but it should only ask for
610
611
        # branch opening, not any other network requests.
611
612
        calls = []
612
 
        def open_branch():
 
613
        def open_branch(name=None):
613
614
            calls.append("Called")
614
615
            return "a-branch"
615
616
        transport = MemoryTransport()
616
617
        # no requests on the network - catches other api calls being made.
617
618
        client = FakeClient(transport.base)
618
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
619
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
619
620
            _client=client)
620
621
        # patch the open_branch call to record that it was called.
621
622
        bzrdir.open_branch = open_branch
640
641
        client.add_expected_call(
641
642
            'Branch.get_stacked_on_url', ('~hello/',),
642
643
            'error', ('NotStacked',))
643
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
644
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
644
645
            _client=client)
645
646
        result = bzrdir.open_branch()
646
647
        self.assertFinished(client)
663
664
        client.add_success_response(
664
665
            'ok', '', rich_response, subtree_response, external_lookup,
665
666
            network_name)
666
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
667
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
667
668
            _client=client)
668
669
        result = bzrdir.open_repository()
669
670
        self.assertEqual(
686
687
        old.
687
688
        """
688
689
        self.assertRaises(errors.NotBranchError,
689
 
            RemoteBzrDirFormat.probe_transport, OldServerTransport())
 
690
            RemoteBzrProber.probe_transport, OldServerTransport())
690
691
 
691
692
 
692
693
class TestBzrDirCreateBranch(TestRemote):
715
716
            'BzrDir.create_branch', ('quack/', network_name),
716
717
            'success', ('ok', network_name, '', 'no', 'no', 'yes',
717
718
            reference_repo_name))
718
 
        a_bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
719
        a_bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
719
720
            _client=client)
720
721
        branch = a_bzrdir.create_branch()
721
722
        # We should have got a remote branch
724
725
        format = branch._format
725
726
        self.assertEqual(network_name, format.network_name())
726
727
 
 
728
    def test_already_open_repo_and_reused_medium(self):
 
729
        """Bug 726584: create_branch(..., repository=repo) should work
 
730
        regardless of what the smart medium's base URL is.
 
731
        """
 
732
        self.transport_server = test_server.SmartTCPServer_for_testing
 
733
        transport = self.get_transport('.')
 
734
        repo = self.make_repository('quack')
 
735
        # Client's medium rooted a transport root (not at the bzrdir)
 
736
        client = FakeClient(transport.base)
 
737
        transport = transport.clone('quack')
 
738
        reference_bzrdir_format = bzrdir.format_registry.get('default')()
 
739
        reference_format = reference_bzrdir_format.get_branch_format()
 
740
        network_name = reference_format.network_name()
 
741
        reference_repo_fmt = reference_bzrdir_format.repository_format
 
742
        reference_repo_name = reference_repo_fmt.network_name()
 
743
        client.add_expected_call(
 
744
            'BzrDir.create_branch', ('extra/quack/', network_name),
 
745
            'success', ('ok', network_name, '', 'no', 'no', 'yes',
 
746
            reference_repo_name))
 
747
        a_bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
 
748
            _client=client)
 
749
        branch = a_bzrdir.create_branch(repository=repo)
 
750
        # We should have got a remote branch
 
751
        self.assertIsInstance(branch, remote.RemoteBranch)
 
752
        # its format should have the settings from the response
 
753
        format = branch._format
 
754
        self.assertEqual(network_name, format.network_name())
 
755
 
727
756
 
728
757
class TestBzrDirCreateRepository(TestRemote):
729
758
 
750
779
                'Bazaar repository format 2a (needs bzr 1.16 or later)\n',
751
780
                'False'),
752
781
            'success', ('ok', 'yes', 'yes', 'yes', network_name))
753
 
        a_bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
782
        a_bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
754
783
            _client=client)
755
784
        repo = a_bzrdir.create_repository()
756
785
        # We should have got a remote repository
785
814
        client.add_success_response('stat', '0', '65535')
786
815
        remote_transport = RemoteTransport(server_url + 'quack/', medium=False,
787
816
            _client=client)
788
 
        bzrdir = RemoteBzrDir(remote_transport, remote.RemoteBzrDirFormat(),
 
817
        bzrdir = RemoteBzrDir(remote_transport, RemoteBzrDirFormat(),
789
818
            _client=client)
790
819
        repo = bzrdir.open_repository()
791
820
        self.assertEqual(
818
847
        client.add_success_response('stat', '0', '65535')
819
848
        remote_transport = RemoteTransport(server_url + 'quack/', medium=False,
820
849
            _client=client)
821
 
        bzrdir = RemoteBzrDir(remote_transport, remote.RemoteBzrDirFormat(),
 
850
        bzrdir = RemoteBzrDir(remote_transport, RemoteBzrDirFormat(),
822
851
            _client=client)
823
852
        repo = bzrdir.open_repository()
824
853
        self.assertEqual(
839
868
        transport = transport.clone('quack')
840
869
        client = FakeClient(transport.base)
841
870
        client.add_success_response('ok', '', 'no', 'no', 'no', network_name)
842
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
871
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
843
872
            _client=client)
844
873
        repo = bzrdir.open_repository()
845
874
        self.assertEqual(
852
881
 
853
882
    def test_success(self):
854
883
        """Simple test for typical successful call."""
855
 
        fmt = bzrdir.RemoteBzrDirFormat()
 
884
        fmt = RemoteBzrDirFormat()
856
885
        default_format_name = BzrDirFormat.get_default_format().network_name()
857
886
        transport = self.get_transport()
858
887
        client = FakeClient(transport.base)
874
903
        """Error responses are translated, e.g. 'PermissionDenied' raises the
875
904
        corresponding error from the client.
876
905
        """
877
 
        fmt = bzrdir.RemoteBzrDirFormat()
 
906
        fmt = RemoteBzrDirFormat()
878
907
        default_format_name = BzrDirFormat.get_default_format().network_name()
879
908
        transport = self.get_transport()
880
909
        client = FakeClient(transport.base)
898
927
        """Integration test for error translation."""
899
928
        transport = self.make_smart_server('foo')
900
929
        transport = transport.clone('no-such-path')
901
 
        fmt = bzrdir.RemoteBzrDirFormat()
 
930
        fmt = RemoteBzrDirFormat()
902
931
        err = self.assertRaises(errors.NoSuchFile,
903
932
            fmt.initialize_on_transport_ex, transport, create_prefix=False)
904
933
 
935
964
 
936
965
    def make_remote_bzrdir(self, transport, client):
937
966
        """Make a RemotebzrDir using 'client' as the _client."""
938
 
        return RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
967
        return RemoteBzrDir(transport, RemoteBzrDirFormat(),
939
968
            _client=client)
940
969
 
941
970
 
1143
1172
            [('set_tags_bytes', 'tags bytes')] * 2, real_branch.calls)
1144
1173
 
1145
1174
 
 
1175
class TestBranchHeadsToFetch(RemoteBranchTestCase):
 
1176
 
 
1177
    def test_uses_last_revision_info_and_tags_by_default(self):
 
1178
        transport = MemoryTransport()
 
1179
        client = FakeClient(transport.base)
 
1180
        client.add_expected_call(
 
1181
            'Branch.get_stacked_on_url', ('quack/',),
 
1182
            'error', ('NotStacked',))
 
1183
        client.add_expected_call(
 
1184
            'Branch.last_revision_info', ('quack/',),
 
1185
            'success', ('ok', '1', 'rev-tip'))
 
1186
        client.add_expected_call(
 
1187
            'Branch.get_config_file', ('quack/',),
 
1188
            'success', ('ok',), '')
 
1189
        transport.mkdir('quack')
 
1190
        transport = transport.clone('quack')
 
1191
        branch = self.make_remote_branch(transport, client)
 
1192
        result = branch.heads_to_fetch()
 
1193
        self.assertFinished(client)
 
1194
        self.assertEqual((set(['rev-tip']), set()), result)
 
1195
 
 
1196
    def test_uses_last_revision_info_and_tags_when_set(self):
 
1197
        transport = MemoryTransport()
 
1198
        client = FakeClient(transport.base)
 
1199
        client.add_expected_call(
 
1200
            'Branch.get_stacked_on_url', ('quack/',),
 
1201
            'error', ('NotStacked',))
 
1202
        client.add_expected_call(
 
1203
            'Branch.last_revision_info', ('quack/',),
 
1204
            'success', ('ok', '1', 'rev-tip'))
 
1205
        client.add_expected_call(
 
1206
            'Branch.get_config_file', ('quack/',),
 
1207
            'success', ('ok',), 'branch.fetch_tags = True')
 
1208
        # XXX: this will break if the default format's serialization of tags
 
1209
        # changes, or if the RPC for fetching tags changes from get_tags_bytes.
 
1210
        client.add_expected_call(
 
1211
            'Branch.get_tags_bytes', ('quack/',),
 
1212
            'success', ('d5:tag-17:rev-foo5:tag-27:rev-bare',))
 
1213
        transport.mkdir('quack')
 
1214
        transport = transport.clone('quack')
 
1215
        branch = self.make_remote_branch(transport, client)
 
1216
        result = branch.heads_to_fetch()
 
1217
        self.assertFinished(client)
 
1218
        self.assertEqual(
 
1219
            (set(['rev-tip']), set(['rev-foo', 'rev-bar'])), result)
 
1220
 
 
1221
    def test_uses_rpc_for_formats_with_non_default_heads_to_fetch(self):
 
1222
        transport = MemoryTransport()
 
1223
        client = FakeClient(transport.base)
 
1224
        client.add_expected_call(
 
1225
            'Branch.get_stacked_on_url', ('quack/',),
 
1226
            'error', ('NotStacked',))
 
1227
        client.add_expected_call(
 
1228
            'Branch.heads_to_fetch', ('quack/',),
 
1229
            'success', (['tip'], ['tagged-1', 'tagged-2']))
 
1230
        transport.mkdir('quack')
 
1231
        transport = transport.clone('quack')
 
1232
        branch = self.make_remote_branch(transport, client)
 
1233
        branch._format._use_default_local_heads_to_fetch = lambda: False
 
1234
        result = branch.heads_to_fetch()
 
1235
        self.assertFinished(client)
 
1236
        self.assertEqual((set(['tip']), set(['tagged-1', 'tagged-2'])), result)
 
1237
 
 
1238
    def make_branch_with_tags(self):
 
1239
        self.setup_smart_server_with_call_log()
 
1240
        # Make a branch with a single revision.
 
1241
        builder = self.make_branch_builder('foo')
 
1242
        builder.start_series()
 
1243
        builder.build_snapshot('tip', None, [
 
1244
            ('add', ('', 'root-id', 'directory', ''))])
 
1245
        builder.finish_series()
 
1246
        branch = builder.get_branch()
 
1247
        # Add two tags to that branch
 
1248
        branch.tags.set_tag('tag-1', 'rev-1')
 
1249
        branch.tags.set_tag('tag-2', 'rev-2')
 
1250
        return branch
 
1251
 
 
1252
    def test_backwards_compatible(self):
 
1253
        branch = self.make_branch_with_tags()
 
1254
        c = branch.get_config()
 
1255
        c.set_user_option('branch.fetch_tags', 'True')
 
1256
        self.addCleanup(branch.lock_read().unlock)
 
1257
        # Disable the heads_to_fetch verb
 
1258
        verb = 'Branch.heads_to_fetch'
 
1259
        self.disable_verb(verb)
 
1260
        self.reset_smart_call_log()
 
1261
        result = branch.heads_to_fetch()
 
1262
        self.assertEqual((set(['tip']), set(['rev-1', 'rev-2'])), result)
 
1263
        self.assertEqual(
 
1264
            ['Branch.last_revision_info', 'Branch.get_config_file',
 
1265
             'Branch.get_tags_bytes'],
 
1266
            [call.call.method for call in self.hpss_calls])
 
1267
 
 
1268
    def test_backwards_compatible_no_tags(self):
 
1269
        branch = self.make_branch_with_tags()
 
1270
        c = branch.get_config()
 
1271
        c.set_user_option('branch.fetch_tags', 'False')
 
1272
        self.addCleanup(branch.lock_read().unlock)
 
1273
        # Disable the heads_to_fetch verb
 
1274
        verb = 'Branch.heads_to_fetch'
 
1275
        self.disable_verb(verb)
 
1276
        self.reset_smart_call_log()
 
1277
        result = branch.heads_to_fetch()
 
1278
        self.assertEqual((set(['tip']), set()), result)
 
1279
        self.assertEqual(
 
1280
            ['Branch.last_revision_info', 'Branch.get_config_file'],
 
1281
            [call.call.method for call in self.hpss_calls])
 
1282
 
 
1283
 
1146
1284
class TestBranchLastRevisionInfo(RemoteBranchTestCase):
1147
1285
 
1148
1286
    def test_empty_branch(self):
1203
1341
        client.add_expected_call(
1204
1342
            'Branch.get_stacked_on_url', ('stacked/',),
1205
1343
            'success', ('ok', vfs_url))
1206
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
1344
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
1207
1345
            _client=client)
1208
1346
        repo_fmt = remote.RemoteRepositoryFormat()
1209
1347
        repo_fmt._custom_format = stacked_branch.repository._format
1236
1374
        # this will also do vfs access, but that goes direct to the transport
1237
1375
        # and isn't seen by the FakeClient.
1238
1376
        bzrdir = RemoteBzrDir(self.get_transport('stacked'),
1239
 
            remote.RemoteBzrDirFormat(), _client=client)
 
1377
            RemoteBzrDirFormat(), _client=client)
1240
1378
        branch = bzrdir.open_branch()
1241
1379
        result = branch.get_stacked_on_url()
1242
1380
        self.assertEqual('../base', result)
1269
1407
            'Branch.get_stacked_on_url', ('stacked/',),
1270
1408
            'success', ('ok', '../base'))
1271
1409
        bzrdir = RemoteBzrDir(self.get_transport('stacked'),
1272
 
            remote.RemoteBzrDirFormat(), _client=client)
 
1410
            RemoteBzrDirFormat(), _client=client)
1273
1411
        branch = bzrdir.open_branch()
1274
1412
        result = branch.get_stacked_on_url()
1275
1413
        self.assertEqual('../base', result)
1283
1421
class TestBranchSetLastRevision(RemoteBranchTestCase):
1284
1422
 
1285
1423
    def test_set_empty(self):
1286
 
        # set_revision_history([]) is translated to calling
 
1424
        # _set_last_revision_info('null:') is translated to calling
1287
1425
        # Branch.set_last_revision(path, '') on the wire.
1288
1426
        transport = MemoryTransport()
1289
1427
        transport.mkdir('branch')
1311
1449
        # unnecessarily invokes _ensure_real upon a call to lock_write.
1312
1450
        branch._ensure_real = lambda: None
1313
1451
        branch.lock_write()
1314
 
        result = branch.set_revision_history([])
 
1452
        result = branch._set_last_revision(NULL_REVISION)
1315
1453
        branch.unlock()
1316
1454
        self.assertEqual(None, result)
1317
1455
        self.assertFinished(client)
1318
1456
 
1319
1457
    def test_set_nonempty(self):
1320
 
        # set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
 
1458
        # set_last_revision_info(N, rev-idN) is translated to calling
1321
1459
        # Branch.set_last_revision(path, rev-idN) on the wire.
1322
1460
        transport = MemoryTransport()
1323
1461
        transport.mkdir('branch')
1349
1487
        branch._ensure_real = lambda: None
1350
1488
        # Lock the branch, reset the record of remote calls.
1351
1489
        branch.lock_write()
1352
 
        result = branch.set_revision_history(['rev-id1', 'rev-id2'])
 
1490
        result = branch._set_last_revision('rev-id2')
1353
1491
        branch.unlock()
1354
1492
        self.assertEqual(None, result)
1355
1493
        self.assertFinished(client)
1385
1523
        branch = self.make_remote_branch(transport, client)
1386
1524
        branch.lock_write()
1387
1525
        self.assertRaises(
1388
 
            errors.NoSuchRevision, branch.set_revision_history, ['rev-id'])
 
1526
            errors.NoSuchRevision, branch._set_last_revision, 'rev-id')
1389
1527
        branch.unlock()
1390
1528
        self.assertFinished(client)
1391
1529
 
1422
1560
        branch._ensure_real = lambda: None
1423
1561
        branch.lock_write()
1424
1562
        # The 'TipChangeRejected' error response triggered by calling
1425
 
        # set_revision_history causes a TipChangeRejected exception.
 
1563
        # set_last_revision_info causes a TipChangeRejected exception.
1426
1564
        err = self.assertRaises(
1427
 
            errors.TipChangeRejected, branch.set_revision_history, ['rev-id'])
 
1565
            errors.TipChangeRejected,
 
1566
            branch._set_last_revision, 'rev-id')
1428
1567
        # The UTF-8 message from the response has been decoded into a unicode
1429
1568
        # object.
1430
1569
        self.assertIsInstance(err.msg, unicode)
1618
1757
    def test_get_multi_line_branch_conf(self):
1619
1758
        # Make sure that multiple-line branch.conf files are supported
1620
1759
        #
1621
 
        # https://bugs.edge.launchpad.net/bzr/+bug/354075
 
1760
        # https://bugs.launchpad.net/bzr/+bug/354075
1622
1761
        client = FakeClient()
1623
1762
        client.add_expected_call(
1624
1763
            'Branch.get_stacked_on_url', ('memory:///',),
1652
1791
        branch.unlock()
1653
1792
        self.assertFinished(client)
1654
1793
 
 
1794
    def test_set_option_with_dict(self):
 
1795
        client = FakeClient()
 
1796
        client.add_expected_call(
 
1797
            'Branch.get_stacked_on_url', ('memory:///',),
 
1798
            'error', ('NotStacked',),)
 
1799
        client.add_expected_call(
 
1800
            'Branch.lock_write', ('memory:///', '', ''),
 
1801
            'success', ('ok', 'branch token', 'repo token'))
 
1802
        encoded_dict_value = 'd5:ascii1:a11:unicode \xe2\x8c\x9a3:\xe2\x80\xbde'
 
1803
        client.add_expected_call(
 
1804
            'Branch.set_config_option_dict', ('memory:///', 'branch token',
 
1805
            'repo token', encoded_dict_value, 'foo', ''),
 
1806
            'success', ())
 
1807
        client.add_expected_call(
 
1808
            'Branch.unlock', ('memory:///', 'branch token', 'repo token'),
 
1809
            'success', ('ok',))
 
1810
        transport = MemoryTransport()
 
1811
        branch = self.make_remote_branch(transport, client)
 
1812
        branch.lock_write()
 
1813
        config = branch._get_config()
 
1814
        config.set_option(
 
1815
            {'ascii': 'a', u'unicode \N{WATCH}': u'\N{INTERROBANG}'},
 
1816
            'foo')
 
1817
        branch.unlock()
 
1818
        self.assertFinished(client)
 
1819
 
1655
1820
    def test_backwards_compat_set_option(self):
1656
1821
        self.setup_smart_server_with_call_log()
1657
1822
        branch = self.make_branch('.')
1664
1829
        self.assertLength(10, self.hpss_calls)
1665
1830
        self.assertEqual('value', branch._get_config().get_option('name'))
1666
1831
 
 
1832
    def test_backwards_compat_set_option_with_dict(self):
 
1833
        self.setup_smart_server_with_call_log()
 
1834
        branch = self.make_branch('.')
 
1835
        verb = 'Branch.set_config_option_dict'
 
1836
        self.disable_verb(verb)
 
1837
        branch.lock_write()
 
1838
        self.addCleanup(branch.unlock)
 
1839
        self.reset_smart_call_log()
 
1840
        config = branch._get_config()
 
1841
        value_dict = {'ascii': 'a', u'unicode \N{WATCH}': u'\N{INTERROBANG}'}
 
1842
        config.set_option(value_dict, 'name')
 
1843
        self.assertLength(10, self.hpss_calls)
 
1844
        self.assertEqual(value_dict, branch._get_config().get_option('name'))
 
1845
 
1667
1846
 
1668
1847
class TestBranchLockWrite(RemoteBranchTestCase):
1669
1848
 
1805
1984
        client = FakeClient(transport.base)
1806
1985
        transport = transport.clone(transport_path)
1807
1986
        # we do not want bzrdir to make any remote calls
1808
 
        bzrdir = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
 
1987
        bzrdir = RemoteBzrDir(transport, RemoteBzrDirFormat(),
1809
1988
            _client=False)
1810
1989
        repo = RemoteRepository(bzrdir, None, _client=client)
1811
1990
        return repo, client
1819
1998
 
1820
1999
    def test_get_format_description(self):
1821
2000
        remote_format = RemoteBranchFormat()
1822
 
        real_format = branch.BranchFormat.get_default_format()
 
2001
        real_format = branch.format_registry.get_default()
1823
2002
        remote_format._network_name = real_format.network_name()
1824
2003
        self.assertEqual(remoted_description(real_format),
1825
2004
            remote_format.get_format_description())
1828
2007
class TestRepositoryFormat(TestRemoteRepository):
1829
2008
 
1830
2009
    def test_fast_delta(self):
1831
 
        true_name = groupcompress_repo.RepositoryFormatCHK1().network_name()
 
2010
        true_name = groupcompress_repo.RepositoryFormat2a().network_name()
1832
2011
        true_format = RemoteRepositoryFormat()
1833
2012
        true_format._network_name = true_name
1834
2013
        self.assertEqual(True, true_format.fast_deltas)
1835
 
        false_name = pack_repo.RepositoryFormatKnitPack1().network_name()
 
2014
        false_name = knitpack_repo.RepositoryFormatKnitPack1().network_name()
1836
2015
        false_format = RemoteRepositoryFormat()
1837
2016
        false_format._network_name = false_name
1838
2017
        self.assertEqual(False, false_format.fast_deltas)
1839
2018
 
1840
2019
    def test_get_format_description(self):
1841
2020
        remote_repo_format = RemoteRepositoryFormat()
1842
 
        real_format = repository.RepositoryFormat.get_default_format()
 
2021
        real_format = repository.format_registry.get_default()
1843
2022
        remote_repo_format._network_name = real_format.network_name()
1844
2023
        self.assertEqual(remoted_description(real_format),
1845
2024
            remote_repo_format.get_format_description())
1968
2147
        parents = repo.get_parent_map([rev_id])
1969
2148
        self.assertEqual(
1970
2149
            [('call_with_body_bytes_expecting_body',
1971
 
              'Repository.get_parent_map', ('quack/', 'include-missing:',
1972
 
              rev_id), '\n\n0'),
 
2150
              'Repository.get_parent_map',
 
2151
              ('quack/', 'include-missing:', rev_id), '\n\n0'),
1973
2152
             ('disconnect medium',),
1974
2153
             ('call_expecting_body', 'Repository.get_revision_graph',
1975
2154
              ('quack/', ''))],
2095
2274
        self.assertEqual({}, repo.get_parent_map(['non-existant']))
2096
2275
        self.assertLength(0, self.hpss_calls)
2097
2276
 
 
2277
    def test_exposes_get_cached_parent_map(self):
 
2278
        """RemoteRepository exposes get_cached_parent_map from
 
2279
        _unstacked_provider
 
2280
        """
 
2281
        r1 = u'\u0e33'.encode('utf8')
 
2282
        r2 = u'\u0dab'.encode('utf8')
 
2283
        lines = [' '.join([r2, r1]), r1]
 
2284
        encoded_body = bz2.compress('\n'.join(lines))
 
2285
 
 
2286
        transport_path = 'quack'
 
2287
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2288
        client.add_success_response_with_body(encoded_body, 'ok')
 
2289
        repo.lock_read()
 
2290
        # get_cached_parent_map should *not* trigger an RPC
 
2291
        self.assertEqual({}, repo.get_cached_parent_map([r1]))
 
2292
        self.assertEqual([], client._calls)
 
2293
        self.assertEqual({r2: (r1,)}, repo.get_parent_map([r2]))
 
2294
        self.assertEqual({r1: (NULL_REVISION,)},
 
2295
            repo.get_cached_parent_map([r1]))
 
2296
        self.assertEqual(
 
2297
            [('call_with_body_bytes_expecting_body',
 
2298
              'Repository.get_parent_map', ('quack/', 'include-missing:', r2),
 
2299
              '\n\n0')],
 
2300
            client._calls)
 
2301
        repo.unlock()
 
2302
 
2098
2303
 
2099
2304
class TestGetParentMapAllowsNew(tests.TestCaseWithTransport):
2100
2305
 
2257
2462
        self.setup_smart_server_with_call_log()
2258
2463
        tree = self.make_branch_and_memory_tree('.')
2259
2464
        tree.lock_write()
 
2465
        tree.add('')
2260
2466
        rev1 = tree.commit('First')
2261
2467
        rev2 = tree.commit('Second')
2262
2468
        tree.unlock()
2301
2507
        transport_path = 'quack'
2302
2508
        repo, client = self.setup_fake_client_and_repository(transport_path)
2303
2509
        client.add_success_response('ok', 'a token')
2304
 
        result = repo.lock_write()
 
2510
        token = repo.lock_write().repository_token
2305
2511
        self.assertEqual(
2306
2512
            [('call', 'Repository.lock_write', ('quack/', ''))],
2307
2513
            client._calls)
2308
 
        self.assertEqual('a token', result)
 
2514
        self.assertEqual('a token', token)
2309
2515
 
2310
2516
    def test_lock_write_already_locked(self):
2311
2517
        transport_path = 'quack'
2402
2608
        the client is finished.
2403
2609
        """
2404
2610
        sink = repo._get_sink()
2405
 
        fmt = repository.RepositoryFormat.get_default_format()
 
2611
        fmt = repository.format_registry.get_default()
2406
2612
        resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2407
2613
        self.assertEqual([], resume_tokens)
2408
2614
        self.assertEqual(set(), missing_keys)
2508
2714
                return True
2509
2715
        repo._real_repository = FakeRealRepository()
2510
2716
        sink = repo._get_sink()
2511
 
        fmt = repository.RepositoryFormat.get_default_format()
 
2717
        fmt = repository.format_registry.get_default()
2512
2718
        stream = self.make_stream_with_inv_deltas(fmt)
2513
2719
        resume_tokens, missing_keys = sink.insert_stream(stream, fmt, [])
2514
2720
        # Every record from the first inventory delta should have been sent to
2734
2940
             ('pack collection autopack',)],
2735
2941
            client._calls)
2736
2942
 
 
2943
    def test_oom_error_reporting(self):
 
2944
        """An out-of-memory condition on the server is reported clearly"""
 
2945
        transport_path = 'quack'
 
2946
        repo, client = self.setup_fake_client_and_repository(transport_path)
 
2947
        client.add_expected_call(
 
2948
            'PackRepository.autopack', ('quack/',),
 
2949
            'error', ('MemoryError',))
 
2950
        err = self.assertRaises(errors.BzrError, repo.autopack)
 
2951
        self.assertContainsRe(str(err), "^remote server out of mem")
 
2952
 
2737
2953
 
2738
2954
class TestErrorTranslationBase(tests.TestCaseWithMemoryTransport):
2739
2955
    """Base class for unit tests for bzrlib.remote._translate_error."""
2812
3028
            detail='extra detail')
2813
3029
        self.assertEqual(expected_error, translated_error)
2814
3030
 
 
3031
    def test_norepository(self):
 
3032
        bzrdir = self.make_bzrdir('')
 
3033
        translated_error = self.translateTuple(('norepository',),
 
3034
            bzrdir=bzrdir)
 
3035
        expected_error = errors.NoRepositoryPresent(bzrdir)
 
3036
        self.assertEqual(expected_error, translated_error)
 
3037
 
2815
3038
    def test_LockContention(self):
2816
3039
        translated_error = self.translateTuple(('LockContention',))
2817
3040
        expected_error = errors.LockContention('(remote lock)')
2845
3068
        expected_error = errors.DivergedBranches(branch, other_branch)
2846
3069
        self.assertEqual(expected_error, translated_error)
2847
3070
 
 
3071
    def test_NotStacked(self):
 
3072
        branch = self.make_branch('')
 
3073
        translated_error = self.translateTuple(('NotStacked',), branch=branch)
 
3074
        expected_error = errors.NotStacked(branch)
 
3075
        self.assertEqual(expected_error, translated_error)
 
3076
 
2848
3077
    def test_ReadError_no_args(self):
2849
3078
        path = 'a path'
2850
3079
        translated_error = self.translateTuple(('ReadError',), path=path)
2866
3095
 
2867
3096
    def test_PermissionDenied_no_args(self):
2868
3097
        path = 'a path'
2869
 
        translated_error = self.translateTuple(('PermissionDenied',), path=path)
 
3098
        translated_error = self.translateTuple(('PermissionDenied',),
 
3099
            path=path)
2870
3100
        expected_error = errors.PermissionDenied(path)
2871
3101
        self.assertEqual(expected_error, translated_error)
2872
3102
 
2895
3125
        expected_error = errors.PermissionDenied(path, extra)
2896
3126
        self.assertEqual(expected_error, translated_error)
2897
3127
 
 
3128
    # GZ 2011-03-02: TODO test for PermissionDenied with non-ascii 'extra'
 
3129
 
 
3130
    def test_NoSuchFile_context_path(self):
 
3131
        local_path = "local path"
 
3132
        translated_error = self.translateTuple(('ReadError', "remote path"),
 
3133
            path=local_path)
 
3134
        expected_error = errors.ReadError(local_path)
 
3135
        self.assertEqual(expected_error, translated_error)
 
3136
 
 
3137
    def test_NoSuchFile_without_context(self):
 
3138
        remote_path = "remote path"
 
3139
        translated_error = self.translateTuple(('ReadError', remote_path))
 
3140
        expected_error = errors.ReadError(remote_path)
 
3141
        self.assertEqual(expected_error, translated_error)
 
3142
 
 
3143
    def test_ReadOnlyError(self):
 
3144
        translated_error = self.translateTuple(('ReadOnlyError',))
 
3145
        expected_error = errors.TransportNotPossible("readonly transport")
 
3146
        self.assertEqual(expected_error, translated_error)
 
3147
 
 
3148
    def test_MemoryError(self):
 
3149
        translated_error = self.translateTuple(('MemoryError',))
 
3150
        self.assertStartsWith(str(translated_error),
 
3151
            "remote server out of memory")
 
3152
 
 
3153
    def test_generic_IndexError_no_classname(self):
 
3154
        err = errors.ErrorFromSmartServer(('error', "list index out of range"))
 
3155
        translated_error = self.translateErrorFromSmartServer(err)
 
3156
        expected_error = errors.UnknownErrorFromSmartServer(err)
 
3157
        self.assertEqual(expected_error, translated_error)
 
3158
 
 
3159
    # GZ 2011-03-02: TODO test generic non-ascii error string
 
3160
 
 
3161
    def test_generic_KeyError(self):
 
3162
        err = errors.ErrorFromSmartServer(('error', 'KeyError', "1"))
 
3163
        translated_error = self.translateErrorFromSmartServer(err)
 
3164
        expected_error = errors.UnknownErrorFromSmartServer(err)
 
3165
        self.assertEqual(expected_error, translated_error)
 
3166
 
2898
3167
 
2899
3168
class TestErrorTranslationRobustness(TestErrorTranslationBase):
2900
3169
    """Unit tests for bzrlib.remote._translate_error's robustness.
3045
3314
        _, stacked = branch_factory()
3046
3315
        source = stacked.repository._get_source(target_repository_format)
3047
3316
        tip = stacked.last_revision()
3048
 
        revs = stacked.repository.get_ancestry(tip)
3049
 
        search = graph.PendingAncestryResult([tip], stacked.repository)
 
3317
        stacked.repository._ensure_real()
 
3318
        graph = stacked.repository.get_graph()
 
3319
        revs = [r for (r,ps) in graph.iter_ancestry([tip])
 
3320
                if r != NULL_REVISION]
 
3321
        revs.reverse()
 
3322
        search = _mod_graph.PendingAncestryResult([tip], stacked.repository)
3050
3323
        self.reset_smart_call_log()
3051
3324
        stream = source.get_stream(search)
3052
 
        if None in revs:
3053
 
            revs.remove(None)
3054
3325
        # We trust that if a revision is in the stream the rest of the new
3055
3326
        # content for it is too, as per our main fetch tests; here we are
3056
3327
        # checking that the revisions are actually included at all, and their
3095
3366
        self.assertEqual(expected_revs, rev_ord)
3096
3367
        # Getting topological sort requires VFS calls still - one of which is
3097
3368
        # pushing up from the bound branch.
3098
 
        self.assertLength(13, self.hpss_calls)
 
3369
        self.assertLength(14, self.hpss_calls)
3099
3370
 
3100
3371
    def test_stacked_get_stream_groupcompress(self):
3101
3372
        # Repository._get_source.get_stream() from a stacked repository with
3142
3413
 
3143
3414
    def test_copy_content_into_avoids_revision_history(self):
3144
3415
        local = self.make_branch('local')
3145
 
        remote_backing_tree = self.make_branch_and_tree('remote')
3146
 
        remote_backing_tree.commit("Commit.")
 
3416
        builder = self.make_branch_builder('remote')
 
3417
        builder.build_commit(message="Commit.")
3147
3418
        remote_branch_url = self.smart_server.get_url() + 'remote'
3148
3419
        remote_branch = bzrdir.BzrDir.open(remote_branch_url).open_branch()
3149
3420
        local.repository.fetch(remote_branch.repository)
3150
3421
        self.hpss_calls = []
3151
3422
        remote_branch.copy_content_into(local)
3152
3423
        self.assertFalse('Branch.revision_history' in self.hpss_calls)
 
3424
 
 
3425
    def test_fetch_everything_needs_just_one_call(self):
 
3426
        local = self.make_branch('local')
 
3427
        builder = self.make_branch_builder('remote')
 
3428
        builder.build_commit(message="Commit.")
 
3429
        remote_branch_url = self.smart_server.get_url() + 'remote'
 
3430
        remote_branch = bzrdir.BzrDir.open(remote_branch_url).open_branch()
 
3431
        self.hpss_calls = []
 
3432
        local.repository.fetch(
 
3433
            remote_branch.repository,
 
3434
            fetch_spec=_mod_graph.EverythingResult(remote_branch.repository))
 
3435
        self.assertEqual(['Repository.get_stream_1.19'], self.hpss_calls)
 
3436
 
 
3437
    def override_verb(self, verb_name, verb):
 
3438
        request_handlers = request.request_handlers
 
3439
        orig_verb = request_handlers.get(verb_name)
 
3440
        request_handlers.register(verb_name, verb, override_existing=True)
 
3441
        self.addCleanup(request_handlers.register, verb_name, orig_verb,
 
3442
                override_existing=True)
 
3443
 
 
3444
    def test_fetch_everything_backwards_compat(self):
 
3445
        """Can fetch with EverythingResult even with pre 2.4 servers.
 
3446
        
 
3447
        Pre-2.4 do not support 'everything' searches with the
 
3448
        Repository.get_stream_1.19 verb.
 
3449
        """
 
3450
        verb_log = []
 
3451
        class OldGetStreamVerb(SmartServerRepositoryGetStream_1_19):
 
3452
            """A version of the Repository.get_stream_1.19 verb patched to
 
3453
            reject 'everything' searches the way 2.3 and earlier do.
 
3454
            """
 
3455
            def recreate_search(self, repository, search_bytes,
 
3456
                                discard_excess=False):
 
3457
                verb_log.append(search_bytes.split('\n', 1)[0])
 
3458
                if search_bytes == 'everything':
 
3459
                    return (None,
 
3460
                            request.FailedSmartServerResponse(('BadSearch',)))
 
3461
                return super(OldGetStreamVerb,
 
3462
                        self).recreate_search(repository, search_bytes,
 
3463
                            discard_excess=discard_excess)
 
3464
        self.override_verb('Repository.get_stream_1.19', OldGetStreamVerb)
 
3465
        local = self.make_branch('local')
 
3466
        builder = self.make_branch_builder('remote')
 
3467
        builder.build_commit(message="Commit.")
 
3468
        remote_branch_url = self.smart_server.get_url() + 'remote'
 
3469
        remote_branch = bzrdir.BzrDir.open(remote_branch_url).open_branch()
 
3470
        self.hpss_calls = []
 
3471
        local.repository.fetch(
 
3472
            remote_branch.repository,
 
3473
            fetch_spec=_mod_graph.EverythingResult(remote_branch.repository))
 
3474
        # make sure the overridden verb was used
 
3475
        self.assertLength(1, verb_log)
 
3476
        # more than one HPSS call is needed, but because it's a VFS callback
 
3477
        # its hard to predict exactly how many.
 
3478
        self.assertTrue(len(self.hpss_calls) > 1)
 
3479
 
 
3480
 
 
3481
class TestUpdateBoundBranchWithModifiedBoundLocation(
 
3482
    tests.TestCaseWithTransport):
 
3483
    """Ensure correct handling of bound_location modifications.
 
3484
 
 
3485
    This is tested against a smart server as http://pad.lv/786980 was about a
 
3486
    ReadOnlyError (write attempt during a read-only transaction) which can only
 
3487
    happen in this context.
 
3488
    """
 
3489
 
 
3490
    def setUp(self):
 
3491
        super(TestUpdateBoundBranchWithModifiedBoundLocation, self).setUp()
 
3492
        self.transport_server = test_server.SmartTCPServer_for_testing
 
3493
 
 
3494
    def make_master_and_checkout(self, master_name, checkout_name):
 
3495
        # Create the master branch and its associated checkout
 
3496
        self.master = self.make_branch_and_tree(master_name)
 
3497
        self.checkout = self.master.branch.create_checkout(checkout_name)
 
3498
        # Modify the master branch so there is something to update
 
3499
        self.master.commit('add stuff')
 
3500
        self.last_revid = self.master.commit('even more stuff')
 
3501
        self.bound_location = self.checkout.branch.get_bound_location()
 
3502
 
 
3503
    def assertUpdateSucceeds(self, new_location):
 
3504
        self.checkout.branch.set_bound_location(new_location)
 
3505
        self.checkout.update()
 
3506
        self.assertEquals(self.last_revid, self.checkout.last_revision())
 
3507
 
 
3508
    def test_without_final_slash(self):
 
3509
        self.make_master_and_checkout('master', 'checkout')
 
3510
        # For unclear reasons some users have a bound_location without a final
 
3511
        # '/', simulate that by forcing such a value
 
3512
        self.assertEndsWith(self.bound_location, '/')
 
3513
        self.assertUpdateSucceeds(self.bound_location.rstrip('/'))
 
3514
 
 
3515
    def test_plus_sign(self):
 
3516
        self.make_master_and_checkout('+master', 'checkout')
 
3517
        self.assertUpdateSucceeds(self.bound_location.replace('%2B', '+', 1))
 
3518
 
 
3519
    def test_tilda(self):
 
3520
        # Embed ~ in the middle of the path just to avoid any $HOME
 
3521
        # interpretation
 
3522
        self.make_master_and_checkout('mas~ter', 'checkout')
 
3523
        self.assertUpdateSucceeds(self.bound_location.replace('%2E', '~', 1))