~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Robert Collins
  • Date: 2008-09-02 05:28:37 UTC
  • mfrom: (3675 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3677.
  • Revision ID: robertc@robertcollins.net-20080902052837-ec3qlv41q5e7f6fl
Resolve conflicts with NEWS.

Show diffs side-by-side

added added

removed removed

Lines of Context:
323
323
 
324
324
    def __init__(self):
325
325
        self.finished_reading = False
326
 
        self._in_buffer = ''
 
326
        self._in_buffer_list = []
 
327
        self._in_buffer_len = 0
327
328
        self.unused_data = ''
328
329
        self.bytes_left = None
329
330
        self._number_needed_bytes = None
330
331
 
 
332
    def _get_in_buffer(self):
 
333
        if len(self._in_buffer_list) == 1:
 
334
            return self._in_buffer_list[0]
 
335
        in_buffer = ''.join(self._in_buffer_list)
 
336
        if len(in_buffer) != self._in_buffer_len:
 
337
            raise AssertionError(
 
338
                "Length of buffer did not match expected value: %s != %s"
 
339
                % self._in_buffer_len, len(in_buffer))
 
340
        self._in_buffer_list = [in_buffer]
 
341
        return in_buffer
 
342
 
 
343
    def _get_in_bytes(self, count):
 
344
        """Grab X bytes from the input_buffer.
 
345
 
 
346
        Callers should have already checked that self._in_buffer_len is >
 
347
        count. Note, this does not consume the bytes from the buffer. The
 
348
        caller will still need to call _get_in_buffer() and then
 
349
        _set_in_buffer() if they actually need to consume the bytes.
 
350
        """
 
351
        # check if we can yield the bytes from just the first entry in our list
 
352
        if len(self._in_buffer_list) == 0:
 
353
            raise AssertionError('Callers must be sure we have buffered bytes'
 
354
                ' before calling _get_in_bytes')
 
355
        if len(self._in_buffer_list[0]) > count:
 
356
            return self._in_buffer_list[0][:count]
 
357
        # We can't yield it from the first buffer, so collapse all buffers, and
 
358
        # yield it from that
 
359
        in_buf = self._get_in_buffer()
 
360
        return in_buf[:count]
 
361
 
 
362
    def _set_in_buffer(self, new_buf):
 
363
        if new_buf is not None:
 
364
            self._in_buffer_list = [new_buf]
 
365
            self._in_buffer_len = len(new_buf)
 
366
        else:
 
367
            self._in_buffer_list = []
 
368
            self._in_buffer_len = 0
 
369
 
331
370
    def accept_bytes(self, bytes):
332
371
        """Decode as much of bytes as possible.
333
372
 
338
377
        data will be appended to self.unused_data.
339
378
        """
340
379
        # accept_bytes is allowed to change the state
341
 
        current_state = self.state_accept
342
380
        self._number_needed_bytes = None
343
 
        self._in_buffer += bytes
 
381
        # lsprof puts a very large amount of time on this specific call for
 
382
        # large readv arrays
 
383
        self._in_buffer_list.append(bytes)
 
384
        self._in_buffer_len += len(bytes)
344
385
        try:
345
386
            # Run the function for the current state.
 
387
            current_state = self.state_accept
346
388
            self.state_accept()
347
389
            while current_state != self.state_accept:
348
390
                # The current state has changed.  Run the function for the new
379
421
            # the rest of this chunk plus an END chunk.
380
422
            return self.bytes_left + 4
381
423
        elif self.state_accept == self._state_accept_expecting_length:
382
 
            if self._in_buffer == '':
 
424
            if self._in_buffer_len == 0:
383
425
                # We're expecting a chunk length.  There's at least two bytes
384
426
                # left: a digit plus '\n'.
385
427
                return 2
390
432
        elif self.state_accept == self._state_accept_reading_unused:
391
433
            return 1
392
434
        elif self.state_accept == self._state_accept_expecting_header:
393
 
            return max(0, len('chunked\n') - len(self._in_buffer))
 
435
            return max(0, len('chunked\n') - self._in_buffer_len)
394
436
        else:
395
437
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
396
438
 
401
443
            return None
402
444
 
403
445
    def _extract_line(self):
404
 
        pos = self._in_buffer.find('\n')
 
446
        in_buf = self._get_in_buffer()
 
447
        pos = in_buf.find('\n')
405
448
        if pos == -1:
406
449
            # We haven't read a complete line yet, so request more bytes before
407
450
            # we continue.
408
451
            raise _NeedMoreBytes(1)
409
 
        line = self._in_buffer[:pos]
 
452
        line = in_buf[:pos]
410
453
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
411
 
        self._in_buffer = self._in_buffer[pos+1:]
 
454
        self._set_in_buffer(in_buf[pos+1:])
412
455
        return line
413
456
 
414
457
    def _finished(self):
415
 
        self.unused_data = self._in_buffer
416
 
        self._in_buffer = ''
 
458
        self.unused_data = self._get_in_buffer()
 
459
        # self._in_buffer = None
 
460
        self._in_buffer_list = []
 
461
        self._in_buffer_len = 0
417
462
        self.state_accept = self._state_accept_reading_unused
418
463
        if self.error:
419
464
            error_args = tuple(self.error_in_progress)
448
493
            self.state_accept = self._state_accept_reading_chunk
449
494
 
450
495
    def _state_accept_reading_chunk(self):
451
 
        in_buffer_len = len(self._in_buffer)
452
 
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
453
 
        self._in_buffer = self._in_buffer[self.bytes_left:]
 
496
        in_buf = self._get_in_buffer()
 
497
        in_buffer_len = len(in_buf)
 
498
        self.chunk_in_progress += in_buf[:self.bytes_left]
 
499
        self._set_in_buffer(in_buf[self.bytes_left:])
454
500
        self.bytes_left -= in_buffer_len
455
501
        if self.bytes_left <= 0:
456
502
            # Finished with chunk
463
509
            self.state_accept = self._state_accept_expecting_length
464
510
        
465
511
    def _state_accept_reading_unused(self):
466
 
        self.unused_data += self._in_buffer
467
 
        self._in_buffer = ''
 
512
        self.unused_data += self._get_in_buffer()
 
513
        self._in_buffer_list = []
468
514
 
469
515
 
470
516
class LengthPrefixedBodyDecoder(_StatefulDecoder):
498
544
        return self.state_read()
499
545
 
500
546
    def _state_accept_expecting_length(self):
501
 
        pos = self._in_buffer.find('\n')
 
547
        in_buf = self._get_in_buffer()
 
548
        pos = in_buf.find('\n')
502
549
        if pos == -1:
503
550
            return
504
 
        self.bytes_left = int(self._in_buffer[:pos])
505
 
        self._in_buffer = self._in_buffer[pos+1:]
 
551
        self.bytes_left = int(in_buf[:pos])
 
552
        self._set_in_buffer(in_buf[pos+1:])
506
553
        self.state_accept = self._state_accept_reading_body
507
554
        self.state_read = self._state_read_body_buffer
508
555
 
509
556
    def _state_accept_reading_body(self):
510
 
        self._body += self._in_buffer
511
 
        self.bytes_left -= len(self._in_buffer)
512
 
        self._in_buffer = ''
 
557
        in_buf = self._get_in_buffer()
 
558
        self._body += in_buf
 
559
        self.bytes_left -= len(in_buf)
 
560
        self._set_in_buffer(None)
513
561
        if self.bytes_left <= 0:
514
562
            # Finished with body
515
563
            if self.bytes_left != 0:
519
567
            self.state_accept = self._state_accept_reading_trailer
520
568
        
521
569
    def _state_accept_reading_trailer(self):
522
 
        self._trailer_buffer += self._in_buffer
523
 
        self._in_buffer = ''
 
570
        self._trailer_buffer += self._get_in_buffer()
 
571
        self._set_in_buffer(None)
524
572
        # TODO: what if the trailer does not match "done\n"?  Should this raise
525
573
        # a ProtocolViolation exception?
526
574
        if self._trailer_buffer.startswith('done\n'):
529
577
            self.finished_reading = True
530
578
    
531
579
    def _state_accept_reading_unused(self):
532
 
        self.unused_data += self._in_buffer
533
 
        self._in_buffer = ''
 
580
        self.unused_data += self._get_in_buffer()
 
581
        self._set_in_buffer(None)
534
582
 
535
583
    def _state_read_no_data(self):
536
584
        return ''
865
913
            self.message_handler.protocol_error(exception)
866
914
 
867
915
    def _extract_length_prefixed_bytes(self):
868
 
        if len(self._in_buffer) < 4:
 
916
        if self._in_buffer_len < 4:
869
917
            # A length prefix by itself is 4 bytes, and we don't even have that
870
918
            # many yet.
871
919
            raise _NeedMoreBytes(4)
872
 
        (length,) = struct.unpack('!L', self._in_buffer[:4])
 
920
        (length,) = struct.unpack('!L', self._get_in_bytes(4))
873
921
        end_of_bytes = 4 + length
874
 
        if len(self._in_buffer) < end_of_bytes:
 
922
        if self._in_buffer_len < end_of_bytes:
875
923
            # We haven't yet read as many bytes as the length-prefix says there
876
924
            # are.
877
925
            raise _NeedMoreBytes(end_of_bytes)
878
926
        # Extract the bytes from the buffer.
879
 
        bytes = self._in_buffer[4:end_of_bytes]
880
 
        self._in_buffer = self._in_buffer[end_of_bytes:]
 
927
        in_buf = self._get_in_buffer()
 
928
        bytes = in_buf[4:end_of_bytes]
 
929
        self._set_in_buffer(in_buf[end_of_bytes:])
881
930
        return bytes
882
931
 
883
932
    def _extract_prefixed_bencoded_data(self):
890
939
        return decoded
891
940
 
892
941
    def _extract_single_byte(self):
893
 
        if self._in_buffer == '':
 
942
        if self._in_buffer_len == 0:
894
943
            # The buffer is empty
895
944
            raise _NeedMoreBytes(1)
896
 
        one_byte = self._in_buffer[0]
897
 
        self._in_buffer = self._in_buffer[1:]
 
945
        in_buf = self._get_in_buffer()
 
946
        one_byte = in_buf[0]
 
947
        self._set_in_buffer(in_buf[1:])
898
948
        return one_byte
899
949
 
900
950
    def _state_accept_expecting_protocol_version(self):
901
 
        needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
 
951
        needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
 
952
        in_buf = self._get_in_buffer()
902
953
        if needed_bytes > 0:
903
954
            # We don't have enough bytes to check if the protocol version
904
955
            # marker is right.  But we can check if it is already wrong by
908
959
            # len(MESSAGE_VERSION_THREE) bytes.  So if the bytes we have so far
909
960
            # are wrong then we should just raise immediately rather than
910
961
            # stall.]
911
 
            if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
 
962
            if not MESSAGE_VERSION_THREE.startswith(in_buf):
912
963
                # We have enough bytes to know the protocol version is wrong
913
 
                raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
 
964
                raise errors.UnexpectedProtocolVersionMarker(in_buf)
914
965
            raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
915
 
        if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
916
 
            raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
917
 
        self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
 
966
        if not in_buf.startswith(MESSAGE_VERSION_THREE):
 
967
            raise errors.UnexpectedProtocolVersionMarker(in_buf)
 
968
        self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
918
969
        self.state_accept = self._state_accept_expecting_headers
919
970
 
920
971
    def _state_accept_expecting_headers(self):
969
1020
            raise errors.SmartMessageHandlerError(sys.exc_info())
970
1021
 
971
1022
    def done(self):
972
 
        self.unused_data = self._in_buffer
973
 
        self._in_buffer = ''
 
1023
        self.unused_data = self._get_in_buffer()
 
1024
        self._set_in_buffer(None)
974
1025
        self.state_accept = self._state_accept_reading_unused
975
1026
        try:
976
1027
            self.message_handler.end_received()
978
1029
            raise errors.SmartMessageHandlerError(sys.exc_info())
979
1030
 
980
1031
    def _state_accept_reading_unused(self):
981
 
        self.unused_data += self._in_buffer
982
 
        self._in_buffer = ''
 
1032
        self.unused_data = self._get_in_buffer()
 
1033
        self._set_in_buffer(None)
983
1034
 
984
1035
    def next_read_size(self):
985
1036
        if self.state_accept == self._state_accept_reading_unused:
992
1043
            return 0
993
1044
        else:
994
1045
            if self._number_needed_bytes is not None:
995
 
                return self._number_needed_bytes - len(self._in_buffer)
 
1046
                return self._number_needed_bytes - self._in_buffer_len
996
1047
            else:
997
1048
                raise AssertionError("don't know how many bytes are expected!")
998
1049