~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-02 07:00:21 UTC
  • mfrom: (4060.1.4 branch.roundtrips)
  • Revision ID: pqm@pqm.ubuntu.com-20090302070021-dvcjdpf47t2aeari
(robertc) Streaming fetch from non-stacked branches on smart servers.
        (Robert Collins, Andrew Bennetts_

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
    SmartProtocolError,
41
41
    )
42
42
from bzrlib.lockable_files import LockableFiles
43
 
from bzrlib.smart import client, vfs
 
43
from bzrlib.smart import client, vfs, repository as smart_repo
44
44
from bzrlib.revision import ensure_null, NULL_REVISION
45
45
from bzrlib.trace import mutter, note, warning
46
46
from bzrlib.util import bencode
47
 
from bzrlib.versionedfile import record_to_fulltext_bytes
48
47
 
49
48
 
50
49
class _RpcHelper(object):
624
623
        """See Repository._get_sink()."""
625
624
        return RemoteStreamSink(self)
626
625
 
 
626
    def _get_source(self, to_format):
 
627
        """Return a source for streaming from this repository."""
 
628
        return RemoteStreamSource(self, to_format)
 
629
 
627
630
    def has_revision(self, revision_id):
628
631
        """See Repository.has_revision()."""
629
632
        if revision_id == NULL_REVISION:
1405
1408
 
1406
1409
class RemoteStreamSink(repository.StreamSink):
1407
1410
 
1408
 
    def __init__(self, target_repo):
1409
 
        repository.StreamSink.__init__(self, target_repo)
1410
 
 
1411
1411
    def _insert_real(self, stream, src_format, resume_tokens):
1412
1412
        self.target_repo._ensure_real()
1413
1413
        sink = self.target_repo._real_repository._get_sink()
1433
1433
            # do not fallback when actually pushing the stream. A cleanup patch
1434
1434
            # is going to look at rewinding/restarting the stream/partial
1435
1435
            # buffering etc.
1436
 
            byte_stream = self._stream_to_byte_stream([], src_format)
 
1436
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1437
1437
            try:
1438
1438
                response = client.call_with_body_stream(
1439
1439
                    ('Repository.insert_stream', path, ''), byte_stream)
1440
1440
            except errors.UnknownSmartMethod:
1441
1441
                medium._remember_remote_is_before((1,13))
1442
1442
                return self._insert_real(stream, src_format, resume_tokens)
1443
 
        byte_stream = self._stream_to_byte_stream(stream, src_format)
 
1443
        byte_stream = smart_repo._stream_to_byte_stream(
 
1444
            stream, src_format)
1444
1445
        resume_tokens = ' '.join(resume_tokens)
1445
1446
        response = client.call_with_body_stream(
1446
1447
            ('Repository.insert_stream', path, resume_tokens), byte_stream)
1458
1459
                    collection.reload_pack_names()
1459
1460
            return [], set()
1460
1461
 
1461
 
    def _stream_to_byte_stream(self, stream, src_format):
1462
 
        bytes = []
1463
 
        pack_writer = pack.ContainerWriter(bytes.append)
1464
 
        pack_writer.begin()
1465
 
        pack_writer.add_bytes_record(src_format.network_name(), '')
1466
 
        adapters = {}
1467
 
        def get_adapter(adapter_key):
1468
 
            try:
1469
 
                return adapters[adapter_key]
1470
 
            except KeyError:
1471
 
                adapter_factory = adapter_registry.get(adapter_key)
1472
 
                adapter = adapter_factory(self)
1473
 
                adapters[adapter_key] = adapter
1474
 
                return adapter
1475
 
        for substream_type, substream in stream:
1476
 
            for record in substream:
1477
 
                if record.storage_kind in ('chunked', 'fulltext'):
1478
 
                    serialised = record_to_fulltext_bytes(record)
1479
 
                else:
1480
 
                    serialised = record.get_bytes_as(record.storage_kind)
1481
 
                if serialised:
1482
 
                    # Some streams embed the whole stream into the wire
1483
 
                    # representation of the first record, which means that
1484
 
                    # later records have no wire representation: we skip them.
1485
 
                    pack_writer.add_bytes_record(serialised, [(substream_type,)])
1486
 
                for b in bytes:
1487
 
                    yield b
1488
 
                del bytes[:]
1489
 
        pack_writer.end()
1490
 
        for b in bytes:
1491
 
            yield b
 
1462
 
 
1463
class RemoteStreamSource(repository.StreamSource):
 
1464
    """Stream data from a remote server."""
 
1465
 
 
1466
    def get_stream(self, search):
 
1467
        # streaming with fallback repositories is not well defined yet: The
 
1468
        # remote repository cannot see the fallback repositories, and thus
 
1469
        # cannot satisfy the entire search in the general case. Likewise the
 
1470
        # fallback repositories cannot reify the search to determine what they
 
1471
        # should send. It likely needs a return value in the stream listing the
 
1472
        # edge of the search to resume from in fallback repositories.
 
1473
        if self.from_repository._fallback_repositories:
 
1474
            return repository.StreamSource.get_stream(self, search)
 
1475
        repo = self.from_repository
 
1476
        client = repo._client
 
1477
        medium = client._medium
 
1478
        if medium._is_remote_before((1, 13)):
 
1479
            # No possible way this can work.
 
1480
            return repository.StreamSource.get_stream(self, search)
 
1481
        path = repo.bzrdir._path_for_remote_call(client)
 
1482
        try:
 
1483
            recipe = repo._serialise_search_recipe(search._recipe)
 
1484
            response = repo._call_with_body_bytes_expecting_body(
 
1485
                'Repository.StreamSource.get_stream',
 
1486
                (path, self.to_format.network_name()), recipe)
 
1487
            response_tuple, response_handler = response
 
1488
        except errors.UnknownSmartMethod:
 
1489
            medium._remember_remote_is_before((1,13))
 
1490
            return repository.StreamSource.get_stream(self, search)
 
1491
        if response_tuple[0] != 'ok':
 
1492
            raise errors.UnexpectedSmartServerResponse(response_tuple)
 
1493
        byte_stream = response_handler.read_streamed_body()
 
1494
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
 
1495
        if src_format.network_name() != repo._format.network_name():
 
1496
            raise AssertionError(
 
1497
                "Mismatched RemoteRepository and stream src %r, %r" % (
 
1498
                src_format.network_name(), repo._format.network_name()))
 
1499
        return stream
1492
1500
 
1493
1501
 
1494
1502
class RemoteBranchLockableFiles(LockableFiles):