~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Matt Nordhoff
  • Date: 2009-04-04 02:50:01 UTC
  • mfrom: (4253 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4256.
  • Revision ID: mnordhoff@mattnordhoff.com-20090404025001-z1403k0tatmc8l91
Merge bzr.dev, fixing conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
18
18
client and server.
109
109
        for start, length in offsets:
110
110
            txt.append('%d,%d' % (start, length))
111
111
        return '\n'.join(txt)
112
 
        
 
112
 
113
113
 
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
    """Server-side encoding and decoding logic for smart version 1."""
116
 
    
 
116
 
117
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
        self._backing_transport = backing_transport
119
119
        self._root_client_path = root_client_path
127
127
 
128
128
    def accept_bytes(self, bytes):
129
129
        """Take bytes, and advance the internal state machine appropriately.
130
 
        
 
130
 
131
131
        :param bytes: must be a byte string
132
132
        """
133
133
        if not isinstance(bytes, str):
169
169
 
170
170
        if self._has_dispatched:
171
171
            if self._finished:
172
 
                # nothing to do.XXX: this routine should be a single state 
 
172
                # nothing to do.XXX: this routine should be a single state
173
173
                # machine too.
174
174
                self.unused_data += self.in_buffer
175
175
                self.in_buffer = ''
211
211
 
212
212
    def _write_protocol_version(self):
213
213
        """Write any prefixes this protocol requires.
214
 
        
 
214
 
215
215
        Version one doesn't send protocol versions.
216
216
        """
217
217
 
234
234
 
235
235
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
236
236
    r"""Version two of the server side of the smart protocol.
237
 
   
 
237
 
238
238
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
239
239
    """
240
240
 
250
250
 
251
251
    def _write_protocol_version(self):
252
252
        r"""Write any prefixes this protocol requires.
253
 
        
 
253
 
254
254
        Version two sends the value of RESPONSE_VERSION_TWO.
255
255
        """
256
256
        self._write_func(self.response_marker)
412
412
        self.chunks = collections.deque()
413
413
        self.error = False
414
414
        self.error_in_progress = None
415
 
    
 
415
 
416
416
    def next_read_size(self):
417
417
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
        # end-of-body marker is 4 bytes: 'END\n'.
506
506
                self.chunks.append(self.chunk_in_progress)
507
507
            self.chunk_in_progress = None
508
508
            self.state_accept = self._state_accept_expecting_length
509
 
        
 
509
 
510
510
    def _state_accept_reading_unused(self):
511
511
        self.unused_data += self._get_in_buffer()
512
512
        self._in_buffer_list = []
514
514
 
515
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
516
    """Decodes the length-prefixed bulk data."""
517
 
    
 
517
 
518
518
    def __init__(self):
519
519
        _StatefulDecoder.__init__(self)
520
520
        self.state_accept = self._state_accept_expecting_length
521
521
        self.state_read = self._state_read_no_data
522
522
        self._body = ''
523
523
        self._trailer_buffer = ''
524
 
    
 
524
 
525
525
    def next_read_size(self):
526
526
        if self.bytes_left is not None:
527
527
            # Ideally we want to read all the remainder of the body and the
537
537
        else:
538
538
            # Reading excess data.  Either way, 1 byte at a time is fine.
539
539
            return 1
540
 
        
 
540
 
541
541
    def read_pending_data(self):
542
542
        """Return any pending data that has been decoded."""
543
543
        return self.state_read()
564
564
                self._body = self._body[:self.bytes_left]
565
565
            self.bytes_left = None
566
566
            self.state_accept = self._state_accept_reading_trailer
567
 
        
 
567
 
568
568
    def _state_accept_reading_trailer(self):
569
569
        self._trailer_buffer += self._get_in_buffer()
570
570
        self._set_in_buffer(None)
574
574
            self.unused_data = self._trailer_buffer[len('done\n'):]
575
575
            self.state_accept = self._state_accept_reading_unused
576
576
            self.finished_reading = True
577
 
    
 
577
 
578
578
    def _state_accept_reading_unused(self):
579
579
        self.unused_data += self._get_in_buffer()
580
580
        self._set_in_buffer(None)
655
655
        if 'hpss' in debug.debug_flags:
656
656
            mutter('              %d bytes in readv request', len(readv_bytes))
657
657
        self._last_verb = args[0]
658
 
    
 
658
 
659
659
    def call_with_body_stream(self, args, stream):
660
660
        # Protocols v1 and v2 don't support body streams.  So it's safe to
661
661
        # assume that a v1/v2 server doesn't support whatever method we're
729
729
    def _response_is_unknown_method(self, result_tuple):
730
730
        """Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
731
731
        method' response to the request.
732
 
        
 
732
 
733
733
        :param response: The response from a smart client call_expecting_body
734
734
            call.
735
735
        :param verb: The verb used in that call.
742
742
            # The response will have no body, so we've finished reading.
743
743
            self._request.finished_reading()
744
744
            raise errors.UnknownSmartMethod(self._last_verb)
745
 
        
 
745
 
746
746
    def read_body_bytes(self, count=-1):
747
747
        """Read bytes from the body, decoding into a byte stream.
748
 
        
749
 
        We read all bytes at once to ensure we've checked the trailer for 
 
748
 
 
749
        We read all bytes at once to ensure we've checked the trailer for
750
750
        errors, and then feed the buffer back as read_body_bytes is called.
751
751
        """
752
752
        if self._body_buffer is not None:
790
790
 
791
791
    def _write_protocol_version(self):
792
792
        """Write any prefixes this protocol requires.
793
 
        
 
793
 
794
794
        Version one doesn't send protocol versions.
795
795
        """
796
796
 
797
797
 
798
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
799
    """Version two of the client side of the smart protocol.
800
 
    
 
800
 
801
801
    This prefixes the request with the value of REQUEST_VERSION_TWO.
802
802
    """
803
803
 
831
831
 
832
832
    def _write_protocol_version(self):
833
833
        """Write any prefixes this protocol requires.
834
 
        
 
834
 
835
835
        Version two sends the value of REQUEST_VERSION_TWO.
836
836
        """
837
837
        self._request.accept_bytes(self.request_marker)
985
985
            self.message_handler.headers_received(decoded)
986
986
        except:
987
987
            raise errors.SmartMessageHandlerError(sys.exc_info())
988
 
    
 
988
 
989
989
    def _state_accept_expecting_message_part(self):
990
990
        message_part_kind = self._extract_single_byte()
991
991
        if message_part_kind == 'o':
1060
1060
    response_marker = request_marker = MESSAGE_VERSION_THREE
1061
1061
 
1062
1062
    def __init__(self, write_func):
1063
 
        self._buf = ''
 
1063
        self._buf = []
1064
1064
        self._real_write_func = write_func
1065
1065
 
1066
1066
    def _write_func(self, bytes):
1067
 
        self._buf += bytes
 
1067
        self._buf.append(bytes)
 
1068
        if len(self._buf) > 100:
 
1069
            self.flush()
1068
1070
 
1069
1071
    def flush(self):
1070
1072
        if self._buf:
1071
 
            self._real_write_func(self._buf)
1072
 
            self._buf = ''
 
1073
            self._real_write_func(''.join(self._buf))
 
1074
            del self._buf[:]
1073
1075
 
1074
1076
    def _serialise_offsets(self, offsets):
1075
1077
        """Serialise a readv offset list."""
1077
1079
        for start, length in offsets:
1078
1080
            txt.append('%d,%d' % (start, length))
1079
1081
        return '\n'.join(txt)
1080
 
        
 
1082
 
1081
1083
    def _write_protocol_version(self):
1082
1084
        self._write_func(MESSAGE_VERSION_THREE)
1083
1085
 
1158
1160
        if response.body is not None:
1159
1161
            self._write_prefixed_body(response.body)
1160
1162
        elif response.body_stream is not None:
1161
 
            for chunk in response.body_stream:
1162
 
                self._write_prefixed_body(chunk)
1163
 
                self.flush()
 
1163
            for exc_info, chunk in _iter_with_errors(response.body_stream):
 
1164
                if exc_info is not None:
 
1165
                    self._write_error_status()
 
1166
                    error_struct = request._translate_error(exc_info[1])
 
1167
                    self._write_structure(error_struct)
 
1168
                    break
 
1169
                else:
 
1170
                    if isinstance(chunk, request.FailedSmartServerResponse):
 
1171
                        self._write_error_status()
 
1172
                        self._write_structure(chunk.args)
 
1173
                        break
 
1174
                    self._write_prefixed_body(chunk)
1164
1175
        self._write_end()
1165
 
        
 
1176
 
 
1177
 
 
1178
def _iter_with_errors(iterable):
 
1179
    """Handle errors from iterable.next().
 
1180
 
 
1181
    Use like::
 
1182
 
 
1183
        for exc_info, value in _iter_with_errors(iterable):
 
1184
            ...
 
1185
 
 
1186
    This is a safer alternative to::
 
1187
 
 
1188
        try:
 
1189
            for value in iterable:
 
1190
               ...
 
1191
        except:
 
1192
            ...
 
1193
 
 
1194
    Because the latter will catch errors from the for-loop body, not just
 
1195
    iterable.next()
 
1196
 
 
1197
    If an error occurs, exc_info will be a exc_info tuple, and the generator
 
1198
    will terminate.  Otherwise exc_info will be None, and value will be the
 
1199
    value from iterable.next().  Note that KeyboardInterrupt and SystemExit
 
1200
    will not be itercepted.
 
1201
    """
 
1202
    iterator = iter(iterable)
 
1203
    while True:
 
1204
        try:
 
1205
            yield None, iterator.next()
 
1206
        except StopIteration:
 
1207
            return
 
1208
        except (KeyboardInterrupt, SystemExit):
 
1209
            raise
 
1210
        except Exception:
 
1211
            yield sys.exc_info(), None
 
1212
            return
 
1213
 
1166
1214
 
1167
1215
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1168
1216
 
1173
1221
 
1174
1222
    def set_headers(self, headers):
1175
1223
        self._headers = headers.copy()
1176
 
        
 
1224
 
1177
1225
    def call(self, *args):
1178
1226
        if 'hpss' in debug.debug_flags:
1179
1227
            mutter('hpss call:   %s', repr(args)[1:-1])
1242
1290
        #       have finished sending the stream.  We would notice at the end
1243
1291
        #       anyway, but if the medium can deliver it early then it's good
1244
1292
        #       to short-circuit the whole request...
1245
 
        try:
1246
 
            for part in stream:
 
1293
        for exc_info, part in _iter_with_errors(stream):
 
1294
            if exc_info is not None:
 
1295
                # Iterating the stream failed.  Cleanly abort the request.
 
1296
                self._write_error_status()
 
1297
                # Currently the client unconditionally sends ('error',) as the
 
1298
                # error args.
 
1299
                self._write_structure(('error',))
 
1300
                self._write_end()
 
1301
                self._medium_request.finished_writing()
 
1302
                raise exc_info[0], exc_info[1], exc_info[2]
 
1303
            else:
1247
1304
                self._write_prefixed_body(part)
1248
1305
                self.flush()
1249
 
        except Exception:
1250
 
            exc_info = sys.exc_info()
1251
 
            # Iterating the stream failed.  Cleanly abort the request.
1252
 
            self._write_error_status()
1253
 
            # Currently the client unconditionally sends ('error',) as the
1254
 
            # error args.
1255
 
            self._write_structure(('error',))
1256
 
            self._write_end()
1257
 
            self._medium_request.finished_writing()
1258
 
            raise exc_info[0], exc_info[1], exc_info[2]
1259
1306
        self._write_end()
1260
1307
        self._medium_request.finished_writing()
1261
1308