14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
# TODO: At some point, handle upgrades by just passing the whole request
18
# across to run on the server.
22
19
from bzrlib import (
31
revision as _mod_revision,
36
34
from bzrlib.branch import BranchReferenceFormat
37
35
from bzrlib.bzrdir import BzrDir, RemoteBzrDirFormat
38
from bzrlib.decorators import needs_read_lock, needs_write_lock
36
from bzrlib.decorators import needs_read_lock, needs_write_lock, only_raises
39
37
from bzrlib.errors import (
41
39
SmartProtocolError,
62
59
except errors.ErrorFromSmartServer, err:
63
60
self._translate_error(err, **err_context)
62
def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
64
return self._client.call_with_body_bytes(method, args, body_bytes)
65
except errors.ErrorFromSmartServer, err:
66
self._translate_error(err, **err_context)
65
68
def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
105
109
self._client = client._SmartClient(medium)
107
111
self._client = _client
117
def _probe_bzrdir(self):
118
medium = self._client._medium
110
119
path = self._path_for_remote_call(self._client)
120
if medium._is_remote_before((2, 1)):
124
self._rpc_open_2_1(path)
126
except errors.UnknownSmartMethod:
127
medium._remember_remote_is_before((2, 1))
130
def _rpc_open_2_1(self, path):
131
response = self._call('BzrDir.open_2.1', path)
132
if response == ('no',):
133
raise errors.NotBranchError(path=self.root_transport.base)
134
elif response[0] == 'yes':
135
if response[1] == 'yes':
136
self._has_working_tree = True
137
elif response[1] == 'no':
138
self._has_working_tree = False
140
raise errors.UnexpectedSmartServerResponse(response)
142
raise errors.UnexpectedSmartServerResponse(response)
144
def _rpc_open(self, path):
111
145
response = self._call('BzrDir.open', path)
112
146
if response not in [('yes',), ('no',)]:
113
147
raise errors.UnexpectedSmartServerResponse(response)
114
148
if response == ('no',):
115
raise errors.NotBranchError(path=transport.base)
149
raise errors.NotBranchError(path=self.root_transport.base)
117
151
def _ensure_real(self):
118
152
"""Ensure that there is a _real_bzrdir set.
544
601
return self._custom_format._fetch_reconcile
546
603
def get_format_description(self):
547
return 'bzr remote repository'
605
return 'Remote: ' + self._custom_format.get_format_description()
549
607
def __eq__(self, other):
550
608
return self.__class__ is other.__class__
552
def check_conversion_target(self, target_format):
553
if self.rich_root_data and not target_format.rich_root_data:
554
raise errors.BadConversionTarget(
555
'Does not support rich root data.', target_format)
556
if (self.supports_tree_reference and
557
not getattr(target_format, 'supports_tree_reference', False)):
558
raise errors.BadConversionTarget(
559
'Does not support nested trees', target_format)
561
610
def network_name(self):
562
611
if self._network_name:
563
612
return self._network_name
665
721
self._ensure_real()
666
722
return self._real_repository.suspend_write_group()
724
def get_missing_parent_inventories(self, check_for_missing_texts=True):
726
return self._real_repository.get_missing_parent_inventories(
727
check_for_missing_texts=check_for_missing_texts)
729
def _get_rev_id_for_revno_vfs(self, revno, known_pair):
731
return self._real_repository.get_rev_id_for_revno(
734
def get_rev_id_for_revno(self, revno, known_pair):
735
"""See Repository.get_rev_id_for_revno."""
736
path = self.bzrdir._path_for_remote_call(self._client)
738
if self._client._medium._is_remote_before((1, 17)):
739
return self._get_rev_id_for_revno_vfs(revno, known_pair)
740
response = self._call(
741
'Repository.get_rev_id_for_revno', path, revno, known_pair)
742
except errors.UnknownSmartMethod:
743
self._client._medium._remember_remote_is_before((1, 17))
744
return self._get_rev_id_for_revno_vfs(revno, known_pair)
745
if response[0] == 'ok':
746
return True, response[1]
747
elif response[0] == 'history-incomplete':
748
known_pair = response[1:3]
749
for fallback in self._fallback_repositories:
750
found, result = fallback.get_rev_id_for_revno(revno, known_pair)
755
# Not found in any fallbacks
756
return False, known_pair
758
raise errors.UnexpectedSmartServerResponse(response)
668
760
def _ensure_real(self):
669
761
"""Ensure that there is a _real_repository set.
744
841
"""Return a source for streaming from this repository."""
745
842
return RemoteStreamSource(self, to_format)
747
845
def has_revision(self, revision_id):
748
"""See Repository.has_revision()."""
749
if revision_id == NULL_REVISION:
750
# The null revision is always present.
752
path = self.bzrdir._path_for_remote_call(self._client)
753
response = self._call('Repository.has_revision', path, revision_id)
754
if response[0] not in ('yes', 'no'):
755
raise errors.UnexpectedSmartServerResponse(response)
756
if response[0] == 'yes':
758
for fallback_repo in self._fallback_repositories:
759
if fallback_repo.has_revision(revision_id):
846
"""True if this repository has a copy of the revision."""
847
# Copy of bzrlib.repository.Repository.has_revision
848
return revision_id in self.has_revisions((revision_id,))
763
851
def has_revisions(self, revision_ids):
764
"""See Repository.has_revisions()."""
765
# FIXME: This does many roundtrips, particularly when there are
766
# fallback repositories. -- mbp 20080905
768
for revision_id in revision_ids:
769
if self.has_revision(revision_id):
770
result.add(revision_id)
852
"""Probe to find out the presence of multiple revisions.
854
:param revision_ids: An iterable of revision_ids.
855
:return: A set of the revision_ids that were present.
857
# Copy of bzrlib.repository.Repository.has_revisions
858
parent_map = self.get_parent_map(revision_ids)
859
result = set(parent_map)
860
if _mod_revision.NULL_REVISION in revision_ids:
861
result.add(_mod_revision.NULL_REVISION)
864
def _has_same_fallbacks(self, other_repo):
865
"""Returns true if the repositories have the same fallbacks."""
866
# XXX: copied from Repository; it should be unified into a base class
867
# <https://bugs.edge.launchpad.net/bzr/+bug/401622>
868
my_fb = self._fallback_repositories
869
other_fb = other_repo._fallback_repositories
870
if len(my_fb) != len(other_fb):
872
for f, g in zip(my_fb, other_fb):
873
if not f.has_same_location(g):
773
877
def has_same_location(self, other):
878
# TODO: Move to RepositoryBase and unify with the regular Repository
879
# one; unfortunately the tests rely on slightly different behaviour at
880
# present -- mbp 20090710
774
881
return (self.__class__ is other.__class__ and
775
882
self.bzrdir.transport.base == other.bzrdir.transport.base)
844
951
def is_write_locked(self):
845
952
return self._lock_mode == 'w'
954
def _warn_if_deprecated(self, branch=None):
955
# If we have a real repository, the check will be done there, if we
956
# don't the check will be done remotely.
847
959
def lock_read(self):
848
960
# wrong eventually - want a local lock cache context
849
961
if not self._lock_mode:
850
963
self._lock_mode = 'r'
851
964
self._lock_count = 1
852
965
self._unstacked_provider.enable_cache(cache_misses=True)
853
966
if self._real_repository is not None:
854
967
self._real_repository.lock_read()
968
for repo in self._fallback_repositories:
856
971
self._lock_count += 1
857
for repo in self._fallback_repositories:
860
973
def _remote_lock_write(self, token):
861
974
path = self.bzrdir._path_for_remote_call(self._client)
1605
1734
def insert_stream(self, stream, src_format, resume_tokens):
1606
1735
target = self.target_repo
1736
target._unstacked_provider.missing_keys.clear()
1737
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1607
1738
if target._lock_token:
1608
verb = 'Repository.insert_stream_locked'
1609
extra_args = (target._lock_token or '',)
1610
required_version = (1, 14)
1739
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1740
lock_args = (target._lock_token or '',)
1612
verb = 'Repository.insert_stream'
1614
required_version = (1, 13)
1742
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1615
1744
client = target._client
1616
1745
medium = client._medium
1617
if medium._is_remote_before(required_version):
1618
# No possible way this can work.
1619
return self._insert_real(stream, src_format, resume_tokens)
1620
1746
path = target.bzrdir._path_for_remote_call(client)
1621
if not resume_tokens:
1622
# XXX: Ugly but important for correctness, *will* be fixed during
1623
# 1.13 cycle. Pushing a stream that is interrupted results in a
1624
# fallback to the _real_repositories sink *with a partial stream*.
1625
# Thats bad because we insert less data than bzr expected. To avoid
1626
# this we do a trial push to make sure the verb is accessible, and
1627
# do not fallback when actually pushing the stream. A cleanup patch
1628
# is going to look at rewinding/restarting the stream/partial
1747
# Probe for the verb to use with an empty stream before sending the
1748
# real stream to it. We do this both to avoid the risk of sending a
1749
# large request that is then rejected, and because we don't want to
1750
# implement a way to buffer, rewind, or restart the stream.
1752
for verb, required_version in candidate_calls:
1753
if medium._is_remote_before(required_version):
1756
# We've already done the probing (and set _is_remote_before) on
1757
# a previous insert.
1630
1760
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1632
1762
response = client.call_with_body_stream(
1633
(verb, path, '') + extra_args, byte_stream)
1763
(verb, path, '') + lock_args, byte_stream)
1634
1764
except errors.UnknownSmartMethod:
1635
1765
medium._remember_remote_is_before(required_version)
1636
return self._insert_real(stream, src_format, resume_tokens)
1771
return self._insert_real(stream, src_format, resume_tokens)
1772
self._last_inv_record = None
1773
self._last_substream = None
1774
if required_version < (1, 19):
1775
# Remote side doesn't support inventory deltas. Wrap the stream to
1776
# make sure we don't send any. If the stream contains inventory
1777
# deltas we'll interrupt the smart insert_stream request and
1779
stream = self._stop_stream_if_inventory_delta(stream)
1637
1780
byte_stream = smart_repo._stream_to_byte_stream(
1638
1781
stream, src_format)
1639
1782
resume_tokens = ' '.join(resume_tokens)
1640
1783
response = client.call_with_body_stream(
1641
(verb, path, resume_tokens) + extra_args, byte_stream)
1784
(verb, path, resume_tokens) + lock_args, byte_stream)
1642
1785
if response[0][0] not in ('ok', 'missing-basis'):
1643
1786
raise errors.UnexpectedSmartServerResponse(response)
1787
if self._last_substream is not None:
1788
# The stream included an inventory-delta record, but the remote
1789
# side isn't new enough to support them. So we need to send the
1790
# rest of the stream via VFS.
1791
self.target_repo.refresh_data()
1792
return self._resume_stream_with_vfs(response, src_format)
1644
1793
if response[0][0] == 'missing-basis':
1645
1794
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1646
1795
resume_tokens = tokens
1649
1798
self.target_repo.refresh_data()
1650
1799
return [], set()
1801
def _resume_stream_with_vfs(self, response, src_format):
1802
"""Resume sending a stream via VFS, first resending the record and
1803
substream that couldn't be sent via an insert_stream verb.
1805
if response[0][0] == 'missing-basis':
1806
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1807
# Ignore missing_keys, we haven't finished inserting yet
1810
def resume_substream():
1811
# Yield the substream that was interrupted.
1812
for record in self._last_substream:
1814
self._last_substream = None
1815
def resume_stream():
1816
# Finish sending the interrupted substream
1817
yield ('inventory-deltas', resume_substream())
1818
# Then simply continue sending the rest of the stream.
1819
for substream_kind, substream in self._last_stream:
1820
yield substream_kind, substream
1821
return self._insert_real(resume_stream(), src_format, tokens)
1823
def _stop_stream_if_inventory_delta(self, stream):
1824
"""Normally this just lets the original stream pass-through unchanged.
1826
However if any 'inventory-deltas' substream occurs it will stop
1827
streaming, and store the interrupted substream and stream in
1828
self._last_substream and self._last_stream so that the stream can be
1829
resumed by _resume_stream_with_vfs.
1832
stream_iter = iter(stream)
1833
for substream_kind, substream in stream_iter:
1834
if substream_kind == 'inventory-deltas':
1835
self._last_substream = substream
1836
self._last_stream = stream_iter
1839
yield substream_kind, substream
1653
1842
class RemoteStreamSource(repository.StreamSource):
1654
1843
"""Stream data from a remote server."""
1657
1846
if (self.from_repository._fallback_repositories and
1658
1847
self.to_format._fetch_order == 'topological'):
1659
1848
return self._real_stream(self.from_repository, search)
1660
return self.missing_parents_chain(search, [self.from_repository] +
1661
self.from_repository._fallback_repositories)
1851
repos = [self.from_repository]
1857
repos.extend(repo._fallback_repositories)
1858
sources.append(repo)
1859
return self.missing_parents_chain(search, sources)
1861
def get_stream_for_missing_keys(self, missing_keys):
1862
self.from_repository._ensure_real()
1863
real_repo = self.from_repository._real_repository
1864
real_source = real_repo._get_source(self.to_format)
1865
return real_source.get_stream_for_missing_keys(missing_keys)
1663
1867
def _real_stream(self, repo, search):
1664
1868
"""Get a stream for search from repo.
1694
1899
return self._real_stream(repo, search)
1695
1900
client = repo._client
1696
1901
medium = client._medium
1697
if medium._is_remote_before((1, 13)):
1698
# streaming was added in 1.13
1699
return self._real_stream(repo, search)
1700
1902
path = repo.bzrdir._path_for_remote_call(client)
1702
search_bytes = repo._serialise_search_result(search)
1703
response = repo._call_with_body_bytes_expecting_body(
1704
'Repository.get_stream',
1705
(path, self.to_format.network_name()), search_bytes)
1706
response_tuple, response_handler = response
1707
except errors.UnknownSmartMethod:
1708
medium._remember_remote_is_before((1,13))
1903
search_bytes = repo._serialise_search_result(search)
1904
args = (path, self.to_format.network_name())
1906
('Repository.get_stream_1.19', (1, 19)),
1907
('Repository.get_stream', (1, 13))]
1909
for verb, version in candidate_verbs:
1910
if medium._is_remote_before(version):
1913
response = repo._call_with_body_bytes_expecting_body(
1914
verb, args, search_bytes)
1915
except errors.UnknownSmartMethod:
1916
medium._remember_remote_is_before(version)
1918
response_tuple, response_handler = response
1709
1922
return self._real_stream(repo, search)
1710
1923
if response_tuple[0] != 'ok':
1711
1924
raise errors.UnexpectedSmartServerResponse(response_tuple)
2083
2311
return self._vfs_get_tags_bytes()
2084
2312
return response[0]
2314
def _vfs_set_tags_bytes(self, bytes):
2316
return self._real_branch._set_tags_bytes(bytes)
2318
def _set_tags_bytes(self, bytes):
2319
medium = self._client._medium
2320
if medium._is_remote_before((1, 18)):
2321
self._vfs_set_tags_bytes(bytes)
2325
self._remote_path(), self._lock_token, self._repo_lock_token)
2326
response = self._call_with_body_bytes(
2327
'Branch.set_tags_bytes', args, bytes)
2328
except errors.UnknownSmartMethod:
2329
medium._remember_remote_is_before((1, 18))
2330
self._vfs_set_tags_bytes(bytes)
2086
2332
def lock_read(self):
2087
2333
self.repository.lock_read()
2088
2334
if not self._lock_mode:
2335
self._note_lock('r')
2089
2336
self._lock_mode = 'r'
2090
2337
self._lock_count = 1
2091
2338
if self._real_branch is not None:
2200
2445
raise NotImplementedError(self.dont_leave_lock_in_place)
2201
2446
self._leave_lock = False
2449
def get_rev_id(self, revno, history=None):
2451
return _mod_revision.NULL_REVISION
2452
last_revision_info = self.last_revision_info()
2453
ok, result = self.repository.get_rev_id_for_revno(
2454
revno, last_revision_info)
2457
missing_parent = result[1]
2458
# Either the revision named by the server is missing, or its parent
2459
# is. Call get_parent_map to determine which, so that we report a
2461
parent_map = self.repository.get_parent_map([missing_parent])
2462
if missing_parent in parent_map:
2463
missing_parent = parent_map[missing_parent]
2464
raise errors.RevisionNotPresent(missing_parent, self.repository)
2203
2466
def _last_revision_info(self):
2204
2467
response = self._call('Branch.last_revision_info', self._remote_path())
2205
2468
if response[0] != 'ok':