~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: John Arbash Meinel
  • Author(s): Mark Hammond
  • Date: 2008-09-09 17:02:21 UTC
  • mto: This revision was merged to the branch mainline in revision 3697.
  • Revision ID: john@arbash-meinel.com-20080909170221-svim3jw2mrz0amp3
An updated transparent icon for bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
18
18
client and server.
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
 
from bzrlib.bencode import bdecode_as_tuple, bencode
 
32
from bzrlib.util.bencode import bdecode, bencode
33
33
 
34
34
 
35
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
109
109
        for start, length in offsets:
110
110
            txt.append('%d,%d' % (start, length))
111
111
        return '\n'.join(txt)
112
 
 
 
112
        
113
113
 
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
    """Server-side encoding and decoding logic for smart version 1."""
116
 
 
 
116
    
117
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
        self._backing_transport = backing_transport
119
119
        self._root_client_path = root_client_path
127
127
 
128
128
    def accept_bytes(self, bytes):
129
129
        """Take bytes, and advance the internal state machine appropriately.
130
 
 
 
130
        
131
131
        :param bytes: must be a byte string
132
132
        """
133
133
        if not isinstance(bytes, str):
169
169
 
170
170
        if self._has_dispatched:
171
171
            if self._finished:
172
 
                # nothing to do.XXX: this routine should be a single state
 
172
                # nothing to do.XXX: this routine should be a single state 
173
173
                # machine too.
174
174
                self.unused_data += self.in_buffer
175
175
                self.in_buffer = ''
211
211
 
212
212
    def _write_protocol_version(self):
213
213
        """Write any prefixes this protocol requires.
214
 
 
 
214
        
215
215
        Version one doesn't send protocol versions.
216
216
        """
217
217
 
234
234
 
235
235
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
236
236
    r"""Version two of the server side of the smart protocol.
237
 
 
 
237
   
238
238
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
239
239
    """
240
240
 
250
250
 
251
251
    def _write_protocol_version(self):
252
252
        r"""Write any prefixes this protocol requires.
253
 
 
 
253
        
254
254
        Version two sends the value of RESPONSE_VERSION_TWO.
255
255
        """
256
256
        self._write_func(self.response_marker)
412
412
        self.chunks = collections.deque()
413
413
        self.error = False
414
414
        self.error_in_progress = None
415
 
 
 
415
    
416
416
    def next_read_size(self):
417
417
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
        # end-of-body marker is 4 bytes: 'END\n'.
456
456
 
457
457
    def _finished(self):
458
458
        self.unused_data = self._get_in_buffer()
 
459
        # self._in_buffer = None
459
460
        self._in_buffer_list = []
460
461
        self._in_buffer_len = 0
461
462
        self.state_accept = self._state_accept_reading_unused
506
507
                self.chunks.append(self.chunk_in_progress)
507
508
            self.chunk_in_progress = None
508
509
            self.state_accept = self._state_accept_expecting_length
509
 
 
 
510
        
510
511
    def _state_accept_reading_unused(self):
511
512
        self.unused_data += self._get_in_buffer()
512
513
        self._in_buffer_list = []
514
515
 
515
516
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
517
    """Decodes the length-prefixed bulk data."""
517
 
 
 
518
    
518
519
    def __init__(self):
519
520
        _StatefulDecoder.__init__(self)
520
521
        self.state_accept = self._state_accept_expecting_length
521
522
        self.state_read = self._state_read_no_data
522
523
        self._body = ''
523
524
        self._trailer_buffer = ''
524
 
 
 
525
    
525
526
    def next_read_size(self):
526
527
        if self.bytes_left is not None:
527
528
            # Ideally we want to read all the remainder of the body and the
537
538
        else:
538
539
            # Reading excess data.  Either way, 1 byte at a time is fine.
539
540
            return 1
540
 
 
 
541
        
541
542
    def read_pending_data(self):
542
543
        """Return any pending data that has been decoded."""
543
544
        return self.state_read()
564
565
                self._body = self._body[:self.bytes_left]
565
566
            self.bytes_left = None
566
567
            self.state_accept = self._state_accept_reading_trailer
567
 
 
 
568
        
568
569
    def _state_accept_reading_trailer(self):
569
570
        self._trailer_buffer += self._get_in_buffer()
570
571
        self._set_in_buffer(None)
574
575
            self.unused_data = self._trailer_buffer[len('done\n'):]
575
576
            self.state_accept = self._state_accept_reading_unused
576
577
            self.finished_reading = True
577
 
 
 
578
    
578
579
    def _state_accept_reading_unused(self):
579
580
        self.unused_data += self._get_in_buffer()
580
581
        self._set_in_buffer(None)
656
657
            mutter('              %d bytes in readv request', len(readv_bytes))
657
658
        self._last_verb = args[0]
658
659
 
659
 
    def call_with_body_stream(self, args, stream):
660
 
        # Protocols v1 and v2 don't support body streams.  So it's safe to
661
 
        # assume that a v1/v2 server doesn't support whatever method we're
662
 
        # trying to call with a body stream.
663
 
        self._request.finished_writing()
664
 
        self._request.finished_reading()
665
 
        raise errors.UnknownSmartMethod(args[0])
666
 
 
667
660
    def cancel_read_body(self):
668
661
        """After expecting a body, a response code may indicate one otherwise.
669
662
 
729
722
    def _response_is_unknown_method(self, result_tuple):
730
723
        """Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
731
724
        method' response to the request.
732
 
 
 
725
        
733
726
        :param response: The response from a smart client call_expecting_body
734
727
            call.
735
728
        :param verb: The verb used in that call.
742
735
            # The response will have no body, so we've finished reading.
743
736
            self._request.finished_reading()
744
737
            raise errors.UnknownSmartMethod(self._last_verb)
745
 
 
 
738
        
746
739
    def read_body_bytes(self, count=-1):
747
740
        """Read bytes from the body, decoding into a byte stream.
748
 
 
749
 
        We read all bytes at once to ensure we've checked the trailer for
 
741
        
 
742
        We read all bytes at once to ensure we've checked the trailer for 
750
743
        errors, and then feed the buffer back as read_body_bytes is called.
751
744
        """
752
745
        if self._body_buffer is not None:
790
783
 
791
784
    def _write_protocol_version(self):
792
785
        """Write any prefixes this protocol requires.
793
 
 
 
786
        
794
787
        Version one doesn't send protocol versions.
795
788
        """
796
789
 
797
790
 
798
791
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
792
    """Version two of the client side of the smart protocol.
800
 
 
 
793
    
801
794
    This prefixes the request with the value of REQUEST_VERSION_TWO.
802
795
    """
803
796
 
831
824
 
832
825
    def _write_protocol_version(self):
833
826
        """Write any prefixes this protocol requires.
834
 
 
 
827
        
835
828
        Version two sends the value of REQUEST_VERSION_TWO.
836
829
        """
837
830
        self._request.accept_bytes(self.request_marker)
897
890
            # We do *not* set self.decoding_failed here.  The message handler
898
891
            # has raised an error, but the decoder is still able to parse bytes
899
892
            # and determine when this message ends.
900
 
            if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
901
 
                log_exception_quietly()
 
893
            log_exception_quietly()
902
894
            self.message_handler.protocol_error(exception.exc_value)
903
895
            # The state machine is ready to continue decoding, but the
904
896
            # exception has interrupted the loop that runs the state machine.
940
932
    def _extract_prefixed_bencoded_data(self):
941
933
        prefixed_bytes = self._extract_length_prefixed_bytes()
942
934
        try:
943
 
            decoded = bdecode_as_tuple(prefixed_bytes)
 
935
            decoded = bdecode(prefixed_bytes)
944
936
        except ValueError:
945
937
            raise errors.SmartProtocolError(
946
938
                'Bytes %r not bencoded' % (prefixed_bytes,))
986
978
            self.message_handler.headers_received(decoded)
987
979
        except:
988
980
            raise errors.SmartMessageHandlerError(sys.exc_info())
989
 
 
 
981
    
990
982
    def _state_accept_expecting_message_part(self):
991
983
        message_part_kind = self._extract_single_byte()
992
984
        if message_part_kind == 'o':
1037
1029
            raise errors.SmartMessageHandlerError(sys.exc_info())
1038
1030
 
1039
1031
    def _state_accept_reading_unused(self):
1040
 
        self.unused_data += self._get_in_buffer()
 
1032
        self.unused_data = self._get_in_buffer()
1041
1033
        self._set_in_buffer(None)
1042
1034
 
1043
1035
    def next_read_size(self):
1061
1053
    response_marker = request_marker = MESSAGE_VERSION_THREE
1062
1054
 
1063
1055
    def __init__(self, write_func):
1064
 
        self._buf = []
 
1056
        self._buf = ''
1065
1057
        self._real_write_func = write_func
1066
1058
 
1067
1059
    def _write_func(self, bytes):
1068
 
        self._buf.append(bytes)
1069
 
        if len(self._buf) > 100:
1070
 
            self.flush()
 
1060
        self._buf += bytes
1071
1061
 
1072
1062
    def flush(self):
1073
1063
        if self._buf:
1074
 
            self._real_write_func(''.join(self._buf))
1075
 
            del self._buf[:]
 
1064
            self._real_write_func(self._buf)
 
1065
            self._buf = ''
1076
1066
 
1077
1067
    def _serialise_offsets(self, offsets):
1078
1068
        """Serialise a readv offset list."""
1080
1070
        for start, length in offsets:
1081
1071
            txt.append('%d,%d' % (start, length))
1082
1072
        return '\n'.join(txt)
1083
 
 
 
1073
        
1084
1074
    def _write_protocol_version(self):
1085
1075
        self._write_func(MESSAGE_VERSION_THREE)
1086
1076
 
1111
1101
        self._write_func(struct.pack('!L', len(bytes)))
1112
1102
        self._write_func(bytes)
1113
1103
 
1114
 
    def _write_chunked_body_start(self):
1115
 
        self._write_func('oC')
1116
 
 
1117
1104
    def _write_error_status(self):
1118
1105
        self._write_func('oE')
1119
1106
 
1161
1148
        if response.body is not None:
1162
1149
            self._write_prefixed_body(response.body)
1163
1150
        elif response.body_stream is not None:
1164
 
            for exc_info, chunk in _iter_with_errors(response.body_stream):
1165
 
                if exc_info is not None:
1166
 
                    self._write_error_status()
1167
 
                    error_struct = request._translate_error(exc_info[1])
1168
 
                    self._write_structure(error_struct)
1169
 
                    break
1170
 
                else:
1171
 
                    if isinstance(chunk, request.FailedSmartServerResponse):
1172
 
                        self._write_error_status()
1173
 
                        self._write_structure(chunk.args)
1174
 
                        break
1175
 
                    self._write_prefixed_body(chunk)
 
1151
            for chunk in response.body_stream:
 
1152
                self._write_prefixed_body(chunk)
 
1153
                self.flush()
1176
1154
        self._write_end()
1177
 
 
1178
 
 
1179
 
def _iter_with_errors(iterable):
1180
 
    """Handle errors from iterable.next().
1181
 
 
1182
 
    Use like::
1183
 
 
1184
 
        for exc_info, value in _iter_with_errors(iterable):
1185
 
            ...
1186
 
 
1187
 
    This is a safer alternative to::
1188
 
 
1189
 
        try:
1190
 
            for value in iterable:
1191
 
               ...
1192
 
        except:
1193
 
            ...
1194
 
 
1195
 
    Because the latter will catch errors from the for-loop body, not just
1196
 
    iterable.next()
1197
 
 
1198
 
    If an error occurs, exc_info will be a exc_info tuple, and the generator
1199
 
    will terminate.  Otherwise exc_info will be None, and value will be the
1200
 
    value from iterable.next().  Note that KeyboardInterrupt and SystemExit
1201
 
    will not be itercepted.
1202
 
    """
1203
 
    iterator = iter(iterable)
1204
 
    while True:
1205
 
        try:
1206
 
            yield None, iterator.next()
1207
 
        except StopIteration:
1208
 
            return
1209
 
        except (KeyboardInterrupt, SystemExit):
1210
 
            raise
1211
 
        except Exception:
1212
 
            yield sys.exc_info(), None
1213
 
            return
1214
 
 
 
1155
        
1215
1156
 
1216
1157
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1217
1158
 
1222
1163
 
1223
1164
    def set_headers(self, headers):
1224
1165
        self._headers = headers.copy()
1225
 
 
 
1166
        
1226
1167
    def call(self, *args):
1227
1168
        if 'hpss' in debug.debug_flags:
1228
1169
            mutter('hpss call:   %s', repr(args)[1:-1])
1277
1218
        self._write_end()
1278
1219
        self._medium_request.finished_writing()
1279
1220
 
1280
 
    def call_with_body_stream(self, args, stream):
1281
 
        if 'hpss' in debug.debug_flags:
1282
 
            mutter('hpss call w/body stream: %r', args)
1283
 
            path = getattr(self._medium_request._medium, '_path', None)
1284
 
            if path is not None:
1285
 
                mutter('                  (to %s)', path)
1286
 
            self._request_start_time = time.time()
1287
 
        self._write_protocol_version()
1288
 
        self._write_headers(self._headers)
1289
 
        self._write_structure(args)
1290
 
        # TODO: notice if the server has sent an early error reply before we
1291
 
        #       have finished sending the stream.  We would notice at the end
1292
 
        #       anyway, but if the medium can deliver it early then it's good
1293
 
        #       to short-circuit the whole request...
1294
 
        for exc_info, part in _iter_with_errors(stream):
1295
 
            if exc_info is not None:
1296
 
                # Iterating the stream failed.  Cleanly abort the request.
1297
 
                self._write_error_status()
1298
 
                # Currently the client unconditionally sends ('error',) as the
1299
 
                # error args.
1300
 
                self._write_structure(('error',))
1301
 
                self._write_end()
1302
 
                self._medium_request.finished_writing()
1303
 
                raise exc_info[0], exc_info[1], exc_info[2]
1304
 
            else:
1305
 
                self._write_prefixed_body(part)
1306
 
                self.flush()
1307
 
        self._write_end()
1308
 
        self._medium_request.finished_writing()
1309