~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: John Arbash Meinel
  • Date: 2008-10-30 00:55:00 UTC
  • mto: (3815.2.5 prepare-1.9)
  • mto: This revision was merged to the branch mainline in revision 3811.
  • Revision ID: john@arbash-meinel.com-20081030005500-r5cej1cxflqhs3io
Switch so that we are using a simple timestamp as the first action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
323
323
 
324
324
    def __init__(self):
325
325
        self.finished_reading = False
326
 
        self._in_buffer = ''
 
326
        self._in_buffer_list = []
 
327
        self._in_buffer_len = 0
327
328
        self.unused_data = ''
328
329
        self.bytes_left = None
329
330
        self._number_needed_bytes = None
330
331
 
 
332
    def _get_in_buffer(self):
 
333
        if len(self._in_buffer_list) == 1:
 
334
            return self._in_buffer_list[0]
 
335
        in_buffer = ''.join(self._in_buffer_list)
 
336
        if len(in_buffer) != self._in_buffer_len:
 
337
            raise AssertionError(
 
338
                "Length of buffer did not match expected value: %s != %s"
 
339
                % self._in_buffer_len, len(in_buffer))
 
340
        self._in_buffer_list = [in_buffer]
 
341
        return in_buffer
 
342
 
 
343
    def _get_in_bytes(self, count):
 
344
        """Grab X bytes from the input_buffer.
 
345
 
 
346
        Callers should have already checked that self._in_buffer_len is >
 
347
        count. Note, this does not consume the bytes from the buffer. The
 
348
        caller will still need to call _get_in_buffer() and then
 
349
        _set_in_buffer() if they actually need to consume the bytes.
 
350
        """
 
351
        # check if we can yield the bytes from just the first entry in our list
 
352
        if len(self._in_buffer_list) == 0:
 
353
            raise AssertionError('Callers must be sure we have buffered bytes'
 
354
                ' before calling _get_in_bytes')
 
355
        if len(self._in_buffer_list[0]) > count:
 
356
            return self._in_buffer_list[0][:count]
 
357
        # We can't yield it from the first buffer, so collapse all buffers, and
 
358
        # yield it from that
 
359
        in_buf = self._get_in_buffer()
 
360
        return in_buf[:count]
 
361
 
 
362
    def _set_in_buffer(self, new_buf):
 
363
        if new_buf is not None:
 
364
            self._in_buffer_list = [new_buf]
 
365
            self._in_buffer_len = len(new_buf)
 
366
        else:
 
367
            self._in_buffer_list = []
 
368
            self._in_buffer_len = 0
 
369
 
331
370
    def accept_bytes(self, bytes):
332
371
        """Decode as much of bytes as possible.
333
372
 
338
377
        data will be appended to self.unused_data.
339
378
        """
340
379
        # accept_bytes is allowed to change the state
341
 
        current_state = self.state_accept
342
380
        self._number_needed_bytes = None
343
 
        self._in_buffer += bytes
 
381
        # lsprof puts a very large amount of time on this specific call for
 
382
        # large readv arrays
 
383
        self._in_buffer_list.append(bytes)
 
384
        self._in_buffer_len += len(bytes)
344
385
        try:
345
386
            # Run the function for the current state.
 
387
            current_state = self.state_accept
346
388
            self.state_accept()
347
389
            while current_state != self.state_accept:
348
390
                # The current state has changed.  Run the function for the new
379
421
            # the rest of this chunk plus an END chunk.
380
422
            return self.bytes_left + 4
381
423
        elif self.state_accept == self._state_accept_expecting_length:
382
 
            if self._in_buffer == '':
 
424
            if self._in_buffer_len == 0:
383
425
                # We're expecting a chunk length.  There's at least two bytes
384
426
                # left: a digit plus '\n'.
385
427
                return 2
390
432
        elif self.state_accept == self._state_accept_reading_unused:
391
433
            return 1
392
434
        elif self.state_accept == self._state_accept_expecting_header:
393
 
            return max(0, len('chunked\n') - len(self._in_buffer))
 
435
            return max(0, len('chunked\n') - self._in_buffer_len)
394
436
        else:
395
437
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
396
438
 
401
443
            return None
402
444
 
403
445
    def _extract_line(self):
404
 
        pos = self._in_buffer.find('\n')
 
446
        in_buf = self._get_in_buffer()
 
447
        pos = in_buf.find('\n')
405
448
        if pos == -1:
406
449
            # We haven't read a complete line yet, so request more bytes before
407
450
            # we continue.
408
451
            raise _NeedMoreBytes(1)
409
 
        line = self._in_buffer[:pos]
 
452
        line = in_buf[:pos]
410
453
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
411
 
        self._in_buffer = self._in_buffer[pos+1:]
 
454
        self._set_in_buffer(in_buf[pos+1:])
412
455
        return line
413
456
 
414
457
    def _finished(self):
415
 
        self.unused_data = self._in_buffer
416
 
        self._in_buffer = ''
 
458
        self.unused_data = self._get_in_buffer()
 
459
        self._in_buffer_list = []
 
460
        self._in_buffer_len = 0
417
461
        self.state_accept = self._state_accept_reading_unused
418
462
        if self.error:
419
463
            error_args = tuple(self.error_in_progress)
448
492
            self.state_accept = self._state_accept_reading_chunk
449
493
 
450
494
    def _state_accept_reading_chunk(self):
451
 
        in_buffer_len = len(self._in_buffer)
452
 
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
453
 
        self._in_buffer = self._in_buffer[self.bytes_left:]
 
495
        in_buf = self._get_in_buffer()
 
496
        in_buffer_len = len(in_buf)
 
497
        self.chunk_in_progress += in_buf[:self.bytes_left]
 
498
        self._set_in_buffer(in_buf[self.bytes_left:])
454
499
        self.bytes_left -= in_buffer_len
455
500
        if self.bytes_left <= 0:
456
501
            # Finished with chunk
463
508
            self.state_accept = self._state_accept_expecting_length
464
509
        
465
510
    def _state_accept_reading_unused(self):
466
 
        self.unused_data += self._in_buffer
467
 
        self._in_buffer = ''
 
511
        self.unused_data += self._get_in_buffer()
 
512
        self._in_buffer_list = []
468
513
 
469
514
 
470
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
498
543
        return self.state_read()
499
544
 
500
545
    def _state_accept_expecting_length(self):
501
 
        pos = self._in_buffer.find('\n')
 
546
        in_buf = self._get_in_buffer()
 
547
        pos = in_buf.find('\n')
502
548
        if pos == -1:
503
549
            return
504
 
        self.bytes_left = int(self._in_buffer[:pos])
505
 
        self._in_buffer = self._in_buffer[pos+1:]
 
550
        self.bytes_left = int(in_buf[:pos])
 
551
        self._set_in_buffer(in_buf[pos+1:])
506
552
        self.state_accept = self._state_accept_reading_body
507
553
        self.state_read = self._state_read_body_buffer
508
554
 
509
555
    def _state_accept_reading_body(self):
510
 
        self._body += self._in_buffer
511
 
        self.bytes_left -= len(self._in_buffer)
512
 
        self._in_buffer = ''
 
556
        in_buf = self._get_in_buffer()
 
557
        self._body += in_buf
 
558
        self.bytes_left -= len(in_buf)
 
559
        self._set_in_buffer(None)
513
560
        if self.bytes_left <= 0:
514
561
            # Finished with body
515
562
            if self.bytes_left != 0:
519
566
            self.state_accept = self._state_accept_reading_trailer
520
567
        
521
568
    def _state_accept_reading_trailer(self):
522
 
        self._trailer_buffer += self._in_buffer
523
 
        self._in_buffer = ''
 
569
        self._trailer_buffer += self._get_in_buffer()
 
570
        self._set_in_buffer(None)
524
571
        # TODO: what if the trailer does not match "done\n"?  Should this raise
525
572
        # a ProtocolViolation exception?
526
573
        if self._trailer_buffer.startswith('done\n'):
529
576
            self.finished_reading = True
530
577
    
531
578
    def _state_accept_reading_unused(self):
532
 
        self.unused_data += self._in_buffer
533
 
        self._in_buffer = ''
 
579
        self.unused_data += self._get_in_buffer()
 
580
        self._set_in_buffer(None)
534
581
 
535
582
    def _state_read_no_data(self):
536
583
        return ''
698
745
            return self._body_buffer.read(count)
699
746
        _body_decoder = LengthPrefixedBodyDecoder()
700
747
 
701
 
        # Read no more than 64k at a time so that we don't risk error 10055 (no
702
 
        # buffer space available) on Windows.
703
 
        max_read = 64 * 1024
704
748
        while not _body_decoder.finished_reading:
705
 
            bytes_wanted = min(_body_decoder.next_read_size(), max_read)
706
 
            bytes = self._request.read_bytes(bytes_wanted)
 
749
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
 
750
            if bytes == '':
 
751
                # end of file encountered reading from server
 
752
                raise errors.ConnectionReset(
 
753
                    "Connection lost while reading response body.")
707
754
            _body_decoder.accept_bytes(bytes)
708
755
        self._request.finished_reading()
709
756
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
715
762
 
716
763
    def _recv_tuple(self):
717
764
        """Receive a tuple from the medium request."""
718
 
        return _decode_tuple(self._recv_line())
719
 
 
720
 
    def _recv_line(self):
721
 
        """Read an entire line from the medium request."""
722
 
        line = ''
723
 
        while not line or line[-1] != '\n':
724
 
            # TODO: this is inefficient - but tuples are short.
725
 
            new_char = self._request.read_bytes(1)
726
 
            if new_char == '':
727
 
                # end of file encountered reading from server
728
 
                raise errors.ConnectionReset(
729
 
                    "please check connectivity and permissions",
730
 
                    "(and try -Dhpss if further diagnosis is required)")
731
 
            line += new_char
732
 
        return line
 
765
        return _decode_tuple(self._request.read_line())
733
766
 
734
767
    def query_version(self):
735
768
        """Return protocol version number of the server."""
772
805
        if version != self.response_marker:
773
806
            self._request.finished_reading()
774
807
            raise errors.UnexpectedProtocolVersionMarker(version)
775
 
        response_status = self._recv_line()
 
808
        response_status = self._request.read_line()
776
809
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
777
810
        self._response_is_unknown_method(result)
778
811
        if response_status == 'success\n':
800
833
        """
801
834
        # Read no more than 64k at a time so that we don't risk error 10055 (no
802
835
        # buffer space available) on Windows.
803
 
        max_read = 64 * 1024
804
836
        _body_decoder = ChunkedBodyDecoder()
805
837
        while not _body_decoder.finished_reading:
806
 
            bytes_wanted = min(_body_decoder.next_read_size(), max_read)
807
 
            bytes = self._request.read_bytes(bytes_wanted)
 
838
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
 
839
            if bytes == '':
 
840
                # end of file encountered reading from server
 
841
                raise errors.ConnectionReset(
 
842
                    "Connection lost while reading streamed body.")
808
843
            _body_decoder.accept_bytes(bytes)
809
844
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
810
845
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
877
912
            self.message_handler.protocol_error(exception)
878
913
 
879
914
    def _extract_length_prefixed_bytes(self):
880
 
        if len(self._in_buffer) < 4:
 
915
        if self._in_buffer_len < 4:
881
916
            # A length prefix by itself is 4 bytes, and we don't even have that
882
917
            # many yet.
883
918
            raise _NeedMoreBytes(4)
884
 
        (length,) = struct.unpack('!L', self._in_buffer[:4])
 
919
        (length,) = struct.unpack('!L', self._get_in_bytes(4))
885
920
        end_of_bytes = 4 + length
886
 
        if len(self._in_buffer) < end_of_bytes:
 
921
        if self._in_buffer_len < end_of_bytes:
887
922
            # We haven't yet read as many bytes as the length-prefix says there
888
923
            # are.
889
924
            raise _NeedMoreBytes(end_of_bytes)
890
925
        # Extract the bytes from the buffer.
891
 
        bytes = self._in_buffer[4:end_of_bytes]
892
 
        self._in_buffer = self._in_buffer[end_of_bytes:]
 
926
        in_buf = self._get_in_buffer()
 
927
        bytes = in_buf[4:end_of_bytes]
 
928
        self._set_in_buffer(in_buf[end_of_bytes:])
893
929
        return bytes
894
930
 
895
931
    def _extract_prefixed_bencoded_data(self):
902
938
        return decoded
903
939
 
904
940
    def _extract_single_byte(self):
905
 
        if self._in_buffer == '':
 
941
        if self._in_buffer_len == 0:
906
942
            # The buffer is empty
907
943
            raise _NeedMoreBytes(1)
908
 
        one_byte = self._in_buffer[0]
909
 
        self._in_buffer = self._in_buffer[1:]
 
944
        in_buf = self._get_in_buffer()
 
945
        one_byte = in_buf[0]
 
946
        self._set_in_buffer(in_buf[1:])
910
947
        return one_byte
911
948
 
912
949
    def _state_accept_expecting_protocol_version(self):
913
 
        needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
 
950
        needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
 
951
        in_buf = self._get_in_buffer()
914
952
        if needed_bytes > 0:
915
953
            # We don't have enough bytes to check if the protocol version
916
954
            # marker is right.  But we can check if it is already wrong by
920
958
            # len(MESSAGE_VERSION_THREE) bytes.  So if the bytes we have so far
921
959
            # are wrong then we should just raise immediately rather than
922
960
            # stall.]
923
 
            if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
 
961
            if not MESSAGE_VERSION_THREE.startswith(in_buf):
924
962
                # We have enough bytes to know the protocol version is wrong
925
 
                raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
 
963
                raise errors.UnexpectedProtocolVersionMarker(in_buf)
926
964
            raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
927
 
        if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
928
 
            raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
929
 
        self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
 
965
        if not in_buf.startswith(MESSAGE_VERSION_THREE):
 
966
            raise errors.UnexpectedProtocolVersionMarker(in_buf)
 
967
        self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
930
968
        self.state_accept = self._state_accept_expecting_headers
931
969
 
932
970
    def _state_accept_expecting_headers(self):
981
1019
            raise errors.SmartMessageHandlerError(sys.exc_info())
982
1020
 
983
1021
    def done(self):
984
 
        self.unused_data = self._in_buffer
985
 
        self._in_buffer = ''
 
1022
        self.unused_data = self._get_in_buffer()
 
1023
        self._set_in_buffer(None)
986
1024
        self.state_accept = self._state_accept_reading_unused
987
1025
        try:
988
1026
            self.message_handler.end_received()
990
1028
            raise errors.SmartMessageHandlerError(sys.exc_info())
991
1029
 
992
1030
    def _state_accept_reading_unused(self):
993
 
        self.unused_data += self._in_buffer
994
 
        self._in_buffer = ''
 
1031
        self.unused_data = self._get_in_buffer()
 
1032
        self._set_in_buffer(None)
995
1033
 
996
1034
    def next_read_size(self):
997
1035
        if self.state_accept == self._state_accept_reading_unused:
1004
1042
            return 0
1005
1043
        else:
1006
1044
            if self._number_needed_bytes is not None:
1007
 
                return self._number_needed_bytes - len(self._in_buffer)
 
1045
                return self._number_needed_bytes - self._in_buffer_len
1008
1046
            else:
1009
1047
                raise AssertionError("don't know how many bytes are expected!")
1010
1048