17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
20
21
from cStringIO import StringIO
22
23
from bzrlib import (
29
from bzrlib.branch import Branch, BranchReferenceFormat
32
from bzrlib.branch import BranchReferenceFormat
30
33
from bzrlib.bzrdir import BzrDir, RemoteBzrDirFormat
31
34
from bzrlib.config import BranchConfig, TreeConfig
32
35
from bzrlib.decorators import needs_read_lock, needs_write_lock
33
36
from bzrlib.errors import NoSuchRevision
34
37
from bzrlib.lockable_files import LockableFiles
35
from bzrlib.pack import ContainerReader
38
from bzrlib.pack import ContainerPushParser
36
39
from bzrlib.smart import client, vfs
37
40
from bzrlib.symbol_versioning import (
41
from bzrlib.trace import note
44
from bzrlib.revision import NULL_REVISION
45
from bzrlib.trace import mutter, note, warning
43
47
# Note: RemoteBzrDirFormat is in bzrdir.py
141
149
def open_repository(self):
142
150
path = self._path_for_remote_call(self._client)
143
response = self._client.call('BzrDir.find_repository', path)
151
verb = 'BzrDir.find_repositoryV2'
152
response = self._client.call(verb, path)
153
if (response == ('error', "Generic bzr smart protocol error: "
154
"bad request '%s'" % verb) or
155
response == ('error', "Generic bzr smart protocol error: "
156
"bad request u'%s'" % verb)):
157
verb = 'BzrDir.find_repository'
158
response = self._client.call(verb, path)
144
159
assert response[0] in ('ok', 'norepository'), \
145
160
'unexpected response code %s' % (response,)
146
161
if response[0] == 'norepository':
147
162
raise errors.NoRepositoryPresent(self)
148
assert len(response) == 4, 'incorrect response length %s' % (response,)
163
if verb == 'BzrDir.find_repository':
164
# servers that don't support the V2 method don't support external
166
response = response + ('no', )
167
assert len(response) == 5, 'incorrect response length %s' % (response,)
149
168
if response[1] == '':
150
169
format = RemoteRepositoryFormat()
151
170
format.rich_root_data = (response[2] == 'yes')
152
171
format.supports_tree_reference = (response[3] == 'yes')
172
# No wire format to check this yet.
173
format.supports_external_lookups = (response[4] == 'yes')
153
174
return RemoteRepository(self, format)
155
176
raise errors.NoRepositoryPresent(self)
375
403
assert response[0] in ('yes', 'no'), 'unexpected response code %s' % (response,)
376
404
return response[0] == 'yes'
406
def has_revisions(self, revision_ids):
407
"""See Repository.has_revisions()."""
409
for revision_id in revision_ids:
410
if self.has_revision(revision_id):
411
result.add(revision_id)
378
414
def has_same_location(self, other):
379
415
return (self.__class__ == other.__class__ and
380
416
self.bzrdir.transport.base == other.bzrdir.transport.base)
382
418
def get_graph(self, other_repository=None):
383
419
"""Return the graph for this repository format"""
385
return self._real_repository.get_graph(other_repository)
420
parents_provider = self
421
if (other_repository is not None and
422
other_repository.bzrdir.transport.base !=
423
self.bzrdir.transport.base):
424
parents_provider = graph._StackedParentsProvider(
425
[parents_provider, other_repository._make_parents_provider()])
426
return graph.Graph(parents_provider)
387
428
def gather_stats(self, revid=None, committers=None):
388
429
"""See Repository.gather_stats()."""
661
719
"""RemoteRepositories never create working trees by default."""
722
def revision_ids_to_search_result(self, result_set):
723
"""Convert a set of revision ids to a graph SearchResult."""
724
result_parents = set()
725
for parents in self.get_graph().get_parent_map(
726
result_set).itervalues():
727
result_parents.update(parents)
728
included_keys = result_set.intersection(result_parents)
729
start_keys = result_set.difference(included_keys)
730
exclude_keys = result_parents.difference(result_set)
731
result = graph.SearchResult(start_keys, exclude_keys,
732
len(result_set), result_set)
736
def search_missing_revision_ids(self, other, revision_id=None, find_ghosts=True):
737
"""Return the revision ids that other has that this does not.
739
These are returned in topological order.
741
revision_id: only return revision ids included by revision_id.
743
return repository.InterRepository.get(
744
other, self).search_missing_revision_ids(revision_id, find_ghosts)
664
746
def fetch(self, source, revision_id=None, pb=None):
665
747
if self.has_same_location(source):
666
748
# check that last_revision is in 'from' and then return a
707
789
self._ensure_real()
708
790
return self._real_repository.iter_files_bytes(desired_files)
792
def get_parent_map(self, keys):
793
"""See bzrlib.Graph.get_parent_map()."""
794
# Hack to build up the caching logic.
795
ancestry = self._parents_map
797
# Repository is not locked, so there's no cache.
798
missing_revisions = set(keys)
801
missing_revisions = set(key for key in keys if key not in ancestry)
802
if missing_revisions:
803
parent_map = self._get_parent_map(missing_revisions)
804
if 'hpss' in debug.debug_flags:
805
mutter('retransmitted revisions: %d of %d',
806
len(set(ancestry).intersection(parent_map)),
808
ancestry.update(parent_map)
809
present_keys = [k for k in keys if k in ancestry]
810
if 'hpss' in debug.debug_flags:
811
self._requested_parents.update(present_keys)
812
mutter('Current RemoteRepository graph hit rate: %d%%',
813
100.0 * len(self._requested_parents) / len(ancestry))
814
return dict((k, ancestry[k]) for k in present_keys)
816
def _response_is_unknown_method(self, response, verb):
817
"""Return True if response is an unknonwn method response to verb.
819
:param response: The response from a smart client call_expecting_body
821
:param verb: The verb used in that call.
822
:return: True if an unknown method was encountered.
824
# This might live better on
825
# bzrlib.smart.protocol.SmartClientRequestProtocolOne
826
if (response[0] == ('error', "Generic bzr smart protocol error: "
827
"bad request '%s'" % verb) or
828
response[0] == ('error', "Generic bzr smart protocol error: "
829
"bad request u'%s'" % verb)):
830
response[1].cancel_read_body()
834
def _get_parent_map(self, keys):
835
"""Helper for get_parent_map that performs the RPC."""
836
medium = self._client.get_smart_medium()
837
if not medium._remote_is_at_least_1_2:
838
# We already found out that the server can't understand
839
# Repository.get_parent_map requests, so just fetch the whole
841
return self.get_revision_graph()
844
if NULL_REVISION in keys:
845
keys.discard(NULL_REVISION)
846
found_parents = {NULL_REVISION:()}
851
# TODO(Needs analysis): We could assume that the keys being requested
852
# from get_parent_map are in a breadth first search, so typically they
853
# will all be depth N from some common parent, and we don't have to
854
# have the server iterate from the root parent, but rather from the
855
# keys we're searching; and just tell the server the keyspace we
856
# already have; but this may be more traffic again.
858
# Transform self._parents_map into a search request recipe.
859
# TODO: Manage this incrementally to avoid covering the same path
860
# repeatedly. (The server will have to on each request, but the less
861
# work done the better).
862
parents_map = self._parents_map
863
if parents_map is None:
864
# Repository is not locked, so there's no cache.
866
start_set = set(parents_map)
867
result_parents = set()
868
for parents in parents_map.itervalues():
869
result_parents.update(parents)
870
stop_keys = result_parents.difference(start_set)
871
included_keys = start_set.intersection(result_parents)
872
start_set.difference_update(included_keys)
873
recipe = (start_set, stop_keys, len(parents_map))
874
body = self._serialise_search_recipe(recipe)
875
path = self.bzrdir._path_for_remote_call(self._client)
877
assert type(key) is str
878
verb = 'Repository.get_parent_map'
879
args = (path,) + tuple(keys)
880
response = self._client.call_with_body_bytes_expecting_body(
881
verb, args, self._serialise_search_recipe(recipe))
882
if self._response_is_unknown_method(response, verb):
883
# Server does not support this method, so get the whole graph.
884
# Worse, we have to force a disconnection, because the server now
885
# doesn't realise it has a body on the wire to consume, so the
886
# only way to recover is to abandon the connection.
888
'Server is too old for fast get_parent_map, reconnecting. '
889
'(Upgrade the server to Bazaar 1.2 to avoid this)')
891
# To avoid having to disconnect repeatedly, we keep track of the
892
# fact the server doesn't understand remote methods added in 1.2.
893
medium._remote_is_at_least_1_2 = False
894
return self.get_revision_graph()
895
elif response[0][0] not in ['ok']:
896
reponse[1].cancel_read_body()
897
raise errors.UnexpectedSmartServerResponse(response[0])
898
if response[0][0] == 'ok':
899
coded = bz2.decompress(response[1].read_body_bytes())
903
lines = coded.split('\n')
906
d = tuple(line.split())
908
revision_graph[d[0]] = d[1:]
910
# No parents - so give the Graph result (NULL_REVISION,).
911
revision_graph[d[0]] = (NULL_REVISION,)
912
return revision_graph
711
915
def get_signature_text(self, revision_id):
712
916
self._ensure_real()
843
1046
self._ensure_real()
844
1047
return self._real_repository.has_signature_for_revision_id(revision_id)
846
def get_data_stream(self, revision_ids):
1049
def get_data_stream_for_search(self, search):
1050
medium = self._client.get_smart_medium()
1051
if not medium._remote_is_at_least_1_2:
1053
return self._real_repository.get_data_stream_for_search(search)
1054
REQUEST_NAME = 'Repository.stream_revisions_chunked'
847
1055
path = self.bzrdir._path_for_remote_call(self._client)
848
response, protocol = self._client.call_expecting_body(
849
'Repository.stream_knit_data_for_revisions', path, *revision_ids)
1056
body = self._serialise_search_recipe(search.get_recipe())
1057
response, protocol = self._client.call_with_body_bytes_expecting_body(
1058
REQUEST_NAME, (path,), body)
1060
if self._response_is_unknown_method((response, protocol), REQUEST_NAME):
1061
# Server does not support this method, so fall back to VFS.
1062
# Worse, we have to force a disconnection, because the server now
1063
# doesn't realise it has a body on the wire to consume, so the
1064
# only way to recover is to abandon the connection.
1066
'Server is too old for streaming pull, reconnecting. '
1067
'(Upgrade the server to Bazaar 1.2 to avoid this)')
1069
# To avoid having to disconnect repeatedly, we keep track of the
1070
# fact the server doesn't understand this remote method.
1071
medium._remote_is_at_least_1_2 = False
1073
return self._real_repository.get_data_stream_for_search(search)
850
1075
if response == ('ok',):
851
1076
return self._deserialise_stream(protocol)
852
elif (response == ('error', "Generic bzr smart protocol error: "
853
"bad request 'Repository.stream_knit_data_for_revisions'") or
854
response == ('error', "Generic bzr smart protocol error: "
855
"bad request u'Repository.stream_knit_data_for_revisions'")):
856
protocol.cancel_read_body()
858
return self._real_repository.get_data_stream(revision_ids)
1077
if response == ('NoSuchRevision', ):
1078
# We cannot easily identify the revision that is missing in this
1079
# situation without doing much more network IO. For now, bail.
1080
raise NoSuchRevision(self, "unknown")
860
1082
raise errors.UnexpectedSmartServerResponse(response)
862
1084
def _deserialise_stream(self, protocol):
863
buffer = StringIO(protocol.read_body_bytes())
864
reader = ContainerReader(buffer)
865
for record_names, read_bytes in reader.iter_records():
867
# These records should have only one name, and that name
868
# should be a one-element tuple.
869
[name_tuple] = record_names
871
raise errors.SmartProtocolError(
872
'Repository data stream had invalid record name %r'
874
yield name_tuple, read_bytes(None)
1085
stream = protocol.read_streamed_body()
1086
container_parser = ContainerPushParser()
1087
for bytes in stream:
1088
container_parser.accept_bytes(bytes)
1089
records = container_parser.read_pending_records()
1090
for record_names, record_bytes in records:
1091
if len(record_names) != 1:
1092
# These records should have only one name, and that name
1093
# should be a one-element tuple.
1094
raise errors.SmartProtocolError(
1095
'Repository data stream had invalid record name %r'
1097
name_tuple = record_names[0]
1098
yield name_tuple, record_bytes
876
1100
def insert_data_stream(self, stream):
877
1101
self._ensure_real()