29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
from bzrlib.util.bencode import bdecode_as_tuple, bencode
32
from bzrlib.util.bencode import bdecode, bencode
35
35
# Protocol version strings. These are sent as prefixes of bzr requests and
324
324
def __init__(self):
325
325
self.finished_reading = False
326
self._in_buffer_list = []
327
self._in_buffer_len = 0
328
327
self.unused_data = ''
329
328
self.bytes_left = None
330
329
self._number_needed_bytes = None
332
def _get_in_buffer(self):
333
if len(self._in_buffer_list) == 1:
334
return self._in_buffer_list[0]
335
in_buffer = ''.join(self._in_buffer_list)
336
if len(in_buffer) != self._in_buffer_len:
337
raise AssertionError(
338
"Length of buffer did not match expected value: %s != %s"
339
% self._in_buffer_len, len(in_buffer))
340
self._in_buffer_list = [in_buffer]
343
def _get_in_bytes(self, count):
344
"""Grab X bytes from the input_buffer.
346
Callers should have already checked that self._in_buffer_len is >
347
count. Note, this does not consume the bytes from the buffer. The
348
caller will still need to call _get_in_buffer() and then
349
_set_in_buffer() if they actually need to consume the bytes.
351
# check if we can yield the bytes from just the first entry in our list
352
if len(self._in_buffer_list) == 0:
353
raise AssertionError('Callers must be sure we have buffered bytes'
354
' before calling _get_in_bytes')
355
if len(self._in_buffer_list[0]) > count:
356
return self._in_buffer_list[0][:count]
357
# We can't yield it from the first buffer, so collapse all buffers, and
359
in_buf = self._get_in_buffer()
360
return in_buf[:count]
362
def _set_in_buffer(self, new_buf):
363
if new_buf is not None:
364
self._in_buffer_list = [new_buf]
365
self._in_buffer_len = len(new_buf)
367
self._in_buffer_list = []
368
self._in_buffer_len = 0
370
331
def accept_bytes(self, bytes):
371
332
"""Decode as much of bytes as possible.
377
338
data will be appended to self.unused_data.
379
340
# accept_bytes is allowed to change the state
341
current_state = self.state_accept
380
342
self._number_needed_bytes = None
381
# lsprof puts a very large amount of time on this specific call for
383
self._in_buffer_list.append(bytes)
384
self._in_buffer_len += len(bytes)
343
self._in_buffer += bytes
386
345
# Run the function for the current state.
387
current_state = self.state_accept
388
346
self.state_accept()
389
347
while current_state != self.state_accept:
390
348
# The current state has changed. Run the function for the new
421
379
# the rest of this chunk plus an END chunk.
422
380
return self.bytes_left + 4
423
381
elif self.state_accept == self._state_accept_expecting_length:
424
if self._in_buffer_len == 0:
382
if self._in_buffer == '':
425
383
# We're expecting a chunk length. There's at least two bytes
426
384
# left: a digit plus '\n'.
432
390
elif self.state_accept == self._state_accept_reading_unused:
434
392
elif self.state_accept == self._state_accept_expecting_header:
435
return max(0, len('chunked\n') - self._in_buffer_len)
393
return max(0, len('chunked\n') - len(self._in_buffer))
437
395
raise AssertionError("Impossible state: %r" % (self.state_accept,))
445
403
def _extract_line(self):
446
in_buf = self._get_in_buffer()
447
pos = in_buf.find('\n')
404
pos = self._in_buffer.find('\n')
449
406
# We haven't read a complete line yet, so request more bytes before
451
408
raise _NeedMoreBytes(1)
409
line = self._in_buffer[:pos]
453
410
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
454
self._set_in_buffer(in_buf[pos+1:])
411
self._in_buffer = self._in_buffer[pos+1:]
457
414
def _finished(self):
458
self.unused_data = self._get_in_buffer()
459
self._in_buffer_list = []
460
self._in_buffer_len = 0
415
self.unused_data = self._in_buffer
461
417
self.state_accept = self._state_accept_reading_unused
463
419
error_args = tuple(self.error_in_progress)
492
448
self.state_accept = self._state_accept_reading_chunk
494
450
def _state_accept_reading_chunk(self):
495
in_buf = self._get_in_buffer()
496
in_buffer_len = len(in_buf)
497
self.chunk_in_progress += in_buf[:self.bytes_left]
498
self._set_in_buffer(in_buf[self.bytes_left:])
451
in_buffer_len = len(self._in_buffer)
452
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
453
self._in_buffer = self._in_buffer[self.bytes_left:]
499
454
self.bytes_left -= in_buffer_len
500
455
if self.bytes_left <= 0:
501
456
# Finished with chunk
508
463
self.state_accept = self._state_accept_expecting_length
510
465
def _state_accept_reading_unused(self):
511
self.unused_data += self._get_in_buffer()
512
self._in_buffer_list = []
466
self.unused_data += self._in_buffer
515
470
class LengthPrefixedBodyDecoder(_StatefulDecoder):
543
498
return self.state_read()
545
500
def _state_accept_expecting_length(self):
546
in_buf = self._get_in_buffer()
547
pos = in_buf.find('\n')
501
pos = self._in_buffer.find('\n')
550
self.bytes_left = int(in_buf[:pos])
551
self._set_in_buffer(in_buf[pos+1:])
504
self.bytes_left = int(self._in_buffer[:pos])
505
self._in_buffer = self._in_buffer[pos+1:]
552
506
self.state_accept = self._state_accept_reading_body
553
507
self.state_read = self._state_read_body_buffer
555
509
def _state_accept_reading_body(self):
556
in_buf = self._get_in_buffer()
558
self.bytes_left -= len(in_buf)
559
self._set_in_buffer(None)
510
self._body += self._in_buffer
511
self.bytes_left -= len(self._in_buffer)
560
513
if self.bytes_left <= 0:
561
514
# Finished with body
562
515
if self.bytes_left != 0:
566
519
self.state_accept = self._state_accept_reading_trailer
568
521
def _state_accept_reading_trailer(self):
569
self._trailer_buffer += self._get_in_buffer()
570
self._set_in_buffer(None)
522
self._trailer_buffer += self._in_buffer
571
524
# TODO: what if the trailer does not match "done\n"? Should this raise
572
525
# a ProtocolViolation exception?
573
526
if self._trailer_buffer.startswith('done\n'):
576
529
self.finished_reading = True
578
531
def _state_accept_reading_unused(self):
579
self.unused_data += self._get_in_buffer()
580
self._set_in_buffer(None)
532
self.unused_data += self._in_buffer
582
535
def _state_read_no_data(self):
655
608
if 'hpss' in debug.debug_flags:
656
609
mutter(' %d bytes in readv request', len(readv_bytes))
657
610
self._last_verb = args[0]
659
def call_with_body_stream(self, args, stream):
660
# Protocols v1 and v2 don't support body streams. So it's safe to
661
# assume that a v1/v2 server doesn't support whatever method we're
662
# trying to call with a body stream.
663
self._request.finished_writing()
664
self._request.finished_reading()
665
raise errors.UnknownSmartMethod(args[0])
667
612
def cancel_read_body(self):
668
613
"""After expecting a body, a response code may indicate one otherwise.
753
698
return self._body_buffer.read(count)
754
699
_body_decoder = LengthPrefixedBodyDecoder()
701
# Read no more than 64k at a time so that we don't risk error 10055 (no
702
# buffer space available) on Windows.
756
704
while not _body_decoder.finished_reading:
757
bytes = self._request.read_bytes(_body_decoder.next_read_size())
705
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
706
bytes = self._request.read_bytes(bytes_wanted)
759
708
# end of file encountered reading from server
760
709
raise errors.ConnectionReset(
771
720
def _recv_tuple(self):
772
721
"""Receive a tuple from the medium request."""
773
return _decode_tuple(self._request.read_line())
722
return _decode_tuple(self._recv_line())
724
def _recv_line(self):
725
"""Read an entire line from the medium request."""
727
while not line or line[-1] != '\n':
728
# TODO: this is inefficient - but tuples are short.
729
new_char = self._request.read_bytes(1)
731
# end of file encountered reading from server
732
raise errors.ConnectionReset(
733
"please check connectivity and permissions",
734
"(and try -Dhpss if further diagnosis is required)")
775
738
def query_version(self):
776
739
"""Return protocol version number of the server."""
813
776
if version != self.response_marker:
814
777
self._request.finished_reading()
815
778
raise errors.UnexpectedProtocolVersionMarker(version)
816
response_status = self._request.read_line()
779
response_status = self._recv_line()
817
780
result = SmartClientRequestProtocolOne._read_response_tuple(self)
818
781
self._response_is_unknown_method(result)
819
782
if response_status == 'success\n':
842
805
# Read no more than 64k at a time so that we don't risk error 10055 (no
843
806
# buffer space available) on Windows.
844
808
_body_decoder = ChunkedBodyDecoder()
845
809
while not _body_decoder.finished_reading:
846
bytes = self._request.read_bytes(_body_decoder.next_read_size())
810
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
811
bytes = self._request.read_bytes(bytes_wanted)
848
813
# end of file encountered reading from server
849
814
raise errors.ConnectionReset(
920
885
self.message_handler.protocol_error(exception)
922
887
def _extract_length_prefixed_bytes(self):
923
if self._in_buffer_len < 4:
888
if len(self._in_buffer) < 4:
924
889
# A length prefix by itself is 4 bytes, and we don't even have that
926
891
raise _NeedMoreBytes(4)
927
(length,) = struct.unpack('!L', self._get_in_bytes(4))
892
(length,) = struct.unpack('!L', self._in_buffer[:4])
928
893
end_of_bytes = 4 + length
929
if self._in_buffer_len < end_of_bytes:
894
if len(self._in_buffer) < end_of_bytes:
930
895
# We haven't yet read as many bytes as the length-prefix says there
932
897
raise _NeedMoreBytes(end_of_bytes)
933
898
# Extract the bytes from the buffer.
934
in_buf = self._get_in_buffer()
935
bytes = in_buf[4:end_of_bytes]
936
self._set_in_buffer(in_buf[end_of_bytes:])
899
bytes = self._in_buffer[4:end_of_bytes]
900
self._in_buffer = self._in_buffer[end_of_bytes:]
939
903
def _extract_prefixed_bencoded_data(self):
940
904
prefixed_bytes = self._extract_length_prefixed_bytes()
942
decoded = bdecode_as_tuple(prefixed_bytes)
906
decoded = bdecode(prefixed_bytes)
943
907
except ValueError:
944
908
raise errors.SmartProtocolError(
945
909
'Bytes %r not bencoded' % (prefixed_bytes,))
948
912
def _extract_single_byte(self):
949
if self._in_buffer_len == 0:
913
if self._in_buffer == '':
950
914
# The buffer is empty
951
915
raise _NeedMoreBytes(1)
952
in_buf = self._get_in_buffer()
954
self._set_in_buffer(in_buf[1:])
916
one_byte = self._in_buffer[0]
917
self._in_buffer = self._in_buffer[1:]
957
920
def _state_accept_expecting_protocol_version(self):
958
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
959
in_buf = self._get_in_buffer()
921
needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
960
922
if needed_bytes > 0:
961
923
# We don't have enough bytes to check if the protocol version
962
924
# marker is right. But we can check if it is already wrong by
966
928
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
967
929
# are wrong then we should just raise immediately rather than
969
if not MESSAGE_VERSION_THREE.startswith(in_buf):
931
if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
970
932
# We have enough bytes to know the protocol version is wrong
971
raise errors.UnexpectedProtocolVersionMarker(in_buf)
933
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
972
934
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
973
if not in_buf.startswith(MESSAGE_VERSION_THREE):
974
raise errors.UnexpectedProtocolVersionMarker(in_buf)
975
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
935
if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
936
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
937
self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
976
938
self.state_accept = self._state_accept_expecting_headers
978
940
def _state_accept_expecting_headers(self):
1027
989
raise errors.SmartMessageHandlerError(sys.exc_info())
1030
self.unused_data = self._get_in_buffer()
1031
self._set_in_buffer(None)
992
self.unused_data = self._in_buffer
1032
994
self.state_accept = self._state_accept_reading_unused
1034
996
self.message_handler.end_received()
1036
998
raise errors.SmartMessageHandlerError(sys.exc_info())
1038
1000
def _state_accept_reading_unused(self):
1039
self.unused_data = self._get_in_buffer()
1040
self._set_in_buffer(None)
1001
self.unused_data += self._in_buffer
1002
self._in_buffer = ''
1042
1004
def next_read_size(self):
1043
1005
if self.state_accept == self._state_accept_reading_unused:
1228
1187
self._write_end()
1229
1188
self._medium_request.finished_writing()
1231
def call_with_body_stream(self, args, stream):
1232
if 'hpss' in debug.debug_flags:
1233
mutter('hpss call w/body stream: %r', args)
1234
path = getattr(self._medium_request._medium, '_path', None)
1235
if path is not None:
1236
mutter(' (to %s)', path)
1237
self._request_start_time = time.time()
1238
self._write_protocol_version()
1239
self._write_headers(self._headers)
1240
self._write_structure(args)
1241
# TODO: notice if the server has sent an early error reply before we
1242
# have finished sending the stream. We would notice at the end
1243
# anyway, but if the medium can deliver it early then it's good
1244
# to short-circuit the whole request...
1247
self._write_prefixed_body(part)
1250
# Iterating the stream failed. Cleanly abort the request.
1251
self._write_error_status()
1252
# Currently the client unconditionally sends ('error',) as the
1254
self._write_structure(('error',))
1256
self._medium_request.finished_writing()