1685
1682
def insert_stream(self, stream, src_format, resume_tokens):
1686
1683
target = self.target_repo
1687
1684
target._unstacked_provider.missing_keys.clear()
1688
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1689
1685
if target._lock_token:
1690
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1691
lock_args = (target._lock_token or '',)
1686
verb = 'Repository.insert_stream_locked'
1687
extra_args = (target._lock_token or '',)
1688
required_version = (1, 14)
1693
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1690
verb = 'Repository.insert_stream'
1692
required_version = (1, 13)
1695
1693
client = target._client
1696
1694
medium = client._medium
1695
if medium._is_remote_before(required_version):
1696
# No possible way this can work.
1697
return self._insert_real(stream, src_format, resume_tokens)
1697
1698
path = target.bzrdir._path_for_remote_call(client)
1698
# Probe for the verb to use with an empty stream before sending the
1699
# real stream to it. We do this both to avoid the risk of sending a
1700
# large request that is then rejected, and because we don't want to
1701
# implement a way to buffer, rewind, or restart the stream.
1703
for verb, required_version in candidate_calls:
1704
if medium._is_remote_before(required_version):
1707
# We've already done the probing (and set _is_remote_before) on
1708
# a previous insert.
1699
if not resume_tokens:
1700
# XXX: Ugly but important for correctness, *will* be fixed during
1701
# 1.13 cycle. Pushing a stream that is interrupted results in a
1702
# fallback to the _real_repositories sink *with a partial stream*.
1703
# Thats bad because we insert less data than bzr expected. To avoid
1704
# this we do a trial push to make sure the verb is accessible, and
1705
# do not fallback when actually pushing the stream. A cleanup patch
1706
# is going to look at rewinding/restarting the stream/partial
1711
1708
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1713
1710
response = client.call_with_body_stream(
1714
(verb, path, '') + lock_args, byte_stream)
1711
(verb, path, '') + extra_args, byte_stream)
1715
1712
except errors.UnknownSmartMethod:
1716
1713
medium._remember_remote_is_before(required_version)
1722
return self._insert_real(stream, src_format, resume_tokens)
1723
self._last_inv_record = None
1724
self._last_substream = None
1725
if required_version < (1, 19):
1726
# Remote side doesn't support inventory deltas. Wrap the stream to
1727
# make sure we don't send any. If the stream contains inventory
1728
# deltas we'll interrupt the smart insert_stream request and
1730
stream = self._stop_stream_if_inventory_delta(stream)
1714
return self._insert_real(stream, src_format, resume_tokens)
1731
1715
byte_stream = smart_repo._stream_to_byte_stream(
1732
1716
stream, src_format)
1733
1717
resume_tokens = ' '.join(resume_tokens)
1734
1718
response = client.call_with_body_stream(
1735
(verb, path, resume_tokens) + lock_args, byte_stream)
1719
(verb, path, resume_tokens) + extra_args, byte_stream)
1736
1720
if response[0][0] not in ('ok', 'missing-basis'):
1737
1721
raise errors.UnexpectedSmartServerResponse(response)
1738
if self._last_substream is not None:
1739
# The stream included an inventory-delta record, but the remote
1740
# side isn't new enough to support them. So we need to send the
1741
# rest of the stream via VFS.
1742
return self._resume_stream_with_vfs(response, src_format)
1743
1722
if response[0][0] == 'missing-basis':
1744
1723
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1745
1724
resume_tokens = tokens
1748
1727
self.target_repo.refresh_data()
1749
1728
return [], set()
1751
def _resume_stream_with_vfs(self, response, src_format):
1752
"""Resume sending a stream via VFS, first resending the record and
1753
substream that couldn't be sent via an insert_stream verb.
1755
if response[0][0] == 'missing-basis':
1756
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1757
# Ignore missing_keys, we haven't finished inserting yet
1760
def resume_substream():
1761
# Yield the substream that was interrupted.
1762
for record in self._last_substream:
1764
self._last_substream = None
1765
def resume_stream():
1766
# Finish sending the interrupted substream
1767
yield ('inventory-deltas', resume_substream())
1768
# Then simply continue sending the rest of the stream.
1769
for substream_kind, substream in self._last_stream:
1770
yield substream_kind, substream
1771
return self._insert_real(resume_stream(), src_format, tokens)
1773
def _stop_stream_if_inventory_delta(self, stream):
1774
"""Normally this just lets the original stream pass-through unchanged.
1776
However if any 'inventory-deltas' substream occurs it will stop
1777
streaming, and store the interrupted substream and stream in
1778
self._last_substream and self._last_stream so that the stream can be
1779
resumed by _resume_stream_with_vfs.
1782
stream_iter = iter(stream)
1783
for substream_kind, substream in stream_iter:
1784
if substream_kind == 'inventory-deltas':
1785
self._last_substream = substream
1786
self._last_stream = stream_iter
1789
yield substream_kind, substream
1792
1731
class RemoteStreamSource(repository.StreamSource):
1793
1732
"""Stream data from a remote server."""
1849
1781
return self._real_stream(repo, search)
1850
1782
client = repo._client
1851
1783
medium = client._medium
1784
if medium._is_remote_before((1, 13)):
1785
# streaming was added in 1.13
1786
return self._real_stream(repo, search)
1852
1787
path = repo.bzrdir._path_for_remote_call(client)
1853
search_bytes = repo._serialise_search_result(search)
1854
args = (path, self.to_format.network_name())
1856
('Repository.get_stream_1.19', (1, 19)),
1857
('Repository.get_stream', (1, 13))]
1859
for verb, version in candidate_verbs:
1860
if medium._is_remote_before(version):
1863
response = repo._call_with_body_bytes_expecting_body(
1864
verb, args, search_bytes)
1865
except errors.UnknownSmartMethod:
1866
medium._remember_remote_is_before(version)
1868
response_tuple, response_handler = response
1789
search_bytes = repo._serialise_search_result(search)
1790
response = repo._call_with_body_bytes_expecting_body(
1791
'Repository.get_stream',
1792
(path, self.to_format.network_name()), search_bytes)
1793
response_tuple, response_handler = response
1794
except errors.UnknownSmartMethod:
1795
medium._remember_remote_is_before((1,13))
1872
1796
return self._real_stream(repo, search)
1873
1797
if response_tuple[0] != 'ok':
1874
1798
raise errors.UnexpectedSmartServerResponse(response_tuple)