~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

  • Committer: Matt Nordhoff
  • Date: 2009-04-04 02:50:01 UTC
  • mfrom: (4253 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4256.
  • Revision ID: mnordhoff@mattnordhoff.com-20090404025001-z1403k0tatmc8l91
Merge bzr.dev, fixing conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
22
22
from bzrlib import (
23
23
    branch,
24
24
    bzrdir,
 
25
    config,
25
26
    debug,
26
27
    errors,
27
28
    graph,
40
41
    SmartProtocolError,
41
42
    )
42
43
from bzrlib.lockable_files import LockableFiles
43
 
from bzrlib.smart import client, vfs
 
44
from bzrlib.smart import client, vfs, repository as smart_repo
44
45
from bzrlib.revision import ensure_null, NULL_REVISION
45
46
from bzrlib.trace import mutter, note, warning
46
 
from bzrlib.versionedfile import record_to_fulltext_bytes
 
47
from bzrlib.util import bencode
47
48
 
48
49
 
49
50
class _RpcHelper(object):
54
55
            return self._client.call(method, *args)
55
56
        except errors.ErrorFromSmartServer, err:
56
57
            self._translate_error(err, **err_context)
57
 
        
 
58
 
58
59
    def _call_expecting_body(self, method, *args, **err_context):
59
60
        try:
60
61
            return self._client.call_expecting_body(method, *args)
61
62
        except errors.ErrorFromSmartServer, err:
62
63
            self._translate_error(err, **err_context)
63
 
        
 
64
 
64
65
    def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
65
66
                                             **err_context):
66
67
        try:
68
69
                method, args, body_bytes)
69
70
        except errors.ErrorFromSmartServer, err:
70
71
            self._translate_error(err, **err_context)
71
 
        
 
72
 
 
73
 
 
74
def response_tuple_to_repo_format(response):
 
75
    """Convert a response tuple describing a repository format to a format."""
 
76
    format = RemoteRepositoryFormat()
 
77
    format._rich_root_data = (response[0] == 'yes')
 
78
    format._supports_tree_reference = (response[1] == 'yes')
 
79
    format._supports_external_lookups = (response[2] == 'yes')
 
80
    format._network_name = response[3]
 
81
    return format
 
82
 
 
83
 
72
84
# Note: RemoteBzrDirFormat is in bzrdir.py
73
85
 
74
86
class RemoteBzrDir(BzrDir, _RpcHelper):
84
96
        # this object holds a delegated bzrdir that uses file-level operations
85
97
        # to talk to the other side
86
98
        self._real_bzrdir = None
 
99
        # 1-shot cache for the call pattern 'create_branch; open_branch' - see
 
100
        # create_branch for details.
 
101
        self._next_open_branch_result = None
87
102
 
88
103
        if _client is None:
89
104
            medium = transport.get_smart_medium()
107
122
        if not self._real_bzrdir:
108
123
            self._real_bzrdir = BzrDir.open_from_transport(
109
124
                self.root_transport, _server_formats=False)
 
125
            self._format._network_name = \
 
126
                self._real_bzrdir._format.network_name()
110
127
 
111
128
    def _translate_error(self, err, **context):
112
129
        _translate_error(err, bzrdir=self, **context)
113
130
 
114
 
    def cloning_metadir(self, stacked=False):
 
131
    def break_lock(self):
 
132
        # Prevent aliasing problems in the next_open_branch_result cache.
 
133
        # See create_branch for rationale.
 
134
        self._next_open_branch_result = None
 
135
        return BzrDir.break_lock(self)
 
136
 
 
137
    def _vfs_cloning_metadir(self, require_stacking=False):
115
138
        self._ensure_real()
116
 
        return self._real_bzrdir.cloning_metadir(stacked)
 
139
        return self._real_bzrdir.cloning_metadir(
 
140
            require_stacking=require_stacking)
 
141
 
 
142
    def cloning_metadir(self, require_stacking=False):
 
143
        medium = self._client._medium
 
144
        if medium._is_remote_before((1, 13)):
 
145
            return self._vfs_cloning_metadir(require_stacking=require_stacking)
 
146
        verb = 'BzrDir.cloning_metadir'
 
147
        if require_stacking:
 
148
            stacking = 'True'
 
149
        else:
 
150
            stacking = 'False'
 
151
        path = self._path_for_remote_call(self._client)
 
152
        try:
 
153
            response = self._call(verb, path, stacking)
 
154
        except errors.UnknownSmartMethod:
 
155
            medium._remember_remote_is_before((1, 13))
 
156
            return self._vfs_cloning_metadir(require_stacking=require_stacking)
 
157
        except errors.UnknownErrorFromSmartServer, err:
 
158
            if err.error_tuple != ('BranchReference',):
 
159
                raise
 
160
            # We need to resolve the branch reference to determine the
 
161
            # cloning_metadir.  This causes unnecessary RPCs to open the
 
162
            # referenced branch (and bzrdir, etc) but only when the caller
 
163
            # didn't already resolve the branch reference.
 
164
            referenced_branch = self.open_branch()
 
165
            return referenced_branch.bzrdir.cloning_metadir()
 
166
        if len(response) != 3:
 
167
            raise errors.UnexpectedSmartServerResponse(response)
 
168
        control_name, repo_name, branch_info = response
 
169
        if len(branch_info) != 2:
 
170
            raise errors.UnexpectedSmartServerResponse(response)
 
171
        branch_ref, branch_name = branch_info
 
172
        format = bzrdir.network_format_registry.get(control_name)
 
173
        if repo_name:
 
174
            format.repository_format = repository.network_format_registry.get(
 
175
                repo_name)
 
176
        if branch_ref == 'ref':
 
177
            # XXX: we need possible_transports here to avoid reopening the
 
178
            # connection to the referenced location
 
179
            ref_bzrdir = BzrDir.open(branch_name)
 
180
            branch_format = ref_bzrdir.cloning_metadir().get_branch_format()
 
181
            format.set_branch_format(branch_format)
 
182
        elif branch_ref == 'branch':
 
183
            if branch_name:
 
184
                format.set_branch_format(
 
185
                    branch.network_format_registry.get(branch_name))
 
186
        else:
 
187
            raise errors.UnexpectedSmartServerResponse(response)
 
188
        return format
117
189
 
118
190
    def create_repository(self, shared=False):
119
191
        # as per meta1 formats - just delegate to the format object which may
134
206
        # be parameterised.
135
207
        real_branch = self._format.get_branch_format().initialize(self)
136
208
        if not isinstance(real_branch, RemoteBranch):
137
 
            return RemoteBranch(self, self.find_repository(), real_branch)
 
209
            result = RemoteBranch(self, self.find_repository(), real_branch)
138
210
        else:
139
 
            return real_branch
 
211
            result = real_branch
 
212
        # BzrDir.clone_on_transport() uses the result of create_branch but does
 
213
        # not return it to its callers; we save approximately 8% of our round
 
214
        # trips by handing the branch we created back to the first caller to
 
215
        # open_branch rather than probing anew. Long term we need a API in
 
216
        # bzrdir that doesn't discard result objects (like result_branch).
 
217
        # RBC 20090225
 
218
        self._next_open_branch_result = result
 
219
        return result
140
220
 
141
221
    def destroy_branch(self):
142
222
        """See BzrDir.destroy_branch"""
143
223
        self._ensure_real()
144
224
        self._real_bzrdir.destroy_branch()
 
225
        self._next_open_branch_result = None
145
226
 
146
227
    def create_workingtree(self, revision_id=None, from_branch=None):
147
228
        raise errors.NotLocalUrl(self.transport.base)
156
237
 
157
238
    def get_branch_reference(self):
158
239
        """See BzrDir.get_branch_reference()."""
 
240
        response = self._get_branch_reference()
 
241
        if response[0] == 'ref':
 
242
            return response[1]
 
243
        else:
 
244
            return None
 
245
 
 
246
    def _get_branch_reference(self):
159
247
        path = self._path_for_remote_call(self._client)
 
248
        medium = self._client._medium
 
249
        if not medium._is_remote_before((1, 13)):
 
250
            try:
 
251
                response = self._call('BzrDir.open_branchV2', path)
 
252
                if response[0] not in ('ref', 'branch'):
 
253
                    raise errors.UnexpectedSmartServerResponse(response)
 
254
                return response
 
255
            except errors.UnknownSmartMethod:
 
256
                medium._remember_remote_is_before((1, 13))
160
257
        response = self._call('BzrDir.open_branch', path)
161
 
        if response[0] == 'ok':
162
 
            if response[1] == '':
163
 
                # branch at this location.
164
 
                return None
165
 
            else:
166
 
                # a branch reference, use the existing BranchReference logic.
167
 
                return response[1]
 
258
        if response[0] != 'ok':
 
259
            raise errors.UnexpectedSmartServerResponse(response)
 
260
        if response[1] != '':
 
261
            return ('ref', response[1])
168
262
        else:
169
 
            raise errors.UnexpectedSmartServerResponse(response)
 
263
            return ('branch', '')
170
264
 
171
265
    def _get_tree_branch(self):
172
266
        """See BzrDir._get_tree_branch()."""
173
267
        return None, self.open_branch()
174
268
 
175
 
    def open_branch(self, _unsupported=False):
 
269
    def open_branch(self, _unsupported=False, ignore_fallbacks=False):
176
270
        if _unsupported:
177
271
            raise NotImplementedError('unsupported flag support not implemented yet.')
178
 
        reference_url = self.get_branch_reference()
179
 
        if reference_url is None:
180
 
            # branch at this location.
181
 
            return RemoteBranch(self, self.find_repository())
182
 
        else:
 
272
        if self._next_open_branch_result is not None:
 
273
            # See create_branch for details.
 
274
            result = self._next_open_branch_result
 
275
            self._next_open_branch_result = None
 
276
            return result
 
277
        response = self._get_branch_reference()
 
278
        if response[0] == 'ref':
183
279
            # a branch reference, use the existing BranchReference logic.
184
280
            format = BranchReferenceFormat()
185
 
            return format.open(self, _found=True, location=reference_url)
186
 
                
 
281
            return format.open(self, _found=True, location=response[1],
 
282
                ignore_fallbacks=ignore_fallbacks)
 
283
        branch_format_name = response[1]
 
284
        if not branch_format_name:
 
285
            branch_format_name = None
 
286
        format = RemoteBranchFormat(network_name=branch_format_name)
 
287
        return RemoteBranch(self, self.find_repository(), format=format,
 
288
            setup_stacking=not ignore_fallbacks)
 
289
 
 
290
    def _open_repo_v1(self, path):
 
291
        verb = 'BzrDir.find_repository'
 
292
        response = self._call(verb, path)
 
293
        if response[0] != 'ok':
 
294
            raise errors.UnexpectedSmartServerResponse(response)
 
295
        # servers that only support the v1 method don't support external
 
296
        # references either.
 
297
        self._ensure_real()
 
298
        repo = self._real_bzrdir.open_repository()
 
299
        response = response + ('no', repo._format.network_name())
 
300
        return response, repo
 
301
 
 
302
    def _open_repo_v2(self, path):
 
303
        verb = 'BzrDir.find_repositoryV2'
 
304
        response = self._call(verb, path)
 
305
        if response[0] != 'ok':
 
306
            raise errors.UnexpectedSmartServerResponse(response)
 
307
        self._ensure_real()
 
308
        repo = self._real_bzrdir.open_repository()
 
309
        response = response + (repo._format.network_name(),)
 
310
        return response, repo
 
311
 
 
312
    def _open_repo_v3(self, path):
 
313
        verb = 'BzrDir.find_repositoryV3'
 
314
        medium = self._client._medium
 
315
        if medium._is_remote_before((1, 13)):
 
316
            raise errors.UnknownSmartMethod(verb)
 
317
        try:
 
318
            response = self._call(verb, path)
 
319
        except errors.UnknownSmartMethod:
 
320
            medium._remember_remote_is_before((1, 13))
 
321
            raise
 
322
        if response[0] != 'ok':
 
323
            raise errors.UnexpectedSmartServerResponse(response)
 
324
        return response, None
 
325
 
187
326
    def open_repository(self):
188
327
        path = self._path_for_remote_call(self._client)
189
 
        verb = 'BzrDir.find_repositoryV2'
190
 
        try:
191
 
            response = self._call(verb, path)
192
 
        except errors.UnknownSmartMethod:
193
 
            verb = 'BzrDir.find_repository'
194
 
            response = self._call(verb, path)
 
328
        response = None
 
329
        for probe in [self._open_repo_v3, self._open_repo_v2,
 
330
            self._open_repo_v1]:
 
331
            try:
 
332
                response, real_repo = probe(path)
 
333
                break
 
334
            except errors.UnknownSmartMethod:
 
335
                pass
 
336
        if response is None:
 
337
            raise errors.UnknownSmartMethod('BzrDir.find_repository{3,2,}')
195
338
        if response[0] != 'ok':
196
339
            raise errors.UnexpectedSmartServerResponse(response)
197
 
        if verb == 'BzrDir.find_repository':
198
 
            # servers that don't support the V2 method don't support external
199
 
            # references either.
200
 
            response = response + ('no', )
201
 
        if not (len(response) == 5):
 
340
        if len(response) != 6:
202
341
            raise SmartProtocolError('incorrect response length %s' % (response,))
203
342
        if response[1] == '':
204
 
            format = RemoteRepositoryFormat()
205
 
            format.rich_root_data = (response[2] == 'yes')
206
 
            format.supports_tree_reference = (response[3] == 'yes')
207
 
            # No wire format to check this yet.
208
 
            format.supports_external_lookups = (response[4] == 'yes')
 
343
            # repo is at this dir.
 
344
            format = response_tuple_to_repo_format(response[2:])
209
345
            # Used to support creating a real format instance when needed.
210
346
            format._creating_bzrdir = self
211
347
            remote_repo = RemoteRepository(self, format)
212
348
            format._creating_repo = remote_repo
 
349
            if real_repo is not None:
 
350
                remote_repo._set_real_repository(real_repo)
213
351
            return remote_repo
214
352
        else:
215
353
            raise errors.NoRepositoryPresent(self)
271
409
    on a per instance basis, and are not set (and should not be) at
272
410
    the class level.
273
411
 
274
 
    :ivar _custom_format: If set, a specific concrete repository format that 
 
412
    :ivar _custom_format: If set, a specific concrete repository format that
275
413
        will be used when initializing a repository with this
276
414
        RemoteRepositoryFormat.
277
415
    :ivar _creating_repo: If set, the repository object that this
286
424
        self._custom_format = None
287
425
        self._network_name = None
288
426
        self._creating_bzrdir = None
 
427
        self._supports_external_lookups = None
 
428
        self._supports_tree_reference = None
 
429
        self._rich_root_data = None
 
430
 
 
431
    @property
 
432
    def fast_deltas(self):
 
433
        self._ensure_real()
 
434
        return self._custom_format.fast_deltas
 
435
 
 
436
    @property
 
437
    def rich_root_data(self):
 
438
        if self._rich_root_data is None:
 
439
            self._ensure_real()
 
440
            self._rich_root_data = self._custom_format.rich_root_data
 
441
        return self._rich_root_data
 
442
 
 
443
    @property
 
444
    def supports_external_lookups(self):
 
445
        if self._supports_external_lookups is None:
 
446
            self._ensure_real()
 
447
            self._supports_external_lookups = \
 
448
                self._custom_format.supports_external_lookups
 
449
        return self._supports_external_lookups
 
450
 
 
451
    @property
 
452
    def supports_tree_reference(self):
 
453
        if self._supports_tree_reference is None:
 
454
            self._ensure_real()
 
455
            self._supports_tree_reference = \
 
456
                self._custom_format.supports_tree_reference
 
457
        return self._supports_tree_reference
289
458
 
290
459
    def _vfs_initialize(self, a_bzrdir, shared):
291
460
        """Helper for common code in initialize."""
339
508
            response = a_bzrdir._call(verb, path, network_name, shared_str)
340
509
        except errors.UnknownSmartMethod:
341
510
            # Fallback - use vfs methods
 
511
            medium._remember_remote_is_before((1, 13))
342
512
            return self._vfs_initialize(a_bzrdir, shared)
343
513
        else:
344
514
            # Turn the response into a RemoteRepository object.
345
 
            format = RemoteRepositoryFormat()
346
 
            format.rich_root_data = (response[1] == 'yes')
347
 
            format.supports_tree_reference = (response[2] == 'yes')
348
 
            format.supports_external_lookups = (response[3] == 'yes')
349
 
            format._network_name = response[4]
 
515
            format = response_tuple_to_repo_format(response[1:])
350
516
            # Used to support creating a real format instance when needed.
351
517
            format._creating_bzrdir = a_bzrdir
352
518
            remote_repo = RemoteRepository(a_bzrdir, format)
353
519
            format._creating_repo = remote_repo
354
520
            return remote_repo
355
 
    
 
521
 
356
522
    def open(self, a_bzrdir):
357
523
        if not isinstance(a_bzrdir, RemoteBzrDir):
358
524
            raise AssertionError('%r is not a RemoteBzrDir' % (a_bzrdir,))
359
525
        return a_bzrdir.open_repository()
360
526
 
 
527
    def _ensure_real(self):
 
528
        if self._custom_format is None:
 
529
            self._custom_format = repository.network_format_registry.get(
 
530
                self._network_name)
 
531
 
 
532
    @property
 
533
    def _fetch_order(self):
 
534
        self._ensure_real()
 
535
        return self._custom_format._fetch_order
 
536
 
 
537
    @property
 
538
    def _fetch_uses_deltas(self):
 
539
        self._ensure_real()
 
540
        return self._custom_format._fetch_uses_deltas
 
541
 
 
542
    @property
 
543
    def _fetch_reconcile(self):
 
544
        self._ensure_real()
 
545
        return self._custom_format._fetch_reconcile
 
546
 
361
547
    def get_format_description(self):
362
548
        return 'bzr remote repository'
363
549
 
364
550
    def __eq__(self, other):
365
 
        return self.__class__ == other.__class__
 
551
        return self.__class__ is other.__class__
366
552
 
367
553
    def check_conversion_target(self, target_format):
368
554
        if self.rich_root_data and not target_format.rich_root_data:
381
567
 
382
568
    @property
383
569
    def _serializer(self):
384
 
        # We should only be getting asked for the serializer for
385
 
        # RemoteRepositoryFormat objects when the RemoteRepositoryFormat object
386
 
        # is a concrete instance for a RemoteRepository. In this case we know
387
 
        # the creating_repo and can use it to supply the serializer.
388
 
        self._creating_repo._ensure_real()
389
 
        return self._creating_repo._real_repository._format._serializer
 
570
        self._ensure_real()
 
571
        return self._custom_format._serializer
390
572
 
391
573
 
392
574
class RemoteRepository(_RpcHelper):
398
580
 
399
581
    def __init__(self, remote_bzrdir, format, real_repository=None, _client=None):
400
582
        """Create a RemoteRepository instance.
401
 
        
 
583
 
402
584
        :param remote_bzrdir: The bzrdir hosting this repository.
403
585
        :param format: The RemoteFormat object to use.
404
586
        :param real_repository: If not None, a local implementation of the
443
625
 
444
626
    def abort_write_group(self, suppress_errors=False):
445
627
        """Complete a write group on the decorated repository.
446
 
        
 
628
 
447
629
        Smart methods perform operations in a single step so this api
448
630
        is not really applicable except as a compatibility thunk
449
631
        for older plugins that don't use e.g. the CommitBuilder
457
639
 
458
640
    def commit_write_group(self):
459
641
        """Complete a write group on the decorated repository.
460
 
        
 
642
 
461
643
        Smart methods perform operations in a single step so this api
462
644
        is not really applicable except as a compatibility thunk
463
645
        for older plugins that don't use e.g. the CommitBuilder
478
660
        """Ensure that there is a _real_repository set.
479
661
 
480
662
        Used before calls to self._real_repository.
 
663
 
 
664
        Note that _ensure_real causes many roundtrips to the server which are
 
665
        not desirable, and prevents the use of smart one-roundtrip RPC's to
 
666
        perform complex operations (such as accessing parent data, streaming
 
667
        revisions etc). Adding calls to _ensure_real should only be done when
 
668
        bringing up new functionality, adding fallbacks for smart methods that
 
669
        require a fallback path, and never to replace an existing smart method
 
670
        invocation. If in doubt chat to the bzr network team.
481
671
        """
482
672
        if self._real_repository is None:
483
673
            self.bzrdir._ensure_real()
512
702
        self._ensure_real()
513
703
        return self._real_repository._generate_text_key_index()
514
704
 
515
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_four)
516
 
    def get_revision_graph(self, revision_id=None):
517
 
        """See Repository.get_revision_graph()."""
518
 
        return self._get_revision_graph(revision_id)
519
 
 
520
705
    def _get_revision_graph(self, revision_id):
521
706
        """Private method for using with old (< 1.2) servers to fallback."""
522
707
        if revision_id is None:
539
724
        for line in lines:
540
725
            d = tuple(line.split())
541
726
            revision_graph[d[0]] = d[1:]
542
 
            
 
727
 
543
728
        return revision_graph
544
729
 
545
730
    def _get_sink(self):
546
731
        """See Repository._get_sink()."""
547
732
        return RemoteStreamSink(self)
548
733
 
 
734
    def _get_source(self, to_format):
 
735
        """Return a source for streaming from this repository."""
 
736
        return RemoteStreamSource(self, to_format)
 
737
 
549
738
    def has_revision(self, revision_id):
550
739
        """See Repository.has_revision()."""
551
740
        if revision_id == NULL_REVISION:
573
762
        return result
574
763
 
575
764
    def has_same_location(self, other):
576
 
        return (self.__class__ == other.__class__ and
 
765
        return (self.__class__ is other.__class__ and
577
766
                self.bzrdir.transport.base == other.bzrdir.transport.base)
578
767
 
579
768
    def get_graph(self, other_repository=None):
651
840
        if not self._lock_mode:
652
841
            self._lock_mode = 'r'
653
842
            self._lock_count = 1
654
 
            self._unstacked_provider.enable_cache(cache_misses=False)
 
843
            self._unstacked_provider.enable_cache(cache_misses=True)
655
844
            if self._real_repository is not None:
656
845
                self._real_repository.lock_read()
657
846
        else:
658
847
            self._lock_count += 1
 
848
        for repo in self._fallback_repositories:
 
849
            repo.lock_read()
659
850
 
660
851
    def _remote_lock_write(self, token):
661
852
        path = self.bzrdir._path_for_remote_call(self._client)
696
887
            raise errors.ReadOnlyError(self)
697
888
        else:
698
889
            self._lock_count += 1
 
890
        for repo in self._fallback_repositories:
 
891
            # Writes don't affect fallback repos
 
892
            repo.lock_read()
699
893
        return self._lock_token or None
700
894
 
701
895
    def leave_lock_in_place(self):
715
909
            implemented operations.
716
910
        """
717
911
        if self._real_repository is not None:
718
 
            raise AssertionError('_real_repository is already set')
 
912
            # Replacing an already set real repository.
 
913
            # We cannot do this [currently] if the repository is locked -
 
914
            # synchronised state might be lost.
 
915
            if self.is_locked():
 
916
                raise AssertionError('_real_repository is already set')
719
917
        if isinstance(repository, RemoteRepository):
720
918
            raise AssertionError()
721
919
        self._real_repository = repository
 
920
        # three code paths happen here:
 
921
        # 1) old servers, RemoteBranch.open() calls _ensure_real before setting
 
922
        # up stacking. In this case self._fallback_repositories is [], and the
 
923
        # real repo is already setup. Preserve the real repo and
 
924
        # RemoteRepository.add_fallback_repository will avoid adding
 
925
        # duplicates.
 
926
        # 2) new servers, RemoteBranch.open() sets up stacking, and when
 
927
        # ensure_real is triggered from a branch, the real repository to
 
928
        # set already has a matching list with separate instances, but
 
929
        # as they are also RemoteRepositories we don't worry about making the
 
930
        # lists be identical.
 
931
        # 3) new servers, RemoteRepository.ensure_real is triggered before
 
932
        # RemoteBranch.ensure real, in this case we get a repo with no fallbacks
 
933
        # and need to populate it.
 
934
        if (self._fallback_repositories and
 
935
            len(self._real_repository._fallback_repositories) !=
 
936
            len(self._fallback_repositories)):
 
937
            if len(self._real_repository._fallback_repositories):
 
938
                raise AssertionError(
 
939
                    "cannot cleanly remove existing _fallback_repositories")
722
940
        for fb in self._fallback_repositories:
723
941
            self._real_repository.add_fallback_repository(fb)
724
942
        if self._lock_mode == 'w':
730
948
 
731
949
    def start_write_group(self):
732
950
        """Start a write group on the decorated repository.
733
 
        
 
951
 
734
952
        Smart methods perform operations in a single step so this api
735
953
        is not really applicable except as a compatibility thunk
736
954
        for older plugins that don't use e.g. the CommitBuilder
753
971
            raise errors.UnexpectedSmartServerResponse(response)
754
972
 
755
973
    def unlock(self):
 
974
        if not self._lock_count:
 
975
            raise errors.LockNotHeld(self)
756
976
        self._lock_count -= 1
757
977
        if self._lock_count > 0:
758
978
            return
785
1005
 
786
1006
    def _get_tarball(self, compression):
787
1007
        """Return a TemporaryFile containing a repository tarball.
788
 
        
 
1008
 
789
1009
        Returns None if the server does not support sending tarballs.
790
1010
        """
791
1011
        import tempfile
837
1057
 
838
1058
    def add_fallback_repository(self, repository):
839
1059
        """Add a repository to use for looking up data not held locally.
840
 
        
 
1060
 
841
1061
        :param repository: A repository.
842
1062
        """
843
 
        # XXX: At the moment the RemoteRepository will allow fallbacks
844
 
        # unconditionally - however, a _real_repository will usually exist,
845
 
        # and may raise an error if it's not accommodated by the underlying
846
 
        # format.  Eventually we should check when opening the repository
847
 
        # whether it's willing to allow them or not.
848
 
        #
 
1063
        if not self._format.supports_external_lookups:
 
1064
            raise errors.UnstackableRepositoryFormat(
 
1065
                self._format.network_name(), self.base)
849
1066
        # We need to accumulate additional repositories here, to pass them in
850
1067
        # on various RPC's.
 
1068
        #
851
1069
        self._fallback_repositories.append(repository)
852
 
        # They are also seen by the fallback repository.  If it doesn't exist
853
 
        # yet they'll be added then.  This implicitly copies them.
854
 
        self._ensure_real()
 
1070
        # If self._real_repository was parameterised already (e.g. because a
 
1071
        # _real_branch had its get_stacked_on_url method called), then the
 
1072
        # repository to be added may already be in the _real_repositories list.
 
1073
        if self._real_repository is not None:
 
1074
            fallback_locations = [repo.bzrdir.root_transport.base for repo in
 
1075
                self._real_repository._fallback_repositories]
 
1076
            if repository.bzrdir.root_transport.base not in fallback_locations:
 
1077
                self._real_repository.add_fallback_repository(repository)
855
1078
 
856
1079
    def add_inventory(self, revid, inv, parents):
857
1080
        self._ensure_real()
896
1119
        self._ensure_real()
897
1120
        return self._real_repository.make_working_trees()
898
1121
 
 
1122
    def refresh_data(self):
 
1123
        """Re-read any data needed to to synchronise with disk.
 
1124
 
 
1125
        This method is intended to be called after another repository instance
 
1126
        (such as one used by a smart server) has inserted data into the
 
1127
        repository. It may not be called during a write group, but may be
 
1128
        called at any other time.
 
1129
        """
 
1130
        if self.is_in_write_group():
 
1131
            raise errors.InternalBzrError(
 
1132
                "May not refresh_data while in a write group.")
 
1133
        if self._real_repository is not None:
 
1134
            self._real_repository.refresh_data()
 
1135
 
899
1136
    def revision_ids_to_search_result(self, result_set):
900
1137
        """Convert a set of revision ids to a graph SearchResult."""
901
1138
        result_parents = set()
912
1149
    @needs_read_lock
913
1150
    def search_missing_revision_ids(self, other, revision_id=None, find_ghosts=True):
914
1151
        """Return the revision ids that other has that this does not.
915
 
        
 
1152
 
916
1153
        These are returned in topological order.
917
1154
 
918
1155
        revision_id: only return revision ids included by revision_id.
920
1157
        return repository.InterRepository.get(
921
1158
            other, self).search_missing_revision_ids(revision_id, find_ghosts)
922
1159
 
923
 
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
924
 
        # Not delegated to _real_repository so that InterRepository.get has a
925
 
        # chance to find an InterRepository specialised for RemoteRepository.
926
 
        if self.has_same_location(source):
 
1160
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
 
1161
            fetch_spec=None):
 
1162
        # No base implementation to use as RemoteRepository is not a subclass
 
1163
        # of Repository; so this is a copy of Repository.fetch().
 
1164
        if fetch_spec is not None and revision_id is not None:
 
1165
            raise AssertionError(
 
1166
                "fetch_spec and revision_id are mutually exclusive.")
 
1167
        if self.is_in_write_group():
 
1168
            raise errors.InternalBzrError(
 
1169
                "May not fetch while in a write group.")
 
1170
        # fast path same-url fetch operations
 
1171
        if self.has_same_location(source) and fetch_spec is None:
927
1172
            # check that last_revision is in 'from' and then return a
928
1173
            # no-operation.
929
1174
            if (revision_id is not None and
930
1175
                not revision.is_null(revision_id)):
931
1176
                self.get_revision(revision_id)
932
1177
            return 0, []
 
1178
        # if there is no specific appropriate InterRepository, this will get
 
1179
        # the InterRepository base class, which raises an
 
1180
        # IncompatibleRepositories when asked to fetch.
933
1181
        inter = repository.InterRepository.get(source, self)
934
 
        try:
935
 
            return inter.fetch(revision_id=revision_id, pb=pb, find_ghosts=find_ghosts)
936
 
        except NotImplementedError:
937
 
            raise errors.IncompatibleRepositories(source, self)
 
1182
        return inter.fetch(revision_id=revision_id, pb=pb,
 
1183
            find_ghosts=find_ghosts, fetch_spec=fetch_spec)
938
1184
 
939
1185
    def create_bundle(self, target, base, fileobj, format=None):
940
1186
        self._ensure_real()
953
1199
        self._ensure_real()
954
1200
        return self._real_repository._get_versioned_file_checker(
955
1201
            revisions, revision_versions_cache)
956
 
        
 
1202
 
957
1203
    def iter_files_bytes(self, desired_files):
958
1204
        """See Repository.iter_file_bytes.
959
1205
        """
960
1206
        self._ensure_real()
961
1207
        return self._real_repository.iter_files_bytes(desired_files)
962
1208
 
963
 
    @property
964
 
    def _fetch_order(self):
965
 
        """Decorate the real repository for now.
966
 
 
967
 
        In the long term getting this back from the remote repository as part
968
 
        of open would be more efficient.
969
 
        """
970
 
        self._ensure_real()
971
 
        return self._real_repository._fetch_order
972
 
 
973
 
    @property
974
 
    def _fetch_uses_deltas(self):
975
 
        """Decorate the real repository for now.
976
 
 
977
 
        In the long term getting this back from the remote repository as part
978
 
        of open would be more efficient.
979
 
        """
980
 
        self._ensure_real()
981
 
        return self._real_repository._fetch_uses_deltas
982
 
 
983
 
    @property
984
 
    def _fetch_reconcile(self):
985
 
        """Decorate the real repository for now.
986
 
 
987
 
        In the long term getting this back from the remote repository as part
988
 
        of open would be more efficient.
989
 
        """
990
 
        self._ensure_real()
991
 
        return self._real_repository._fetch_reconcile
992
 
 
993
1209
    def get_parent_map(self, revision_ids):
994
1210
        """See bzrlib.Graph.get_parent_map()."""
995
1211
        return self._make_parents_provider().get_parent_map(revision_ids)
1001
1217
            # We already found out that the server can't understand
1002
1218
            # Repository.get_parent_map requests, so just fetch the whole
1003
1219
            # graph.
1004
 
            # XXX: Note that this will issue a deprecation warning. This is ok
1005
 
            # :- its because we're working with a deprecated server anyway, and
1006
 
            # the user will almost certainly have seen a warning about the
1007
 
            # server version already.
1008
 
            rg = self.get_revision_graph()
 
1220
            #
 
1221
            # Note that this reads the whole graph, when only some keys are
 
1222
            # wanted.  On this old server there's no way (?) to get them all
 
1223
            # in one go, and the user probably will have seen a warning about
 
1224
            # the server being old anyhow.
 
1225
            rg = self._get_revision_graph(None)
1009
1226
            # There is an api discrepancy between get_parent_map and
1010
1227
            # get_revision_graph. Specifically, a "key:()" pair in
1011
1228
            # get_revision_graph just means a node has no parents. For
1042
1259
        # TODO: Manage this incrementally to avoid covering the same path
1043
1260
        # repeatedly. (The server will have to on each request, but the less
1044
1261
        # work done the better).
 
1262
        #
 
1263
        # Negative caching notes:
 
1264
        # new server sends missing when a request including the revid
 
1265
        # 'include-missing:' is present in the request.
 
1266
        # missing keys are serialised as missing:X, and we then call
 
1267
        # provider.note_missing(X) for-all X
1045
1268
        parents_map = self._unstacked_provider.get_cached_map()
1046
1269
        if parents_map is None:
1047
1270
            # Repository is not locked, so there's no cache.
1048
1271
            parents_map = {}
 
1272
        # start_set is all the keys in the cache
1049
1273
        start_set = set(parents_map)
 
1274
        # result set is all the references to keys in the cache
1050
1275
        result_parents = set()
1051
1276
        for parents in parents_map.itervalues():
1052
1277
            result_parents.update(parents)
1053
1278
        stop_keys = result_parents.difference(start_set)
 
1279
        # We don't need to send ghosts back to the server as a position to
 
1280
        # stop either.
 
1281
        stop_keys.difference_update(self._unstacked_provider.missing_keys)
 
1282
        key_count = len(parents_map)
 
1283
        if (NULL_REVISION in result_parents
 
1284
            and NULL_REVISION in self._unstacked_provider.missing_keys):
 
1285
            # If we pruned NULL_REVISION from the stop_keys because it's also
 
1286
            # in our cache of "missing" keys we need to increment our key count
 
1287
            # by 1, because the reconsitituted SearchResult on the server will
 
1288
            # still consider NULL_REVISION to be an included key.
 
1289
            key_count += 1
1054
1290
        included_keys = start_set.intersection(result_parents)
1055
1291
        start_set.difference_update(included_keys)
1056
 
        recipe = (start_set, stop_keys, len(parents_map))
 
1292
        recipe = ('manual', start_set, stop_keys, key_count)
1057
1293
        body = self._serialise_search_recipe(recipe)
1058
1294
        path = self.bzrdir._path_for_remote_call(self._client)
1059
1295
        for key in keys:
1061
1297
                raise ValueError(
1062
1298
                    "key %r not a plain string" % (key,))
1063
1299
        verb = 'Repository.get_parent_map'
1064
 
        args = (path,) + tuple(keys)
 
1300
        args = (path, 'include-missing:') + tuple(keys)
1065
1301
        try:
1066
1302
            response = self._call_with_body_bytes_expecting_body(
1067
1303
                verb, args, body)
1077
1313
            # To avoid having to disconnect repeatedly, we keep track of the
1078
1314
            # fact the server doesn't understand remote methods added in 1.2.
1079
1315
            medium._remember_remote_is_before((1, 2))
1080
 
            return self.get_revision_graph(None)
 
1316
            # Recurse just once and we should use the fallback code.
 
1317
            return self._get_parent_map_rpc(keys)
1081
1318
        response_tuple, response_handler = response
1082
1319
        if response_tuple[0] not in ['ok']:
1083
1320
            response_handler.cancel_read_body()
1094
1331
                if len(d) > 1:
1095
1332
                    revision_graph[d[0]] = d[1:]
1096
1333
                else:
1097
 
                    # No parents - so give the Graph result (NULL_REVISION,).
1098
 
                    revision_graph[d[0]] = (NULL_REVISION,)
 
1334
                    # No parents:
 
1335
                    if d[0].startswith('missing:'):
 
1336
                        revid = d[0][8:]
 
1337
                        self._unstacked_provider.note_missing_key(revid)
 
1338
                    else:
 
1339
                        # no parents - so give the Graph result
 
1340
                        # (NULL_REVISION,).
 
1341
                        revision_graph[d[0]] = (NULL_REVISION,)
1099
1342
            return revision_graph
1100
1343
 
1101
1344
    @needs_read_lock
1104
1347
        return self._real_repository.get_signature_text(revision_id)
1105
1348
 
1106
1349
    @needs_read_lock
1107
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_three)
1108
 
    def get_revision_graph_with_ghosts(self, revision_ids=None):
1109
 
        self._ensure_real()
1110
 
        return self._real_repository.get_revision_graph_with_ghosts(
1111
 
            revision_ids=revision_ids)
1112
 
 
1113
 
    @needs_read_lock
1114
1350
    def get_inventory_xml(self, revision_id):
1115
1351
        self._ensure_real()
1116
1352
        return self._real_repository.get_inventory_xml(revision_id)
1122
1358
    def reconcile(self, other=None, thorough=False):
1123
1359
        self._ensure_real()
1124
1360
        return self._real_repository.reconcile(other=other, thorough=thorough)
1125
 
        
 
1361
 
1126
1362
    def all_revision_ids(self):
1127
1363
        self._ensure_real()
1128
1364
        return self._real_repository.all_revision_ids()
1129
 
    
1130
 
    @needs_read_lock
1131
 
    def get_deltas_for_revisions(self, revisions):
1132
 
        self._ensure_real()
1133
 
        return self._real_repository.get_deltas_for_revisions(revisions)
1134
 
 
1135
 
    @needs_read_lock
1136
 
    def get_revision_delta(self, revision_id):
1137
 
        self._ensure_real()
1138
 
        return self._real_repository.get_revision_delta(revision_id)
 
1365
 
 
1366
    @needs_read_lock
 
1367
    def get_deltas_for_revisions(self, revisions, specific_fileids=None):
 
1368
        self._ensure_real()
 
1369
        return self._real_repository.get_deltas_for_revisions(revisions,
 
1370
            specific_fileids=specific_fileids)
 
1371
 
 
1372
    @needs_read_lock
 
1373
    def get_revision_delta(self, revision_id, specific_fileids=None):
 
1374
        self._ensure_real()
 
1375
        return self._real_repository.get_revision_delta(revision_id,
 
1376
            specific_fileids=specific_fileids)
1139
1377
 
1140
1378
    @needs_read_lock
1141
1379
    def revision_trees(self, revision_ids):
1263
1501
        return self._real_repository.get_revisions(revision_ids)
1264
1502
 
1265
1503
    def supports_rich_root(self):
1266
 
        self._ensure_real()
1267
 
        return self._real_repository.supports_rich_root()
 
1504
        return self._format.rich_root_data
1268
1505
 
1269
1506
    def iter_reverse_revision_history(self, revision_id):
1270
1507
        self._ensure_real()
1272
1509
 
1273
1510
    @property
1274
1511
    def _serializer(self):
1275
 
        self._ensure_real()
1276
 
        return self._real_repository._serializer
 
1512
        return self._format._serializer
1277
1513
 
1278
1514
    def store_revision_signature(self, gpg_strategy, plaintext, revision_id):
1279
1515
        self._ensure_real()
1320
1556
        :param recipe: A search recipe (start, stop, count).
1321
1557
        :return: Serialised bytes.
1322
1558
        """
1323
 
        start_keys = ' '.join(recipe[0])
1324
 
        stop_keys = ' '.join(recipe[1])
1325
 
        count = str(recipe[2])
 
1559
        start_keys = ' '.join(recipe[1])
 
1560
        stop_keys = ' '.join(recipe[2])
 
1561
        count = str(recipe[3])
1326
1562
        return '\n'.join((start_keys, stop_keys, count))
1327
1563
 
 
1564
    def _serialise_search_result(self, search_result):
 
1565
        if isinstance(search_result, graph.PendingAncestryResult):
 
1566
            parts = ['ancestry-of']
 
1567
            parts.extend(search_result.heads)
 
1568
        else:
 
1569
            recipe = search_result.get_recipe()
 
1570
            parts = [recipe[0], self._serialise_search_recipe(recipe)]
 
1571
        return '\n'.join(parts)
 
1572
 
1328
1573
    def autopack(self):
1329
1574
        path = self.bzrdir._path_for_remote_call(self._client)
1330
1575
        try:
1333
1578
            self._ensure_real()
1334
1579
            self._real_repository._pack_collection.autopack()
1335
1580
            return
1336
 
        if self._real_repository is not None:
1337
 
            # Reset the real repository's cache of pack names.
1338
 
            # XXX: At some point we may be able to skip this and just rely on
1339
 
            # the automatic retry logic to do the right thing, but for now we
1340
 
            # err on the side of being correct rather than being optimal.
1341
 
            self._real_repository._pack_collection.reload_pack_names()
 
1581
        self.refresh_data()
1342
1582
        if response[0] != 'ok':
1343
1583
            raise errors.UnexpectedSmartServerResponse(response)
1344
1584
 
1345
1585
 
1346
1586
class RemoteStreamSink(repository.StreamSink):
1347
1587
 
1348
 
    def _insert_real(self, stream, src_format):
 
1588
    def _insert_real(self, stream, src_format, resume_tokens):
1349
1589
        self.target_repo._ensure_real()
1350
1590
        sink = self.target_repo._real_repository._get_sink()
1351
 
        return sink.insert_stream(stream, src_format)
1352
 
 
1353
 
    def insert_stream(self, stream, src_format):
1354
 
        repo = self.target_repo
1355
 
        # Until we can handle deltas in stack repositories we can't hand all
1356
 
        # the processing off to a remote server.
1357
 
        if self.target_repo._fallback_repositories:
1358
 
            return self._insert_real(stream, src_format)
 
1591
        result = sink.insert_stream(stream, src_format, resume_tokens)
 
1592
        if not result:
 
1593
            self.target_repo.autopack()
 
1594
        return result
 
1595
 
 
1596
    def insert_stream(self, stream, src_format, resume_tokens):
 
1597
        target = self.target_repo
 
1598
        if target._lock_token:
 
1599
            verb = 'Repository.insert_stream_locked'
 
1600
            extra_args = (target._lock_token or '',)
 
1601
            required_version = (1, 14)
 
1602
        else:
 
1603
            verb = 'Repository.insert_stream'
 
1604
            extra_args = ()
 
1605
            required_version = (1, 13)
 
1606
        client = target._client
 
1607
        medium = client._medium
 
1608
        if medium._is_remote_before(required_version):
 
1609
            # No possible way this can work.
 
1610
            return self._insert_real(stream, src_format, resume_tokens)
 
1611
        path = target.bzrdir._path_for_remote_call(client)
 
1612
        if not resume_tokens:
 
1613
            # XXX: Ugly but important for correctness, *will* be fixed during
 
1614
            # 1.13 cycle. Pushing a stream that is interrupted results in a
 
1615
            # fallback to the _real_repositories sink *with a partial stream*.
 
1616
            # Thats bad because we insert less data than bzr expected. To avoid
 
1617
            # this we do a trial push to make sure the verb is accessible, and
 
1618
            # do not fallback when actually pushing the stream. A cleanup patch
 
1619
            # is going to look at rewinding/restarting the stream/partial
 
1620
            # buffering etc.
 
1621
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
 
1622
            try:
 
1623
                response = client.call_with_body_stream(
 
1624
                    (verb, path, '') + extra_args, byte_stream)
 
1625
            except errors.UnknownSmartMethod:
 
1626
                medium._remember_remote_is_before(required_version)
 
1627
                return self._insert_real(stream, src_format, resume_tokens)
 
1628
        byte_stream = smart_repo._stream_to_byte_stream(
 
1629
            stream, src_format)
 
1630
        resume_tokens = ' '.join(resume_tokens)
 
1631
        response = client.call_with_body_stream(
 
1632
            (verb, path, resume_tokens) + extra_args, byte_stream)
 
1633
        if response[0][0] not in ('ok', 'missing-basis'):
 
1634
            raise errors.UnexpectedSmartServerResponse(response)
 
1635
        if response[0][0] == 'missing-basis':
 
1636
            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
 
1637
            resume_tokens = tokens
 
1638
            return resume_tokens, missing_keys
 
1639
        else:
 
1640
            self.target_repo.refresh_data()
 
1641
            return [], set()
 
1642
 
 
1643
 
 
1644
class RemoteStreamSource(repository.StreamSource):
 
1645
    """Stream data from a remote server."""
 
1646
 
 
1647
    def get_stream(self, search):
 
1648
        if (self.from_repository._fallback_repositories and
 
1649
            self.to_format._fetch_order == 'topological'):
 
1650
            return self._real_stream(self.from_repository, search)
 
1651
        return self.missing_parents_chain(search, [self.from_repository] +
 
1652
            self.from_repository._fallback_repositories)
 
1653
 
 
1654
    def _real_stream(self, repo, search):
 
1655
        """Get a stream for search from repo.
 
1656
        
 
1657
        This never called RemoteStreamSource.get_stream, and is a heler
 
1658
        for RemoteStreamSource._get_stream to allow getting a stream 
 
1659
        reliably whether fallback back because of old servers or trying
 
1660
        to stream from a non-RemoteRepository (which the stacked support
 
1661
        code will do).
 
1662
        """
 
1663
        source = repo._get_source(self.to_format)
 
1664
        if isinstance(source, RemoteStreamSource):
 
1665
            return repository.StreamSource.get_stream(source, search)
 
1666
        return source.get_stream(search)
 
1667
 
 
1668
    def _get_stream(self, repo, search):
 
1669
        """Core worker to get a stream from repo for search.
 
1670
 
 
1671
        This is used by both get_stream and the stacking support logic. It
 
1672
        deliberately gets a stream for repo which does not need to be
 
1673
        self.from_repository. In the event that repo is not Remote, or
 
1674
        cannot do a smart stream, a fallback is made to the generic
 
1675
        repository._get_stream() interface, via self._real_stream.
 
1676
 
 
1677
        In the event of stacking, streams from _get_stream will not
 
1678
        contain all the data for search - this is normal (see get_stream).
 
1679
 
 
1680
        :param repo: A repository.
 
1681
        :param search: A search.
 
1682
        """
 
1683
        # Fallbacks may be non-smart
 
1684
        if not isinstance(repo, RemoteRepository):
 
1685
            return self._real_stream(repo, search)
1359
1686
        client = repo._client
1360
1687
        medium = client._medium
1361
 
        if medium._is_remote_before((1,13)):
1362
 
            # No possible way this can work.
1363
 
            return self._insert_real(stream, src_format)
 
1688
        if medium._is_remote_before((1, 13)):
 
1689
            # streaming was added in 1.13
 
1690
            return self._real_stream(repo, search)
1364
1691
        path = repo.bzrdir._path_for_remote_call(client)
1365
 
        # XXX: Ugly but important for correctness, *will* be fixed during 1.13
1366
 
        # cycle. Pushing a stream that is interrupted results in a fallback to
1367
 
        # the _real_repositories sink *with a partial stream*. Thats bad
1368
 
        # because we insert less data than bzr expected. To avoid this we do a
1369
 
        # trial push to make sure the verb is accessible, and do not fallback
1370
 
        # when actually pushing the stream. A cleanup patch is going to look at
1371
 
        # rewinding/restarting the stream/partial buffering etc.
1372
 
        byte_stream = self._stream_to_byte_stream([], src_format)
1373
1692
        try:
1374
 
            response = client.call_with_body_stream(
1375
 
                ('Repository.insert_stream', path), byte_stream)
 
1693
            search_bytes = repo._serialise_search_result(search)
 
1694
            response = repo._call_with_body_bytes_expecting_body(
 
1695
                'Repository.get_stream',
 
1696
                (path, self.to_format.network_name()), search_bytes)
 
1697
            response_tuple, response_handler = response
1376
1698
        except errors.UnknownSmartMethod:
1377
1699
            medium._remember_remote_is_before((1,13))
1378
 
            return self._insert_real(stream, src_format)
1379
 
        byte_stream = self._stream_to_byte_stream(stream, src_format)
1380
 
        response = client.call_with_body_stream(
1381
 
            ('Repository.insert_stream', path), byte_stream)
1382
 
        if response[0][0] not in ('ok', ):
1383
 
            raise errors.UnexpectedSmartServerResponse(response)
1384
 
            
1385
 
    def _stream_to_byte_stream(self, stream, src_format):
1386
 
        bytes = []
1387
 
        pack_writer = pack.ContainerWriter(bytes.append)
1388
 
        pack_writer.begin()
1389
 
        pack_writer.add_bytes_record(src_format.network_name(), '')
1390
 
        adapters = {}
1391
 
        def get_adapter(adapter_key):
1392
 
            try:
1393
 
                return adapters[adapter_key]
1394
 
            except KeyError:
1395
 
                adapter_factory = adapter_registry.get(adapter_key)
1396
 
                adapter = adapter_factory(self)
1397
 
                adapters[adapter_key] = adapter
1398
 
                return adapter
1399
 
        for substream_type, substream in stream:
1400
 
            for record in substream:
1401
 
                if record.storage_kind in ('chunked', 'fulltext'):
1402
 
                    serialised = record_to_fulltext_bytes(record)
 
1700
            return self._real_stream(repo, search)
 
1701
        if response_tuple[0] != 'ok':
 
1702
            raise errors.UnexpectedSmartServerResponse(response_tuple)
 
1703
        byte_stream = response_handler.read_streamed_body()
 
1704
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
 
1705
        if src_format.network_name() != repo._format.network_name():
 
1706
            raise AssertionError(
 
1707
                "Mismatched RemoteRepository and stream src %r, %r" % (
 
1708
                src_format.network_name(), repo._format.network_name()))
 
1709
        return stream
 
1710
 
 
1711
    def missing_parents_chain(self, search, sources):
 
1712
        """Chain multiple streams together to handle stacking.
 
1713
 
 
1714
        :param search: The overall search to satisfy with streams.
 
1715
        :param sources: A list of Repository objects to query.
 
1716
        """
 
1717
        self.serialiser = self.to_format._serializer
 
1718
        self.seen_revs = set()
 
1719
        self.referenced_revs = set()
 
1720
        # If there are heads in the search, or the key count is > 0, we are not
 
1721
        # done.
 
1722
        while not search.is_empty() and len(sources) > 1:
 
1723
            source = sources.pop(0)
 
1724
            stream = self._get_stream(source, search)
 
1725
            for kind, substream in stream:
 
1726
                if kind != 'revisions':
 
1727
                    yield kind, substream
1403
1728
                else:
1404
 
                    serialised = record.get_bytes_as(record.storage_kind)
1405
 
                pack_writer.add_bytes_record(serialised, [(substream_type,)])
1406
 
                for b in bytes:
1407
 
                    yield b
1408
 
                del bytes[:]
1409
 
        pack_writer.end()
1410
 
        for b in bytes:
1411
 
            yield b
 
1729
                    yield kind, self.missing_parents_rev_handler(substream)
 
1730
            search = search.refine(self.seen_revs, self.referenced_revs)
 
1731
            self.seen_revs = set()
 
1732
            self.referenced_revs = set()
 
1733
        if not search.is_empty():
 
1734
            for kind, stream in self._get_stream(sources[0], search):
 
1735
                yield kind, stream
 
1736
 
 
1737
    def missing_parents_rev_handler(self, substream):
 
1738
        for content in substream:
 
1739
            revision_bytes = content.get_bytes_as('fulltext')
 
1740
            revision = self.serialiser.read_revision_from_string(revision_bytes)
 
1741
            self.seen_revs.add(content.key[-1])
 
1742
            self.referenced_revs.update(revision.parent_ids)
 
1743
            yield content
1412
1744
 
1413
1745
 
1414
1746
class RemoteBranchLockableFiles(LockableFiles):
1415
1747
    """A 'LockableFiles' implementation that talks to a smart server.
1416
 
    
 
1748
 
1417
1749
    This is not a public interface class.
1418
1750
    """
1419
1751
 
1433
1765
 
1434
1766
class RemoteBranchFormat(branch.BranchFormat):
1435
1767
 
1436
 
    def __init__(self):
 
1768
    def __init__(self, network_name=None):
1437
1769
        super(RemoteBranchFormat, self).__init__()
1438
1770
        self._matchingbzrdir = RemoteBzrDirFormat()
1439
1771
        self._matchingbzrdir.set_branch_format(self)
 
1772
        self._custom_format = None
 
1773
        self._network_name = network_name
1440
1774
 
1441
1775
    def __eq__(self, other):
1442
 
        return (isinstance(other, RemoteBranchFormat) and 
 
1776
        return (isinstance(other, RemoteBranchFormat) and
1443
1777
            self.__dict__ == other.__dict__)
1444
1778
 
 
1779
    def _ensure_real(self):
 
1780
        if self._custom_format is None:
 
1781
            self._custom_format = branch.network_format_registry.get(
 
1782
                self._network_name)
 
1783
 
1445
1784
    def get_format_description(self):
1446
1785
        return 'Remote BZR Branch'
1447
1786
 
1448
 
    def get_format_string(self):
1449
 
        return 'Remote BZR Branch'
1450
 
 
1451
 
    def open(self, a_bzrdir):
1452
 
        return a_bzrdir.open_branch()
 
1787
    def network_name(self):
 
1788
        return self._network_name
 
1789
 
 
1790
    def open(self, a_bzrdir, ignore_fallbacks=False):
 
1791
        return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
 
1792
 
 
1793
    def _vfs_initialize(self, a_bzrdir):
 
1794
        # Initialisation when using a local bzrdir object, or a non-vfs init
 
1795
        # method is not available on the server.
 
1796
        # self._custom_format is always set - the start of initialize ensures
 
1797
        # that.
 
1798
        if isinstance(a_bzrdir, RemoteBzrDir):
 
1799
            a_bzrdir._ensure_real()
 
1800
            result = self._custom_format.initialize(a_bzrdir._real_bzrdir)
 
1801
        else:
 
1802
            # We assume the bzrdir is parameterised; it may not be.
 
1803
            result = self._custom_format.initialize(a_bzrdir)
 
1804
        if (isinstance(a_bzrdir, RemoteBzrDir) and
 
1805
            not isinstance(result, RemoteBranch)):
 
1806
            result = RemoteBranch(a_bzrdir, a_bzrdir.find_repository(), result)
 
1807
        return result
1453
1808
 
1454
1809
    def initialize(self, a_bzrdir):
1455
 
        # Delegate to a _real object here - the RemoteBzrDir format now
1456
 
        # supports delegating to parameterised branch formats and as such
1457
 
        # this RemoteBranchFormat method is only called when no specific format
1458
 
        # is selected.
 
1810
        # 1) get the network name to use.
 
1811
        if self._custom_format:
 
1812
            network_name = self._custom_format.network_name()
 
1813
        else:
 
1814
            # Select the current bzrlib default and ask for that.
 
1815
            reference_bzrdir_format = bzrdir.format_registry.get('default')()
 
1816
            reference_format = reference_bzrdir_format.get_branch_format()
 
1817
            self._custom_format = reference_format
 
1818
            network_name = reference_format.network_name()
 
1819
        # Being asked to create on a non RemoteBzrDir:
1459
1820
        if not isinstance(a_bzrdir, RemoteBzrDir):
1460
 
            result = a_bzrdir.create_branch()
 
1821
            return self._vfs_initialize(a_bzrdir)
 
1822
        medium = a_bzrdir._client._medium
 
1823
        if medium._is_remote_before((1, 13)):
 
1824
            return self._vfs_initialize(a_bzrdir)
 
1825
        # Creating on a remote bzr dir.
 
1826
        # 2) try direct creation via RPC
 
1827
        path = a_bzrdir._path_for_remote_call(a_bzrdir._client)
 
1828
        verb = 'BzrDir.create_branch'
 
1829
        try:
 
1830
            response = a_bzrdir._call(verb, path, network_name)
 
1831
        except errors.UnknownSmartMethod:
 
1832
            # Fallback - use vfs methods
 
1833
            medium._remember_remote_is_before((1, 13))
 
1834
            return self._vfs_initialize(a_bzrdir)
 
1835
        if response[0] != 'ok':
 
1836
            raise errors.UnexpectedSmartServerResponse(response)
 
1837
        # Turn the response into a RemoteRepository object.
 
1838
        format = RemoteBranchFormat(network_name=response[1])
 
1839
        repo_format = response_tuple_to_repo_format(response[3:])
 
1840
        if response[2] == '':
 
1841
            repo_bzrdir = a_bzrdir
1461
1842
        else:
1462
 
            a_bzrdir._ensure_real()
1463
 
            result = a_bzrdir._real_bzrdir.create_branch()
1464
 
        if not isinstance(result, RemoteBranch):
1465
 
            result = RemoteBranch(a_bzrdir, a_bzrdir.find_repository(), result)
1466
 
        return result
 
1843
            repo_bzrdir = RemoteBzrDir(
 
1844
                a_bzrdir.root_transport.clone(response[2]), a_bzrdir._format,
 
1845
                a_bzrdir._client)
 
1846
        remote_repo = RemoteRepository(repo_bzrdir, repo_format)
 
1847
        remote_branch = RemoteBranch(a_bzrdir, remote_repo,
 
1848
            format=format, setup_stacking=False)
 
1849
        # XXX: We know this is a new branch, so it must have revno 0, revid
 
1850
        # NULL_REVISION. Creating the branch locked would make this be unable
 
1851
        # to be wrong; here its simply very unlikely to be wrong. RBC 20090225
 
1852
        remote_branch._last_revision_info_cache = 0, NULL_REVISION
 
1853
        return remote_branch
 
1854
 
 
1855
    def make_tags(self, branch):
 
1856
        self._ensure_real()
 
1857
        return self._custom_format.make_tags(branch)
1467
1858
 
1468
1859
    def supports_tags(self):
1469
1860
        # Remote branches might support tags, but we won't know until we
1470
1861
        # access the real remote branch.
1471
 
        return True
 
1862
        self._ensure_real()
 
1863
        return self._custom_format.supports_tags()
 
1864
 
 
1865
    def supports_stacking(self):
 
1866
        self._ensure_real()
 
1867
        return self._custom_format.supports_stacking()
1472
1868
 
1473
1869
 
1474
1870
class RemoteBranch(branch.Branch, _RpcHelper):
1478
1874
    """
1479
1875
 
1480
1876
    def __init__(self, remote_bzrdir, remote_repository, real_branch=None,
1481
 
        _client=None):
 
1877
        _client=None, format=None, setup_stacking=True):
1482
1878
        """Create a RemoteBranch instance.
1483
1879
 
1484
1880
        :param real_branch: An optional local implementation of the branch
1485
1881
            format, usually accessing the data via the VFS.
1486
1882
        :param _client: Private parameter for testing.
 
1883
        :param format: A RemoteBranchFormat object, None to create one
 
1884
            automatically. If supplied it should have a network_name already
 
1885
            supplied.
 
1886
        :param setup_stacking: If True make an RPC call to determine the
 
1887
            stacked (or not) status of the branch. If False assume the branch
 
1888
            is not stacked.
1487
1889
        """
1488
1890
        # We intentionally don't call the parent class's __init__, because it
1489
1891
        # will try to assign to self.tags, which is a property in this subclass.
1512
1914
        else:
1513
1915
            self._real_branch = None
1514
1916
        # Fill out expected attributes of branch for bzrlib api users.
1515
 
        self._format = RemoteBranchFormat()
1516
1917
        self.base = self.bzrdir.root_transport.base
1517
1918
        self._control_files = None
1518
1919
        self._lock_mode = None
1520
1921
        self._repo_lock_token = None
1521
1922
        self._lock_count = 0
1522
1923
        self._leave_lock = False
 
1924
        # Setup a format: note that we cannot call _ensure_real until all the
 
1925
        # attributes above are set: This code cannot be moved higher up in this
 
1926
        # function.
 
1927
        if format is None:
 
1928
            self._format = RemoteBranchFormat()
 
1929
            if real_branch is not None:
 
1930
                self._format._network_name = \
 
1931
                    self._real_branch._format.network_name()
 
1932
        else:
 
1933
            self._format = format
 
1934
        if not self._format._network_name:
 
1935
            # Did not get from open_branchV2 - old server.
 
1936
            self._ensure_real()
 
1937
            self._format._network_name = \
 
1938
                self._real_branch._format.network_name()
 
1939
        self.tags = self._format.make_tags(self)
1523
1940
        # The base class init is not called, so we duplicate this:
1524
1941
        hooks = branch.Branch.hooks['open']
1525
1942
        for hook in hooks:
1526
1943
            hook(self)
1527
 
        self._setup_stacking()
 
1944
        if setup_stacking:
 
1945
            self._setup_stacking()
1528
1946
 
1529
1947
    def _setup_stacking(self):
1530
1948
        # configure stacking into the remote repository, by reading it from
1534
1952
        except (errors.NotStacked, errors.UnstackableBranchFormat,
1535
1953
            errors.UnstackableRepositoryFormat), e:
1536
1954
            return
1537
 
        # it's relative to this branch...
1538
 
        fallback_url = urlutils.join(self.base, fallback_url)
1539
 
        transports = [self.bzrdir.root_transport]
1540
 
        if self._real_branch is not None:
1541
 
            transports.append(self._real_branch._transport)
1542
 
        stacked_on = branch.Branch.open(fallback_url,
1543
 
                                        possible_transports=transports)
1544
 
        self.repository.add_fallback_repository(stacked_on.repository)
 
1955
        self._activate_fallback_location(fallback_url)
 
1956
 
 
1957
    def _get_config(self):
 
1958
        return RemoteBranchConfig(self)
1545
1959
 
1546
1960
    def _get_real_transport(self):
1547
1961
        # if we try vfs access, return the real branch's vfs transport
1600
2014
        too, in fact doing so might harm performance.
1601
2015
        """
1602
2016
        super(RemoteBranch, self)._clear_cached_state()
1603
 
        
 
2017
 
1604
2018
    @property
1605
2019
    def control_files(self):
1606
2020
        # Defer actually creating RemoteBranchLockableFiles until its needed,
1645
2059
            raise errors.UnexpectedSmartServerResponse(response)
1646
2060
        return response[1]
1647
2061
 
 
2062
    def _vfs_get_tags_bytes(self):
 
2063
        self._ensure_real()
 
2064
        return self._real_branch._get_tags_bytes()
 
2065
 
 
2066
    def _get_tags_bytes(self):
 
2067
        medium = self._client._medium
 
2068
        if medium._is_remote_before((1, 13)):
 
2069
            return self._vfs_get_tags_bytes()
 
2070
        try:
 
2071
            response = self._call('Branch.get_tags_bytes', self._remote_path())
 
2072
        except errors.UnknownSmartMethod:
 
2073
            medium._remember_remote_is_before((1, 13))
 
2074
            return self._vfs_get_tags_bytes()
 
2075
        return response[0]
 
2076
 
1648
2077
    def lock_read(self):
1649
2078
        self.repository.lock_read()
1650
2079
        if not self._lock_mode:
1670
2099
            raise errors.UnexpectedSmartServerResponse(response)
1671
2100
        ok, branch_token, repo_token = response
1672
2101
        return branch_token, repo_token
1673
 
            
 
2102
 
1674
2103
    def lock_write(self, token=None):
1675
2104
        if not self._lock_mode:
1676
2105
            # Lock the branch and repo in one remote call.
1704
2133
            self.repository.lock_write(self._repo_lock_token)
1705
2134
        return self._lock_token or None
1706
2135
 
 
2136
    def _set_tags_bytes(self, bytes):
 
2137
        self._ensure_real()
 
2138
        return self._real_branch._set_tags_bytes(bytes)
 
2139
 
1707
2140
    def _unlock(self, branch_token, repo_token):
1708
2141
        err_context = {'token': str((branch_token, repo_token))}
1709
2142
        response = self._call(
1838
2271
            hook(self, rev_history)
1839
2272
        self._cache_revision_history(rev_history)
1840
2273
 
1841
 
    def get_parent(self):
1842
 
        self._ensure_real()
1843
 
        return self._real_branch.get_parent()
1844
 
 
1845
2274
    def _get_parent_location(self):
1846
 
        # Used by tests, when checking normalisation of given vs stored paths.
 
2275
        medium = self._client._medium
 
2276
        if medium._is_remote_before((1, 13)):
 
2277
            return self._vfs_get_parent_location()
 
2278
        try:
 
2279
            response = self._call('Branch.get_parent', self._remote_path())
 
2280
        except errors.UnknownSmartMethod:
 
2281
            medium._remember_remote_is_before((1, 13))
 
2282
            return self._vfs_get_parent_location()
 
2283
        if len(response) != 1:
 
2284
            raise errors.UnexpectedSmartServerResponse(response)
 
2285
        parent_location = response[0]
 
2286
        if parent_location == '':
 
2287
            return None
 
2288
        return parent_location
 
2289
 
 
2290
    def _vfs_get_parent_location(self):
1847
2291
        self._ensure_real()
1848
2292
        return self._real_branch._get_parent_location()
1849
 
        
 
2293
 
1850
2294
    def set_parent(self, url):
1851
2295
        self._ensure_real()
1852
2296
        return self._real_branch.set_parent(url)
1858
2302
        else:
1859
2303
            self._ensure_real()
1860
2304
            return self._real_branch._set_parent_location(url)
1861
 
        
1862
 
    def set_stacked_on_url(self, stacked_location):
1863
 
        """Set the URL this branch is stacked against.
1864
 
 
1865
 
        :raises UnstackableBranchFormat: If the branch does not support
1866
 
            stacking.
1867
 
        :raises UnstackableRepositoryFormat: If the repository does not support
1868
 
            stacking.
1869
 
        """
1870
 
        self._ensure_real()
1871
 
        return self._real_branch.set_stacked_on_url(stacked_location)
1872
 
 
1873
 
    def sprout(self, to_bzrdir, revision_id=None):
1874
 
        branch_format = to_bzrdir._format._branch_format
1875
 
        if (branch_format is None or
1876
 
            isinstance(branch_format, RemoteBranchFormat)):
1877
 
            # The to_bzrdir specifies RemoteBranchFormat (or no format, which
1878
 
            # implies the same thing), but RemoteBranches can't be created at
1879
 
            # arbitrary URLs.  So create a branch in the same format as
1880
 
            # _real_branch instead.
1881
 
            # XXX: if to_bzrdir is a RemoteBzrDir, this should perhaps do
1882
 
            # to_bzrdir.create_branch to create a RemoteBranch after all...
1883
 
            self._ensure_real()
1884
 
            result = self._real_branch._format.initialize(to_bzrdir)
1885
 
            self.copy_content_into(result, revision_id=revision_id)
1886
 
            result.set_parent(self.bzrdir.root_transport.base)
1887
 
        else:
1888
 
            result = branch.Branch.sprout(
1889
 
                self, to_bzrdir, revision_id=revision_id)
1890
 
        return result
1891
2305
 
1892
2306
    @needs_write_lock
1893
2307
    def pull(self, source, overwrite=False, stop_revision=None,
1956
2370
        self.set_revision_history(self._lefthand_history(revision_id,
1957
2371
            last_rev=last_rev,other_branch=other_branch))
1958
2372
 
1959
 
    @property
1960
 
    def tags(self):
1961
 
        self._ensure_real()
1962
 
        return self._real_branch.tags
1963
 
 
1964
2373
    def set_push_location(self, location):
1965
2374
        self._ensure_real()
1966
2375
        return self._real_branch.set_push_location(location)
1967
2376
 
1968
 
    @needs_write_lock
1969
 
    def update_revisions(self, other, stop_revision=None, overwrite=False,
1970
 
                         graph=None):
1971
 
        """See Branch.update_revisions."""
1972
 
        other.lock_read()
 
2377
 
 
2378
class RemoteBranchConfig(object):
 
2379
    """A Config that reads from a smart branch and writes via smart methods.
 
2380
 
 
2381
    It is a low-level object that considers config data to be name/value pairs
 
2382
    that may be associated with a section. Assigning meaning to the these
 
2383
    values is done at higher levels like bzrlib.config.TreeConfig.
 
2384
    """
 
2385
 
 
2386
    def __init__(self, branch):
 
2387
        self._branch = branch
 
2388
 
 
2389
    def get_option(self, name, section=None, default=None):
 
2390
        """Return the value associated with a named option.
 
2391
 
 
2392
        :param name: The name of the value
 
2393
        :param section: The section the option is in (if any)
 
2394
        :param default: The value to return if the value is not set
 
2395
        :return: The value or default value
 
2396
        """
 
2397
        configobj = self._get_configobj()
 
2398
        if section is None:
 
2399
            section_obj = configobj
 
2400
        else:
 
2401
            try:
 
2402
                section_obj = configobj[section]
 
2403
            except KeyError:
 
2404
                return default
 
2405
        return section_obj.get(name, default)
 
2406
 
 
2407
    def _get_configobj(self):
 
2408
        path = self._branch._remote_path()
 
2409
        response = self._branch._client.call_expecting_body(
 
2410
            'Branch.get_config_file', path)
 
2411
        if response[0][0] != 'ok':
 
2412
            raise UnexpectedSmartServerResponse(response)
 
2413
        lines = response[1].read_body_bytes().splitlines()
 
2414
        return config.ConfigObj(lines, encoding='utf-8')
 
2415
 
 
2416
    def set_option(self, value, name, section=None):
 
2417
        """Set the value associated with a named option.
 
2418
 
 
2419
        :param value: The value to set
 
2420
        :param name: The name of the value to set
 
2421
        :param section: The section the option is in (if any)
 
2422
        """
 
2423
        medium = self._branch._client._medium
 
2424
        if medium._is_remote_before((1, 14)):
 
2425
            return self._vfs_set_option(value, name, section)
1973
2426
        try:
1974
 
            if stop_revision is None:
1975
 
                stop_revision = other.last_revision()
1976
 
                if revision.is_null(stop_revision):
1977
 
                    # if there are no commits, we're done.
1978
 
                    return
1979
 
            self.fetch(other, stop_revision)
 
2427
            path = self._branch._remote_path()
 
2428
            response = self._branch._client.call('Branch.set_config_option',
 
2429
                path, self._branch._lock_token, self._branch._repo_lock_token,
 
2430
                value.encode('utf8'), name, section or '')
 
2431
        except errors.UnknownSmartMethod:
 
2432
            medium._remember_remote_is_before((1, 14))
 
2433
            return self._vfs_set_option(value, name, section)
 
2434
        if response != ():
 
2435
            raise errors.UnexpectedSmartServerResponse(response)
1980
2436
 
1981
 
            if overwrite:
1982
 
                # Just unconditionally set the new revision.  We don't care if
1983
 
                # the branches have diverged.
1984
 
                self._set_last_revision(stop_revision)
1985
 
            else:
1986
 
                medium = self._client._medium
1987
 
                if not medium._is_remote_before((1, 6)):
1988
 
                    try:
1989
 
                        self._set_last_revision_descendant(stop_revision, other)
1990
 
                        return
1991
 
                    except errors.UnknownSmartMethod:
1992
 
                        medium._remember_remote_is_before((1, 6))
1993
 
                # Fallback for pre-1.6 servers: check for divergence
1994
 
                # client-side, then do _set_last_revision.
1995
 
                last_rev = revision.ensure_null(self.last_revision())
1996
 
                if graph is None:
1997
 
                    graph = self.repository.get_graph()
1998
 
                if self._check_if_descendant_or_diverged(
1999
 
                        stop_revision, last_rev, graph, other):
2000
 
                    # stop_revision is a descendant of last_rev, but we aren't
2001
 
                    # overwriting, so we're done.
2002
 
                    return
2003
 
                self._set_last_revision(stop_revision)
2004
 
        finally:
2005
 
            other.unlock()
 
2437
    def _vfs_set_option(self, value, name, section=None):
 
2438
        self._branch._ensure_real()
 
2439
        return self._branch._real_branch._get_config().set_option(
 
2440
            value, name, section)
2006
2441
 
2007
2442
 
2008
2443
def _extract_tar(tar, to_dir):