~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
19
19
 
 
20
import bz2
20
21
from cStringIO import StringIO
21
22
 
22
23
from bzrlib import (
23
24
    branch,
 
25
    debug,
24
26
    errors,
25
27
    graph,
26
28
    lockdir,
40
42
    zero_ninetyone,
41
43
    )
42
44
from bzrlib.revision import NULL_REVISION
43
 
from bzrlib.trace import note
 
45
from bzrlib.trace import mutter, note, warning
44
46
 
45
47
# Note: RemoteBzrDirFormat is in bzrdir.py
46
48
 
129
131
        else:
130
132
            raise errors.UnexpectedSmartServerResponse(response)
131
133
 
 
134
    def _get_tree_branch(self):
 
135
        """See BzrDir._get_tree_branch()."""
 
136
        return None, self.open_branch()
 
137
 
132
138
    def open_branch(self, _unsupported=False):
133
139
        assert _unsupported == False, 'unsupported flag support not implemented yet.'
134
140
        reference_url = self.get_branch_reference()
142
148
                
143
149
    def open_repository(self):
144
150
        path = self._path_for_remote_call(self._client)
145
 
        response = self._client.call('BzrDir.find_repository', path)
 
151
        verb = 'BzrDir.find_repositoryV2'
 
152
        response = self._client.call(verb, path)
 
153
        if (response == ('error', "Generic bzr smart protocol error: "
 
154
                "bad request '%s'" % verb) or
 
155
              response == ('error', "Generic bzr smart protocol error: "
 
156
                "bad request u'%s'" % verb)):
 
157
            verb = 'BzrDir.find_repository'
 
158
            response = self._client.call(verb, path)
146
159
        assert response[0] in ('ok', 'norepository'), \
147
160
            'unexpected response code %s' % (response,)
148
161
        if response[0] == 'norepository':
149
162
            raise errors.NoRepositoryPresent(self)
150
 
        assert len(response) == 4, 'incorrect response length %s' % (response,)
 
163
        if verb == 'BzrDir.find_repository':
 
164
            # servers that don't support the V2 method don't support external
 
165
            # references either.
 
166
            response = response + ('no', )
 
167
        assert len(response) == 5, 'incorrect response length %s' % (response,)
151
168
        if response[1] == '':
152
169
            format = RemoteRepositoryFormat()
153
170
            format.rich_root_data = (response[2] == 'yes')
154
171
            format.supports_tree_reference = (response[3] == 'yes')
 
172
            # No wire format to check this yet.
 
173
            format.supports_external_lookups = (response[4] == 'yes')
155
174
            return RemoteRepository(self, format)
156
175
        else:
157
176
            raise errors.NoRepositoryPresent(self)
267
286
        self._leave_lock = False
268
287
        # A cache of looked up revision parent data; reset at unlock time.
269
288
        self._parents_map = None
 
289
        if 'hpss' in debug.debug_flags:
 
290
            self._requested_parents = None
270
291
        # For tests:
271
292
        # These depend on the actual remote format, so force them off for
272
293
        # maximum compatibility. XXX: In future these should depend on the
471
492
            self._lock_mode = 'r'
472
493
            self._lock_count = 1
473
494
            self._parents_map = {}
 
495
            if 'hpss' in debug.debug_flags:
 
496
                self._requested_parents = set()
474
497
            if self._real_repository is not None:
475
498
                self._real_repository.lock_read()
476
499
        else:
509
532
            self._lock_mode = 'w'
510
533
            self._lock_count = 1
511
534
            self._parents_map = {}
 
535
            if 'hpss' in debug.debug_flags:
 
536
                self._requested_parents = set()
512
537
        elif self._lock_mode == 'r':
513
538
            raise errors.ReadOnlyError(self)
514
539
        else:
569
594
        if self._lock_count > 0:
570
595
            return
571
596
        self._parents_map = None
 
597
        if 'hpss' in debug.debug_flags:
 
598
            self._requested_parents = None
572
599
        old_mode = self._lock_mode
573
600
        self._lock_mode = None
574
601
        try:
648
675
                committer=committer, revprops=revprops, revision_id=revision_id)
649
676
        return builder
650
677
 
651
 
    @needs_write_lock
652
678
    def add_inventory(self, revid, inv, parents):
653
679
        self._ensure_real()
654
680
        return self._real_repository.add_inventory(revid, inv, parents)
655
681
 
656
 
    @needs_write_lock
657
682
    def add_revision(self, rev_id, rev, inv=None, config=None):
658
683
        self._ensure_real()
659
684
        return self._real_repository.add_revision(
691
716
        """RemoteRepositories never create working trees by default."""
692
717
        return False
693
718
 
 
719
    def revision_ids_to_search_result(self, result_set):
 
720
        """Convert a set of revision ids to a graph SearchResult."""
 
721
        result_parents = set()
 
722
        for parents in self.get_graph().get_parent_map(
 
723
            result_set).itervalues():
 
724
            result_parents.update(parents)
 
725
        included_keys = result_set.intersection(result_parents)
 
726
        start_keys = result_set.difference(included_keys)
 
727
        exclude_keys = result_parents.difference(result_set)
 
728
        result = graph.SearchResult(start_keys, exclude_keys,
 
729
            len(result_set), result_set)
 
730
        return result
 
731
 
 
732
    @needs_read_lock
 
733
    def search_missing_revision_ids(self, other, revision_id=None, find_ghosts=True):
 
734
        """Return the revision ids that other has that this does not.
 
735
        
 
736
        These are returned in topological order.
 
737
 
 
738
        revision_id: only return revision ids included by revision_id.
 
739
        """
 
740
        return repository.InterRepository.get(
 
741
            other, self).search_missing_revision_ids(revision_id, find_ghosts)
 
742
 
694
743
    def fetch(self, source, revision_id=None, pb=None):
695
744
        if self.has_same_location(source):
696
745
            # check that last_revision is in 'from' and then return a
741
790
        """See bzrlib.Graph.get_parent_map()."""
742
791
        # Hack to build up the caching logic.
743
792
        ancestry = self._parents_map
744
 
        missing_revisions = set(key for key in keys if key not in ancestry)
 
793
        if ancestry is None:
 
794
            # Repository is not locked, so there's no cache.
 
795
            missing_revisions = set(keys)
 
796
            ancestry = {}
 
797
        else:
 
798
            missing_revisions = set(key for key in keys if key not in ancestry)
745
799
        if missing_revisions:
746
 
            self._parents_map.update(self._get_parent_map(missing_revisions))
747
 
        return dict((k, ancestry[k]) for k in keys if k in ancestry)
 
800
            parent_map = self._get_parent_map(missing_revisions)
 
801
            if 'hpss' in debug.debug_flags:
 
802
                mutter('retransmitted revisions: %d of %d',
 
803
                        len(set(ancestry).intersection(parent_map)),
 
804
                        len(parent_map))
 
805
            ancestry.update(parent_map)
 
806
        present_keys = [k for k in keys if k in ancestry]
 
807
        if 'hpss' in debug.debug_flags:
 
808
            self._requested_parents.update(present_keys)
 
809
            mutter('Current RemoteRepository graph hit rate: %d%%',
 
810
                100.0 * len(self._requested_parents) / len(ancestry))
 
811
        return dict((k, ancestry[k]) for k in present_keys)
748
812
 
749
813
    def _response_is_unknown_method(self, response, verb):
750
814
        """Return True if response is an unknonwn method response to verb.
766
830
 
767
831
    def _get_parent_map(self, keys):
768
832
        """Helper for get_parent_map that performs the RPC."""
 
833
        medium = self._client.get_smart_medium()
 
834
        if not medium._remote_is_at_least_1_2:
 
835
            # We already found out that the server can't understand
 
836
            # Repository.get_parent_map requests, so just fetch the whole
 
837
            # graph.
 
838
            return self.get_revision_graph()
 
839
 
769
840
        keys = set(keys)
770
841
        if NULL_REVISION in keys:
771
842
            keys.discard(NULL_REVISION)
774
845
                return found_parents
775
846
        else:
776
847
            found_parents = {}
 
848
        # TODO(Needs analysis): We could assume that the keys being requested
 
849
        # from get_parent_map are in a breadth first search, so typically they
 
850
        # will all be depth N from some common parent, and we don't have to
 
851
        # have the server iterate from the root parent, but rather from the
 
852
        # keys we're searching; and just tell the server the keyspace we
 
853
        # already have; but this may be more traffic again.
 
854
 
 
855
        # Transform self._parents_map into a search request recipe.
 
856
        # TODO: Manage this incrementally to avoid covering the same path
 
857
        # repeatedly. (The server will have to on each request, but the less
 
858
        # work done the better).
 
859
        parents_map = self._parents_map
 
860
        if parents_map is None:
 
861
            # Repository is not locked, so there's no cache.
 
862
            parents_map = {}
 
863
        start_set = set(parents_map)
 
864
        result_parents = set()
 
865
        for parents in parents_map.itervalues():
 
866
            result_parents.update(parents)
 
867
        stop_keys = result_parents.difference(start_set)
 
868
        included_keys = start_set.intersection(result_parents)
 
869
        start_set.difference_update(included_keys)
 
870
        recipe = (start_set, stop_keys, len(parents_map))
 
871
        body = self._serialise_search_recipe(recipe)
777
872
        path = self.bzrdir._path_for_remote_call(self._client)
778
873
        for key in keys:
779
874
            assert type(key) is str
780
875
        verb = 'Repository.get_parent_map'
781
 
        response = self._client.call_expecting_body(
782
 
            verb, path, *keys)
 
876
        args = (path,) + tuple(keys)
 
877
        response = self._client.call_with_body_bytes_expecting_body(
 
878
            verb, args, self._serialise_search_recipe(recipe))
783
879
        if self._response_is_unknown_method(response, verb):
784
 
            # Server that does not support this method, get the whole graph.
785
 
            response = self._client.call_expecting_body(
786
 
                'Repository.get_revision_graph', path, '')
787
 
            if response[0][0] not in ['ok', 'nosuchrevision']:
788
 
                reponse[1].cancel_read_body()
789
 
                raise errors.UnexpectedSmartServerResponse(response[0])
 
880
            # Server does not support this method, so get the whole graph.
 
881
            # Worse, we have to force a disconnection, because the server now
 
882
            # doesn't realise it has a body on the wire to consume, so the
 
883
            # only way to recover is to abandon the connection.
 
884
            warning(
 
885
                'Server is too old for fast get_parent_map, reconnecting.  '
 
886
                '(Upgrade the server to Bazaar 1.2 to avoid this)')
 
887
            medium.disconnect()
 
888
            # To avoid having to disconnect repeatedly, we keep track of the
 
889
            # fact the server doesn't understand remote methods added in 1.2.
 
890
            medium._remote_is_at_least_1_2 = False
 
891
            return self.get_revision_graph()
790
892
        elif response[0][0] not in ['ok']:
791
893
            reponse[1].cancel_read_body()
792
894
            raise errors.UnexpectedSmartServerResponse(response[0])
793
895
        if response[0][0] == 'ok':
794
 
            coded = response[1].read_body_bytes()
 
896
            coded = bz2.decompress(response[1].read_body_bytes())
795
897
            if coded == '':
796
898
                # no revisions found
797
899
                return {}
941
1043
        self._ensure_real()
942
1044
        return self._real_repository.has_signature_for_revision_id(revision_id)
943
1045
 
944
 
    def get_data_stream(self, revision_ids):
 
1046
    def get_data_stream_for_search(self, search):
 
1047
        medium = self._client.get_smart_medium()
 
1048
        if not medium._remote_is_at_least_1_2:
 
1049
            self._ensure_real()
 
1050
            return self._real_repository.get_data_stream_for_search(search)
945
1051
        REQUEST_NAME = 'Repository.stream_revisions_chunked'
946
1052
        path = self.bzrdir._path_for_remote_call(self._client)
947
 
        response, protocol = self._client.call_expecting_body(
948
 
            REQUEST_NAME, path, *revision_ids)
 
1053
        body = self._serialise_search_recipe(search.get_recipe())
 
1054
        response, protocol = self._client.call_with_body_bytes_expecting_body(
 
1055
            REQUEST_NAME, (path,), body)
 
1056
 
 
1057
        if self._response_is_unknown_method((response, protocol), REQUEST_NAME):
 
1058
            # Server does not support this method, so fall back to VFS.
 
1059
            # Worse, we have to force a disconnection, because the server now
 
1060
            # doesn't realise it has a body on the wire to consume, so the
 
1061
            # only way to recover is to abandon the connection.
 
1062
            warning(
 
1063
                'Server is too old for streaming pull, reconnecting.  '
 
1064
                '(Upgrade the server to Bazaar 1.2 to avoid this)')
 
1065
            medium.disconnect()
 
1066
            # To avoid having to disconnect repeatedly, we keep track of the
 
1067
            # fact the server doesn't understand this remote method.
 
1068
            medium._remote_is_at_least_1_2 = False
 
1069
            self._ensure_real()
 
1070
            return self._real_repository.get_data_stream_for_search(search)
949
1071
 
950
1072
        if response == ('ok',):
951
1073
            return self._deserialise_stream(protocol)
952
 
        elif (response == ('error', "Generic bzr smart protocol error: "
953
 
                "bad request '%s'" % REQUEST_NAME) or
954
 
              response == ('error', "Generic bzr smart protocol error: "
955
 
                "bad request u'%s'" % REQUEST_NAME)):
956
 
            protocol.cancel_read_body()
957
 
            self._ensure_real()
958
 
            return self._real_repository.get_data_stream(revision_ids)
 
1074
        if response == ('NoSuchRevision', ):
 
1075
            # We cannot easily identify the revision that is missing in this
 
1076
            # situation without doing much more network IO. For now, bail.
 
1077
            raise NoSuchRevision(self, "unknown")
959
1078
        else:
960
1079
            raise errors.UnexpectedSmartServerResponse(response)
961
1080
 
1000
1119
    def _make_parents_provider(self):
1001
1120
        return self
1002
1121
 
 
1122
    def _serialise_search_recipe(self, recipe):
 
1123
        """Serialise a graph search recipe.
 
1124
 
 
1125
        :param recipe: A search recipe (start, stop, count).
 
1126
        :return: Serialised bytes.
 
1127
        """
 
1128
        start_keys = ' '.join(recipe[0])
 
1129
        stop_keys = ' '.join(recipe[1])
 
1130
        count = str(recipe[2])
 
1131
        return '\n'.join((start_keys, stop_keys, count))
 
1132
 
1003
1133
 
1004
1134
class RemoteBranchLockableFiles(LockableFiles):
1005
1135
    """A 'LockableFiles' implementation that talks to a smart server.