59
63
except errors.ErrorFromSmartServer, err:
60
64
self._translate_error(err, **err_context)
62
def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
64
return self._client.call_with_body_bytes(method, args, body_bytes)
65
except errors.ErrorFromSmartServer, err:
66
self._translate_error(err, **err_context)
68
66
def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
685
675
return self._real_repository.get_missing_parent_inventories(
686
676
check_for_missing_texts=check_for_missing_texts)
688
def _get_rev_id_for_revno_vfs(self, revno, known_pair):
690
return self._real_repository.get_rev_id_for_revno(
693
def get_rev_id_for_revno(self, revno, known_pair):
694
"""See Repository.get_rev_id_for_revno."""
695
path = self.bzrdir._path_for_remote_call(self._client)
697
if self._client._medium._is_remote_before((1, 17)):
698
return self._get_rev_id_for_revno_vfs(revno, known_pair)
699
response = self._call(
700
'Repository.get_rev_id_for_revno', path, revno, known_pair)
701
except errors.UnknownSmartMethod:
702
self._client._medium._remember_remote_is_before((1, 17))
703
return self._get_rev_id_for_revno_vfs(revno, known_pair)
704
if response[0] == 'ok':
705
return True, response[1]
706
elif response[0] == 'history-incomplete':
707
known_pair = response[1:3]
708
for fallback in self._fallback_repositories:
709
found, result = fallback.get_rev_id_for_revno(revno, known_pair)
714
# Not found in any fallbacks
715
return False, known_pair
717
raise errors.UnexpectedSmartServerResponse(response)
719
678
def _ensure_real(self):
720
679
"""Ensure that there is a _real_repository set.
820
779
result.add(_mod_revision.NULL_REVISION)
823
def _has_same_fallbacks(self, other_repo):
824
"""Returns true if the repositories have the same fallbacks."""
825
# XXX: copied from Repository; it should be unified into a base class
826
# <https://bugs.edge.launchpad.net/bzr/+bug/401622>
827
my_fb = self._fallback_repositories
828
other_fb = other_repo._fallback_repositories
829
if len(my_fb) != len(other_fb):
831
for f, g in zip(my_fb, other_fb):
832
if not f.has_same_location(g):
836
782
def has_same_location(self, other):
837
# TODO: Move to RepositoryBase and unify with the regular Repository
838
# one; unfortunately the tests rely on slightly different behaviour at
839
# present -- mbp 20090710
840
783
return (self.__class__ is other.__class__ and
841
784
self.bzrdir.transport.base == other.bzrdir.transport.base)
1685
1624
def insert_stream(self, stream, src_format, resume_tokens):
1686
1625
target = self.target_repo
1687
1626
target._unstacked_provider.missing_keys.clear()
1688
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1689
1627
if target._lock_token:
1690
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1691
lock_args = (target._lock_token or '',)
1628
verb = 'Repository.insert_stream_locked'
1629
extra_args = (target._lock_token or '',)
1630
required_version = (1, 14)
1693
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1632
verb = 'Repository.insert_stream'
1634
required_version = (1, 13)
1695
1635
client = target._client
1696
1636
medium = client._medium
1637
if medium._is_remote_before(required_version):
1638
# No possible way this can work.
1639
return self._insert_real(stream, src_format, resume_tokens)
1697
1640
path = target.bzrdir._path_for_remote_call(client)
1698
# Probe for the verb to use with an empty stream before sending the
1699
# real stream to it. We do this both to avoid the risk of sending a
1700
# large request that is then rejected, and because we don't want to
1701
# implement a way to buffer, rewind, or restart the stream.
1703
for verb, required_version in candidate_calls:
1704
if medium._is_remote_before(required_version):
1707
# We've already done the probing (and set _is_remote_before) on
1708
# a previous insert.
1641
if not resume_tokens:
1642
# XXX: Ugly but important for correctness, *will* be fixed during
1643
# 1.13 cycle. Pushing a stream that is interrupted results in a
1644
# fallback to the _real_repositories sink *with a partial stream*.
1645
# Thats bad because we insert less data than bzr expected. To avoid
1646
# this we do a trial push to make sure the verb is accessible, and
1647
# do not fallback when actually pushing the stream. A cleanup patch
1648
# is going to look at rewinding/restarting the stream/partial
1711
1650
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1713
1652
response = client.call_with_body_stream(
1714
(verb, path, '') + lock_args, byte_stream)
1653
(verb, path, '') + extra_args, byte_stream)
1715
1654
except errors.UnknownSmartMethod:
1716
1655
medium._remember_remote_is_before(required_version)
1722
return self._insert_real(stream, src_format, resume_tokens)
1723
self._last_inv_record = None
1724
self._last_substream = None
1725
if required_version < (1, 19):
1726
# Remote side doesn't support inventory deltas. Wrap the stream to
1727
# make sure we don't send any. If the stream contains inventory
1728
# deltas we'll interrupt the smart insert_stream request and
1730
stream = self._stop_stream_if_inventory_delta(stream)
1656
return self._insert_real(stream, src_format, resume_tokens)
1731
1657
byte_stream = smart_repo._stream_to_byte_stream(
1732
1658
stream, src_format)
1733
1659
resume_tokens = ' '.join(resume_tokens)
1734
1660
response = client.call_with_body_stream(
1735
(verb, path, resume_tokens) + lock_args, byte_stream)
1661
(verb, path, resume_tokens) + extra_args, byte_stream)
1736
1662
if response[0][0] not in ('ok', 'missing-basis'):
1737
1663
raise errors.UnexpectedSmartServerResponse(response)
1738
if self._last_substream is not None:
1739
# The stream included an inventory-delta record, but the remote
1740
# side isn't new enough to support them. So we need to send the
1741
# rest of the stream via VFS.
1742
return self._resume_stream_with_vfs(response, src_format)
1743
1664
if response[0][0] == 'missing-basis':
1744
1665
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1745
1666
resume_tokens = tokens
1748
1669
self.target_repo.refresh_data()
1749
1670
return [], set()
1751
def _resume_stream_with_vfs(self, response, src_format):
1752
"""Resume sending a stream via VFS, first resending the record and
1753
substream that couldn't be sent via an insert_stream verb.
1755
if response[0][0] == 'missing-basis':
1756
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1757
# Ignore missing_keys, we haven't finished inserting yet
1760
def resume_substream():
1761
# Yield the substream that was interrupted.
1762
for record in self._last_substream:
1764
self._last_substream = None
1765
def resume_stream():
1766
# Finish sending the interrupted substream
1767
yield ('inventory-deltas', resume_substream())
1768
# Then simply continue sending the rest of the stream.
1769
for substream_kind, substream in self._last_stream:
1770
yield substream_kind, substream
1771
return self._insert_real(resume_stream(), src_format, tokens)
1773
def _stop_stream_if_inventory_delta(self, stream):
1774
"""Normally this just lets the original stream pass-through unchanged.
1776
However if any 'inventory-deltas' substream occurs it will stop
1777
streaming, and store the interrupted substream and stream in
1778
self._last_substream and self._last_stream so that the stream can be
1779
resumed by _resume_stream_with_vfs.
1782
stream_iter = iter(stream)
1783
for substream_kind, substream in stream_iter:
1784
if substream_kind == 'inventory-deltas':
1785
self._last_substream = substream
1786
self._last_stream = stream_iter
1789
yield substream_kind, substream
1792
1673
class RemoteStreamSource(repository.StreamSource):
1793
1674
"""Stream data from a remote server."""
1796
1677
if (self.from_repository._fallback_repositories and
1797
1678
self.to_format._fetch_order == 'topological'):
1798
1679
return self._real_stream(self.from_repository, search)
1801
repos = [self.from_repository]
1807
repos.extend(repo._fallback_repositories)
1808
sources.append(repo)
1809
return self.missing_parents_chain(search, sources)
1811
def get_stream_for_missing_keys(self, missing_keys):
1812
self.from_repository._ensure_real()
1813
real_repo = self.from_repository._real_repository
1814
real_source = real_repo._get_source(self.to_format)
1815
return real_source.get_stream_for_missing_keys(missing_keys)
1680
return self.missing_parents_chain(search, [self.from_repository] +
1681
self.from_repository._fallback_repositories)
1817
1683
def _real_stream(self, repo, search):
1818
1684
"""Get a stream for search from repo.
1849
1714
return self._real_stream(repo, search)
1850
1715
client = repo._client
1851
1716
medium = client._medium
1717
if medium._is_remote_before((1, 13)):
1718
# streaming was added in 1.13
1719
return self._real_stream(repo, search)
1852
1720
path = repo.bzrdir._path_for_remote_call(client)
1853
search_bytes = repo._serialise_search_result(search)
1854
args = (path, self.to_format.network_name())
1856
('Repository.get_stream_1.19', (1, 19)),
1857
('Repository.get_stream', (1, 13))]
1859
for verb, version in candidate_verbs:
1860
if medium._is_remote_before(version):
1863
response = repo._call_with_body_bytes_expecting_body(
1864
verb, args, search_bytes)
1865
except errors.UnknownSmartMethod:
1866
medium._remember_remote_is_before(version)
1868
response_tuple, response_handler = response
1722
search_bytes = repo._serialise_search_result(search)
1723
response = repo._call_with_body_bytes_expecting_body(
1724
'Repository.get_stream',
1725
(path, self.to_format.network_name()), search_bytes)
1726
response_tuple, response_handler = response
1727
except errors.UnknownSmartMethod:
1728
medium._remember_remote_is_before((1,13))
1872
1729
return self._real_stream(repo, search)
1873
1730
if response_tuple[0] != 'ok':
1874
1731
raise errors.UnexpectedSmartServerResponse(response_tuple)
2259
2103
return self._vfs_get_tags_bytes()
2260
2104
return response[0]
2262
def _vfs_set_tags_bytes(self, bytes):
2264
return self._real_branch._set_tags_bytes(bytes)
2266
def _set_tags_bytes(self, bytes):
2267
medium = self._client._medium
2268
if medium._is_remote_before((1, 18)):
2269
self._vfs_set_tags_bytes(bytes)
2272
self._remote_path(), self._lock_token, self._repo_lock_token)
2273
response = self._call_with_body_bytes(
2274
'Branch.set_tags_bytes', args, bytes)
2275
except errors.UnknownSmartMethod:
2276
medium._remember_remote_is_before((1, 18))
2277
self._vfs_set_tags_bytes(bytes)
2279
2106
def lock_read(self):
2280
2107
self.repository.lock_read()
2281
2108
if not self._lock_mode:
2389
2220
raise NotImplementedError(self.dont_leave_lock_in_place)
2390
2221
self._leave_lock = False
2392
def get_rev_id(self, revno, history=None):
2394
return _mod_revision.NULL_REVISION
2395
last_revision_info = self.last_revision_info()
2396
ok, result = self.repository.get_rev_id_for_revno(
2397
revno, last_revision_info)
2400
missing_parent = result[1]
2401
# Either the revision named by the server is missing, or its parent
2402
# is. Call get_parent_map to determine which, so that we report a
2404
parent_map = self.repository.get_parent_map([missing_parent])
2405
if missing_parent in parent_map:
2406
missing_parent = parent_map[missing_parent]
2407
raise errors.RevisionNotPresent(missing_parent, self.repository)
2409
2223
def _last_revision_info(self):
2410
2224
response = self._call('Branch.last_revision_info', self._remote_path())
2411
2225
if response[0] != 'ok':