324
324
def __init__(self):
325
325
self.finished_reading = False
326
self._in_buffer_list = []
327
self._in_buffer_len = 0
327
328
self.unused_data = ''
328
329
self.bytes_left = None
329
330
self._number_needed_bytes = None
332
def _get_in_buffer(self):
333
if len(self._in_buffer_list) == 1:
334
return self._in_buffer_list[0]
335
in_buffer = ''.join(self._in_buffer_list)
336
if len(in_buffer) != self._in_buffer_len:
337
raise AssertionError(
338
"Length of buffer did not match expected value: %s != %s"
339
% self._in_buffer_len, len(in_buffer))
340
self._in_buffer_list = [in_buffer]
343
def _get_in_bytes(self, count):
344
"""Grab X bytes from the input_buffer.
346
Callers should have already checked that self._in_buffer_len is >
347
count. Note, this does not consume the bytes from the buffer. The
348
caller will still need to call _get_in_buffer() and then
349
_set_in_buffer() if they actually need to consume the bytes.
351
# check if we can yield the bytes from just the first entry in our list
352
if len(self._in_buffer_list) == 0:
353
raise AssertionError('Callers must be sure we have buffered bytes'
354
' before calling _get_in_bytes')
355
if len(self._in_buffer_list[0]) > count:
356
return self._in_buffer_list[0][:count]
357
# We can't yield it from the first buffer, so collapse all buffers, and
359
in_buf = self._get_in_buffer()
360
return in_buf[:count]
362
def _set_in_buffer(self, new_buf):
363
if new_buf is not None:
364
self._in_buffer_list = [new_buf]
365
self._in_buffer_len = len(new_buf)
367
self._in_buffer_list = []
368
self._in_buffer_len = 0
331
370
def accept_bytes(self, bytes):
332
371
"""Decode as much of bytes as possible.
338
377
data will be appended to self.unused_data.
340
379
# accept_bytes is allowed to change the state
341
current_state = self.state_accept
342
380
self._number_needed_bytes = None
343
self._in_buffer += bytes
381
# lsprof puts a very large amount of time on this specific call for
383
self._in_buffer_list.append(bytes)
384
self._in_buffer_len += len(bytes)
345
386
# Run the function for the current state.
387
current_state = self.state_accept
346
388
self.state_accept()
347
389
while current_state != self.state_accept:
348
390
# The current state has changed. Run the function for the new
379
421
# the rest of this chunk plus an END chunk.
380
422
return self.bytes_left + 4
381
423
elif self.state_accept == self._state_accept_expecting_length:
382
if self._in_buffer == '':
424
if self._in_buffer_len == 0:
383
425
# We're expecting a chunk length. There's at least two bytes
384
426
# left: a digit plus '\n'.
390
432
elif self.state_accept == self._state_accept_reading_unused:
392
434
elif self.state_accept == self._state_accept_expecting_header:
393
return max(0, len('chunked\n') - len(self._in_buffer))
435
return max(0, len('chunked\n') - self._in_buffer_len)
395
437
raise AssertionError("Impossible state: %r" % (self.state_accept,))
403
445
def _extract_line(self):
404
pos = self._in_buffer.find('\n')
446
in_buf = self._get_in_buffer()
447
pos = in_buf.find('\n')
406
449
# We haven't read a complete line yet, so request more bytes before
408
451
raise _NeedMoreBytes(1)
409
line = self._in_buffer[:pos]
410
453
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
411
self._in_buffer = self._in_buffer[pos+1:]
454
self._set_in_buffer(in_buf[pos+1:])
414
457
def _finished(self):
415
self.unused_data = self._in_buffer
458
self.unused_data = self._get_in_buffer()
459
self._in_buffer_list = []
460
self._in_buffer_len = 0
417
461
self.state_accept = self._state_accept_reading_unused
419
463
error_args = tuple(self.error_in_progress)
448
492
self.state_accept = self._state_accept_reading_chunk
450
494
def _state_accept_reading_chunk(self):
451
in_buffer_len = len(self._in_buffer)
452
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
453
self._in_buffer = self._in_buffer[self.bytes_left:]
495
in_buf = self._get_in_buffer()
496
in_buffer_len = len(in_buf)
497
self.chunk_in_progress += in_buf[:self.bytes_left]
498
self._set_in_buffer(in_buf[self.bytes_left:])
454
499
self.bytes_left -= in_buffer_len
455
500
if self.bytes_left <= 0:
456
501
# Finished with chunk
463
508
self.state_accept = self._state_accept_expecting_length
465
510
def _state_accept_reading_unused(self):
466
self.unused_data += self._in_buffer
511
self.unused_data += self._get_in_buffer()
512
self._in_buffer_list = []
470
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
498
543
return self.state_read()
500
545
def _state_accept_expecting_length(self):
501
pos = self._in_buffer.find('\n')
546
in_buf = self._get_in_buffer()
547
pos = in_buf.find('\n')
504
self.bytes_left = int(self._in_buffer[:pos])
505
self._in_buffer = self._in_buffer[pos+1:]
550
self.bytes_left = int(in_buf[:pos])
551
self._set_in_buffer(in_buf[pos+1:])
506
552
self.state_accept = self._state_accept_reading_body
507
553
self.state_read = self._state_read_body_buffer
509
555
def _state_accept_reading_body(self):
510
self._body += self._in_buffer
511
self.bytes_left -= len(self._in_buffer)
556
in_buf = self._get_in_buffer()
558
self.bytes_left -= len(in_buf)
559
self._set_in_buffer(None)
513
560
if self.bytes_left <= 0:
514
561
# Finished with body
515
562
if self.bytes_left != 0:
519
566
self.state_accept = self._state_accept_reading_trailer
521
568
def _state_accept_reading_trailer(self):
522
self._trailer_buffer += self._in_buffer
569
self._trailer_buffer += self._get_in_buffer()
570
self._set_in_buffer(None)
524
571
# TODO: what if the trailer does not match "done\n"? Should this raise
525
572
# a ProtocolViolation exception?
526
573
if self._trailer_buffer.startswith('done\n'):
529
576
self.finished_reading = True
531
578
def _state_accept_reading_unused(self):
532
self.unused_data += self._in_buffer
579
self.unused_data += self._get_in_buffer()
580
self._set_in_buffer(None)
535
582
def _state_read_no_data(self):
698
745
return self._body_buffer.read(count)
699
746
_body_decoder = LengthPrefixedBodyDecoder()
701
# Read no more than 64k at a time so that we don't risk error 10055 (no
702
# buffer space available) on Windows.
704
748
while not _body_decoder.finished_reading:
705
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
706
bytes = self._request.read_bytes(bytes_wanted)
749
bytes = self._request.read_bytes(_body_decoder.next_read_size())
751
# end of file encountered reading from server
752
raise errors.ConnectionReset(
753
"Connection lost while reading response body.")
707
754
_body_decoder.accept_bytes(bytes)
708
755
self._request.finished_reading()
709
756
self._body_buffer = StringIO(_body_decoder.read_pending_data())
716
763
def _recv_tuple(self):
717
764
"""Receive a tuple from the medium request."""
718
return _decode_tuple(self._recv_line())
720
def _recv_line(self):
721
"""Read an entire line from the medium request."""
723
while not line or line[-1] != '\n':
724
# TODO: this is inefficient - but tuples are short.
725
new_char = self._request.read_bytes(1)
727
# end of file encountered reading from server
728
raise errors.ConnectionReset(
729
"please check connectivity and permissions",
730
"(and try -Dhpss if further diagnosis is required)")
765
return _decode_tuple(self._request.read_line())
734
767
def query_version(self):
735
768
"""Return protocol version number of the server."""
772
805
if version != self.response_marker:
773
806
self._request.finished_reading()
774
807
raise errors.UnexpectedProtocolVersionMarker(version)
775
response_status = self._recv_line()
808
response_status = self._request.read_line()
776
809
result = SmartClientRequestProtocolOne._read_response_tuple(self)
777
810
self._response_is_unknown_method(result)
778
811
if response_status == 'success\n':
801
834
# Read no more than 64k at a time so that we don't risk error 10055 (no
802
835
# buffer space available) on Windows.
804
836
_body_decoder = ChunkedBodyDecoder()
805
837
while not _body_decoder.finished_reading:
806
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
807
bytes = self._request.read_bytes(bytes_wanted)
838
bytes = self._request.read_bytes(_body_decoder.next_read_size())
840
# end of file encountered reading from server
841
raise errors.ConnectionReset(
842
"Connection lost while reading streamed body.")
808
843
_body_decoder.accept_bytes(bytes)
809
844
for body_bytes in iter(_body_decoder.read_next_chunk, None):
810
845
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
877
912
self.message_handler.protocol_error(exception)
879
914
def _extract_length_prefixed_bytes(self):
880
if len(self._in_buffer) < 4:
915
if self._in_buffer_len < 4:
881
916
# A length prefix by itself is 4 bytes, and we don't even have that
883
918
raise _NeedMoreBytes(4)
884
(length,) = struct.unpack('!L', self._in_buffer[:4])
919
(length,) = struct.unpack('!L', self._get_in_bytes(4))
885
920
end_of_bytes = 4 + length
886
if len(self._in_buffer) < end_of_bytes:
921
if self._in_buffer_len < end_of_bytes:
887
922
# We haven't yet read as many bytes as the length-prefix says there
889
924
raise _NeedMoreBytes(end_of_bytes)
890
925
# Extract the bytes from the buffer.
891
bytes = self._in_buffer[4:end_of_bytes]
892
self._in_buffer = self._in_buffer[end_of_bytes:]
926
in_buf = self._get_in_buffer()
927
bytes = in_buf[4:end_of_bytes]
928
self._set_in_buffer(in_buf[end_of_bytes:])
895
931
def _extract_prefixed_bencoded_data(self):
904
940
def _extract_single_byte(self):
905
if self._in_buffer == '':
941
if self._in_buffer_len == 0:
906
942
# The buffer is empty
907
943
raise _NeedMoreBytes(1)
908
one_byte = self._in_buffer[0]
909
self._in_buffer = self._in_buffer[1:]
944
in_buf = self._get_in_buffer()
946
self._set_in_buffer(in_buf[1:])
912
949
def _state_accept_expecting_protocol_version(self):
913
needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
950
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
951
in_buf = self._get_in_buffer()
914
952
if needed_bytes > 0:
915
953
# We don't have enough bytes to check if the protocol version
916
954
# marker is right. But we can check if it is already wrong by
920
958
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
921
959
# are wrong then we should just raise immediately rather than
923
if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
961
if not MESSAGE_VERSION_THREE.startswith(in_buf):
924
962
# We have enough bytes to know the protocol version is wrong
925
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
963
raise errors.UnexpectedProtocolVersionMarker(in_buf)
926
964
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
927
if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
928
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
929
self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
965
if not in_buf.startswith(MESSAGE_VERSION_THREE):
966
raise errors.UnexpectedProtocolVersionMarker(in_buf)
967
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
930
968
self.state_accept = self._state_accept_expecting_headers
932
970
def _state_accept_expecting_headers(self):
981
1019
raise errors.SmartMessageHandlerError(sys.exc_info())
984
self.unused_data = self._in_buffer
1022
self.unused_data = self._get_in_buffer()
1023
self._set_in_buffer(None)
986
1024
self.state_accept = self._state_accept_reading_unused
988
1026
self.message_handler.end_received()
990
1028
raise errors.SmartMessageHandlerError(sys.exc_info())
992
1030
def _state_accept_reading_unused(self):
993
self.unused_data += self._in_buffer
1031
self.unused_data = self._get_in_buffer()
1032
self._set_in_buffer(None)
996
1034
def next_read_size(self):
997
1035
if self.state_accept == self._state_accept_reading_unused:
1006
1044
if self._number_needed_bytes is not None:
1007
return self._number_needed_bytes - len(self._in_buffer)
1045
return self._number_needed_bytes - self._in_buffer_len
1009
1047
raise AssertionError("don't know how many bytes are expected!")