63
59
except errors.ErrorFromSmartServer, err:
64
60
self._translate_error(err, **err_context)
62
def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
64
return self._client.call_with_body_bytes(method, args, body_bytes)
65
except errors.ErrorFromSmartServer, err:
66
self._translate_error(err, **err_context)
66
68
def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
675
685
return self._real_repository.get_missing_parent_inventories(
676
686
check_for_missing_texts=check_for_missing_texts)
688
def _get_rev_id_for_revno_vfs(self, revno, known_pair):
690
return self._real_repository.get_rev_id_for_revno(
693
def get_rev_id_for_revno(self, revno, known_pair):
694
"""See Repository.get_rev_id_for_revno."""
695
path = self.bzrdir._path_for_remote_call(self._client)
697
if self._client._medium._is_remote_before((1, 17)):
698
return self._get_rev_id_for_revno_vfs(revno, known_pair)
699
response = self._call(
700
'Repository.get_rev_id_for_revno', path, revno, known_pair)
701
except errors.UnknownSmartMethod:
702
self._client._medium._remember_remote_is_before((1, 17))
703
return self._get_rev_id_for_revno_vfs(revno, known_pair)
704
if response[0] == 'ok':
705
return True, response[1]
706
elif response[0] == 'history-incomplete':
707
known_pair = response[1:3]
708
for fallback in self._fallback_repositories:
709
found, result = fallback.get_rev_id_for_revno(revno, known_pair)
714
# Not found in any fallbacks
715
return False, known_pair
717
raise errors.UnexpectedSmartServerResponse(response)
678
719
def _ensure_real(self):
679
720
"""Ensure that there is a _real_repository set.
779
820
result.add(_mod_revision.NULL_REVISION)
823
def _has_same_fallbacks(self, other_repo):
824
"""Returns true if the repositories have the same fallbacks."""
825
# XXX: copied from Repository; it should be unified into a base class
826
# <https://bugs.edge.launchpad.net/bzr/+bug/401622>
827
my_fb = self._fallback_repositories
828
other_fb = other_repo._fallback_repositories
829
if len(my_fb) != len(other_fb):
831
for f, g in zip(my_fb, other_fb):
832
if not f.has_same_location(g):
782
836
def has_same_location(self, other):
837
# TODO: Move to RepositoryBase and unify with the regular Repository
838
# one; unfortunately the tests rely on slightly different behaviour at
839
# present -- mbp 20090710
783
840
return (self.__class__ is other.__class__ and
784
841
self.bzrdir.transport.base == other.bzrdir.transport.base)
1624
1685
def insert_stream(self, stream, src_format, resume_tokens):
1625
1686
target = self.target_repo
1626
1687
target._unstacked_provider.missing_keys.clear()
1688
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1627
1689
if target._lock_token:
1628
verb = 'Repository.insert_stream_locked'
1629
extra_args = (target._lock_token or '',)
1630
required_version = (1, 14)
1690
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1691
lock_args = (target._lock_token or '',)
1632
verb = 'Repository.insert_stream'
1634
required_version = (1, 13)
1693
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1635
1695
client = target._client
1636
1696
medium = client._medium
1637
if medium._is_remote_before(required_version):
1638
# No possible way this can work.
1639
return self._insert_real(stream, src_format, resume_tokens)
1640
1697
path = target.bzrdir._path_for_remote_call(client)
1641
if not resume_tokens:
1642
# XXX: Ugly but important for correctness, *will* be fixed during
1643
# 1.13 cycle. Pushing a stream that is interrupted results in a
1644
# fallback to the _real_repositories sink *with a partial stream*.
1645
# Thats bad because we insert less data than bzr expected. To avoid
1646
# this we do a trial push to make sure the verb is accessible, and
1647
# do not fallback when actually pushing the stream. A cleanup patch
1648
# is going to look at rewinding/restarting the stream/partial
1698
# Probe for the verb to use with an empty stream before sending the
1699
# real stream to it. We do this both to avoid the risk of sending a
1700
# large request that is then rejected, and because we don't want to
1701
# implement a way to buffer, rewind, or restart the stream.
1703
for verb, required_version in candidate_calls:
1704
if medium._is_remote_before(required_version):
1707
# We've already done the probing (and set _is_remote_before) on
1708
# a previous insert.
1650
1711
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1652
1713
response = client.call_with_body_stream(
1653
(verb, path, '') + extra_args, byte_stream)
1714
(verb, path, '') + lock_args, byte_stream)
1654
1715
except errors.UnknownSmartMethod:
1655
1716
medium._remember_remote_is_before(required_version)
1656
return self._insert_real(stream, src_format, resume_tokens)
1722
return self._insert_real(stream, src_format, resume_tokens)
1723
self._last_inv_record = None
1724
self._last_substream = None
1725
if required_version < (1, 19):
1726
# Remote side doesn't support inventory deltas. Wrap the stream to
1727
# make sure we don't send any. If the stream contains inventory
1728
# deltas we'll interrupt the smart insert_stream request and
1730
stream = self._stop_stream_if_inventory_delta(stream)
1657
1731
byte_stream = smart_repo._stream_to_byte_stream(
1658
1732
stream, src_format)
1659
1733
resume_tokens = ' '.join(resume_tokens)
1660
1734
response = client.call_with_body_stream(
1661
(verb, path, resume_tokens) + extra_args, byte_stream)
1735
(verb, path, resume_tokens) + lock_args, byte_stream)
1662
1736
if response[0][0] not in ('ok', 'missing-basis'):
1663
1737
raise errors.UnexpectedSmartServerResponse(response)
1738
if self._last_substream is not None:
1739
# The stream included an inventory-delta record, but the remote
1740
# side isn't new enough to support them. So we need to send the
1741
# rest of the stream via VFS.
1742
return self._resume_stream_with_vfs(response, src_format)
1664
1743
if response[0][0] == 'missing-basis':
1665
1744
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1666
1745
resume_tokens = tokens
1669
1748
self.target_repo.refresh_data()
1670
1749
return [], set()
1751
def _resume_stream_with_vfs(self, response, src_format):
1752
"""Resume sending a stream via VFS, first resending the record and
1753
substream that couldn't be sent via an insert_stream verb.
1755
if response[0][0] == 'missing-basis':
1756
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1757
# Ignore missing_keys, we haven't finished inserting yet
1760
def resume_substream():
1761
# Yield the substream that was interrupted.
1762
for record in self._last_substream:
1764
self._last_substream = None
1765
def resume_stream():
1766
# Finish sending the interrupted substream
1767
yield ('inventory-deltas', resume_substream())
1768
# Then simply continue sending the rest of the stream.
1769
for substream_kind, substream in self._last_stream:
1770
yield substream_kind, substream
1771
return self._insert_real(resume_stream(), src_format, tokens)
1773
def _stop_stream_if_inventory_delta(self, stream):
1774
"""Normally this just lets the original stream pass-through unchanged.
1776
However if any 'inventory-deltas' substream occurs it will stop
1777
streaming, and store the interrupted substream and stream in
1778
self._last_substream and self._last_stream so that the stream can be
1779
resumed by _resume_stream_with_vfs.
1782
stream_iter = iter(stream)
1783
for substream_kind, substream in stream_iter:
1784
if substream_kind == 'inventory-deltas':
1785
self._last_substream = substream
1786
self._last_stream = stream_iter
1789
yield substream_kind, substream
1673
1792
class RemoteStreamSource(repository.StreamSource):
1674
1793
"""Stream data from a remote server."""
1677
1796
if (self.from_repository._fallback_repositories and
1678
1797
self.to_format._fetch_order == 'topological'):
1679
1798
return self._real_stream(self.from_repository, search)
1680
return self.missing_parents_chain(search, [self.from_repository] +
1681
self.from_repository._fallback_repositories)
1801
repos = [self.from_repository]
1807
repos.extend(repo._fallback_repositories)
1808
sources.append(repo)
1809
return self.missing_parents_chain(search, sources)
1811
def get_stream_for_missing_keys(self, missing_keys):
1812
self.from_repository._ensure_real()
1813
real_repo = self.from_repository._real_repository
1814
real_source = real_repo._get_source(self.to_format)
1815
return real_source.get_stream_for_missing_keys(missing_keys)
1683
1817
def _real_stream(self, repo, search):
1684
1818
"""Get a stream for search from repo.
1714
1849
return self._real_stream(repo, search)
1715
1850
client = repo._client
1716
1851
medium = client._medium
1717
if medium._is_remote_before((1, 13)):
1718
# streaming was added in 1.13
1719
return self._real_stream(repo, search)
1720
1852
path = repo.bzrdir._path_for_remote_call(client)
1722
search_bytes = repo._serialise_search_result(search)
1723
response = repo._call_with_body_bytes_expecting_body(
1724
'Repository.get_stream',
1725
(path, self.to_format.network_name()), search_bytes)
1726
response_tuple, response_handler = response
1727
except errors.UnknownSmartMethod:
1728
medium._remember_remote_is_before((1,13))
1853
search_bytes = repo._serialise_search_result(search)
1854
args = (path, self.to_format.network_name())
1856
('Repository.get_stream_1.19', (1, 19)),
1857
('Repository.get_stream', (1, 13))]
1859
for verb, version in candidate_verbs:
1860
if medium._is_remote_before(version):
1863
response = repo._call_with_body_bytes_expecting_body(
1864
verb, args, search_bytes)
1865
except errors.UnknownSmartMethod:
1866
medium._remember_remote_is_before(version)
1868
response_tuple, response_handler = response
1729
1872
return self._real_stream(repo, search)
1730
1873
if response_tuple[0] != 'ok':
1731
1874
raise errors.UnexpectedSmartServerResponse(response_tuple)
2112
2259
return self._vfs_get_tags_bytes()
2113
2260
return response[0]
2262
def _vfs_set_tags_bytes(self, bytes):
2264
return self._real_branch._set_tags_bytes(bytes)
2266
def _set_tags_bytes(self, bytes):
2267
medium = self._client._medium
2268
if medium._is_remote_before((1, 18)):
2269
self._vfs_set_tags_bytes(bytes)
2272
self._remote_path(), self._lock_token, self._repo_lock_token)
2273
response = self._call_with_body_bytes(
2274
'Branch.set_tags_bytes', args, bytes)
2275
except errors.UnknownSmartMethod:
2276
medium._remember_remote_is_before((1, 18))
2277
self._vfs_set_tags_bytes(bytes)
2115
2279
def lock_read(self):
2116
2280
self.repository.lock_read()
2117
2281
if not self._lock_mode:
2229
2389
raise NotImplementedError(self.dont_leave_lock_in_place)
2230
2390
self._leave_lock = False
2392
def get_rev_id(self, revno, history=None):
2394
return _mod_revision.NULL_REVISION
2395
last_revision_info = self.last_revision_info()
2396
ok, result = self.repository.get_rev_id_for_revno(
2397
revno, last_revision_info)
2400
missing_parent = result[1]
2401
# Either the revision named by the server is missing, or its parent
2402
# is. Call get_parent_map to determine which, so that we report a
2404
parent_map = self.repository.get_parent_map([missing_parent])
2405
if missing_parent in parent_map:
2406
missing_parent = parent_map[missing_parent]
2407
raise errors.RevisionNotPresent(missing_parent, self.repository)
2232
2409
def _last_revision_info(self):
2233
2410
response = self._call('Branch.last_revision_info', self._remote_path())
2234
2411
if response[0] != 'ok':