~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007, 2008 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
# TODO: At some point, handle upgrades by just passing the whole request
18
 
# across to run on the server.
19
 
 
20
17
import bz2
21
18
 
22
19
from bzrlib import (
 
20
    bencode,
23
21
    branch,
24
22
    bzrdir,
25
23
    config,
26
24
    debug,
27
25
    errors,
28
26
    graph,
 
27
    lock,
29
28
    lockdir,
30
 
    pack,
31
29
    repository,
 
30
    repository as _mod_repository,
32
31
    revision,
 
32
    revision as _mod_revision,
 
33
    static_tuple,
33
34
    symbol_versioning,
34
 
    urlutils,
35
35
)
36
 
from bzrlib.branch import BranchReferenceFormat
 
36
from bzrlib.branch import BranchReferenceFormat, BranchWriteLockResult
37
37
from bzrlib.bzrdir import BzrDir, RemoteBzrDirFormat
38
 
from bzrlib.decorators import needs_read_lock, needs_write_lock
 
38
from bzrlib.decorators import needs_read_lock, needs_write_lock, only_raises
39
39
from bzrlib.errors import (
40
40
    NoSuchRevision,
41
41
    SmartProtocolError,
43
43
from bzrlib.lockable_files import LockableFiles
44
44
from bzrlib.smart import client, vfs, repository as smart_repo
45
45
from bzrlib.revision import ensure_null, NULL_REVISION
 
46
from bzrlib.repository import RepositoryWriteLockResult
46
47
from bzrlib.trace import mutter, note, warning
47
 
from bzrlib.util import bencode
48
48
 
49
49
 
50
50
class _RpcHelper(object):
62
62
        except errors.ErrorFromSmartServer, err:
63
63
            self._translate_error(err, **err_context)
64
64
 
 
65
    def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
 
66
        try:
 
67
            return self._client.call_with_body_bytes(method, args, body_bytes)
 
68
        except errors.ErrorFromSmartServer, err:
 
69
            self._translate_error(err, **err_context)
 
70
 
65
71
    def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
66
72
                                             **err_context):
67
73
        try:
86
92
class RemoteBzrDir(BzrDir, _RpcHelper):
87
93
    """Control directory on a remote server, accessed via bzr:// or similar."""
88
94
 
89
 
    def __init__(self, transport, format, _client=None):
 
95
    def __init__(self, transport, format, _client=None, _force_probe=False):
90
96
        """Construct a RemoteBzrDir.
91
97
 
92
98
        :param _client: Private parameter for testing. Disables probing and the
96
102
        # this object holds a delegated bzrdir that uses file-level operations
97
103
        # to talk to the other side
98
104
        self._real_bzrdir = None
 
105
        self._has_working_tree = None
99
106
        # 1-shot cache for the call pattern 'create_branch; open_branch' - see
100
107
        # create_branch for details.
101
108
        self._next_open_branch_result = None
105
112
            self._client = client._SmartClient(medium)
106
113
        else:
107
114
            self._client = _client
108
 
            return
109
 
 
 
115
            if not _force_probe:
 
116
                return
 
117
 
 
118
        self._probe_bzrdir()
 
119
 
 
120
    def __repr__(self):
 
121
        return '%s(%r)' % (self.__class__.__name__, self._client)
 
122
 
 
123
    def _probe_bzrdir(self):
 
124
        medium = self._client._medium
110
125
        path = self._path_for_remote_call(self._client)
 
126
        if medium._is_remote_before((2, 1)):
 
127
            self._rpc_open(path)
 
128
            return
 
129
        try:
 
130
            self._rpc_open_2_1(path)
 
131
            return
 
132
        except errors.UnknownSmartMethod:
 
133
            medium._remember_remote_is_before((2, 1))
 
134
            self._rpc_open(path)
 
135
 
 
136
    def _rpc_open_2_1(self, path):
 
137
        response = self._call('BzrDir.open_2.1', path)
 
138
        if response == ('no',):
 
139
            raise errors.NotBranchError(path=self.root_transport.base)
 
140
        elif response[0] == 'yes':
 
141
            if response[1] == 'yes':
 
142
                self._has_working_tree = True
 
143
            elif response[1] == 'no':
 
144
                self._has_working_tree = False
 
145
            else:
 
146
                raise errors.UnexpectedSmartServerResponse(response)
 
147
        else:
 
148
            raise errors.UnexpectedSmartServerResponse(response)
 
149
 
 
150
    def _rpc_open(self, path):
111
151
        response = self._call('BzrDir.open', path)
112
152
        if response not in [('yes',), ('no',)]:
113
153
            raise errors.UnexpectedSmartServerResponse(response)
114
154
        if response == ('no',):
115
 
            raise errors.NotBranchError(path=transport.base)
 
155
            raise errors.NotBranchError(path=self.root_transport.base)
116
156
 
117
157
    def _ensure_real(self):
118
158
        """Ensure that there is a _real_bzrdir set.
120
160
        Used before calls to self._real_bzrdir.
121
161
        """
122
162
        if not self._real_bzrdir:
 
163
            if 'hpssvfs' in debug.debug_flags:
 
164
                import traceback
 
165
                warning('VFS BzrDir access triggered\n%s',
 
166
                    ''.join(traceback.format_stack()))
123
167
            self._real_bzrdir = BzrDir.open_from_transport(
124
168
                self.root_transport, _server_formats=False)
125
169
            self._format._network_name = \
201
245
        self._ensure_real()
202
246
        self._real_bzrdir.destroy_repository()
203
247
 
204
 
    def create_branch(self):
 
248
    def create_branch(self, name=None):
205
249
        # as per meta1 formats - just delegate to the format object which may
206
250
        # be parameterised.
207
 
        real_branch = self._format.get_branch_format().initialize(self)
 
251
        real_branch = self._format.get_branch_format().initialize(self,
 
252
            name=name)
208
253
        if not isinstance(real_branch, RemoteBranch):
209
 
            result = RemoteBranch(self, self.find_repository(), real_branch)
 
254
            result = RemoteBranch(self, self.find_repository(), real_branch,
 
255
                                  name=name)
210
256
        else:
211
257
            result = real_branch
212
258
        # BzrDir.clone_on_transport() uses the result of create_branch but does
218
264
        self._next_open_branch_result = result
219
265
        return result
220
266
 
221
 
    def destroy_branch(self):
 
267
    def destroy_branch(self, name=None):
222
268
        """See BzrDir.destroy_branch"""
223
269
        self._ensure_real()
224
 
        self._real_bzrdir.destroy_branch()
 
270
        self._real_bzrdir.destroy_branch(name=name)
225
271
        self._next_open_branch_result = None
226
272
 
227
273
    def create_workingtree(self, revision_id=None, from_branch=None):
228
274
        raise errors.NotLocalUrl(self.transport.base)
229
275
 
230
 
    def find_branch_format(self):
 
276
    def find_branch_format(self, name=None):
231
277
        """Find the branch 'format' for this bzrdir.
232
278
 
233
279
        This might be a synthetic object for e.g. RemoteBranch and SVN.
234
280
        """
235
 
        b = self.open_branch()
 
281
        b = self.open_branch(name=name)
236
282
        return b._format
237
283
 
238
 
    def get_branch_reference(self):
 
284
    def get_branch_reference(self, name=None):
239
285
        """See BzrDir.get_branch_reference()."""
 
286
        if name is not None:
 
287
            # XXX JRV20100304: Support opening colocated branches
 
288
            raise errors.NoColocatedBranchSupport(self)
240
289
        response = self._get_branch_reference()
241
290
        if response[0] == 'ref':
242
291
            return response[1]
246
295
    def _get_branch_reference(self):
247
296
        path = self._path_for_remote_call(self._client)
248
297
        medium = self._client._medium
249
 
        if not medium._is_remote_before((1, 13)):
 
298
        candidate_calls = [
 
299
            ('BzrDir.open_branchV3', (2, 1)),
 
300
            ('BzrDir.open_branchV2', (1, 13)),
 
301
            ('BzrDir.open_branch', None),
 
302
            ]
 
303
        for verb, required_version in candidate_calls:
 
304
            if required_version and medium._is_remote_before(required_version):
 
305
                continue
250
306
            try:
251
 
                response = self._call('BzrDir.open_branchV2', path)
252
 
                if response[0] not in ('ref', 'branch'):
253
 
                    raise errors.UnexpectedSmartServerResponse(response)
254
 
                return response
 
307
                response = self._call(verb, path)
255
308
            except errors.UnknownSmartMethod:
256
 
                medium._remember_remote_is_before((1, 13))
257
 
        response = self._call('BzrDir.open_branch', path)
258
 
        if response[0] != 'ok':
 
309
                if required_version is None:
 
310
                    raise
 
311
                medium._remember_remote_is_before(required_version)
 
312
            else:
 
313
                break
 
314
        if verb == 'BzrDir.open_branch':
 
315
            if response[0] != 'ok':
 
316
                raise errors.UnexpectedSmartServerResponse(response)
 
317
            if response[1] != '':
 
318
                return ('ref', response[1])
 
319
            else:
 
320
                return ('branch', '')
 
321
        if response[0] not in ('ref', 'branch'):
259
322
            raise errors.UnexpectedSmartServerResponse(response)
260
 
        if response[1] != '':
261
 
            return ('ref', response[1])
262
 
        else:
263
 
            return ('branch', '')
 
323
        return response
264
324
 
265
 
    def _get_tree_branch(self):
 
325
    def _get_tree_branch(self, name=None):
266
326
        """See BzrDir._get_tree_branch()."""
267
 
        return None, self.open_branch()
 
327
        return None, self.open_branch(name=name)
268
328
 
269
 
    def open_branch(self, _unsupported=False, ignore_fallbacks=False):
270
 
        if _unsupported:
 
329
    def open_branch(self, name=None, unsupported=False,
 
330
                    ignore_fallbacks=False):
 
331
        if unsupported:
271
332
            raise NotImplementedError('unsupported flag support not implemented yet.')
272
333
        if self._next_open_branch_result is not None:
273
334
            # See create_branch for details.
278
339
        if response[0] == 'ref':
279
340
            # a branch reference, use the existing BranchReference logic.
280
341
            format = BranchReferenceFormat()
281
 
            return format.open(self, _found=True, location=response[1],
282
 
                ignore_fallbacks=ignore_fallbacks)
 
342
            return format.open(self, name=name, _found=True,
 
343
                location=response[1], ignore_fallbacks=ignore_fallbacks)
283
344
        branch_format_name = response[1]
284
345
        if not branch_format_name:
285
346
            branch_format_name = None
286
347
        format = RemoteBranchFormat(network_name=branch_format_name)
287
348
        return RemoteBranch(self, self.find_repository(), format=format,
288
 
            setup_stacking=not ignore_fallbacks)
 
349
            setup_stacking=not ignore_fallbacks, name=name)
289
350
 
290
351
    def _open_repo_v1(self, path):
291
352
        verb = 'BzrDir.find_repository'
352
413
        else:
353
414
            raise errors.NoRepositoryPresent(self)
354
415
 
 
416
    def has_workingtree(self):
 
417
        if self._has_working_tree is None:
 
418
            self._ensure_real()
 
419
            self._has_working_tree = self._real_bzrdir.has_workingtree()
 
420
        return self._has_working_tree
 
421
 
355
422
    def open_workingtree(self, recommend_upgrade=True):
356
 
        self._ensure_real()
357
 
        if self._real_bzrdir.has_workingtree():
 
423
        if self.has_workingtree():
358
424
            raise errors.NotLocalUrl(self.root_transport)
359
425
        else:
360
426
            raise errors.NoWorkingTree(self.root_transport.base)
363
429
        """Return the path to be used for this bzrdir in a remote call."""
364
430
        return client.remote_path_from_transport(self.root_transport)
365
431
 
366
 
    def get_branch_transport(self, branch_format):
 
432
    def get_branch_transport(self, branch_format, name=None):
367
433
        self._ensure_real()
368
 
        return self._real_bzrdir.get_branch_transport(branch_format)
 
434
        return self._real_bzrdir.get_branch_transport(branch_format, name=name)
369
435
 
370
436
    def get_repository_transport(self, repository_format):
371
437
        self._ensure_real()
392
458
        return self._real_bzrdir.clone(url, revision_id=revision_id,
393
459
            force_new_repo=force_new_repo, preserve_stacking=preserve_stacking)
394
460
 
395
 
    def get_config(self):
396
 
        self._ensure_real()
397
 
        return self._real_bzrdir.get_config()
 
461
    def _get_config(self):
 
462
        return RemoteBzrDirConfig(self)
398
463
 
399
464
 
400
465
class RemoteRepositoryFormat(repository.RepositoryFormat):
424
489
        self._custom_format = None
425
490
        self._network_name = None
426
491
        self._creating_bzrdir = None
 
492
        self._supports_chks = None
427
493
        self._supports_external_lookups = None
428
494
        self._supports_tree_reference = None
429
495
        self._rich_root_data = None
430
496
 
 
497
    def __repr__(self):
 
498
        return "%s(_network_name=%r)" % (self.__class__.__name__,
 
499
            self._network_name)
 
500
 
431
501
    @property
432
502
    def fast_deltas(self):
433
503
        self._ensure_real()
441
511
        return self._rich_root_data
442
512
 
443
513
    @property
 
514
    def supports_chks(self):
 
515
        if self._supports_chks is None:
 
516
            self._ensure_real()
 
517
            self._supports_chks = self._custom_format.supports_chks
 
518
        return self._supports_chks
 
519
 
 
520
    @property
444
521
    def supports_external_lookups(self):
445
522
        if self._supports_external_lookups is None:
446
523
            self._ensure_real()
492
569
        # 1) get the network name to use.
493
570
        if self._custom_format:
494
571
            network_name = self._custom_format.network_name()
 
572
        elif self._network_name:
 
573
            network_name = self._network_name
495
574
        else:
496
575
            # Select the current bzrlib default and ask for that.
497
576
            reference_bzrdir_format = bzrdir.format_registry.get('default')()
545
624
        return self._custom_format._fetch_reconcile
546
625
 
547
626
    def get_format_description(self):
548
 
        return 'bzr remote repository'
 
627
        self._ensure_real()
 
628
        return 'Remote: ' + self._custom_format.get_format_description()
549
629
 
550
630
    def __eq__(self, other):
551
631
        return self.__class__ is other.__class__
552
632
 
553
 
    def check_conversion_target(self, target_format):
554
 
        if self.rich_root_data and not target_format.rich_root_data:
555
 
            raise errors.BadConversionTarget(
556
 
                'Does not support rich root data.', target_format)
557
 
        if (self.supports_tree_reference and
558
 
            not getattr(target_format, 'supports_tree_reference', False)):
559
 
            raise errors.BadConversionTarget(
560
 
                'Does not support nested trees', target_format)
561
 
 
562
633
    def network_name(self):
563
634
        if self._network_name:
564
635
            return self._network_name
566
637
        return self._creating_repo._real_repository._format.network_name()
567
638
 
568
639
    @property
 
640
    def pack_compresses(self):
 
641
        self._ensure_real()
 
642
        return self._custom_format.pack_compresses
 
643
 
 
644
    @property
569
645
    def _serializer(self):
570
646
        self._ensure_real()
571
647
        return self._custom_format._serializer
572
648
 
573
649
 
574
 
class RemoteRepository(_RpcHelper):
 
650
class RemoteRepository(_RpcHelper, lock._RelockDebugMixin,
 
651
    bzrdir.ControlComponent):
575
652
    """Repository accessed over rpc.
576
653
 
577
654
    For the moment most operations are performed using local transport-backed
603
680
        self._lock_token = None
604
681
        self._lock_count = 0
605
682
        self._leave_lock = False
 
683
        # Cache of revision parents; misses are cached during read locks, and
 
684
        # write locks when no _real_repository has been set.
606
685
        self._unstacked_provider = graph.CachingParentsProvider(
607
686
            get_parent_map=self._get_parent_map_rpc)
608
687
        self._unstacked_provider.disable_cache()
618
697
        # Additional places to query for data.
619
698
        self._fallback_repositories = []
620
699
 
 
700
    @property
 
701
    def user_transport(self):
 
702
        return self.bzrdir.user_transport
 
703
 
 
704
    @property
 
705
    def control_transport(self):
 
706
        # XXX: Normally you shouldn't directly get at the remote repository
 
707
        # transport, but I'm not sure it's worth making this method
 
708
        # optional -- mbp 2010-04-21
 
709
        return self.bzrdir.get_repository_transport(None)
 
710
        
621
711
    def __str__(self):
622
712
        return "%s(%s)" % (self.__class__.__name__, self.base)
623
713
 
666
756
        self._ensure_real()
667
757
        return self._real_repository.suspend_write_group()
668
758
 
 
759
    def get_missing_parent_inventories(self, check_for_missing_texts=True):
 
760
        self._ensure_real()
 
761
        return self._real_repository.get_missing_parent_inventories(
 
762
            check_for_missing_texts=check_for_missing_texts)
 
763
 
 
764
    def _get_rev_id_for_revno_vfs(self, revno, known_pair):
 
765
        self._ensure_real()
 
766
        return self._real_repository.get_rev_id_for_revno(
 
767
            revno, known_pair)
 
768
 
 
769
    def get_rev_id_for_revno(self, revno, known_pair):
 
770
        """See Repository.get_rev_id_for_revno."""
 
771
        path = self.bzrdir._path_for_remote_call(self._client)
 
772
        try:
 
773
            if self._client._medium._is_remote_before((1, 17)):
 
774
                return self._get_rev_id_for_revno_vfs(revno, known_pair)
 
775
            response = self._call(
 
776
                'Repository.get_rev_id_for_revno', path, revno, known_pair)
 
777
        except errors.UnknownSmartMethod:
 
778
            self._client._medium._remember_remote_is_before((1, 17))
 
779
            return self._get_rev_id_for_revno_vfs(revno, known_pair)
 
780
        if response[0] == 'ok':
 
781
            return True, response[1]
 
782
        elif response[0] == 'history-incomplete':
 
783
            known_pair = response[1:3]
 
784
            for fallback in self._fallback_repositories:
 
785
                found, result = fallback.get_rev_id_for_revno(revno, known_pair)
 
786
                if found:
 
787
                    return True, result
 
788
                else:
 
789
                    known_pair = result
 
790
            # Not found in any fallbacks
 
791
            return False, known_pair
 
792
        else:
 
793
            raise errors.UnexpectedSmartServerResponse(response)
 
794
 
669
795
    def _ensure_real(self):
670
796
        """Ensure that there is a _real_repository set.
671
797
 
680
806
        invocation. If in doubt chat to the bzr network team.
681
807
        """
682
808
        if self._real_repository is None:
 
809
            if 'hpssvfs' in debug.debug_flags:
 
810
                import traceback
 
811
                warning('VFS Repository access triggered\n%s',
 
812
                    ''.join(traceback.format_stack()))
 
813
            self._unstacked_provider.missing_keys.clear()
683
814
            self.bzrdir._ensure_real()
684
815
            self._set_real_repository(
685
816
                self.bzrdir._real_bzrdir.open_repository())
745
876
        """Return a source for streaming from this repository."""
746
877
        return RemoteStreamSource(self, to_format)
747
878
 
 
879
    @needs_read_lock
748
880
    def has_revision(self, revision_id):
749
 
        """See Repository.has_revision()."""
750
 
        if revision_id == NULL_REVISION:
751
 
            # The null revision is always present.
752
 
            return True
753
 
        path = self.bzrdir._path_for_remote_call(self._client)
754
 
        response = self._call('Repository.has_revision', path, revision_id)
755
 
        if response[0] not in ('yes', 'no'):
756
 
            raise errors.UnexpectedSmartServerResponse(response)
757
 
        if response[0] == 'yes':
758
 
            return True
759
 
        for fallback_repo in self._fallback_repositories:
760
 
            if fallback_repo.has_revision(revision_id):
761
 
                return True
762
 
        return False
 
881
        """True if this repository has a copy of the revision."""
 
882
        # Copy of bzrlib.repository.Repository.has_revision
 
883
        return revision_id in self.has_revisions((revision_id,))
763
884
 
 
885
    @needs_read_lock
764
886
    def has_revisions(self, revision_ids):
765
 
        """See Repository.has_revisions()."""
766
 
        # FIXME: This does many roundtrips, particularly when there are
767
 
        # fallback repositories.  -- mbp 20080905
768
 
        result = set()
769
 
        for revision_id in revision_ids:
770
 
            if self.has_revision(revision_id):
771
 
                result.add(revision_id)
 
887
        """Probe to find out the presence of multiple revisions.
 
888
 
 
889
        :param revision_ids: An iterable of revision_ids.
 
890
        :return: A set of the revision_ids that were present.
 
891
        """
 
892
        # Copy of bzrlib.repository.Repository.has_revisions
 
893
        parent_map = self.get_parent_map(revision_ids)
 
894
        result = set(parent_map)
 
895
        if _mod_revision.NULL_REVISION in revision_ids:
 
896
            result.add(_mod_revision.NULL_REVISION)
772
897
        return result
773
898
 
 
899
    def _has_same_fallbacks(self, other_repo):
 
900
        """Returns true if the repositories have the same fallbacks."""
 
901
        # XXX: copied from Repository; it should be unified into a base class
 
902
        # <https://bugs.launchpad.net/bzr/+bug/401622>
 
903
        my_fb = self._fallback_repositories
 
904
        other_fb = other_repo._fallback_repositories
 
905
        if len(my_fb) != len(other_fb):
 
906
            return False
 
907
        for f, g in zip(my_fb, other_fb):
 
908
            if not f.has_same_location(g):
 
909
                return False
 
910
        return True
 
911
 
774
912
    def has_same_location(self, other):
 
913
        # TODO: Move to RepositoryBase and unify with the regular Repository
 
914
        # one; unfortunately the tests rely on slightly different behaviour at
 
915
        # present -- mbp 20090710
775
916
        return (self.__class__ is other.__class__ and
776
917
                self.bzrdir.transport.base == other.bzrdir.transport.base)
777
918
 
780
921
        parents_provider = self._make_parents_provider(other_repository)
781
922
        return graph.Graph(parents_provider)
782
923
 
 
924
    @needs_read_lock
 
925
    def get_known_graph_ancestry(self, revision_ids):
 
926
        """Return the known graph for a set of revision ids and their ancestors.
 
927
        """
 
928
        st = static_tuple.StaticTuple
 
929
        revision_keys = [st(r_id).intern() for r_id in revision_ids]
 
930
        known_graph = self.revisions.get_known_graph_ancestry(revision_keys)
 
931
        return graph.GraphThunkIdsToKeys(known_graph)
 
932
 
783
933
    def gather_stats(self, revid=None, committers=None):
784
934
        """See Repository.gather_stats()."""
785
935
        path = self.bzrdir._path_for_remote_call(self._client)
845
995
    def is_write_locked(self):
846
996
        return self._lock_mode == 'w'
847
997
 
 
998
    def _warn_if_deprecated(self, branch=None):
 
999
        # If we have a real repository, the check will be done there, if we
 
1000
        # don't the check will be done remotely.
 
1001
        pass
 
1002
 
848
1003
    def lock_read(self):
 
1004
        """Lock the repository for read operations.
 
1005
 
 
1006
        :return: A bzrlib.lock.LogicalLockResult.
 
1007
        """
849
1008
        # wrong eventually - want a local lock cache context
850
1009
        if not self._lock_mode:
 
1010
            self._note_lock('r')
851
1011
            self._lock_mode = 'r'
852
1012
            self._lock_count = 1
853
1013
            self._unstacked_provider.enable_cache(cache_misses=True)
854
1014
            if self._real_repository is not None:
855
1015
                self._real_repository.lock_read()
 
1016
            for repo in self._fallback_repositories:
 
1017
                repo.lock_read()
856
1018
        else:
857
1019
            self._lock_count += 1
858
 
        for repo in self._fallback_repositories:
859
 
            repo.lock_read()
 
1020
        return lock.LogicalLockResult(self.unlock)
860
1021
 
861
1022
    def _remote_lock_write(self, token):
862
1023
        path = self.bzrdir._path_for_remote_call(self._client)
873
1034
 
874
1035
    def lock_write(self, token=None, _skip_rpc=False):
875
1036
        if not self._lock_mode:
 
1037
            self._note_lock('w')
876
1038
            if _skip_rpc:
877
1039
                if self._lock_token is not None:
878
1040
                    if token != self._lock_token:
892
1054
                self._leave_lock = False
893
1055
            self._lock_mode = 'w'
894
1056
            self._lock_count = 1
895
 
            self._unstacked_provider.enable_cache(cache_misses=False)
 
1057
            cache_misses = self._real_repository is None
 
1058
            self._unstacked_provider.enable_cache(cache_misses=cache_misses)
 
1059
            for repo in self._fallback_repositories:
 
1060
                # Writes don't affect fallback repos
 
1061
                repo.lock_read()
896
1062
        elif self._lock_mode == 'r':
897
1063
            raise errors.ReadOnlyError(self)
898
1064
        else:
899
1065
            self._lock_count += 1
900
 
        for repo in self._fallback_repositories:
901
 
            # Writes don't affect fallback repos
902
 
            repo.lock_read()
903
 
        return self._lock_token or None
 
1066
        return RepositoryWriteLockResult(self.unlock, self._lock_token or None)
904
1067
 
905
1068
    def leave_lock_in_place(self):
906
1069
        if not self._lock_token:
980
1143
        else:
981
1144
            raise errors.UnexpectedSmartServerResponse(response)
982
1145
 
 
1146
    @only_raises(errors.LockNotHeld, errors.LockBroken)
983
1147
    def unlock(self):
984
1148
        if not self._lock_count:
985
 
            raise errors.LockNotHeld(self)
 
1149
            return lock.cant_unlock_not_held(self)
986
1150
        self._lock_count -= 1
987
1151
        if self._lock_count > 0:
988
1152
            return
1007
1171
                self._lock_token = None
1008
1172
                if not self._leave_lock:
1009
1173
                    self._unlock(old_token)
 
1174
        # Fallbacks are always 'lock_read()' so we don't pay attention to
 
1175
        # self._leave_lock
 
1176
        for repo in self._fallback_repositories:
 
1177
            repo.unlock()
1010
1178
 
1011
1179
    def break_lock(self):
1012
1180
        # should hand off to the network
1076
1244
        # We need to accumulate additional repositories here, to pass them in
1077
1245
        # on various RPC's.
1078
1246
        #
 
1247
        if self.is_locked():
 
1248
            # We will call fallback.unlock() when we transition to the unlocked
 
1249
            # state, so always add a lock here. If a caller passes us a locked
 
1250
            # repository, they are responsible for unlocking it later.
 
1251
            repository.lock_read()
 
1252
        self._check_fallback_repository(repository)
1079
1253
        self._fallback_repositories.append(repository)
1080
1254
        # If self._real_repository was parameterised already (e.g. because a
1081
1255
        # _real_branch had its get_stacked_on_url method called), then the
1082
1256
        # repository to be added may already be in the _real_repositories list.
1083
1257
        if self._real_repository is not None:
1084
 
            fallback_locations = [repo.bzrdir.root_transport.base for repo in
 
1258
            fallback_locations = [repo.user_url for repo in
1085
1259
                self._real_repository._fallback_repositories]
1086
 
            if repository.bzrdir.root_transport.base not in fallback_locations:
 
1260
            if repository.user_url not in fallback_locations:
1087
1261
                self._real_repository.add_fallback_repository(repository)
1088
1262
 
 
1263
    def _check_fallback_repository(self, repository):
 
1264
        """Check that this repository can fallback to repository safely.
 
1265
 
 
1266
        Raise an error if not.
 
1267
 
 
1268
        :param repository: A repository to fallback to.
 
1269
        """
 
1270
        return _mod_repository.InterRepository._assert_same_model(
 
1271
            self, repository)
 
1272
 
1089
1273
    def add_inventory(self, revid, inv, parents):
1090
1274
        self._ensure_real()
1091
1275
        return self._real_repository.add_inventory(revid, inv, parents)
1092
1276
 
1093
1277
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
1094
 
                               parents):
 
1278
            parents, basis_inv=None, propagate_caches=False):
1095
1279
        self._ensure_real()
1096
1280
        return self._real_repository.add_inventory_by_delta(basis_revision_id,
1097
 
            delta, new_revision_id, parents)
 
1281
            delta, new_revision_id, parents, basis_inv=basis_inv,
 
1282
            propagate_caches=propagate_caches)
1098
1283
 
1099
1284
    def add_revision(self, rev_id, rev, inv=None, config=None):
1100
1285
        self._ensure_real()
1106
1291
        self._ensure_real()
1107
1292
        return self._real_repository.get_inventory(revision_id)
1108
1293
 
1109
 
    def iter_inventories(self, revision_ids):
 
1294
    def iter_inventories(self, revision_ids, ordering=None):
1110
1295
        self._ensure_real()
1111
 
        return self._real_repository.iter_inventories(revision_ids)
 
1296
        return self._real_repository.iter_inventories(revision_ids, ordering)
1112
1297
 
1113
1298
    @needs_read_lock
1114
1299
    def get_revision(self, revision_id):
1130
1315
        return self._real_repository.make_working_trees()
1131
1316
 
1132
1317
    def refresh_data(self):
1133
 
        """Re-read any data needed to to synchronise with disk.
 
1318
        """Re-read any data needed to synchronise with disk.
1134
1319
 
1135
1320
        This method is intended to be called after another repository instance
1136
1321
        (such as one used by a smart server) has inserted data into the
1137
 
        repository. It may not be called during a write group, but may be
1138
 
        called at any other time.
 
1322
        repository. On all repositories this will work outside of write groups.
 
1323
        Some repository formats (pack and newer for bzrlib native formats)
 
1324
        support refresh_data inside write groups. If called inside a write
 
1325
        group on a repository that does not support refreshing in a write group
 
1326
        IsInWriteGroupError will be raised.
1139
1327
        """
1140
 
        if self.is_in_write_group():
1141
 
            raise errors.InternalBzrError(
1142
 
                "May not refresh_data while in a write group.")
1143
1328
        if self._real_repository is not None:
1144
1329
            self._real_repository.refresh_data()
1145
1330
 
1178
1363
            raise errors.InternalBzrError(
1179
1364
                "May not fetch while in a write group.")
1180
1365
        # fast path same-url fetch operations
1181
 
        if self.has_same_location(source) and fetch_spec is None:
 
1366
        if (self.has_same_location(source)
 
1367
            and fetch_spec is None
 
1368
            and self._has_same_fallbacks(source)):
1182
1369
            # check that last_revision is in 'from' and then return a
1183
1370
            # no-operation.
1184
1371
            if (revision_id is not None and
1357
1544
        return self._real_repository.get_signature_text(revision_id)
1358
1545
 
1359
1546
    @needs_read_lock
1360
 
    def get_inventory_xml(self, revision_id):
1361
 
        self._ensure_real()
1362
 
        return self._real_repository.get_inventory_xml(revision_id)
1363
 
 
1364
 
    def deserialise_inventory(self, revision_id, xml):
1365
 
        self._ensure_real()
1366
 
        return self._real_repository.deserialise_inventory(revision_id, xml)
 
1547
    def _get_inventory_xml(self, revision_id):
 
1548
        self._ensure_real()
 
1549
        return self._real_repository._get_inventory_xml(revision_id)
1367
1550
 
1368
1551
    def reconcile(self, other=None, thorough=False):
1369
1552
        self._ensure_real()
1396
1579
        return self._real_repository.get_revision_reconcile(revision_id)
1397
1580
 
1398
1581
    @needs_read_lock
1399
 
    def check(self, revision_ids=None):
 
1582
    def check(self, revision_ids=None, callback_refs=None, check_repo=True):
1400
1583
        self._ensure_real()
1401
 
        return self._real_repository.check(revision_ids=revision_ids)
 
1584
        return self._real_repository.check(revision_ids=revision_ids,
 
1585
            callback_refs=callback_refs, check_repo=check_repo)
1402
1586
 
1403
1587
    def copy_content_into(self, destination, revision_id=None):
1404
1588
        self._ensure_real()
1444
1628
        return self._real_repository.inventories
1445
1629
 
1446
1630
    @needs_write_lock
1447
 
    def pack(self):
 
1631
    def pack(self, hint=None, clean_obsolete_packs=False):
1448
1632
        """Compress the data within the repository.
1449
1633
 
1450
1634
        This is not currently implemented within the smart server.
1451
1635
        """
1452
1636
        self._ensure_real()
1453
 
        return self._real_repository.pack()
 
1637
        return self._real_repository.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1454
1638
 
1455
1639
    @property
1456
1640
    def revisions(self):
1544
1728
        self._ensure_real()
1545
1729
        return self._real_repository.revision_graph_can_have_wrong_parents()
1546
1730
 
1547
 
    def _find_inconsistent_revision_parents(self):
 
1731
    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1548
1732
        self._ensure_real()
1549
 
        return self._real_repository._find_inconsistent_revision_parents()
 
1733
        return self._real_repository._find_inconsistent_revision_parents(
 
1734
            revisions_iterator)
1550
1735
 
1551
1736
    def _check_for_inconsistent_revision_parents(self):
1552
1737
        self._ensure_real()
1558
1743
            providers.insert(0, other)
1559
1744
        providers.extend(r._make_parents_provider() for r in
1560
1745
                         self._fallback_repositories)
1561
 
        return graph._StackedParentsProvider(providers)
 
1746
        return graph.StackedParentsProvider(providers)
1562
1747
 
1563
1748
    def _serialise_search_recipe(self, recipe):
1564
1749
        """Serialise a graph search recipe.
1605
1790
 
1606
1791
    def insert_stream(self, stream, src_format, resume_tokens):
1607
1792
        target = self.target_repo
 
1793
        target._unstacked_provider.missing_keys.clear()
 
1794
        candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1608
1795
        if target._lock_token:
1609
 
            verb = 'Repository.insert_stream_locked'
1610
 
            extra_args = (target._lock_token or '',)
1611
 
            required_version = (1, 14)
 
1796
            candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
 
1797
            lock_args = (target._lock_token or '',)
1612
1798
        else:
1613
 
            verb = 'Repository.insert_stream'
1614
 
            extra_args = ()
1615
 
            required_version = (1, 13)
 
1799
            candidate_calls.append(('Repository.insert_stream', (1, 13)))
 
1800
            lock_args = ()
1616
1801
        client = target._client
1617
1802
        medium = client._medium
1618
 
        if medium._is_remote_before(required_version):
1619
 
            # No possible way this can work.
1620
 
            return self._insert_real(stream, src_format, resume_tokens)
1621
1803
        path = target.bzrdir._path_for_remote_call(client)
1622
 
        if not resume_tokens:
1623
 
            # XXX: Ugly but important for correctness, *will* be fixed during
1624
 
            # 1.13 cycle. Pushing a stream that is interrupted results in a
1625
 
            # fallback to the _real_repositories sink *with a partial stream*.
1626
 
            # Thats bad because we insert less data than bzr expected. To avoid
1627
 
            # this we do a trial push to make sure the verb is accessible, and
1628
 
            # do not fallback when actually pushing the stream. A cleanup patch
1629
 
            # is going to look at rewinding/restarting the stream/partial
1630
 
            # buffering etc.
 
1804
        # Probe for the verb to use with an empty stream before sending the
 
1805
        # real stream to it.  We do this both to avoid the risk of sending a
 
1806
        # large request that is then rejected, and because we don't want to
 
1807
        # implement a way to buffer, rewind, or restart the stream.
 
1808
        found_verb = False
 
1809
        for verb, required_version in candidate_calls:
 
1810
            if medium._is_remote_before(required_version):
 
1811
                continue
 
1812
            if resume_tokens:
 
1813
                # We've already done the probing (and set _is_remote_before) on
 
1814
                # a previous insert.
 
1815
                found_verb = True
 
1816
                break
1631
1817
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1632
1818
            try:
1633
1819
                response = client.call_with_body_stream(
1634
 
                    (verb, path, '') + extra_args, byte_stream)
 
1820
                    (verb, path, '') + lock_args, byte_stream)
1635
1821
            except errors.UnknownSmartMethod:
1636
1822
                medium._remember_remote_is_before(required_version)
1637
 
                return self._insert_real(stream, src_format, resume_tokens)
 
1823
            else:
 
1824
                found_verb = True
 
1825
                break
 
1826
        if not found_verb:
 
1827
            # Have to use VFS.
 
1828
            return self._insert_real(stream, src_format, resume_tokens)
 
1829
        self._last_inv_record = None
 
1830
        self._last_substream = None
 
1831
        if required_version < (1, 19):
 
1832
            # Remote side doesn't support inventory deltas.  Wrap the stream to
 
1833
            # make sure we don't send any.  If the stream contains inventory
 
1834
            # deltas we'll interrupt the smart insert_stream request and
 
1835
            # fallback to VFS.
 
1836
            stream = self._stop_stream_if_inventory_delta(stream)
1638
1837
        byte_stream = smart_repo._stream_to_byte_stream(
1639
1838
            stream, src_format)
1640
1839
        resume_tokens = ' '.join(resume_tokens)
1641
1840
        response = client.call_with_body_stream(
1642
 
            (verb, path, resume_tokens) + extra_args, byte_stream)
 
1841
            (verb, path, resume_tokens) + lock_args, byte_stream)
1643
1842
        if response[0][0] not in ('ok', 'missing-basis'):
1644
1843
            raise errors.UnexpectedSmartServerResponse(response)
 
1844
        if self._last_substream is not None:
 
1845
            # The stream included an inventory-delta record, but the remote
 
1846
            # side isn't new enough to support them.  So we need to send the
 
1847
            # rest of the stream via VFS.
 
1848
            self.target_repo.refresh_data()
 
1849
            return self._resume_stream_with_vfs(response, src_format)
1645
1850
        if response[0][0] == 'missing-basis':
1646
1851
            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1647
1852
            resume_tokens = tokens
1648
 
            return resume_tokens, missing_keys
 
1853
            return resume_tokens, set(missing_keys)
1649
1854
        else:
1650
1855
            self.target_repo.refresh_data()
1651
1856
            return [], set()
1652
1857
 
 
1858
    def _resume_stream_with_vfs(self, response, src_format):
 
1859
        """Resume sending a stream via VFS, first resending the record and
 
1860
        substream that couldn't be sent via an insert_stream verb.
 
1861
        """
 
1862
        if response[0][0] == 'missing-basis':
 
1863
            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
 
1864
            # Ignore missing_keys, we haven't finished inserting yet
 
1865
        else:
 
1866
            tokens = []
 
1867
        def resume_substream():
 
1868
            # Yield the substream that was interrupted.
 
1869
            for record in self._last_substream:
 
1870
                yield record
 
1871
            self._last_substream = None
 
1872
        def resume_stream():
 
1873
            # Finish sending the interrupted substream
 
1874
            yield ('inventory-deltas', resume_substream())
 
1875
            # Then simply continue sending the rest of the stream.
 
1876
            for substream_kind, substream in self._last_stream:
 
1877
                yield substream_kind, substream
 
1878
        return self._insert_real(resume_stream(), src_format, tokens)
 
1879
 
 
1880
    def _stop_stream_if_inventory_delta(self, stream):
 
1881
        """Normally this just lets the original stream pass-through unchanged.
 
1882
 
 
1883
        However if any 'inventory-deltas' substream occurs it will stop
 
1884
        streaming, and store the interrupted substream and stream in
 
1885
        self._last_substream and self._last_stream so that the stream can be
 
1886
        resumed by _resume_stream_with_vfs.
 
1887
        """
 
1888
                    
 
1889
        stream_iter = iter(stream)
 
1890
        for substream_kind, substream in stream_iter:
 
1891
            if substream_kind == 'inventory-deltas':
 
1892
                self._last_substream = substream
 
1893
                self._last_stream = stream_iter
 
1894
                return
 
1895
            else:
 
1896
                yield substream_kind, substream
 
1897
            
1653
1898
 
1654
1899
class RemoteStreamSource(repository.StreamSource):
1655
1900
    """Stream data from a remote server."""
1658
1903
        if (self.from_repository._fallback_repositories and
1659
1904
            self.to_format._fetch_order == 'topological'):
1660
1905
            return self._real_stream(self.from_repository, search)
1661
 
        return self.missing_parents_chain(search, [self.from_repository] +
1662
 
            self.from_repository._fallback_repositories)
 
1906
        sources = []
 
1907
        seen = set()
 
1908
        repos = [self.from_repository]
 
1909
        while repos:
 
1910
            repo = repos.pop(0)
 
1911
            if repo in seen:
 
1912
                continue
 
1913
            seen.add(repo)
 
1914
            repos.extend(repo._fallback_repositories)
 
1915
            sources.append(repo)
 
1916
        return self.missing_parents_chain(search, sources)
 
1917
 
 
1918
    def get_stream_for_missing_keys(self, missing_keys):
 
1919
        self.from_repository._ensure_real()
 
1920
        real_repo = self.from_repository._real_repository
 
1921
        real_source = real_repo._get_source(self.to_format)
 
1922
        return real_source.get_stream_for_missing_keys(missing_keys)
1663
1923
 
1664
1924
    def _real_stream(self, repo, search):
1665
1925
        """Get a stream for search from repo.
1672
1932
        """
1673
1933
        source = repo._get_source(self.to_format)
1674
1934
        if isinstance(source, RemoteStreamSource):
1675
 
            return repository.StreamSource.get_stream(source, search)
 
1935
            repo._ensure_real()
 
1936
            source = repo._real_repository._get_source(self.to_format)
1676
1937
        return source.get_stream(search)
1677
1938
 
1678
1939
    def _get_stream(self, repo, search):
1695
1956
            return self._real_stream(repo, search)
1696
1957
        client = repo._client
1697
1958
        medium = client._medium
1698
 
        if medium._is_remote_before((1, 13)):
1699
 
            # streaming was added in 1.13
1700
 
            return self._real_stream(repo, search)
1701
1959
        path = repo.bzrdir._path_for_remote_call(client)
1702
 
        try:
1703
 
            search_bytes = repo._serialise_search_result(search)
1704
 
            response = repo._call_with_body_bytes_expecting_body(
1705
 
                'Repository.get_stream',
1706
 
                (path, self.to_format.network_name()), search_bytes)
1707
 
            response_tuple, response_handler = response
1708
 
        except errors.UnknownSmartMethod:
1709
 
            medium._remember_remote_is_before((1,13))
 
1960
        search_bytes = repo._serialise_search_result(search)
 
1961
        args = (path, self.to_format.network_name())
 
1962
        candidate_verbs = [
 
1963
            ('Repository.get_stream_1.19', (1, 19)),
 
1964
            ('Repository.get_stream', (1, 13))]
 
1965
        found_verb = False
 
1966
        for verb, version in candidate_verbs:
 
1967
            if medium._is_remote_before(version):
 
1968
                continue
 
1969
            try:
 
1970
                response = repo._call_with_body_bytes_expecting_body(
 
1971
                    verb, args, search_bytes)
 
1972
            except errors.UnknownSmartMethod:
 
1973
                medium._remember_remote_is_before(version)
 
1974
            else:
 
1975
                response_tuple, response_handler = response
 
1976
                found_verb = True
 
1977
                break
 
1978
        if not found_verb:
1710
1979
            return self._real_stream(repo, search)
1711
1980
        if response_tuple[0] != 'ok':
1712
1981
            raise errors.UnexpectedSmartServerResponse(response_tuple)
1713
1982
        byte_stream = response_handler.read_streamed_body()
1714
 
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
 
1983
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream,
 
1984
            self._record_counter)
1715
1985
        if src_format.network_name() != repo._format.network_name():
1716
1986
            raise AssertionError(
1717
1987
                "Mismatched RemoteRepository and stream src %r, %r" % (
1724
1994
        :param search: The overall search to satisfy with streams.
1725
1995
        :param sources: A list of Repository objects to query.
1726
1996
        """
1727
 
        self.serialiser = self.to_format._serializer
 
1997
        self.from_serialiser = self.from_repository._format._serializer
1728
1998
        self.seen_revs = set()
1729
1999
        self.referenced_revs = set()
1730
2000
        # If there are heads in the search, or the key count is > 0, we are not
1747
2017
    def missing_parents_rev_handler(self, substream):
1748
2018
        for content in substream:
1749
2019
            revision_bytes = content.get_bytes_as('fulltext')
1750
 
            revision = self.serialiser.read_revision_from_string(revision_bytes)
 
2020
            revision = self.from_serialiser.read_revision_from_string(
 
2021
                revision_bytes)
1751
2022
            self.seen_revs.add(content.key[-1])
1752
2023
            self.referenced_revs.update(revision.parent_ids)
1753
2024
            yield content
1792
2063
                self._network_name)
1793
2064
 
1794
2065
    def get_format_description(self):
1795
 
        return 'Remote BZR Branch'
 
2066
        self._ensure_real()
 
2067
        return 'Remote: ' + self._custom_format.get_format_description()
1796
2068
 
1797
2069
    def network_name(self):
1798
2070
        return self._network_name
1799
2071
 
1800
 
    def open(self, a_bzrdir, ignore_fallbacks=False):
1801
 
        return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
 
2072
    def open(self, a_bzrdir, name=None, ignore_fallbacks=False):
 
2073
        return a_bzrdir.open_branch(name=name, 
 
2074
            ignore_fallbacks=ignore_fallbacks)
1802
2075
 
1803
 
    def _vfs_initialize(self, a_bzrdir):
 
2076
    def _vfs_initialize(self, a_bzrdir, name):
1804
2077
        # Initialisation when using a local bzrdir object, or a non-vfs init
1805
2078
        # method is not available on the server.
1806
2079
        # self._custom_format is always set - the start of initialize ensures
1807
2080
        # that.
1808
2081
        if isinstance(a_bzrdir, RemoteBzrDir):
1809
2082
            a_bzrdir._ensure_real()
1810
 
            result = self._custom_format.initialize(a_bzrdir._real_bzrdir)
 
2083
            result = self._custom_format.initialize(a_bzrdir._real_bzrdir,
 
2084
                name)
1811
2085
        else:
1812
2086
            # We assume the bzrdir is parameterised; it may not be.
1813
 
            result = self._custom_format.initialize(a_bzrdir)
 
2087
            result = self._custom_format.initialize(a_bzrdir, name)
1814
2088
        if (isinstance(a_bzrdir, RemoteBzrDir) and
1815
2089
            not isinstance(result, RemoteBranch)):
1816
 
            result = RemoteBranch(a_bzrdir, a_bzrdir.find_repository(), result)
 
2090
            result = RemoteBranch(a_bzrdir, a_bzrdir.find_repository(), result,
 
2091
                                  name=name)
1817
2092
        return result
1818
2093
 
1819
 
    def initialize(self, a_bzrdir):
 
2094
    def initialize(self, a_bzrdir, name=None):
1820
2095
        # 1) get the network name to use.
1821
2096
        if self._custom_format:
1822
2097
            network_name = self._custom_format.network_name()
1828
2103
            network_name = reference_format.network_name()
1829
2104
        # Being asked to create on a non RemoteBzrDir:
1830
2105
        if not isinstance(a_bzrdir, RemoteBzrDir):
1831
 
            return self._vfs_initialize(a_bzrdir)
 
2106
            return self._vfs_initialize(a_bzrdir, name=name)
1832
2107
        medium = a_bzrdir._client._medium
1833
2108
        if medium._is_remote_before((1, 13)):
1834
 
            return self._vfs_initialize(a_bzrdir)
 
2109
            return self._vfs_initialize(a_bzrdir, name=name)
1835
2110
        # Creating on a remote bzr dir.
1836
2111
        # 2) try direct creation via RPC
1837
2112
        path = a_bzrdir._path_for_remote_call(a_bzrdir._client)
 
2113
        if name is not None:
 
2114
            # XXX JRV20100304: Support creating colocated branches
 
2115
            raise errors.NoColocatedBranchSupport(self)
1838
2116
        verb = 'BzrDir.create_branch'
1839
2117
        try:
1840
2118
            response = a_bzrdir._call(verb, path, network_name)
1841
2119
        except errors.UnknownSmartMethod:
1842
2120
            # Fallback - use vfs methods
1843
2121
            medium._remember_remote_is_before((1, 13))
1844
 
            return self._vfs_initialize(a_bzrdir)
 
2122
            return self._vfs_initialize(a_bzrdir, name=name)
1845
2123
        if response[0] != 'ok':
1846
2124
            raise errors.UnexpectedSmartServerResponse(response)
1847
2125
        # Turn the response into a RemoteRepository object.
1855
2133
                a_bzrdir._client)
1856
2134
        remote_repo = RemoteRepository(repo_bzrdir, repo_format)
1857
2135
        remote_branch = RemoteBranch(a_bzrdir, remote_repo,
1858
 
            format=format, setup_stacking=False)
 
2136
            format=format, setup_stacking=False, name=name)
1859
2137
        # XXX: We know this is a new branch, so it must have revno 0, revid
1860
2138
        # NULL_REVISION. Creating the branch locked would make this be unable
1861
2139
        # to be wrong; here its simply very unlikely to be wrong. RBC 20090225
1876
2154
        self._ensure_real()
1877
2155
        return self._custom_format.supports_stacking()
1878
2156
 
1879
 
 
1880
 
class RemoteBranch(branch.Branch, _RpcHelper):
 
2157
    def supports_set_append_revisions_only(self):
 
2158
        self._ensure_real()
 
2159
        return self._custom_format.supports_set_append_revisions_only()
 
2160
 
 
2161
 
 
2162
class RemoteBranch(branch.Branch, _RpcHelper, lock._RelockDebugMixin):
1881
2163
    """Branch stored on a server accessed by HPSS RPC.
1882
2164
 
1883
2165
    At the moment most operations are mapped down to simple file operations.
1884
2166
    """
1885
2167
 
1886
2168
    def __init__(self, remote_bzrdir, remote_repository, real_branch=None,
1887
 
        _client=None, format=None, setup_stacking=True):
 
2169
        _client=None, format=None, setup_stacking=True, name=None):
1888
2170
        """Create a RemoteBranch instance.
1889
2171
 
1890
2172
        :param real_branch: An optional local implementation of the branch
1896
2178
        :param setup_stacking: If True make an RPC call to determine the
1897
2179
            stacked (or not) status of the branch. If False assume the branch
1898
2180
            is not stacked.
 
2181
        :param name: Colocated branch name
1899
2182
        """
1900
2183
        # We intentionally don't call the parent class's __init__, because it
1901
2184
        # will try to assign to self.tags, which is a property in this subclass.
1902
2185
        # And the parent's __init__ doesn't do much anyway.
1903
 
        self._revision_id_to_revno_cache = None
1904
 
        self._partial_revision_id_to_revno_cache = {}
1905
 
        self._revision_history_cache = None
1906
 
        self._last_revision_info_cache = None
1907
 
        self._merge_sorted_revisions_cache = None
1908
2186
        self.bzrdir = remote_bzrdir
1909
2187
        if _client is not None:
1910
2188
            self._client = _client
1924
2202
        else:
1925
2203
            self._real_branch = None
1926
2204
        # Fill out expected attributes of branch for bzrlib API users.
1927
 
        self.base = self.bzrdir.root_transport.base
 
2205
        self._clear_cached_state()
 
2206
        # TODO: deprecate self.base in favor of user_url
 
2207
        self.base = self.bzrdir.user_url
 
2208
        self._name = name
1928
2209
        self._control_files = None
1929
2210
        self._lock_mode = None
1930
2211
        self._lock_token = None
1941
2222
                    self._real_branch._format.network_name()
1942
2223
        else:
1943
2224
            self._format = format
 
2225
        # when we do _ensure_real we may need to pass ignore_fallbacks to the
 
2226
        # branch.open_branch method.
 
2227
        self._real_ignore_fallbacks = not setup_stacking
1944
2228
        if not self._format._network_name:
1945
2229
            # Did not get from open_branchV2 - old server.
1946
2230
            self._ensure_real()
1951
2235
        hooks = branch.Branch.hooks['open']
1952
2236
        for hook in hooks:
1953
2237
            hook(self)
 
2238
        self._is_stacked = False
1954
2239
        if setup_stacking:
1955
2240
            self._setup_stacking()
1956
2241
 
1962
2247
        except (errors.NotStacked, errors.UnstackableBranchFormat,
1963
2248
            errors.UnstackableRepositoryFormat), e:
1964
2249
            return
 
2250
        self._is_stacked = True
1965
2251
        self._activate_fallback_location(fallback_url)
1966
2252
 
1967
2253
    def _get_config(self):
1989
2275
                raise AssertionError('smart server vfs must be enabled '
1990
2276
                    'to use vfs implementation')
1991
2277
            self.bzrdir._ensure_real()
1992
 
            self._real_branch = self.bzrdir._real_bzrdir.open_branch()
 
2278
            self._real_branch = self.bzrdir._real_bzrdir.open_branch(
 
2279
                ignore_fallbacks=self._real_ignore_fallbacks, name=self._name)
1993
2280
            if self.repository._real_repository is None:
1994
2281
                # Give the remote repository the matching real repo.
1995
2282
                real_repo = self._real_branch.repository
2069
2356
            raise errors.UnexpectedSmartServerResponse(response)
2070
2357
        return response[1]
2071
2358
 
 
2359
    def set_stacked_on_url(self, url):
 
2360
        branch.Branch.set_stacked_on_url(self, url)
 
2361
        if not url:
 
2362
            self._is_stacked = False
 
2363
        else:
 
2364
            self._is_stacked = True
 
2365
        
2072
2366
    def _vfs_get_tags_bytes(self):
2073
2367
        self._ensure_real()
2074
2368
        return self._real_branch._get_tags_bytes()
2084
2378
            return self._vfs_get_tags_bytes()
2085
2379
        return response[0]
2086
2380
 
 
2381
    def _vfs_set_tags_bytes(self, bytes):
 
2382
        self._ensure_real()
 
2383
        return self._real_branch._set_tags_bytes(bytes)
 
2384
 
 
2385
    def _set_tags_bytes(self, bytes):
 
2386
        medium = self._client._medium
 
2387
        if medium._is_remote_before((1, 18)):
 
2388
            self._vfs_set_tags_bytes(bytes)
 
2389
            return
 
2390
        try:
 
2391
            args = (
 
2392
                self._remote_path(), self._lock_token, self._repo_lock_token)
 
2393
            response = self._call_with_body_bytes(
 
2394
                'Branch.set_tags_bytes', args, bytes)
 
2395
        except errors.UnknownSmartMethod:
 
2396
            medium._remember_remote_is_before((1, 18))
 
2397
            self._vfs_set_tags_bytes(bytes)
 
2398
 
2087
2399
    def lock_read(self):
 
2400
        """Lock the branch for read operations.
 
2401
 
 
2402
        :return: A bzrlib.lock.LogicalLockResult.
 
2403
        """
2088
2404
        self.repository.lock_read()
2089
2405
        if not self._lock_mode:
 
2406
            self._note_lock('r')
2090
2407
            self._lock_mode = 'r'
2091
2408
            self._lock_count = 1
2092
2409
            if self._real_branch is not None:
2093
2410
                self._real_branch.lock_read()
2094
2411
        else:
2095
2412
            self._lock_count += 1
 
2413
        return lock.LogicalLockResult(self.unlock)
2096
2414
 
2097
2415
    def _remote_lock_write(self, token):
2098
2416
        if token is None:
2099
2417
            branch_token = repo_token = ''
2100
2418
        else:
2101
2419
            branch_token = token
2102
 
            repo_token = self.repository.lock_write()
 
2420
            repo_token = self.repository.lock_write().repository_token
2103
2421
            self.repository.unlock()
2104
2422
        err_context = {'token': token}
2105
 
        response = self._call(
2106
 
            'Branch.lock_write', self._remote_path(), branch_token,
2107
 
            repo_token or '', **err_context)
 
2423
        try:
 
2424
            response = self._call(
 
2425
                'Branch.lock_write', self._remote_path(), branch_token,
 
2426
                repo_token or '', **err_context)
 
2427
        except errors.LockContention, e:
 
2428
            # The LockContention from the server doesn't have any
 
2429
            # information about the lock_url. We re-raise LockContention
 
2430
            # with valid lock_url.
 
2431
            raise errors.LockContention('(remote lock)',
 
2432
                self.repository.base.split('.bzr/')[0])
2108
2433
        if response[0] != 'ok':
2109
2434
            raise errors.UnexpectedSmartServerResponse(response)
2110
2435
        ok, branch_token, repo_token = response
2112
2437
 
2113
2438
    def lock_write(self, token=None):
2114
2439
        if not self._lock_mode:
 
2440
            self._note_lock('w')
2115
2441
            # Lock the branch and repo in one remote call.
2116
2442
            remote_tokens = self._remote_lock_write(token)
2117
2443
            self._lock_token, self._repo_lock_token = remote_tokens
2130
2456
            self._lock_mode = 'w'
2131
2457
            self._lock_count = 1
2132
2458
        elif self._lock_mode == 'r':
2133
 
            raise errors.ReadOnlyTransaction
 
2459
            raise errors.ReadOnlyError(self)
2134
2460
        else:
2135
2461
            if token is not None:
2136
2462
                # A token was given to lock_write, and we're relocking, so
2141
2467
            self._lock_count += 1
2142
2468
            # Re-lock the repository too.
2143
2469
            self.repository.lock_write(self._repo_lock_token)
2144
 
        return self._lock_token or None
2145
 
 
2146
 
    def _set_tags_bytes(self, bytes):
2147
 
        self._ensure_real()
2148
 
        return self._real_branch._set_tags_bytes(bytes)
 
2470
        return BranchWriteLockResult(self.unlock, self._lock_token or None)
2149
2471
 
2150
2472
    def _unlock(self, branch_token, repo_token):
2151
2473
        err_context = {'token': str((branch_token, repo_token))}
2156
2478
            return
2157
2479
        raise errors.UnexpectedSmartServerResponse(response)
2158
2480
 
 
2481
    @only_raises(errors.LockNotHeld, errors.LockBroken)
2159
2482
    def unlock(self):
2160
2483
        try:
2161
2484
            self._lock_count -= 1
2201
2524
            raise NotImplementedError(self.dont_leave_lock_in_place)
2202
2525
        self._leave_lock = False
2203
2526
 
 
2527
    @needs_read_lock
 
2528
    def get_rev_id(self, revno, history=None):
 
2529
        if revno == 0:
 
2530
            return _mod_revision.NULL_REVISION
 
2531
        last_revision_info = self.last_revision_info()
 
2532
        ok, result = self.repository.get_rev_id_for_revno(
 
2533
            revno, last_revision_info)
 
2534
        if ok:
 
2535
            return result
 
2536
        missing_parent = result[1]
 
2537
        # Either the revision named by the server is missing, or its parent
 
2538
        # is.  Call get_parent_map to determine which, so that we report a
 
2539
        # useful error.
 
2540
        parent_map = self.repository.get_parent_map([missing_parent])
 
2541
        if missing_parent in parent_map:
 
2542
            missing_parent = parent_map[missing_parent]
 
2543
        raise errors.RevisionNotPresent(missing_parent, self.repository)
 
2544
 
2204
2545
    def _last_revision_info(self):
2205
2546
        response = self._call('Branch.last_revision_info', self._remote_path())
2206
2547
        if response[0] != 'ok':
2211
2552
 
2212
2553
    def _gen_revision_history(self):
2213
2554
        """See Branch._gen_revision_history()."""
 
2555
        if self._is_stacked:
 
2556
            self._ensure_real()
 
2557
            return self._real_branch._gen_revision_history()
2214
2558
        response_tuple, response_handler = self._call_expecting_body(
2215
2559
            'Branch.revision_history', self._remote_path())
2216
2560
        if response_tuple[0] != 'ok':
2301
2645
        self._ensure_real()
2302
2646
        return self._real_branch._get_parent_location()
2303
2647
 
2304
 
    def set_parent(self, url):
2305
 
        self._ensure_real()
2306
 
        return self._real_branch.set_parent(url)
2307
 
 
2308
2648
    def _set_parent_location(self, url):
2309
 
        # Used by tests, to poke bad urls into branch configurations
2310
 
        if url is None:
2311
 
            self.set_parent(url)
2312
 
        else:
2313
 
            self._ensure_real()
2314
 
            return self._real_branch._set_parent_location(url)
 
2649
        medium = self._client._medium
 
2650
        if medium._is_remote_before((1, 15)):
 
2651
            return self._vfs_set_parent_location(url)
 
2652
        try:
 
2653
            call_url = url or ''
 
2654
            if type(call_url) is not str:
 
2655
                raise AssertionError('url must be a str or None (%s)' % url)
 
2656
            response = self._call('Branch.set_parent_location',
 
2657
                self._remote_path(), self._lock_token, self._repo_lock_token,
 
2658
                call_url)
 
2659
        except errors.UnknownSmartMethod:
 
2660
            medium._remember_remote_is_before((1, 15))
 
2661
            return self._vfs_set_parent_location(url)
 
2662
        if response != ():
 
2663
            raise errors.UnexpectedSmartServerResponse(response)
 
2664
 
 
2665
    def _vfs_set_parent_location(self, url):
 
2666
        self._ensure_real()
 
2667
        return self._real_branch._set_parent_location(url)
2315
2668
 
2316
2669
    @needs_write_lock
2317
2670
    def pull(self, source, overwrite=False, stop_revision=None,
2385
2738
        return self._real_branch.set_push_location(location)
2386
2739
 
2387
2740
 
2388
 
class RemoteBranchConfig(object):
2389
 
    """A Config that reads from a smart branch and writes via smart methods.
 
2741
class RemoteConfig(object):
 
2742
    """A Config that reads and writes from smart verbs.
2390
2743
 
2391
2744
    It is a low-level object that considers config data to be name/value pairs
2392
2745
    that may be associated with a section. Assigning meaning to the these
2393
2746
    values is done at higher levels like bzrlib.config.TreeConfig.
2394
2747
    """
2395
2748
 
2396
 
    def __init__(self, branch):
2397
 
        self._branch = branch
2398
 
 
2399
2749
    def get_option(self, name, section=None, default=None):
2400
2750
        """Return the value associated with a named option.
2401
2751
 
2404
2754
        :param default: The value to return if the value is not set
2405
2755
        :return: The value or default value
2406
2756
        """
2407
 
        configobj = self._get_configobj()
2408
 
        if section is None:
2409
 
            section_obj = configobj
2410
 
        else:
2411
 
            try:
2412
 
                section_obj = configobj[section]
2413
 
            except KeyError:
2414
 
                return default
2415
 
        return section_obj.get(name, default)
 
2757
        try:
 
2758
            configobj = self._get_configobj()
 
2759
            if section is None:
 
2760
                section_obj = configobj
 
2761
            else:
 
2762
                try:
 
2763
                    section_obj = configobj[section]
 
2764
                except KeyError:
 
2765
                    return default
 
2766
            return section_obj.get(name, default)
 
2767
        except errors.UnknownSmartMethod:
 
2768
            return self._vfs_get_option(name, section, default)
 
2769
 
 
2770
    def _response_to_configobj(self, response):
 
2771
        if len(response[0]) and response[0][0] != 'ok':
 
2772
            raise errors.UnexpectedSmartServerResponse(response)
 
2773
        lines = response[1].read_body_bytes().splitlines()
 
2774
        return config.ConfigObj(lines, encoding='utf-8')
 
2775
 
 
2776
 
 
2777
class RemoteBranchConfig(RemoteConfig):
 
2778
    """A RemoteConfig for Branches."""
 
2779
 
 
2780
    def __init__(self, branch):
 
2781
        self._branch = branch
2416
2782
 
2417
2783
    def _get_configobj(self):
2418
2784
        path = self._branch._remote_path()
2419
2785
        response = self._branch._client.call_expecting_body(
2420
2786
            'Branch.get_config_file', path)
2421
 
        if response[0][0] != 'ok':
2422
 
            raise UnexpectedSmartServerResponse(response)
2423
 
        lines = response[1].read_body_bytes().splitlines()
2424
 
        return config.ConfigObj(lines, encoding='utf-8')
 
2787
        return self._response_to_configobj(response)
2425
2788
 
2426
2789
    def set_option(self, value, name, section=None):
2427
2790
        """Set the value associated with a named option.
2433
2796
        medium = self._branch._client._medium
2434
2797
        if medium._is_remote_before((1, 14)):
2435
2798
            return self._vfs_set_option(value, name, section)
 
2799
        if isinstance(value, dict):
 
2800
            if medium._is_remote_before((2, 2)):
 
2801
                return self._vfs_set_option(value, name, section)
 
2802
            return self._set_config_option_dict(value, name, section)
 
2803
        else:
 
2804
            return self._set_config_option(value, name, section)
 
2805
 
 
2806
    def _set_config_option(self, value, name, section):
2436
2807
        try:
2437
2808
            path = self._branch._remote_path()
2438
2809
            response = self._branch._client.call('Branch.set_config_option',
2439
2810
                path, self._branch._lock_token, self._branch._repo_lock_token,
2440
2811
                value.encode('utf8'), name, section or '')
2441
2812
        except errors.UnknownSmartMethod:
 
2813
            medium = self._branch._client._medium
2442
2814
            medium._remember_remote_is_before((1, 14))
2443
2815
            return self._vfs_set_option(value, name, section)
2444
2816
        if response != ():
2445
2817
            raise errors.UnexpectedSmartServerResponse(response)
2446
2818
 
 
2819
    def _serialize_option_dict(self, option_dict):
 
2820
        utf8_dict = {}
 
2821
        for key, value in option_dict.items():
 
2822
            if isinstance(key, unicode):
 
2823
                key = key.encode('utf8')
 
2824
            if isinstance(value, unicode):
 
2825
                value = value.encode('utf8')
 
2826
            utf8_dict[key] = value
 
2827
        return bencode.bencode(utf8_dict)
 
2828
 
 
2829
    def _set_config_option_dict(self, value, name, section):
 
2830
        try:
 
2831
            path = self._branch._remote_path()
 
2832
            serialised_dict = self._serialize_option_dict(value)
 
2833
            response = self._branch._client.call(
 
2834
                'Branch.set_config_option_dict',
 
2835
                path, self._branch._lock_token, self._branch._repo_lock_token,
 
2836
                serialised_dict, name, section or '')
 
2837
        except errors.UnknownSmartMethod:
 
2838
            medium = self._branch._client._medium
 
2839
            medium._remember_remote_is_before((2, 2))
 
2840
            return self._vfs_set_option(value, name, section)
 
2841
        if response != ():
 
2842
            raise errors.UnexpectedSmartServerResponse(response)
 
2843
 
 
2844
    def _real_object(self):
 
2845
        self._branch._ensure_real()
 
2846
        return self._branch._real_branch
 
2847
 
2447
2848
    def _vfs_set_option(self, value, name, section=None):
2448
 
        self._branch._ensure_real()
2449
 
        return self._branch._real_branch._get_config().set_option(
2450
 
            value, name, section)
 
2849
        return self._real_object()._get_config().set_option(
 
2850
            value, name, section)
 
2851
 
 
2852
 
 
2853
class RemoteBzrDirConfig(RemoteConfig):
 
2854
    """A RemoteConfig for BzrDirs."""
 
2855
 
 
2856
    def __init__(self, bzrdir):
 
2857
        self._bzrdir = bzrdir
 
2858
 
 
2859
    def _get_configobj(self):
 
2860
        medium = self._bzrdir._client._medium
 
2861
        verb = 'BzrDir.get_config_file'
 
2862
        if medium._is_remote_before((1, 15)):
 
2863
            raise errors.UnknownSmartMethod(verb)
 
2864
        path = self._bzrdir._path_for_remote_call(self._bzrdir._client)
 
2865
        response = self._bzrdir._call_expecting_body(
 
2866
            verb, path)
 
2867
        return self._response_to_configobj(response)
 
2868
 
 
2869
    def _vfs_get_option(self, name, section, default):
 
2870
        return self._real_object()._get_config().get_option(
 
2871
            name, section, default)
 
2872
 
 
2873
    def set_option(self, value, name, section=None):
 
2874
        """Set the value associated with a named option.
 
2875
 
 
2876
        :param value: The value to set
 
2877
        :param name: The name of the value to set
 
2878
        :param section: The section the option is in (if any)
 
2879
        """
 
2880
        return self._real_object()._get_config().set_option(
 
2881
            value, name, section)
 
2882
 
 
2883
    def _real_object(self):
 
2884
        self._bzrdir._ensure_real()
 
2885
        return self._bzrdir._real_bzrdir
 
2886
 
2451
2887
 
2452
2888
 
2453
2889
def _extract_tar(tar, to_dir):
2493
2929
                    'Missing key %r in context %r', key_err.args[0], context)
2494
2930
                raise err
2495
2931
 
2496
 
    if err.error_verb == 'NoSuchRevision':
 
2932
    if err.error_verb == 'IncompatibleRepositories':
 
2933
        raise errors.IncompatibleRepositories(err.error_args[0],
 
2934
            err.error_args[1], err.error_args[2])
 
2935
    elif err.error_verb == 'NoSuchRevision':
2497
2936
        raise NoSuchRevision(find('branch'), err.error_args[0])
2498
2937
    elif err.error_verb == 'nosuchrevision':
2499
2938
        raise NoSuchRevision(find('repository'), err.error_args[0])
2500
 
    elif err.error_tuple == ('nobranch',):
2501
 
        raise errors.NotBranchError(path=find('bzrdir').root_transport.base)
 
2939
    elif err.error_verb == 'nobranch':
 
2940
        if len(err.error_args) >= 1:
 
2941
            extra = err.error_args[0]
 
2942
        else:
 
2943
            extra = None
 
2944
        raise errors.NotBranchError(path=find('bzrdir').root_transport.base,
 
2945
            detail=extra)
2502
2946
    elif err.error_verb == 'norepository':
2503
2947
        raise errors.NoRepositoryPresent(find('bzrdir'))
2504
2948
    elif err.error_verb == 'LockContention':