~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Mark Hammond
  • Date: 2009-01-12 01:55:34 UTC
  • mto: (3995.8.2 prepare-1.12)
  • mto: This revision was merged to the branch mainline in revision 4007.
  • Revision ID: mhammond@skippinet.com.au-20090112015534-yfxg50p7mpds9j4v
Include all .html files from the tortoise doc directory.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
18
18
client and server.
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
 
from bzrlib.bencode import bdecode_as_tuple, bencode
 
32
from bzrlib.util.bencode import bdecode, bencode
33
33
 
34
34
 
35
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
109
109
        for start, length in offsets:
110
110
            txt.append('%d,%d' % (start, length))
111
111
        return '\n'.join(txt)
112
 
 
 
112
        
113
113
 
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
    """Server-side encoding and decoding logic for smart version 1."""
116
 
 
 
116
    
117
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
        self._backing_transport = backing_transport
119
119
        self._root_client_path = root_client_path
127
127
 
128
128
    def accept_bytes(self, bytes):
129
129
        """Take bytes, and advance the internal state machine appropriately.
130
 
 
 
130
        
131
131
        :param bytes: must be a byte string
132
132
        """
133
133
        if not isinstance(bytes, str):
169
169
 
170
170
        if self._has_dispatched:
171
171
            if self._finished:
172
 
                # nothing to do.XXX: this routine should be a single state
 
172
                # nothing to do.XXX: this routine should be a single state 
173
173
                # machine too.
174
174
                self.unused_data += self.in_buffer
175
175
                self.in_buffer = ''
211
211
 
212
212
    def _write_protocol_version(self):
213
213
        """Write any prefixes this protocol requires.
214
 
 
 
214
        
215
215
        Version one doesn't send protocol versions.
216
216
        """
217
217
 
234
234
 
235
235
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
236
236
    r"""Version two of the server side of the smart protocol.
237
 
 
 
237
   
238
238
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
239
239
    """
240
240
 
250
250
 
251
251
    def _write_protocol_version(self):
252
252
        r"""Write any prefixes this protocol requires.
253
 
 
 
253
        
254
254
        Version two sends the value of RESPONSE_VERSION_TWO.
255
255
        """
256
256
        self._write_func(self.response_marker)
412
412
        self.chunks = collections.deque()
413
413
        self.error = False
414
414
        self.error_in_progress = None
415
 
 
 
415
    
416
416
    def next_read_size(self):
417
417
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
        # end-of-body marker is 4 bytes: 'END\n'.
506
506
                self.chunks.append(self.chunk_in_progress)
507
507
            self.chunk_in_progress = None
508
508
            self.state_accept = self._state_accept_expecting_length
509
 
 
 
509
        
510
510
    def _state_accept_reading_unused(self):
511
511
        self.unused_data += self._get_in_buffer()
512
512
        self._in_buffer_list = []
514
514
 
515
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
516
    """Decodes the length-prefixed bulk data."""
517
 
 
 
517
    
518
518
    def __init__(self):
519
519
        _StatefulDecoder.__init__(self)
520
520
        self.state_accept = self._state_accept_expecting_length
521
521
        self.state_read = self._state_read_no_data
522
522
        self._body = ''
523
523
        self._trailer_buffer = ''
524
 
 
 
524
    
525
525
    def next_read_size(self):
526
526
        if self.bytes_left is not None:
527
527
            # Ideally we want to read all the remainder of the body and the
537
537
        else:
538
538
            # Reading excess data.  Either way, 1 byte at a time is fine.
539
539
            return 1
540
 
 
 
540
        
541
541
    def read_pending_data(self):
542
542
        """Return any pending data that has been decoded."""
543
543
        return self.state_read()
564
564
                self._body = self._body[:self.bytes_left]
565
565
            self.bytes_left = None
566
566
            self.state_accept = self._state_accept_reading_trailer
567
 
 
 
567
        
568
568
    def _state_accept_reading_trailer(self):
569
569
        self._trailer_buffer += self._get_in_buffer()
570
570
        self._set_in_buffer(None)
574
574
            self.unused_data = self._trailer_buffer[len('done\n'):]
575
575
            self.state_accept = self._state_accept_reading_unused
576
576
            self.finished_reading = True
577
 
 
 
577
    
578
578
    def _state_accept_reading_unused(self):
579
579
        self.unused_data += self._get_in_buffer()
580
580
        self._set_in_buffer(None)
656
656
            mutter('              %d bytes in readv request', len(readv_bytes))
657
657
        self._last_verb = args[0]
658
658
 
659
 
    def call_with_body_stream(self, args, stream):
660
 
        # Protocols v1 and v2 don't support body streams.  So it's safe to
661
 
        # assume that a v1/v2 server doesn't support whatever method we're
662
 
        # trying to call with a body stream.
663
 
        self._request.finished_writing()
664
 
        self._request.finished_reading()
665
 
        raise errors.UnknownSmartMethod(args[0])
666
 
 
667
659
    def cancel_read_body(self):
668
660
        """After expecting a body, a response code may indicate one otherwise.
669
661
 
729
721
    def _response_is_unknown_method(self, result_tuple):
730
722
        """Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
731
723
        method' response to the request.
732
 
 
 
724
        
733
725
        :param response: The response from a smart client call_expecting_body
734
726
            call.
735
727
        :param verb: The verb used in that call.
742
734
            # The response will have no body, so we've finished reading.
743
735
            self._request.finished_reading()
744
736
            raise errors.UnknownSmartMethod(self._last_verb)
745
 
 
 
737
        
746
738
    def read_body_bytes(self, count=-1):
747
739
        """Read bytes from the body, decoding into a byte stream.
748
 
 
749
 
        We read all bytes at once to ensure we've checked the trailer for
 
740
        
 
741
        We read all bytes at once to ensure we've checked the trailer for 
750
742
        errors, and then feed the buffer back as read_body_bytes is called.
751
743
        """
752
744
        if self._body_buffer is not None:
790
782
 
791
783
    def _write_protocol_version(self):
792
784
        """Write any prefixes this protocol requires.
793
 
 
 
785
        
794
786
        Version one doesn't send protocol versions.
795
787
        """
796
788
 
797
789
 
798
790
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
791
    """Version two of the client side of the smart protocol.
800
 
 
 
792
    
801
793
    This prefixes the request with the value of REQUEST_VERSION_TWO.
802
794
    """
803
795
 
831
823
 
832
824
    def _write_protocol_version(self):
833
825
        """Write any prefixes this protocol requires.
834
 
 
 
826
        
835
827
        Version two sends the value of REQUEST_VERSION_TWO.
836
828
        """
837
829
        self._request.accept_bytes(self.request_marker)
939
931
    def _extract_prefixed_bencoded_data(self):
940
932
        prefixed_bytes = self._extract_length_prefixed_bytes()
941
933
        try:
942
 
            decoded = bdecode_as_tuple(prefixed_bytes)
 
934
            decoded = bdecode(prefixed_bytes)
943
935
        except ValueError:
944
936
            raise errors.SmartProtocolError(
945
937
                'Bytes %r not bencoded' % (prefixed_bytes,))
985
977
            self.message_handler.headers_received(decoded)
986
978
        except:
987
979
            raise errors.SmartMessageHandlerError(sys.exc_info())
988
 
 
 
980
    
989
981
    def _state_accept_expecting_message_part(self):
990
982
        message_part_kind = self._extract_single_byte()
991
983
        if message_part_kind == 'o':
1036
1028
            raise errors.SmartMessageHandlerError(sys.exc_info())
1037
1029
 
1038
1030
    def _state_accept_reading_unused(self):
1039
 
        self.unused_data += self._get_in_buffer()
 
1031
        self.unused_data = self._get_in_buffer()
1040
1032
        self._set_in_buffer(None)
1041
1033
 
1042
1034
    def next_read_size(self):
1060
1052
    response_marker = request_marker = MESSAGE_VERSION_THREE
1061
1053
 
1062
1054
    def __init__(self, write_func):
1063
 
        self._buf = []
 
1055
        self._buf = ''
1064
1056
        self._real_write_func = write_func
1065
1057
 
1066
1058
    def _write_func(self, bytes):
1067
 
        self._buf.append(bytes)
1068
 
        if len(self._buf) > 100:
1069
 
            self.flush()
 
1059
        self._buf += bytes
1070
1060
 
1071
1061
    def flush(self):
1072
1062
        if self._buf:
1073
 
            self._real_write_func(''.join(self._buf))
1074
 
            del self._buf[:]
 
1063
            self._real_write_func(self._buf)
 
1064
            self._buf = ''
1075
1065
 
1076
1066
    def _serialise_offsets(self, offsets):
1077
1067
        """Serialise a readv offset list."""
1079
1069
        for start, length in offsets:
1080
1070
            txt.append('%d,%d' % (start, length))
1081
1071
        return '\n'.join(txt)
1082
 
 
 
1072
        
1083
1073
    def _write_protocol_version(self):
1084
1074
        self._write_func(MESSAGE_VERSION_THREE)
1085
1075
 
1110
1100
        self._write_func(struct.pack('!L', len(bytes)))
1111
1101
        self._write_func(bytes)
1112
1102
 
1113
 
    def _write_chunked_body_start(self):
1114
 
        self._write_func('oC')
1115
 
 
1116
1103
    def _write_error_status(self):
1117
1104
        self._write_func('oE')
1118
1105
 
1160
1147
        if response.body is not None:
1161
1148
            self._write_prefixed_body(response.body)
1162
1149
        elif response.body_stream is not None:
1163
 
            for exc_info, chunk in _iter_with_errors(response.body_stream):
1164
 
                if exc_info is not None:
1165
 
                    self._write_error_status()
1166
 
                    error_struct = request._translate_error(exc_info[1])
1167
 
                    self._write_structure(error_struct)
1168
 
                    break
1169
 
                else:
1170
 
                    if isinstance(chunk, request.FailedSmartServerResponse):
1171
 
                        self._write_error_status()
1172
 
                        self._write_structure(chunk.args)
1173
 
                        break
1174
 
                    self._write_prefixed_body(chunk)
 
1150
            for chunk in response.body_stream:
 
1151
                self._write_prefixed_body(chunk)
 
1152
                self.flush()
1175
1153
        self._write_end()
1176
 
 
1177
 
 
1178
 
def _iter_with_errors(iterable):
1179
 
    """Handle errors from iterable.next().
1180
 
 
1181
 
    Use like::
1182
 
 
1183
 
        for exc_info, value in _iter_with_errors(iterable):
1184
 
            ...
1185
 
 
1186
 
    This is a safer alternative to::
1187
 
 
1188
 
        try:
1189
 
            for value in iterable:
1190
 
               ...
1191
 
        except:
1192
 
            ...
1193
 
 
1194
 
    Because the latter will catch errors from the for-loop body, not just
1195
 
    iterable.next()
1196
 
 
1197
 
    If an error occurs, exc_info will be a exc_info tuple, and the generator
1198
 
    will terminate.  Otherwise exc_info will be None, and value will be the
1199
 
    value from iterable.next().  Note that KeyboardInterrupt and SystemExit
1200
 
    will not be itercepted.
1201
 
    """
1202
 
    iterator = iter(iterable)
1203
 
    while True:
1204
 
        try:
1205
 
            yield None, iterator.next()
1206
 
        except StopIteration:
1207
 
            return
1208
 
        except (KeyboardInterrupt, SystemExit):
1209
 
            raise
1210
 
        except Exception:
1211
 
            mutter('_iter_with_errors caught error')
1212
 
            log_exception_quietly()
1213
 
            yield sys.exc_info(), None
1214
 
            return
1215
 
 
 
1154
        
1216
1155
 
1217
1156
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1218
1157
 
1223
1162
 
1224
1163
    def set_headers(self, headers):
1225
1164
        self._headers = headers.copy()
1226
 
 
 
1165
        
1227
1166
    def call(self, *args):
1228
1167
        if 'hpss' in debug.debug_flags:
1229
1168
            mutter('hpss call:   %s', repr(args)[1:-1])
1278
1217
        self._write_end()
1279
1218
        self._medium_request.finished_writing()
1280
1219
 
1281
 
    def call_with_body_stream(self, args, stream):
1282
 
        if 'hpss' in debug.debug_flags:
1283
 
            mutter('hpss call w/body stream: %r', args)
1284
 
            path = getattr(self._medium_request._medium, '_path', None)
1285
 
            if path is not None:
1286
 
                mutter('                  (to %s)', path)
1287
 
            self._request_start_time = time.time()
1288
 
        self._write_protocol_version()
1289
 
        self._write_headers(self._headers)
1290
 
        self._write_structure(args)
1291
 
        # TODO: notice if the server has sent an early error reply before we
1292
 
        #       have finished sending the stream.  We would notice at the end
1293
 
        #       anyway, but if the medium can deliver it early then it's good
1294
 
        #       to short-circuit the whole request...
1295
 
        for exc_info, part in _iter_with_errors(stream):
1296
 
            if exc_info is not None:
1297
 
                # Iterating the stream failed.  Cleanly abort the request.
1298
 
                self._write_error_status()
1299
 
                # Currently the client unconditionally sends ('error',) as the
1300
 
                # error args.
1301
 
                self._write_structure(('error',))
1302
 
                self._write_end()
1303
 
                self._medium_request.finished_writing()
1304
 
                raise exc_info[0], exc_info[1], exc_info[2]
1305
 
            else:
1306
 
                self._write_prefixed_body(part)
1307
 
                self.flush()
1308
 
        self._write_end()
1309
 
        self._medium_request.finished_writing()
1310