1682
1690
def insert_stream(self, stream, src_format, resume_tokens):
1683
1691
target = self.target_repo
1684
1692
target._unstacked_provider.missing_keys.clear()
1693
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1685
1694
if target._lock_token:
1686
verb = 'Repository.insert_stream_locked'
1687
extra_args = (target._lock_token or '',)
1688
required_version = (1, 14)
1695
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1696
lock_args = (target._lock_token or '',)
1690
verb = 'Repository.insert_stream'
1692
required_version = (1, 13)
1698
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1693
1700
client = target._client
1694
1701
medium = client._medium
1695
if medium._is_remote_before(required_version):
1696
# No possible way this can work.
1697
return self._insert_real(stream, src_format, resume_tokens)
1698
1702
path = target.bzrdir._path_for_remote_call(client)
1699
if not resume_tokens:
1700
# XXX: Ugly but important for correctness, *will* be fixed during
1701
# 1.13 cycle. Pushing a stream that is interrupted results in a
1702
# fallback to the _real_repositories sink *with a partial stream*.
1703
# Thats bad because we insert less data than bzr expected. To avoid
1704
# this we do a trial push to make sure the verb is accessible, and
1705
# do not fallback when actually pushing the stream. A cleanup patch
1706
# is going to look at rewinding/restarting the stream/partial
1703
# Probe for the verb to use with an empty stream before sending the
1704
# real stream to it. We do this both to avoid the risk of sending a
1705
# large request that is then rejected, and because we don't want to
1706
# implement a way to buffer, rewind, or restart the stream.
1708
for verb, required_version in candidate_calls:
1709
if medium._is_remote_before(required_version):
1712
# We've already done the probing (and set _is_remote_before) on
1713
# a previous insert.
1708
1716
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1710
1718
response = client.call_with_body_stream(
1711
(verb, path, '') + extra_args, byte_stream)
1719
(verb, path, '') + lock_args, byte_stream)
1712
1720
except errors.UnknownSmartMethod:
1713
1721
medium._remember_remote_is_before(required_version)
1714
return self._insert_real(stream, src_format, resume_tokens)
1727
return self._insert_real(stream, src_format, resume_tokens)
1728
self._last_inv_record = None
1729
self._last_substream = None
1730
if required_version < (1, 19):
1731
# Remote side doesn't support inventory deltas. Wrap the stream to
1732
# make sure we don't send any. If the stream contains inventory
1733
# deltas we'll interrupt the smart insert_stream request and
1735
stream = self._stop_stream_if_inventory_delta(stream)
1715
1736
byte_stream = smart_repo._stream_to_byte_stream(
1716
1737
stream, src_format)
1717
1738
resume_tokens = ' '.join(resume_tokens)
1718
1739
response = client.call_with_body_stream(
1719
(verb, path, resume_tokens) + extra_args, byte_stream)
1740
(verb, path, resume_tokens) + lock_args, byte_stream)
1720
1741
if response[0][0] not in ('ok', 'missing-basis'):
1721
1742
raise errors.UnexpectedSmartServerResponse(response)
1743
if self._last_substream is not None:
1744
# The stream included an inventory-delta record, but the remote
1745
# side isn't new enough to support them. So we need to send the
1746
# rest of the stream via VFS.
1747
return self._resume_stream_with_vfs(response, src_format)
1722
1748
if response[0][0] == 'missing-basis':
1723
1749
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1724
1750
resume_tokens = tokens
1727
1753
self.target_repo.refresh_data()
1728
1754
return [], set()
1756
def _resume_stream_with_vfs(self, response, src_format):
1757
"""Resume sending a stream via VFS, first resending the record and
1758
substream that couldn't be sent via an insert_stream verb.
1760
if response[0][0] == 'missing-basis':
1761
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1762
# Ignore missing_keys, we haven't finished inserting yet
1765
def resume_substream():
1766
# Yield the substream that was interrupted.
1767
for record in self._last_substream:
1769
self._last_substream = None
1770
def resume_stream():
1771
# Finish sending the interrupted substream
1772
yield ('inventory-deltas', resume_substream())
1773
# Then simply continue sending the rest of the stream.
1774
for substream_kind, substream in self._last_stream:
1775
yield substream_kind, substream
1776
return self._insert_real(resume_stream(), src_format, tokens)
1778
def _stop_stream_if_inventory_delta(self, stream):
1779
"""Normally this just lets the original stream pass-through unchanged.
1781
However if any 'inventory-deltas' substream occurs it will stop
1782
streaming, and store the interrupted substream and stream in
1783
self._last_substream and self._last_stream so that the stream can be
1784
resumed by _resume_stream_with_vfs.
1787
stream_iter = iter(stream)
1788
for substream_kind, substream in stream_iter:
1789
if substream_kind == 'inventory-deltas':
1790
self._last_substream = substream
1791
self._last_stream = stream_iter
1794
yield substream_kind, substream
1731
1797
class RemoteStreamSource(repository.StreamSource):
1732
1798
"""Stream data from a remote server."""
1782
1854
return self._real_stream(repo, search)
1783
1855
client = repo._client
1784
1856
medium = client._medium
1785
if medium._is_remote_before((1, 13)):
1786
# streaming was added in 1.13
1787
return self._real_stream(repo, search)
1788
1857
path = repo.bzrdir._path_for_remote_call(client)
1790
search_bytes = repo._serialise_search_result(search)
1791
response = repo._call_with_body_bytes_expecting_body(
1792
'Repository.get_stream',
1793
(path, self.to_format.network_name()), search_bytes)
1794
response_tuple, response_handler = response
1795
except errors.UnknownSmartMethod:
1796
medium._remember_remote_is_before((1,13))
1858
search_bytes = repo._serialise_search_result(search)
1859
args = (path, self.to_format.network_name())
1861
('Repository.get_stream_1.19', (1, 19)),
1862
('Repository.get_stream', (1, 13))]
1864
for verb, version in candidate_verbs:
1865
if medium._is_remote_before(version):
1868
response = repo._call_with_body_bytes_expecting_body(
1869
verb, args, search_bytes)
1870
except errors.UnknownSmartMethod:
1871
medium._remember_remote_is_before(version)
1873
response_tuple, response_handler = response
1797
1877
return self._real_stream(repo, search)
1798
1878
if response_tuple[0] != 'ok':
1799
1879
raise errors.UnexpectedSmartServerResponse(response_tuple)