~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-08-27 02:27:19 UTC
  • mfrom: (4634.3.19 gc-batching)
  • Revision ID: pqm@pqm.ubuntu.com-20090827022719-bl2yoqhpj3fcfczu
(andrew) Fix #402657: 2a fetch over dumb transport reads one group at
        a time.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
18
18
client and server.
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
 
from bzrlib.util.bencode import bdecode, bencode
 
32
from bzrlib.bencode import bdecode_as_tuple, bencode
33
33
 
34
34
 
35
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
109
109
        for start, length in offsets:
110
110
            txt.append('%d,%d' % (start, length))
111
111
        return '\n'.join(txt)
112
 
        
 
112
 
113
113
 
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
    """Server-side encoding and decoding logic for smart version 1."""
116
 
    
 
116
 
117
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
        self._backing_transport = backing_transport
119
119
        self._root_client_path = root_client_path
127
127
 
128
128
    def accept_bytes(self, bytes):
129
129
        """Take bytes, and advance the internal state machine appropriately.
130
 
        
 
130
 
131
131
        :param bytes: must be a byte string
132
132
        """
133
133
        if not isinstance(bytes, str):
169
169
 
170
170
        if self._has_dispatched:
171
171
            if self._finished:
172
 
                # nothing to do.XXX: this routine should be a single state 
 
172
                # nothing to do.XXX: this routine should be a single state
173
173
                # machine too.
174
174
                self.unused_data += self.in_buffer
175
175
                self.in_buffer = ''
211
211
 
212
212
    def _write_protocol_version(self):
213
213
        """Write any prefixes this protocol requires.
214
 
        
 
214
 
215
215
        Version one doesn't send protocol versions.
216
216
        """
217
217
 
234
234
 
235
235
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
236
236
    r"""Version two of the server side of the smart protocol.
237
 
   
 
237
 
238
238
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
239
239
    """
240
240
 
250
250
 
251
251
    def _write_protocol_version(self):
252
252
        r"""Write any prefixes this protocol requires.
253
 
        
 
253
 
254
254
        Version two sends the value of RESPONSE_VERSION_TWO.
255
255
        """
256
256
        self._write_func(self.response_marker)
412
412
        self.chunks = collections.deque()
413
413
        self.error = False
414
414
        self.error_in_progress = None
415
 
    
 
415
 
416
416
    def next_read_size(self):
417
417
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
        # end-of-body marker is 4 bytes: 'END\n'.
456
456
 
457
457
    def _finished(self):
458
458
        self.unused_data = self._get_in_buffer()
459
 
        # self._in_buffer = None
460
459
        self._in_buffer_list = []
461
460
        self._in_buffer_len = 0
462
461
        self.state_accept = self._state_accept_reading_unused
507
506
                self.chunks.append(self.chunk_in_progress)
508
507
            self.chunk_in_progress = None
509
508
            self.state_accept = self._state_accept_expecting_length
510
 
        
 
509
 
511
510
    def _state_accept_reading_unused(self):
512
511
        self.unused_data += self._get_in_buffer()
513
512
        self._in_buffer_list = []
515
514
 
516
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
517
516
    """Decodes the length-prefixed bulk data."""
518
 
    
 
517
 
519
518
    def __init__(self):
520
519
        _StatefulDecoder.__init__(self)
521
520
        self.state_accept = self._state_accept_expecting_length
522
521
        self.state_read = self._state_read_no_data
523
522
        self._body = ''
524
523
        self._trailer_buffer = ''
525
 
    
 
524
 
526
525
    def next_read_size(self):
527
526
        if self.bytes_left is not None:
528
527
            # Ideally we want to read all the remainder of the body and the
538
537
        else:
539
538
            # Reading excess data.  Either way, 1 byte at a time is fine.
540
539
            return 1
541
 
        
 
540
 
542
541
    def read_pending_data(self):
543
542
        """Return any pending data that has been decoded."""
544
543
        return self.state_read()
565
564
                self._body = self._body[:self.bytes_left]
566
565
            self.bytes_left = None
567
566
            self.state_accept = self._state_accept_reading_trailer
568
 
        
 
567
 
569
568
    def _state_accept_reading_trailer(self):
570
569
        self._trailer_buffer += self._get_in_buffer()
571
570
        self._set_in_buffer(None)
575
574
            self.unused_data = self._trailer_buffer[len('done\n'):]
576
575
            self.state_accept = self._state_accept_reading_unused
577
576
            self.finished_reading = True
578
 
    
 
577
 
579
578
    def _state_accept_reading_unused(self):
580
579
        self.unused_data += self._get_in_buffer()
581
580
        self._set_in_buffer(None)
657
656
            mutter('              %d bytes in readv request', len(readv_bytes))
658
657
        self._last_verb = args[0]
659
658
 
 
659
    def call_with_body_stream(self, args, stream):
 
660
        # Protocols v1 and v2 don't support body streams.  So it's safe to
 
661
        # assume that a v1/v2 server doesn't support whatever method we're
 
662
        # trying to call with a body stream.
 
663
        self._request.finished_writing()
 
664
        self._request.finished_reading()
 
665
        raise errors.UnknownSmartMethod(args[0])
 
666
 
660
667
    def cancel_read_body(self):
661
668
        """After expecting a body, a response code may indicate one otherwise.
662
669
 
722
729
    def _response_is_unknown_method(self, result_tuple):
723
730
        """Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
724
731
        method' response to the request.
725
 
        
 
732
 
726
733
        :param response: The response from a smart client call_expecting_body
727
734
            call.
728
735
        :param verb: The verb used in that call.
735
742
            # The response will have no body, so we've finished reading.
736
743
            self._request.finished_reading()
737
744
            raise errors.UnknownSmartMethod(self._last_verb)
738
 
        
 
745
 
739
746
    def read_body_bytes(self, count=-1):
740
747
        """Read bytes from the body, decoding into a byte stream.
741
 
        
742
 
        We read all bytes at once to ensure we've checked the trailer for 
 
748
 
 
749
        We read all bytes at once to ensure we've checked the trailer for
743
750
        errors, and then feed the buffer back as read_body_bytes is called.
744
751
        """
745
752
        if self._body_buffer is not None:
783
790
 
784
791
    def _write_protocol_version(self):
785
792
        """Write any prefixes this protocol requires.
786
 
        
 
793
 
787
794
        Version one doesn't send protocol versions.
788
795
        """
789
796
 
790
797
 
791
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
792
799
    """Version two of the client side of the smart protocol.
793
 
    
 
800
 
794
801
    This prefixes the request with the value of REQUEST_VERSION_TWO.
795
802
    """
796
803
 
824
831
 
825
832
    def _write_protocol_version(self):
826
833
        """Write any prefixes this protocol requires.
827
 
        
 
834
 
828
835
        Version two sends the value of REQUEST_VERSION_TWO.
829
836
        """
830
837
        self._request.accept_bytes(self.request_marker)
890
897
            # We do *not* set self.decoding_failed here.  The message handler
891
898
            # has raised an error, but the decoder is still able to parse bytes
892
899
            # and determine when this message ends.
893
 
            log_exception_quietly()
 
900
            if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
 
901
                log_exception_quietly()
894
902
            self.message_handler.protocol_error(exception.exc_value)
895
903
            # The state machine is ready to continue decoding, but the
896
904
            # exception has interrupted the loop that runs the state machine.
932
940
    def _extract_prefixed_bencoded_data(self):
933
941
        prefixed_bytes = self._extract_length_prefixed_bytes()
934
942
        try:
935
 
            decoded = bdecode(prefixed_bytes)
 
943
            decoded = bdecode_as_tuple(prefixed_bytes)
936
944
        except ValueError:
937
945
            raise errors.SmartProtocolError(
938
946
                'Bytes %r not bencoded' % (prefixed_bytes,))
978
986
            self.message_handler.headers_received(decoded)
979
987
        except:
980
988
            raise errors.SmartMessageHandlerError(sys.exc_info())
981
 
    
 
989
 
982
990
    def _state_accept_expecting_message_part(self):
983
991
        message_part_kind = self._extract_single_byte()
984
992
        if message_part_kind == 'o':
1029
1037
            raise errors.SmartMessageHandlerError(sys.exc_info())
1030
1038
 
1031
1039
    def _state_accept_reading_unused(self):
1032
 
        self.unused_data = self._get_in_buffer()
 
1040
        self.unused_data += self._get_in_buffer()
1033
1041
        self._set_in_buffer(None)
1034
1042
 
1035
1043
    def next_read_size(self):
1053
1061
    response_marker = request_marker = MESSAGE_VERSION_THREE
1054
1062
 
1055
1063
    def __init__(self, write_func):
1056
 
        self._buf = ''
 
1064
        self._buf = []
1057
1065
        self._real_write_func = write_func
1058
1066
 
1059
1067
    def _write_func(self, bytes):
1060
 
        self._buf += bytes
 
1068
        self._buf.append(bytes)
 
1069
        if len(self._buf) > 100:
 
1070
            self.flush()
1061
1071
 
1062
1072
    def flush(self):
1063
1073
        if self._buf:
1064
 
            self._real_write_func(self._buf)
1065
 
            self._buf = ''
 
1074
            self._real_write_func(''.join(self._buf))
 
1075
            del self._buf[:]
1066
1076
 
1067
1077
    def _serialise_offsets(self, offsets):
1068
1078
        """Serialise a readv offset list."""
1070
1080
        for start, length in offsets:
1071
1081
            txt.append('%d,%d' % (start, length))
1072
1082
        return '\n'.join(txt)
1073
 
        
 
1083
 
1074
1084
    def _write_protocol_version(self):
1075
1085
        self._write_func(MESSAGE_VERSION_THREE)
1076
1086
 
1101
1111
        self._write_func(struct.pack('!L', len(bytes)))
1102
1112
        self._write_func(bytes)
1103
1113
 
 
1114
    def _write_chunked_body_start(self):
 
1115
        self._write_func('oC')
 
1116
 
1104
1117
    def _write_error_status(self):
1105
1118
        self._write_func('oE')
1106
1119
 
1148
1161
        if response.body is not None:
1149
1162
            self._write_prefixed_body(response.body)
1150
1163
        elif response.body_stream is not None:
1151
 
            for chunk in response.body_stream:
1152
 
                self._write_prefixed_body(chunk)
1153
 
                self.flush()
 
1164
            for exc_info, chunk in _iter_with_errors(response.body_stream):
 
1165
                if exc_info is not None:
 
1166
                    self._write_error_status()
 
1167
                    error_struct = request._translate_error(exc_info[1])
 
1168
                    self._write_structure(error_struct)
 
1169
                    break
 
1170
                else:
 
1171
                    if isinstance(chunk, request.FailedSmartServerResponse):
 
1172
                        self._write_error_status()
 
1173
                        self._write_structure(chunk.args)
 
1174
                        break
 
1175
                    self._write_prefixed_body(chunk)
1154
1176
        self._write_end()
1155
 
        
 
1177
 
 
1178
 
 
1179
def _iter_with_errors(iterable):
 
1180
    """Handle errors from iterable.next().
 
1181
 
 
1182
    Use like::
 
1183
 
 
1184
        for exc_info, value in _iter_with_errors(iterable):
 
1185
            ...
 
1186
 
 
1187
    This is a safer alternative to::
 
1188
 
 
1189
        try:
 
1190
            for value in iterable:
 
1191
               ...
 
1192
        except:
 
1193
            ...
 
1194
 
 
1195
    Because the latter will catch errors from the for-loop body, not just
 
1196
    iterable.next()
 
1197
 
 
1198
    If an error occurs, exc_info will be a exc_info tuple, and the generator
 
1199
    will terminate.  Otherwise exc_info will be None, and value will be the
 
1200
    value from iterable.next().  Note that KeyboardInterrupt and SystemExit
 
1201
    will not be itercepted.
 
1202
    """
 
1203
    iterator = iter(iterable)
 
1204
    while True:
 
1205
        try:
 
1206
            yield None, iterator.next()
 
1207
        except StopIteration:
 
1208
            return
 
1209
        except (KeyboardInterrupt, SystemExit):
 
1210
            raise
 
1211
        except Exception:
 
1212
            mutter('_iter_with_errors caught error')
 
1213
            log_exception_quietly()
 
1214
            yield sys.exc_info(), None
 
1215
            return
 
1216
 
1156
1217
 
1157
1218
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1158
1219
 
1163
1224
 
1164
1225
    def set_headers(self, headers):
1165
1226
        self._headers = headers.copy()
1166
 
        
 
1227
 
1167
1228
    def call(self, *args):
1168
1229
        if 'hpss' in debug.debug_flags:
1169
1230
            mutter('hpss call:   %s', repr(args)[1:-1])
1218
1279
        self._write_end()
1219
1280
        self._medium_request.finished_writing()
1220
1281
 
 
1282
    def call_with_body_stream(self, args, stream):
 
1283
        if 'hpss' in debug.debug_flags:
 
1284
            mutter('hpss call w/body stream: %r', args)
 
1285
            path = getattr(self._medium_request._medium, '_path', None)
 
1286
            if path is not None:
 
1287
                mutter('                  (to %s)', path)
 
1288
            self._request_start_time = time.time()
 
1289
        self._write_protocol_version()
 
1290
        self._write_headers(self._headers)
 
1291
        self._write_structure(args)
 
1292
        # TODO: notice if the server has sent an early error reply before we
 
1293
        #       have finished sending the stream.  We would notice at the end
 
1294
        #       anyway, but if the medium can deliver it early then it's good
 
1295
        #       to short-circuit the whole request...
 
1296
        for exc_info, part in _iter_with_errors(stream):
 
1297
            if exc_info is not None:
 
1298
                # Iterating the stream failed.  Cleanly abort the request.
 
1299
                self._write_error_status()
 
1300
                # Currently the client unconditionally sends ('error',) as the
 
1301
                # error args.
 
1302
                self._write_structure(('error',))
 
1303
                self._write_end()
 
1304
                self._medium_request.finished_writing()
 
1305
                raise exc_info[0], exc_info[1], exc_info[2]
 
1306
            else:
 
1307
                self._write_prefixed_body(part)
 
1308
                self.flush()
 
1309
        self._write_end()
 
1310
        self._medium_request.finished_writing()
 
1311