62
59
except errors.ErrorFromSmartServer, err:
63
60
self._translate_error(err, **err_context)
62
def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
64
return self._client.call_with_body_bytes(method, args, body_bytes)
65
except errors.ErrorFromSmartServer, err:
66
self._translate_error(err, **err_context)
65
68
def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
665
680
self._ensure_real()
666
681
return self._real_repository.suspend_write_group()
683
def get_missing_parent_inventories(self, check_for_missing_texts=True):
685
return self._real_repository.get_missing_parent_inventories(
686
check_for_missing_texts=check_for_missing_texts)
688
def _get_rev_id_for_revno_vfs(self, revno, known_pair):
690
return self._real_repository.get_rev_id_for_revno(
693
def get_rev_id_for_revno(self, revno, known_pair):
694
"""See Repository.get_rev_id_for_revno."""
695
path = self.bzrdir._path_for_remote_call(self._client)
697
if self._client._medium._is_remote_before((1, 17)):
698
return self._get_rev_id_for_revno_vfs(revno, known_pair)
699
response = self._call(
700
'Repository.get_rev_id_for_revno', path, revno, known_pair)
701
except errors.UnknownSmartMethod:
702
self._client._medium._remember_remote_is_before((1, 17))
703
return self._get_rev_id_for_revno_vfs(revno, known_pair)
704
if response[0] == 'ok':
705
return True, response[1]
706
elif response[0] == 'history-incomplete':
707
known_pair = response[1:3]
708
for fallback in self._fallback_repositories:
709
found, result = fallback.get_rev_id_for_revno(revno, known_pair)
714
# Not found in any fallbacks
715
return False, known_pair
717
raise errors.UnexpectedSmartServerResponse(response)
668
719
def _ensure_real(self):
669
720
"""Ensure that there is a _real_repository set.
744
800
"""Return a source for streaming from this repository."""
745
801
return RemoteStreamSource(self, to_format)
747
804
def has_revision(self, revision_id):
748
"""See Repository.has_revision()."""
749
if revision_id == NULL_REVISION:
750
# The null revision is always present.
752
path = self.bzrdir._path_for_remote_call(self._client)
753
response = self._call('Repository.has_revision', path, revision_id)
754
if response[0] not in ('yes', 'no'):
755
raise errors.UnexpectedSmartServerResponse(response)
756
if response[0] == 'yes':
758
for fallback_repo in self._fallback_repositories:
759
if fallback_repo.has_revision(revision_id):
805
"""True if this repository has a copy of the revision."""
806
# Copy of bzrlib.repository.Repository.has_revision
807
return revision_id in self.has_revisions((revision_id,))
763
810
def has_revisions(self, revision_ids):
764
"""See Repository.has_revisions()."""
765
# FIXME: This does many roundtrips, particularly when there are
766
# fallback repositories. -- mbp 20080905
768
for revision_id in revision_ids:
769
if self.has_revision(revision_id):
770
result.add(revision_id)
811
"""Probe to find out the presence of multiple revisions.
813
:param revision_ids: An iterable of revision_ids.
814
:return: A set of the revision_ids that were present.
816
# Copy of bzrlib.repository.Repository.has_revisions
817
parent_map = self.get_parent_map(revision_ids)
818
result = set(parent_map)
819
if _mod_revision.NULL_REVISION in revision_ids:
820
result.add(_mod_revision.NULL_REVISION)
823
def _has_same_fallbacks(self, other_repo):
824
"""Returns true if the repositories have the same fallbacks."""
825
# XXX: copied from Repository; it should be unified into a base class
826
# <https://bugs.edge.launchpad.net/bzr/+bug/401622>
827
my_fb = self._fallback_repositories
828
other_fb = other_repo._fallback_repositories
829
if len(my_fb) != len(other_fb):
831
for f, g in zip(my_fb, other_fb):
832
if not f.has_same_location(g):
773
836
def has_same_location(self, other):
837
# TODO: Move to RepositoryBase and unify with the regular Repository
838
# one; unfortunately the tests rely on slightly different behaviour at
839
# present -- mbp 20090710
774
840
return (self.__class__ is other.__class__ and
775
841
self.bzrdir.transport.base == other.bzrdir.transport.base)
1605
1685
def insert_stream(self, stream, src_format, resume_tokens):
1606
1686
target = self.target_repo
1687
target._unstacked_provider.missing_keys.clear()
1688
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1607
1689
if target._lock_token:
1608
verb = 'Repository.insert_stream_locked'
1609
extra_args = (target._lock_token or '',)
1610
required_version = (1, 14)
1690
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1691
lock_args = (target._lock_token or '',)
1612
verb = 'Repository.insert_stream'
1614
required_version = (1, 13)
1693
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1615
1695
client = target._client
1616
1696
medium = client._medium
1617
if medium._is_remote_before(required_version):
1618
# No possible way this can work.
1619
return self._insert_real(stream, src_format, resume_tokens)
1620
1697
path = target.bzrdir._path_for_remote_call(client)
1621
if not resume_tokens:
1622
# XXX: Ugly but important for correctness, *will* be fixed during
1623
# 1.13 cycle. Pushing a stream that is interrupted results in a
1624
# fallback to the _real_repositories sink *with a partial stream*.
1625
# Thats bad because we insert less data than bzr expected. To avoid
1626
# this we do a trial push to make sure the verb is accessible, and
1627
# do not fallback when actually pushing the stream. A cleanup patch
1628
# is going to look at rewinding/restarting the stream/partial
1698
# Probe for the verb to use with an empty stream before sending the
1699
# real stream to it. We do this both to avoid the risk of sending a
1700
# large request that is then rejected, and because we don't want to
1701
# implement a way to buffer, rewind, or restart the stream.
1703
for verb, required_version in candidate_calls:
1704
if medium._is_remote_before(required_version):
1707
# We've already done the probing (and set _is_remote_before) on
1708
# a previous insert.
1630
1711
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1632
1713
response = client.call_with_body_stream(
1633
(verb, path, '') + extra_args, byte_stream)
1714
(verb, path, '') + lock_args, byte_stream)
1634
1715
except errors.UnknownSmartMethod:
1635
1716
medium._remember_remote_is_before(required_version)
1636
return self._insert_real(stream, src_format, resume_tokens)
1722
return self._insert_real(stream, src_format, resume_tokens)
1723
self._last_inv_record = None
1724
self._last_substream = None
1725
if required_version < (1, 19):
1726
# Remote side doesn't support inventory deltas. Wrap the stream to
1727
# make sure we don't send any. If the stream contains inventory
1728
# deltas we'll interrupt the smart insert_stream request and
1730
stream = self._stop_stream_if_inventory_delta(stream)
1637
1731
byte_stream = smart_repo._stream_to_byte_stream(
1638
1732
stream, src_format)
1639
1733
resume_tokens = ' '.join(resume_tokens)
1640
1734
response = client.call_with_body_stream(
1641
(verb, path, resume_tokens) + extra_args, byte_stream)
1735
(verb, path, resume_tokens) + lock_args, byte_stream)
1642
1736
if response[0][0] not in ('ok', 'missing-basis'):
1643
1737
raise errors.UnexpectedSmartServerResponse(response)
1738
if self._last_substream is not None:
1739
# The stream included an inventory-delta record, but the remote
1740
# side isn't new enough to support them. So we need to send the
1741
# rest of the stream via VFS.
1742
self.target_repo.refresh_data()
1743
return self._resume_stream_with_vfs(response, src_format)
1644
1744
if response[0][0] == 'missing-basis':
1645
1745
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1646
1746
resume_tokens = tokens
1649
1749
self.target_repo.refresh_data()
1650
1750
return [], set()
1752
def _resume_stream_with_vfs(self, response, src_format):
1753
"""Resume sending a stream via VFS, first resending the record and
1754
substream that couldn't be sent via an insert_stream verb.
1756
if response[0][0] == 'missing-basis':
1757
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1758
# Ignore missing_keys, we haven't finished inserting yet
1761
def resume_substream():
1762
# Yield the substream that was interrupted.
1763
for record in self._last_substream:
1765
self._last_substream = None
1766
def resume_stream():
1767
# Finish sending the interrupted substream
1768
yield ('inventory-deltas', resume_substream())
1769
# Then simply continue sending the rest of the stream.
1770
for substream_kind, substream in self._last_stream:
1771
yield substream_kind, substream
1772
return self._insert_real(resume_stream(), src_format, tokens)
1774
def _stop_stream_if_inventory_delta(self, stream):
1775
"""Normally this just lets the original stream pass-through unchanged.
1777
However if any 'inventory-deltas' substream occurs it will stop
1778
streaming, and store the interrupted substream and stream in
1779
self._last_substream and self._last_stream so that the stream can be
1780
resumed by _resume_stream_with_vfs.
1783
stream_iter = iter(stream)
1784
for substream_kind, substream in stream_iter:
1785
if substream_kind == 'inventory-deltas':
1786
self._last_substream = substream
1787
self._last_stream = stream_iter
1790
yield substream_kind, substream
1653
1793
class RemoteStreamSource(repository.StreamSource):
1654
1794
"""Stream data from a remote server."""
1657
1797
if (self.from_repository._fallback_repositories and
1658
1798
self.to_format._fetch_order == 'topological'):
1659
1799
return self._real_stream(self.from_repository, search)
1660
return self.missing_parents_chain(search, [self.from_repository] +
1661
self.from_repository._fallback_repositories)
1802
repos = [self.from_repository]
1808
repos.extend(repo._fallback_repositories)
1809
sources.append(repo)
1810
return self.missing_parents_chain(search, sources)
1812
def get_stream_for_missing_keys(self, missing_keys):
1813
self.from_repository._ensure_real()
1814
real_repo = self.from_repository._real_repository
1815
real_source = real_repo._get_source(self.to_format)
1816
return real_source.get_stream_for_missing_keys(missing_keys)
1663
1818
def _real_stream(self, repo, search):
1664
1819
"""Get a stream for search from repo.
1694
1850
return self._real_stream(repo, search)
1695
1851
client = repo._client
1696
1852
medium = client._medium
1697
if medium._is_remote_before((1, 13)):
1698
# streaming was added in 1.13
1699
return self._real_stream(repo, search)
1700
1853
path = repo.bzrdir._path_for_remote_call(client)
1702
search_bytes = repo._serialise_search_result(search)
1703
response = repo._call_with_body_bytes_expecting_body(
1704
'Repository.get_stream',
1705
(path, self.to_format.network_name()), search_bytes)
1706
response_tuple, response_handler = response
1707
except errors.UnknownSmartMethod:
1708
medium._remember_remote_is_before((1,13))
1854
search_bytes = repo._serialise_search_result(search)
1855
args = (path, self.to_format.network_name())
1857
('Repository.get_stream_1.19', (1, 19)),
1858
('Repository.get_stream', (1, 13))]
1860
for verb, version in candidate_verbs:
1861
if medium._is_remote_before(version):
1864
response = repo._call_with_body_bytes_expecting_body(
1865
verb, args, search_bytes)
1866
except errors.UnknownSmartMethod:
1867
medium._remember_remote_is_before(version)
1869
response_tuple, response_handler = response
1709
1873
return self._real_stream(repo, search)
1710
1874
if response_tuple[0] != 'ok':
1711
1875
raise errors.UnexpectedSmartServerResponse(response_tuple)
2083
2261
return self._vfs_get_tags_bytes()
2084
2262
return response[0]
2264
def _vfs_set_tags_bytes(self, bytes):
2266
return self._real_branch._set_tags_bytes(bytes)
2268
def _set_tags_bytes(self, bytes):
2269
medium = self._client._medium
2270
if medium._is_remote_before((1, 18)):
2271
self._vfs_set_tags_bytes(bytes)
2275
self._remote_path(), self._lock_token, self._repo_lock_token)
2276
response = self._call_with_body_bytes(
2277
'Branch.set_tags_bytes', args, bytes)
2278
except errors.UnknownSmartMethod:
2279
medium._remember_remote_is_before((1, 18))
2280
self._vfs_set_tags_bytes(bytes)
2086
2282
def lock_read(self):
2087
2283
self.repository.lock_read()
2088
2284
if not self._lock_mode:
2200
2392
raise NotImplementedError(self.dont_leave_lock_in_place)
2201
2393
self._leave_lock = False
2396
def get_rev_id(self, revno, history=None):
2398
return _mod_revision.NULL_REVISION
2399
last_revision_info = self.last_revision_info()
2400
ok, result = self.repository.get_rev_id_for_revno(
2401
revno, last_revision_info)
2404
missing_parent = result[1]
2405
# Either the revision named by the server is missing, or its parent
2406
# is. Call get_parent_map to determine which, so that we report a
2408
parent_map = self.repository.get_parent_map([missing_parent])
2409
if missing_parent in parent_map:
2410
missing_parent = parent_map[missing_parent]
2411
raise errors.RevisionNotPresent(missing_parent, self.repository)
2203
2413
def _last_revision_info(self):
2204
2414
response = self._call('Branch.last_revision_info', self._remote_path())
2205
2415
if response[0] != 'ok':