40
40
SmartProtocolError,
42
42
from bzrlib.lockable_files import LockableFiles
43
from bzrlib.smart import client, vfs
43
from bzrlib.smart import client, vfs, repository as smart_repo
44
44
from bzrlib.revision import ensure_null, NULL_REVISION
45
45
from bzrlib.trace import mutter, note, warning
46
46
from bzrlib.util import bencode
47
from bzrlib.versionedfile import record_to_fulltext_bytes
50
49
class _RpcHelper(object):
624
623
"""See Repository._get_sink()."""
625
624
return RemoteStreamSink(self)
626
def _get_source(self, to_format):
627
"""Return a source for streaming from this repository."""
628
return RemoteStreamSource(self, to_format)
627
630
def has_revision(self, revision_id):
628
631
"""See Repository.has_revision()."""
629
632
if revision_id == NULL_REVISION:
1433
1433
# do not fallback when actually pushing the stream. A cleanup patch
1434
1434
# is going to look at rewinding/restarting the stream/partial
1435
1435
# buffering etc.
1436
byte_stream = self._stream_to_byte_stream([], src_format)
1436
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1438
1438
response = client.call_with_body_stream(
1439
1439
('Repository.insert_stream', path, ''), byte_stream)
1440
1440
except errors.UnknownSmartMethod:
1441
1441
medium._remember_remote_is_before((1,13))
1442
1442
return self._insert_real(stream, src_format, resume_tokens)
1443
byte_stream = self._stream_to_byte_stream(stream, src_format)
1443
byte_stream = smart_repo._stream_to_byte_stream(
1444
1445
resume_tokens = ' '.join(resume_tokens)
1445
1446
response = client.call_with_body_stream(
1446
1447
('Repository.insert_stream', path, resume_tokens), byte_stream)
1458
1459
collection.reload_pack_names()
1459
1460
return [], set()
1461
def _stream_to_byte_stream(self, stream, src_format):
1463
pack_writer = pack.ContainerWriter(bytes.append)
1465
pack_writer.add_bytes_record(src_format.network_name(), '')
1467
def get_adapter(adapter_key):
1469
return adapters[adapter_key]
1471
adapter_factory = adapter_registry.get(adapter_key)
1472
adapter = adapter_factory(self)
1473
adapters[adapter_key] = adapter
1475
for substream_type, substream in stream:
1476
for record in substream:
1477
if record.storage_kind in ('chunked', 'fulltext'):
1478
serialised = record_to_fulltext_bytes(record)
1480
serialised = record.get_bytes_as(record.storage_kind)
1482
# Some streams embed the whole stream into the wire
1483
# representation of the first record, which means that
1484
# later records have no wire representation: we skip them.
1485
pack_writer.add_bytes_record(serialised, [(substream_type,)])
1463
class RemoteStreamSource(repository.StreamSource):
1464
"""Stream data from a remote server."""
1466
def get_stream(self, search):
1467
# streaming with fallback repositories is not well defined yet: The
1468
# remote repository cannot see the fallback repositories, and thus
1469
# cannot satisfy the entire search in the general case. Likewise the
1470
# fallback repositories cannot reify the search to determine what they
1471
# should send. It likely needs a return value in the stream listing the
1472
# edge of the search to resume from in fallback repositories.
1473
if self.from_repository._fallback_repositories:
1474
return repository.StreamSource.get_stream(self, search)
1475
repo = self.from_repository
1476
client = repo._client
1477
medium = client._medium
1478
if medium._is_remote_before((1, 13)):
1479
# No possible way this can work.
1480
return repository.StreamSource.get_stream(self, search)
1481
path = repo.bzrdir._path_for_remote_call(client)
1483
recipe = repo._serialise_search_recipe(search._recipe)
1484
response = repo._call_with_body_bytes_expecting_body(
1485
'Repository.StreamSource.get_stream',
1486
(path, self.to_format.network_name()), recipe)
1487
response_tuple, response_handler = response
1488
except errors.UnknownSmartMethod:
1489
medium._remember_remote_is_before((1,13))
1490
return repository.StreamSource.get_stream(self, search)
1491
if response_tuple[0] != 'ok':
1492
raise errors.UnexpectedSmartServerResponse(response_tuple)
1493
byte_stream = response_handler.read_streamed_body()
1494
src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
1495
if src_format.network_name() != repo._format.network_name():
1496
raise AssertionError(
1497
"Mismatched RemoteRepository and stream src %r, %r" % (
1498
src_format.network_name(), repo._format.network_name()))
1494
1502
class RemoteBranchLockableFiles(LockableFiles):