~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-08-20 05:20:56 UTC
  • mfrom: (5380.3.3 doc)
  • Revision ID: pqm@pqm.ubuntu.com-20100820052056-gwad7dz2otckrjax
(mbp) Start whatsnew for 2.3; update ppa developer docs (Martin Pool)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
27
27
    lock,
28
28
    lockdir,
29
29
    repository,
 
30
    repository as _mod_repository,
30
31
    revision,
31
32
    revision as _mod_revision,
 
33
    static_tuple,
32
34
    symbol_versioning,
33
35
)
34
 
from bzrlib.branch import BranchReferenceFormat
 
36
from bzrlib.branch import BranchReferenceFormat, BranchWriteLockResult
35
37
from bzrlib.bzrdir import BzrDir, RemoteBzrDirFormat
36
 
from bzrlib.decorators import needs_read_lock, needs_write_lock
 
38
from bzrlib.decorators import needs_read_lock, needs_write_lock, only_raises
37
39
from bzrlib.errors import (
38
40
    NoSuchRevision,
39
41
    SmartProtocolError,
41
43
from bzrlib.lockable_files import LockableFiles
42
44
from bzrlib.smart import client, vfs, repository as smart_repo
43
45
from bzrlib.revision import ensure_null, NULL_REVISION
 
46
from bzrlib.repository import RepositoryWriteLockResult
44
47
from bzrlib.trace import mutter, note, warning
45
48
 
46
49
 
89
92
class RemoteBzrDir(BzrDir, _RpcHelper):
90
93
    """Control directory on a remote server, accessed via bzr:// or similar."""
91
94
 
92
 
    def __init__(self, transport, format, _client=None):
 
95
    def __init__(self, transport, format, _client=None, _force_probe=False):
93
96
        """Construct a RemoteBzrDir.
94
97
 
95
98
        :param _client: Private parameter for testing. Disables probing and the
99
102
        # this object holds a delegated bzrdir that uses file-level operations
100
103
        # to talk to the other side
101
104
        self._real_bzrdir = None
 
105
        self._has_working_tree = None
102
106
        # 1-shot cache for the call pattern 'create_branch; open_branch' - see
103
107
        # create_branch for details.
104
108
        self._next_open_branch_result = None
108
112
            self._client = client._SmartClient(medium)
109
113
        else:
110
114
            self._client = _client
111
 
            return
112
 
 
 
115
            if not _force_probe:
 
116
                return
 
117
 
 
118
        self._probe_bzrdir()
 
119
 
 
120
    def __repr__(self):
 
121
        return '%s(%r)' % (self.__class__.__name__, self._client)
 
122
 
 
123
    def _probe_bzrdir(self):
 
124
        medium = self._client._medium
113
125
        path = self._path_for_remote_call(self._client)
 
126
        if medium._is_remote_before((2, 1)):
 
127
            self._rpc_open(path)
 
128
            return
 
129
        try:
 
130
            self._rpc_open_2_1(path)
 
131
            return
 
132
        except errors.UnknownSmartMethod:
 
133
            medium._remember_remote_is_before((2, 1))
 
134
            self._rpc_open(path)
 
135
 
 
136
    def _rpc_open_2_1(self, path):
 
137
        response = self._call('BzrDir.open_2.1', path)
 
138
        if response == ('no',):
 
139
            raise errors.NotBranchError(path=self.root_transport.base)
 
140
        elif response[0] == 'yes':
 
141
            if response[1] == 'yes':
 
142
                self._has_working_tree = True
 
143
            elif response[1] == 'no':
 
144
                self._has_working_tree = False
 
145
            else:
 
146
                raise errors.UnexpectedSmartServerResponse(response)
 
147
        else:
 
148
            raise errors.UnexpectedSmartServerResponse(response)
 
149
 
 
150
    def _rpc_open(self, path):
114
151
        response = self._call('BzrDir.open', path)
115
152
        if response not in [('yes',), ('no',)]:
116
153
            raise errors.UnexpectedSmartServerResponse(response)
117
154
        if response == ('no',):
118
 
            raise errors.NotBranchError(path=transport.base)
 
155
            raise errors.NotBranchError(path=self.root_transport.base)
119
156
 
120
157
    def _ensure_real(self):
121
158
        """Ensure that there is a _real_bzrdir set.
123
160
        Used before calls to self._real_bzrdir.
124
161
        """
125
162
        if not self._real_bzrdir:
 
163
            if 'hpssvfs' in debug.debug_flags:
 
164
                import traceback
 
165
                warning('VFS BzrDir access triggered\n%s',
 
166
                    ''.join(traceback.format_stack()))
126
167
            self._real_bzrdir = BzrDir.open_from_transport(
127
168
                self.root_transport, _server_formats=False)
128
169
            self._format._network_name = \
204
245
        self._ensure_real()
205
246
        self._real_bzrdir.destroy_repository()
206
247
 
207
 
    def create_branch(self):
 
248
    def create_branch(self, name=None):
208
249
        # as per meta1 formats - just delegate to the format object which may
209
250
        # be parameterised.
210
 
        real_branch = self._format.get_branch_format().initialize(self)
 
251
        real_branch = self._format.get_branch_format().initialize(self,
 
252
            name=name)
211
253
        if not isinstance(real_branch, RemoteBranch):
212
 
            result = RemoteBranch(self, self.find_repository(), real_branch)
 
254
            result = RemoteBranch(self, self.find_repository(), real_branch,
 
255
                                  name=name)
213
256
        else:
214
257
            result = real_branch
215
258
        # BzrDir.clone_on_transport() uses the result of create_branch but does
221
264
        self._next_open_branch_result = result
222
265
        return result
223
266
 
224
 
    def destroy_branch(self):
 
267
    def destroy_branch(self, name=None):
225
268
        """See BzrDir.destroy_branch"""
226
269
        self._ensure_real()
227
 
        self._real_bzrdir.destroy_branch()
 
270
        self._real_bzrdir.destroy_branch(name=name)
228
271
        self._next_open_branch_result = None
229
272
 
230
273
    def create_workingtree(self, revision_id=None, from_branch=None):
231
274
        raise errors.NotLocalUrl(self.transport.base)
232
275
 
233
 
    def find_branch_format(self):
 
276
    def find_branch_format(self, name=None):
234
277
        """Find the branch 'format' for this bzrdir.
235
278
 
236
279
        This might be a synthetic object for e.g. RemoteBranch and SVN.
237
280
        """
238
 
        b = self.open_branch()
 
281
        b = self.open_branch(name=name)
239
282
        return b._format
240
283
 
241
 
    def get_branch_reference(self):
 
284
    def get_branch_reference(self, name=None):
242
285
        """See BzrDir.get_branch_reference()."""
 
286
        if name is not None:
 
287
            # XXX JRV20100304: Support opening colocated branches
 
288
            raise errors.NoColocatedBranchSupport(self)
243
289
        response = self._get_branch_reference()
244
290
        if response[0] == 'ref':
245
291
            return response[1]
249
295
    def _get_branch_reference(self):
250
296
        path = self._path_for_remote_call(self._client)
251
297
        medium = self._client._medium
252
 
        if not medium._is_remote_before((1, 13)):
 
298
        candidate_calls = [
 
299
            ('BzrDir.open_branchV3', (2, 1)),
 
300
            ('BzrDir.open_branchV2', (1, 13)),
 
301
            ('BzrDir.open_branch', None),
 
302
            ]
 
303
        for verb, required_version in candidate_calls:
 
304
            if required_version and medium._is_remote_before(required_version):
 
305
                continue
253
306
            try:
254
 
                response = self._call('BzrDir.open_branchV2', path)
255
 
                if response[0] not in ('ref', 'branch'):
256
 
                    raise errors.UnexpectedSmartServerResponse(response)
257
 
                return response
 
307
                response = self._call(verb, path)
258
308
            except errors.UnknownSmartMethod:
259
 
                medium._remember_remote_is_before((1, 13))
260
 
        response = self._call('BzrDir.open_branch', path)
261
 
        if response[0] != 'ok':
 
309
                if required_version is None:
 
310
                    raise
 
311
                medium._remember_remote_is_before(required_version)
 
312
            else:
 
313
                break
 
314
        if verb == 'BzrDir.open_branch':
 
315
            if response[0] != 'ok':
 
316
                raise errors.UnexpectedSmartServerResponse(response)
 
317
            if response[1] != '':
 
318
                return ('ref', response[1])
 
319
            else:
 
320
                return ('branch', '')
 
321
        if response[0] not in ('ref', 'branch'):
262
322
            raise errors.UnexpectedSmartServerResponse(response)
263
 
        if response[1] != '':
264
 
            return ('ref', response[1])
265
 
        else:
266
 
            return ('branch', '')
 
323
        return response
267
324
 
268
 
    def _get_tree_branch(self):
 
325
    def _get_tree_branch(self, name=None):
269
326
        """See BzrDir._get_tree_branch()."""
270
 
        return None, self.open_branch()
 
327
        return None, self.open_branch(name=name)
271
328
 
272
 
    def open_branch(self, _unsupported=False, ignore_fallbacks=False):
273
 
        if _unsupported:
 
329
    def open_branch(self, name=None, unsupported=False,
 
330
                    ignore_fallbacks=False):
 
331
        if unsupported:
274
332
            raise NotImplementedError('unsupported flag support not implemented yet.')
275
333
        if self._next_open_branch_result is not None:
276
334
            # See create_branch for details.
281
339
        if response[0] == 'ref':
282
340
            # a branch reference, use the existing BranchReference logic.
283
341
            format = BranchReferenceFormat()
284
 
            return format.open(self, _found=True, location=response[1],
285
 
                ignore_fallbacks=ignore_fallbacks)
 
342
            return format.open(self, name=name, _found=True,
 
343
                location=response[1], ignore_fallbacks=ignore_fallbacks)
286
344
        branch_format_name = response[1]
287
345
        if not branch_format_name:
288
346
            branch_format_name = None
289
347
        format = RemoteBranchFormat(network_name=branch_format_name)
290
348
        return RemoteBranch(self, self.find_repository(), format=format,
291
 
            setup_stacking=not ignore_fallbacks)
 
349
            setup_stacking=not ignore_fallbacks, name=name)
292
350
 
293
351
    def _open_repo_v1(self, path):
294
352
        verb = 'BzrDir.find_repository'
355
413
        else:
356
414
            raise errors.NoRepositoryPresent(self)
357
415
 
 
416
    def has_workingtree(self):
 
417
        if self._has_working_tree is None:
 
418
            self._ensure_real()
 
419
            self._has_working_tree = self._real_bzrdir.has_workingtree()
 
420
        return self._has_working_tree
 
421
 
358
422
    def open_workingtree(self, recommend_upgrade=True):
359
 
        self._ensure_real()
360
 
        if self._real_bzrdir.has_workingtree():
 
423
        if self.has_workingtree():
361
424
            raise errors.NotLocalUrl(self.root_transport)
362
425
        else:
363
426
            raise errors.NoWorkingTree(self.root_transport.base)
366
429
        """Return the path to be used for this bzrdir in a remote call."""
367
430
        return client.remote_path_from_transport(self.root_transport)
368
431
 
369
 
    def get_branch_transport(self, branch_format):
 
432
    def get_branch_transport(self, branch_format, name=None):
370
433
        self._ensure_real()
371
 
        return self._real_bzrdir.get_branch_transport(branch_format)
 
434
        return self._real_bzrdir.get_branch_transport(branch_format, name=name)
372
435
 
373
436
    def get_repository_transport(self, repository_format):
374
437
        self._ensure_real()
426
489
        self._custom_format = None
427
490
        self._network_name = None
428
491
        self._creating_bzrdir = None
 
492
        self._supports_chks = None
429
493
        self._supports_external_lookups = None
430
494
        self._supports_tree_reference = None
431
495
        self._rich_root_data = None
432
496
 
 
497
    def __repr__(self):
 
498
        return "%s(_network_name=%r)" % (self.__class__.__name__,
 
499
            self._network_name)
 
500
 
433
501
    @property
434
502
    def fast_deltas(self):
435
503
        self._ensure_real()
443
511
        return self._rich_root_data
444
512
 
445
513
    @property
 
514
    def supports_chks(self):
 
515
        if self._supports_chks is None:
 
516
            self._ensure_real()
 
517
            self._supports_chks = self._custom_format.supports_chks
 
518
        return self._supports_chks
 
519
 
 
520
    @property
446
521
    def supports_external_lookups(self):
447
522
        if self._supports_external_lookups is None:
448
523
            self._ensure_real()
549
624
        return self._custom_format._fetch_reconcile
550
625
 
551
626
    def get_format_description(self):
552
 
        return 'bzr remote repository'
 
627
        self._ensure_real()
 
628
        return 'Remote: ' + self._custom_format.get_format_description()
553
629
 
554
630
    def __eq__(self, other):
555
631
        return self.__class__ is other.__class__
556
632
 
557
 
    def check_conversion_target(self, target_format):
558
 
        if self.rich_root_data and not target_format.rich_root_data:
559
 
            raise errors.BadConversionTarget(
560
 
                'Does not support rich root data.', target_format)
561
 
        if (self.supports_tree_reference and
562
 
            not getattr(target_format, 'supports_tree_reference', False)):
563
 
            raise errors.BadConversionTarget(
564
 
                'Does not support nested trees', target_format)
565
 
 
566
633
    def network_name(self):
567
634
        if self._network_name:
568
635
            return self._network_name
580
647
        return self._custom_format._serializer
581
648
 
582
649
 
583
 
class RemoteRepository(_RpcHelper):
 
650
class RemoteRepository(_RpcHelper, lock._RelockDebugMixin,
 
651
    bzrdir.ControlComponent):
584
652
    """Repository accessed over rpc.
585
653
 
586
654
    For the moment most operations are performed using local transport-backed
629
697
        # Additional places to query for data.
630
698
        self._fallback_repositories = []
631
699
 
 
700
    @property
 
701
    def user_transport(self):
 
702
        return self.bzrdir.user_transport
 
703
 
 
704
    @property
 
705
    def control_transport(self):
 
706
        # XXX: Normally you shouldn't directly get at the remote repository
 
707
        # transport, but I'm not sure it's worth making this method
 
708
        # optional -- mbp 2010-04-21
 
709
        return self.bzrdir.get_repository_transport(None)
 
710
        
632
711
    def __str__(self):
633
712
        return "%s(%s)" % (self.__class__.__name__, self.base)
634
713
 
820
899
    def _has_same_fallbacks(self, other_repo):
821
900
        """Returns true if the repositories have the same fallbacks."""
822
901
        # XXX: copied from Repository; it should be unified into a base class
823
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/401622>
 
902
        # <https://bugs.launchpad.net/bzr/+bug/401622>
824
903
        my_fb = self._fallback_repositories
825
904
        other_fb = other_repo._fallback_repositories
826
905
        if len(my_fb) != len(other_fb):
842
921
        parents_provider = self._make_parents_provider(other_repository)
843
922
        return graph.Graph(parents_provider)
844
923
 
 
924
    @needs_read_lock
 
925
    def get_known_graph_ancestry(self, revision_ids):
 
926
        """Return the known graph for a set of revision ids and their ancestors.
 
927
        """
 
928
        st = static_tuple.StaticTuple
 
929
        revision_keys = [st(r_id).intern() for r_id in revision_ids]
 
930
        known_graph = self.revisions.get_known_graph_ancestry(revision_keys)
 
931
        return graph.GraphThunkIdsToKeys(known_graph)
 
932
 
845
933
    def gather_stats(self, revid=None, committers=None):
846
934
        """See Repository.gather_stats()."""
847
935
        path = self.bzrdir._path_for_remote_call(self._client)
907
995
    def is_write_locked(self):
908
996
        return self._lock_mode == 'w'
909
997
 
 
998
    def _warn_if_deprecated(self, branch=None):
 
999
        # If we have a real repository, the check will be done there, if we
 
1000
        # don't the check will be done remotely.
 
1001
        pass
 
1002
 
910
1003
    def lock_read(self):
 
1004
        """Lock the repository for read operations.
 
1005
 
 
1006
        :return: A bzrlib.lock.LogicalLockResult.
 
1007
        """
911
1008
        # wrong eventually - want a local lock cache context
912
1009
        if not self._lock_mode:
 
1010
            self._note_lock('r')
913
1011
            self._lock_mode = 'r'
914
1012
            self._lock_count = 1
915
1013
            self._unstacked_provider.enable_cache(cache_misses=True)
919
1017
                repo.lock_read()
920
1018
        else:
921
1019
            self._lock_count += 1
 
1020
        return lock.LogicalLockResult(self.unlock)
922
1021
 
923
1022
    def _remote_lock_write(self, token):
924
1023
        path = self.bzrdir._path_for_remote_call(self._client)
935
1034
 
936
1035
    def lock_write(self, token=None, _skip_rpc=False):
937
1036
        if not self._lock_mode:
 
1037
            self._note_lock('w')
938
1038
            if _skip_rpc:
939
1039
                if self._lock_token is not None:
940
1040
                    if token != self._lock_token:
963
1063
            raise errors.ReadOnlyError(self)
964
1064
        else:
965
1065
            self._lock_count += 1
966
 
        return self._lock_token or None
 
1066
        return RepositoryWriteLockResult(self.unlock, self._lock_token or None)
967
1067
 
968
1068
    def leave_lock_in_place(self):
969
1069
        if not self._lock_token:
1043
1143
        else:
1044
1144
            raise errors.UnexpectedSmartServerResponse(response)
1045
1145
 
 
1146
    @only_raises(errors.LockNotHeld, errors.LockBroken)
1046
1147
    def unlock(self):
1047
1148
        if not self._lock_count:
1048
1149
            return lock.cant_unlock_not_held(self)
1148
1249
            # state, so always add a lock here. If a caller passes us a locked
1149
1250
            # repository, they are responsible for unlocking it later.
1150
1251
            repository.lock_read()
 
1252
        self._check_fallback_repository(repository)
1151
1253
        self._fallback_repositories.append(repository)
1152
1254
        # If self._real_repository was parameterised already (e.g. because a
1153
1255
        # _real_branch had its get_stacked_on_url method called), then the
1154
1256
        # repository to be added may already be in the _real_repositories list.
1155
1257
        if self._real_repository is not None:
1156
 
            fallback_locations = [repo.bzrdir.root_transport.base for repo in
 
1258
            fallback_locations = [repo.user_url for repo in
1157
1259
                self._real_repository._fallback_repositories]
1158
 
            if repository.bzrdir.root_transport.base not in fallback_locations:
 
1260
            if repository.user_url not in fallback_locations:
1159
1261
                self._real_repository.add_fallback_repository(repository)
1160
1262
 
 
1263
    def _check_fallback_repository(self, repository):
 
1264
        """Check that this repository can fallback to repository safely.
 
1265
 
 
1266
        Raise an error if not.
 
1267
 
 
1268
        :param repository: A repository to fallback to.
 
1269
        """
 
1270
        return _mod_repository.InterRepository._assert_same_model(
 
1271
            self, repository)
 
1272
 
1161
1273
    def add_inventory(self, revid, inv, parents):
1162
1274
        self._ensure_real()
1163
1275
        return self._real_repository.add_inventory(revid, inv, parents)
1164
1276
 
1165
1277
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
1166
 
                               parents):
 
1278
            parents, basis_inv=None, propagate_caches=False):
1167
1279
        self._ensure_real()
1168
1280
        return self._real_repository.add_inventory_by_delta(basis_revision_id,
1169
 
            delta, new_revision_id, parents)
 
1281
            delta, new_revision_id, parents, basis_inv=basis_inv,
 
1282
            propagate_caches=propagate_caches)
1170
1283
 
1171
1284
    def add_revision(self, rev_id, rev, inv=None, config=None):
1172
1285
        self._ensure_real()
1178
1291
        self._ensure_real()
1179
1292
        return self._real_repository.get_inventory(revision_id)
1180
1293
 
1181
 
    def iter_inventories(self, revision_ids):
 
1294
    def iter_inventories(self, revision_ids, ordering=None):
1182
1295
        self._ensure_real()
1183
 
        return self._real_repository.iter_inventories(revision_ids)
 
1296
        return self._real_repository.iter_inventories(revision_ids, ordering)
1184
1297
 
1185
1298
    @needs_read_lock
1186
1299
    def get_revision(self, revision_id):
1202
1315
        return self._real_repository.make_working_trees()
1203
1316
 
1204
1317
    def refresh_data(self):
1205
 
        """Re-read any data needed to to synchronise with disk.
 
1318
        """Re-read any data needed to synchronise with disk.
1206
1319
 
1207
1320
        This method is intended to be called after another repository instance
1208
1321
        (such as one used by a smart server) has inserted data into the
1209
 
        repository. It may not be called during a write group, but may be
1210
 
        called at any other time.
 
1322
        repository. On all repositories this will work outside of write groups.
 
1323
        Some repository formats (pack and newer for bzrlib native formats)
 
1324
        support refresh_data inside write groups. If called inside a write
 
1325
        group on a repository that does not support refreshing in a write group
 
1326
        IsInWriteGroupError will be raised.
1211
1327
        """
1212
 
        if self.is_in_write_group():
1213
 
            raise errors.InternalBzrError(
1214
 
                "May not refresh_data while in a write group.")
1215
1328
        if self._real_repository is not None:
1216
1329
            self._real_repository.refresh_data()
1217
1330
 
1431
1544
        return self._real_repository.get_signature_text(revision_id)
1432
1545
 
1433
1546
    @needs_read_lock
1434
 
    def get_inventory_xml(self, revision_id):
1435
 
        self._ensure_real()
1436
 
        return self._real_repository.get_inventory_xml(revision_id)
1437
 
 
1438
 
    def deserialise_inventory(self, revision_id, xml):
1439
 
        self._ensure_real()
1440
 
        return self._real_repository.deserialise_inventory(revision_id, xml)
 
1547
    def _get_inventory_xml(self, revision_id):
 
1548
        self._ensure_real()
 
1549
        return self._real_repository._get_inventory_xml(revision_id)
1441
1550
 
1442
1551
    def reconcile(self, other=None, thorough=False):
1443
1552
        self._ensure_real()
1519
1628
        return self._real_repository.inventories
1520
1629
 
1521
1630
    @needs_write_lock
1522
 
    def pack(self, hint=None):
 
1631
    def pack(self, hint=None, clean_obsolete_packs=False):
1523
1632
        """Compress the data within the repository.
1524
1633
 
1525
1634
        This is not currently implemented within the smart server.
1526
1635
        """
1527
1636
        self._ensure_real()
1528
 
        return self._real_repository.pack(hint=hint)
 
1637
        return self._real_repository.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1529
1638
 
1530
1639
    @property
1531
1640
    def revisions(self):
1682
1791
    def insert_stream(self, stream, src_format, resume_tokens):
1683
1792
        target = self.target_repo
1684
1793
        target._unstacked_provider.missing_keys.clear()
 
1794
        candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1685
1795
        if target._lock_token:
1686
 
            verb = 'Repository.insert_stream_locked'
1687
 
            extra_args = (target._lock_token or '',)
1688
 
            required_version = (1, 14)
 
1796
            candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
 
1797
            lock_args = (target._lock_token or '',)
1689
1798
        else:
1690
 
            verb = 'Repository.insert_stream'
1691
 
            extra_args = ()
1692
 
            required_version = (1, 13)
 
1799
            candidate_calls.append(('Repository.insert_stream', (1, 13)))
 
1800
            lock_args = ()
1693
1801
        client = target._client
1694
1802
        medium = client._medium
1695
 
        if medium._is_remote_before(required_version):
1696
 
            # No possible way this can work.
1697
 
            return self._insert_real(stream, src_format, resume_tokens)
1698
1803
        path = target.bzrdir._path_for_remote_call(client)
1699
 
        if not resume_tokens:
1700
 
            # XXX: Ugly but important for correctness, *will* be fixed during
1701
 
            # 1.13 cycle. Pushing a stream that is interrupted results in a
1702
 
            # fallback to the _real_repositories sink *with a partial stream*.
1703
 
            # Thats bad because we insert less data than bzr expected. To avoid
1704
 
            # this we do a trial push to make sure the verb is accessible, and
1705
 
            # do not fallback when actually pushing the stream. A cleanup patch
1706
 
            # is going to look at rewinding/restarting the stream/partial
1707
 
            # buffering etc.
 
1804
        # Probe for the verb to use with an empty stream before sending the
 
1805
        # real stream to it.  We do this both to avoid the risk of sending a
 
1806
        # large request that is then rejected, and because we don't want to
 
1807
        # implement a way to buffer, rewind, or restart the stream.
 
1808
        found_verb = False
 
1809
        for verb, required_version in candidate_calls:
 
1810
            if medium._is_remote_before(required_version):
 
1811
                continue
 
1812
            if resume_tokens:
 
1813
                # We've already done the probing (and set _is_remote_before) on
 
1814
                # a previous insert.
 
1815
                found_verb = True
 
1816
                break
1708
1817
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1709
1818
            try:
1710
1819
                response = client.call_with_body_stream(
1711
 
                    (verb, path, '') + extra_args, byte_stream)
 
1820
                    (verb, path, '') + lock_args, byte_stream)
1712
1821
            except errors.UnknownSmartMethod:
1713
1822
                medium._remember_remote_is_before(required_version)
1714
 
                return self._insert_real(stream, src_format, resume_tokens)
 
1823
            else:
 
1824
                found_verb = True
 
1825
                break
 
1826
        if not found_verb:
 
1827
            # Have to use VFS.
 
1828
            return self._insert_real(stream, src_format, resume_tokens)
 
1829
        self._last_inv_record = None
 
1830
        self._last_substream = None
 
1831
        if required_version < (1, 19):
 
1832
            # Remote side doesn't support inventory deltas.  Wrap the stream to
 
1833
            # make sure we don't send any.  If the stream contains inventory
 
1834
            # deltas we'll interrupt the smart insert_stream request and
 
1835
            # fallback to VFS.
 
1836
            stream = self._stop_stream_if_inventory_delta(stream)
1715
1837
        byte_stream = smart_repo._stream_to_byte_stream(
1716
1838
            stream, src_format)
1717
1839
        resume_tokens = ' '.join(resume_tokens)
1718
1840
        response = client.call_with_body_stream(
1719
 
            (verb, path, resume_tokens) + extra_args, byte_stream)
 
1841
            (verb, path, resume_tokens) + lock_args, byte_stream)
1720
1842
        if response[0][0] not in ('ok', 'missing-basis'):
1721
1843
            raise errors.UnexpectedSmartServerResponse(response)
 
1844
        if self._last_substream is not None:
 
1845
            # The stream included an inventory-delta record, but the remote
 
1846
            # side isn't new enough to support them.  So we need to send the
 
1847
            # rest of the stream via VFS.
 
1848
            self.target_repo.refresh_data()
 
1849
            return self._resume_stream_with_vfs(response, src_format)
1722
1850
        if response[0][0] == 'missing-basis':
1723
1851
            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1724
1852
            resume_tokens = tokens
1727
1855
            self.target_repo.refresh_data()
1728
1856
            return [], set()
1729
1857
 
 
1858
    def _resume_stream_with_vfs(self, response, src_format):
 
1859
        """Resume sending a stream via VFS, first resending the record and
 
1860
        substream that couldn't be sent via an insert_stream verb.
 
1861
        """
 
1862
        if response[0][0] == 'missing-basis':
 
1863
            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
 
1864
            # Ignore missing_keys, we haven't finished inserting yet
 
1865
        else:
 
1866
            tokens = []
 
1867
        def resume_substream():
 
1868
            # Yield the substream that was interrupted.
 
1869
            for record in self._last_substream:
 
1870
                yield record
 
1871
            self._last_substream = None
 
1872
        def resume_stream():
 
1873
            # Finish sending the interrupted substream
 
1874
            yield ('inventory-deltas', resume_substream())
 
1875
            # Then simply continue sending the rest of the stream.
 
1876
            for substream_kind, substream in self._last_stream:
 
1877
                yield substream_kind, substream
 
1878
        return self._insert_real(resume_stream(), src_format, tokens)
 
1879
 
 
1880
    def _stop_stream_if_inventory_delta(self, stream):
 
1881
        """Normally this just lets the original stream pass-through unchanged.
 
1882
 
 
1883
        However if any 'inventory-deltas' substream occurs it will stop
 
1884
        streaming, and store the interrupted substream and stream in
 
1885
        self._last_substream and self._last_stream so that the stream can be
 
1886
        resumed by _resume_stream_with_vfs.
 
1887
        """
 
1888
                    
 
1889
        stream_iter = iter(stream)
 
1890
        for substream_kind, substream in stream_iter:
 
1891
            if substream_kind == 'inventory-deltas':
 
1892
                self._last_substream = substream
 
1893
                self._last_stream = stream_iter
 
1894
                return
 
1895
            else:
 
1896
                yield substream_kind, substream
 
1897
            
1730
1898
 
1731
1899
class RemoteStreamSource(repository.StreamSource):
1732
1900
    """Stream data from a remote server."""
1747
1915
            sources.append(repo)
1748
1916
        return self.missing_parents_chain(search, sources)
1749
1917
 
 
1918
    def get_stream_for_missing_keys(self, missing_keys):
 
1919
        self.from_repository._ensure_real()
 
1920
        real_repo = self.from_repository._real_repository
 
1921
        real_source = real_repo._get_source(self.to_format)
 
1922
        return real_source.get_stream_for_missing_keys(missing_keys)
 
1923
 
1750
1924
    def _real_stream(self, repo, search):
1751
1925
        """Get a stream for search from repo.
1752
1926
        
1758
1932
        """
1759
1933
        source = repo._get_source(self.to_format)
1760
1934
        if isinstance(source, RemoteStreamSource):
1761
 
            return repository.StreamSource.get_stream(source, search)
 
1935
            repo._ensure_real()
 
1936
            source = repo._real_repository._get_source(self.to_format)
1762
1937
        return source.get_stream(search)
1763
1938
 
1764
1939
    def _get_stream(self, repo, search):
1781
1956
            return self._real_stream(repo, search)
1782
1957
        client = repo._client
1783
1958
        medium = client._medium
1784
 
        if medium._is_remote_before((1, 13)):
1785
 
            # streaming was added in 1.13
1786
 
            return self._real_stream(repo, search)
1787
1959
        path = repo.bzrdir._path_for_remote_call(client)
1788
 
        try:
1789
 
            search_bytes = repo._serialise_search_result(search)
1790
 
            response = repo._call_with_body_bytes_expecting_body(
1791
 
                'Repository.get_stream',
1792
 
                (path, self.to_format.network_name()), search_bytes)
1793
 
            response_tuple, response_handler = response
1794
 
        except errors.UnknownSmartMethod:
1795
 
            medium._remember_remote_is_before((1,13))
 
1960
        search_bytes = repo._serialise_search_result(search)
 
1961
        args = (path, self.to_format.network_name())
 
1962
        candidate_verbs = [
 
1963
            ('Repository.get_stream_1.19', (1, 19)),
 
1964
            ('Repository.get_stream', (1, 13))]
 
1965
        found_verb = False
 
1966
        for verb, version in candidate_verbs:
 
1967
            if medium._is_remote_before(version):
 
1968
                continue
 
1969
            try:
 
1970
                response = repo._call_with_body_bytes_expecting_body(
 
1971
                    verb, args, search_bytes)
 
1972
            except errors.UnknownSmartMethod:
 
1973
                medium._remember_remote_is_before(version)
 
1974
            else:
 
1975
                response_tuple, response_handler = response
 
1976
                found_verb = True
 
1977
                break
 
1978
        if not found_verb:
1796
1979
            return self._real_stream(repo, search)
1797
1980
        if response_tuple[0] != 'ok':
1798
1981
            raise errors.UnexpectedSmartServerResponse(response_tuple)
1799
1982
        byte_stream = response_handler.read_streamed_body()
1800
 
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
 
1983
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream,
 
1984
            self._record_counter)
1801
1985
        if src_format.network_name() != repo._format.network_name():
1802
1986
            raise AssertionError(
1803
1987
                "Mismatched RemoteRepository and stream src %r, %r" % (
1810
1994
        :param search: The overall search to satisfy with streams.
1811
1995
        :param sources: A list of Repository objects to query.
1812
1996
        """
1813
 
        self.serialiser = self.to_format._serializer
 
1997
        self.from_serialiser = self.from_repository._format._serializer
1814
1998
        self.seen_revs = set()
1815
1999
        self.referenced_revs = set()
1816
2000
        # If there are heads in the search, or the key count is > 0, we are not
1833
2017
    def missing_parents_rev_handler(self, substream):
1834
2018
        for content in substream:
1835
2019
            revision_bytes = content.get_bytes_as('fulltext')
1836
 
            revision = self.serialiser.read_revision_from_string(revision_bytes)
 
2020
            revision = self.from_serialiser.read_revision_from_string(
 
2021
                revision_bytes)
1837
2022
            self.seen_revs.add(content.key[-1])
1838
2023
            self.referenced_revs.update(revision.parent_ids)
1839
2024
            yield content
1878
2063
                self._network_name)
1879
2064
 
1880
2065
    def get_format_description(self):
1881
 
        return 'Remote BZR Branch'
 
2066
        self._ensure_real()
 
2067
        return 'Remote: ' + self._custom_format.get_format_description()
1882
2068
 
1883
2069
    def network_name(self):
1884
2070
        return self._network_name
1885
2071
 
1886
 
    def open(self, a_bzrdir, ignore_fallbacks=False):
1887
 
        return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
 
2072
    def open(self, a_bzrdir, name=None, ignore_fallbacks=False):
 
2073
        return a_bzrdir.open_branch(name=name, 
 
2074
            ignore_fallbacks=ignore_fallbacks)
1888
2075
 
1889
 
    def _vfs_initialize(self, a_bzrdir):
 
2076
    def _vfs_initialize(self, a_bzrdir, name):
1890
2077
        # Initialisation when using a local bzrdir object, or a non-vfs init
1891
2078
        # method is not available on the server.
1892
2079
        # self._custom_format is always set - the start of initialize ensures
1893
2080
        # that.
1894
2081
        if isinstance(a_bzrdir, RemoteBzrDir):
1895
2082
            a_bzrdir._ensure_real()
1896
 
            result = self._custom_format.initialize(a_bzrdir._real_bzrdir)
 
2083
            result = self._custom_format.initialize(a_bzrdir._real_bzrdir,
 
2084
                name)
1897
2085
        else:
1898
2086
            # We assume the bzrdir is parameterised; it may not be.
1899
 
            result = self._custom_format.initialize(a_bzrdir)
 
2087
            result = self._custom_format.initialize(a_bzrdir, name)
1900
2088
        if (isinstance(a_bzrdir, RemoteBzrDir) and
1901
2089
            not isinstance(result, RemoteBranch)):
1902
 
            result = RemoteBranch(a_bzrdir, a_bzrdir.find_repository(), result)
 
2090
            result = RemoteBranch(a_bzrdir, a_bzrdir.find_repository(), result,
 
2091
                                  name=name)
1903
2092
        return result
1904
2093
 
1905
 
    def initialize(self, a_bzrdir):
 
2094
    def initialize(self, a_bzrdir, name=None):
1906
2095
        # 1) get the network name to use.
1907
2096
        if self._custom_format:
1908
2097
            network_name = self._custom_format.network_name()
1914
2103
            network_name = reference_format.network_name()
1915
2104
        # Being asked to create on a non RemoteBzrDir:
1916
2105
        if not isinstance(a_bzrdir, RemoteBzrDir):
1917
 
            return self._vfs_initialize(a_bzrdir)
 
2106
            return self._vfs_initialize(a_bzrdir, name=name)
1918
2107
        medium = a_bzrdir._client._medium
1919
2108
        if medium._is_remote_before((1, 13)):
1920
 
            return self._vfs_initialize(a_bzrdir)
 
2109
            return self._vfs_initialize(a_bzrdir, name=name)
1921
2110
        # Creating on a remote bzr dir.
1922
2111
        # 2) try direct creation via RPC
1923
2112
        path = a_bzrdir._path_for_remote_call(a_bzrdir._client)
 
2113
        if name is not None:
 
2114
            # XXX JRV20100304: Support creating colocated branches
 
2115
            raise errors.NoColocatedBranchSupport(self)
1924
2116
        verb = 'BzrDir.create_branch'
1925
2117
        try:
1926
2118
            response = a_bzrdir._call(verb, path, network_name)
1927
2119
        except errors.UnknownSmartMethod:
1928
2120
            # Fallback - use vfs methods
1929
2121
            medium._remember_remote_is_before((1, 13))
1930
 
            return self._vfs_initialize(a_bzrdir)
 
2122
            return self._vfs_initialize(a_bzrdir, name=name)
1931
2123
        if response[0] != 'ok':
1932
2124
            raise errors.UnexpectedSmartServerResponse(response)
1933
2125
        # Turn the response into a RemoteRepository object.
1941
2133
                a_bzrdir._client)
1942
2134
        remote_repo = RemoteRepository(repo_bzrdir, repo_format)
1943
2135
        remote_branch = RemoteBranch(a_bzrdir, remote_repo,
1944
 
            format=format, setup_stacking=False)
 
2136
            format=format, setup_stacking=False, name=name)
1945
2137
        # XXX: We know this is a new branch, so it must have revno 0, revid
1946
2138
        # NULL_REVISION. Creating the branch locked would make this be unable
1947
2139
        # to be wrong; here its simply very unlikely to be wrong. RBC 20090225
1967
2159
        return self._custom_format.supports_set_append_revisions_only()
1968
2160
 
1969
2161
 
1970
 
class RemoteBranch(branch.Branch, _RpcHelper):
 
2162
class RemoteBranch(branch.Branch, _RpcHelper, lock._RelockDebugMixin):
1971
2163
    """Branch stored on a server accessed by HPSS RPC.
1972
2164
 
1973
2165
    At the moment most operations are mapped down to simple file operations.
1974
2166
    """
1975
2167
 
1976
2168
    def __init__(self, remote_bzrdir, remote_repository, real_branch=None,
1977
 
        _client=None, format=None, setup_stacking=True):
 
2169
        _client=None, format=None, setup_stacking=True, name=None):
1978
2170
        """Create a RemoteBranch instance.
1979
2171
 
1980
2172
        :param real_branch: An optional local implementation of the branch
1986
2178
        :param setup_stacking: If True make an RPC call to determine the
1987
2179
            stacked (or not) status of the branch. If False assume the branch
1988
2180
            is not stacked.
 
2181
        :param name: Colocated branch name
1989
2182
        """
1990
2183
        # We intentionally don't call the parent class's __init__, because it
1991
2184
        # will try to assign to self.tags, which is a property in this subclass.
2010
2203
            self._real_branch = None
2011
2204
        # Fill out expected attributes of branch for bzrlib API users.
2012
2205
        self._clear_cached_state()
2013
 
        self.base = self.bzrdir.root_transport.base
 
2206
        # TODO: deprecate self.base in favor of user_url
 
2207
        self.base = self.bzrdir.user_url
 
2208
        self._name = name
2014
2209
        self._control_files = None
2015
2210
        self._lock_mode = None
2016
2211
        self._lock_token = None
2027
2222
                    self._real_branch._format.network_name()
2028
2223
        else:
2029
2224
            self._format = format
 
2225
        # when we do _ensure_real we may need to pass ignore_fallbacks to the
 
2226
        # branch.open_branch method.
 
2227
        self._real_ignore_fallbacks = not setup_stacking
2030
2228
        if not self._format._network_name:
2031
2229
            # Did not get from open_branchV2 - old server.
2032
2230
            self._ensure_real()
2077
2275
                raise AssertionError('smart server vfs must be enabled '
2078
2276
                    'to use vfs implementation')
2079
2277
            self.bzrdir._ensure_real()
2080
 
            self._real_branch = self.bzrdir._real_bzrdir.open_branch()
 
2278
            self._real_branch = self.bzrdir._real_bzrdir.open_branch(
 
2279
                ignore_fallbacks=self._real_ignore_fallbacks, name=self._name)
2081
2280
            if self.repository._real_repository is None:
2082
2281
                # Give the remote repository the matching real repo.
2083
2282
                real_repo = self._real_branch.repository
2187
2386
        medium = self._client._medium
2188
2387
        if medium._is_remote_before((1, 18)):
2189
2388
            self._vfs_set_tags_bytes(bytes)
 
2389
            return
2190
2390
        try:
2191
2391
            args = (
2192
2392
                self._remote_path(), self._lock_token, self._repo_lock_token)
2197
2397
            self._vfs_set_tags_bytes(bytes)
2198
2398
 
2199
2399
    def lock_read(self):
 
2400
        """Lock the branch for read operations.
 
2401
 
 
2402
        :return: A bzrlib.lock.LogicalLockResult.
 
2403
        """
2200
2404
        self.repository.lock_read()
2201
2405
        if not self._lock_mode:
 
2406
            self._note_lock('r')
2202
2407
            self._lock_mode = 'r'
2203
2408
            self._lock_count = 1
2204
2409
            if self._real_branch is not None:
2205
2410
                self._real_branch.lock_read()
2206
2411
        else:
2207
2412
            self._lock_count += 1
 
2413
        return lock.LogicalLockResult(self.unlock)
2208
2414
 
2209
2415
    def _remote_lock_write(self, token):
2210
2416
        if token is None:
2211
2417
            branch_token = repo_token = ''
2212
2418
        else:
2213
2419
            branch_token = token
2214
 
            repo_token = self.repository.lock_write()
 
2420
            repo_token = self.repository.lock_write().repository_token
2215
2421
            self.repository.unlock()
2216
2422
        err_context = {'token': token}
2217
 
        response = self._call(
2218
 
            'Branch.lock_write', self._remote_path(), branch_token,
2219
 
            repo_token or '', **err_context)
 
2423
        try:
 
2424
            response = self._call(
 
2425
                'Branch.lock_write', self._remote_path(), branch_token,
 
2426
                repo_token or '', **err_context)
 
2427
        except errors.LockContention, e:
 
2428
            # The LockContention from the server doesn't have any
 
2429
            # information about the lock_url. We re-raise LockContention
 
2430
            # with valid lock_url.
 
2431
            raise errors.LockContention('(remote lock)',
 
2432
                self.repository.base.split('.bzr/')[0])
2220
2433
        if response[0] != 'ok':
2221
2434
            raise errors.UnexpectedSmartServerResponse(response)
2222
2435
        ok, branch_token, repo_token = response
2224
2437
 
2225
2438
    def lock_write(self, token=None):
2226
2439
        if not self._lock_mode:
 
2440
            self._note_lock('w')
2227
2441
            # Lock the branch and repo in one remote call.
2228
2442
            remote_tokens = self._remote_lock_write(token)
2229
2443
            self._lock_token, self._repo_lock_token = remote_tokens
2242
2456
            self._lock_mode = 'w'
2243
2457
            self._lock_count = 1
2244
2458
        elif self._lock_mode == 'r':
2245
 
            raise errors.ReadOnlyTransaction
 
2459
            raise errors.ReadOnlyError(self)
2246
2460
        else:
2247
2461
            if token is not None:
2248
2462
                # A token was given to lock_write, and we're relocking, so
2253
2467
            self._lock_count += 1
2254
2468
            # Re-lock the repository too.
2255
2469
            self.repository.lock_write(self._repo_lock_token)
2256
 
        return self._lock_token or None
 
2470
        return BranchWriteLockResult(self.unlock, self._lock_token or None)
2257
2471
 
2258
2472
    def _unlock(self, branch_token, repo_token):
2259
2473
        err_context = {'token': str((branch_token, repo_token))}
2264
2478
            return
2265
2479
        raise errors.UnexpectedSmartServerResponse(response)
2266
2480
 
 
2481
    @only_raises(errors.LockNotHeld, errors.LockBroken)
2267
2482
    def unlock(self):
2268
2483
        try:
2269
2484
            self._lock_count -= 1
2309
2524
            raise NotImplementedError(self.dont_leave_lock_in_place)
2310
2525
        self._leave_lock = False
2311
2526
 
 
2527
    @needs_read_lock
2312
2528
    def get_rev_id(self, revno, history=None):
2313
2529
        if revno == 0:
2314
2530
            return _mod_revision.NULL_REVISION
2580
2796
        medium = self._branch._client._medium
2581
2797
        if medium._is_remote_before((1, 14)):
2582
2798
            return self._vfs_set_option(value, name, section)
 
2799
        if isinstance(value, dict):
 
2800
            if medium._is_remote_before((2, 2)):
 
2801
                return self._vfs_set_option(value, name, section)
 
2802
            return self._set_config_option_dict(value, name, section)
 
2803
        else:
 
2804
            return self._set_config_option(value, name, section)
 
2805
 
 
2806
    def _set_config_option(self, value, name, section):
2583
2807
        try:
2584
2808
            path = self._branch._remote_path()
2585
2809
            response = self._branch._client.call('Branch.set_config_option',
2586
2810
                path, self._branch._lock_token, self._branch._repo_lock_token,
2587
2811
                value.encode('utf8'), name, section or '')
2588
2812
        except errors.UnknownSmartMethod:
 
2813
            medium = self._branch._client._medium
2589
2814
            medium._remember_remote_is_before((1, 14))
2590
2815
            return self._vfs_set_option(value, name, section)
2591
2816
        if response != ():
2592
2817
            raise errors.UnexpectedSmartServerResponse(response)
2593
2818
 
 
2819
    def _serialize_option_dict(self, option_dict):
 
2820
        utf8_dict = {}
 
2821
        for key, value in option_dict.items():
 
2822
            if isinstance(key, unicode):
 
2823
                key = key.encode('utf8')
 
2824
            if isinstance(value, unicode):
 
2825
                value = value.encode('utf8')
 
2826
            utf8_dict[key] = value
 
2827
        return bencode.bencode(utf8_dict)
 
2828
 
 
2829
    def _set_config_option_dict(self, value, name, section):
 
2830
        try:
 
2831
            path = self._branch._remote_path()
 
2832
            serialised_dict = self._serialize_option_dict(value)
 
2833
            response = self._branch._client.call(
 
2834
                'Branch.set_config_option_dict',
 
2835
                path, self._branch._lock_token, self._branch._repo_lock_token,
 
2836
                serialised_dict, name, section or '')
 
2837
        except errors.UnknownSmartMethod:
 
2838
            medium = self._branch._client._medium
 
2839
            medium._remember_remote_is_before((2, 2))
 
2840
            return self._vfs_set_option(value, name, section)
 
2841
        if response != ():
 
2842
            raise errors.UnexpectedSmartServerResponse(response)
 
2843
 
2594
2844
    def _real_object(self):
2595
2845
        self._branch._ensure_real()
2596
2846
        return self._branch._real_branch
2679
2929
                    'Missing key %r in context %r', key_err.args[0], context)
2680
2930
                raise err
2681
2931
 
2682
 
    if err.error_verb == 'NoSuchRevision':
 
2932
    if err.error_verb == 'IncompatibleRepositories':
 
2933
        raise errors.IncompatibleRepositories(err.error_args[0],
 
2934
            err.error_args[1], err.error_args[2])
 
2935
    elif err.error_verb == 'NoSuchRevision':
2683
2936
        raise NoSuchRevision(find('branch'), err.error_args[0])
2684
2937
    elif err.error_verb == 'nosuchrevision':
2685
2938
        raise NoSuchRevision(find('repository'), err.error_args[0])
2686
 
    elif err.error_tuple == ('nobranch',):
2687
 
        raise errors.NotBranchError(path=find('bzrdir').root_transport.base)
 
2939
    elif err.error_verb == 'nobranch':
 
2940
        if len(err.error_args) >= 1:
 
2941
            extra = err.error_args[0]
 
2942
        else:
 
2943
            extra = None
 
2944
        raise errors.NotBranchError(path=find('bzrdir').root_transport.base,
 
2945
            detail=extra)
2688
2946
    elif err.error_verb == 'norepository':
2689
2947
        raise errors.NoRepositoryPresent(find('bzrdir'))
2690
2948
    elif err.error_verb == 'LockContention':