746
602
This prefixes the request with the value of REQUEST_VERSION_TWO.
749
response_marker = RESPONSE_VERSION_TWO
750
request_marker = REQUEST_VERSION_TWO
752
605
def read_response_tuple(self, expect_body=False):
753
606
"""Read a response tuple from the wire.
755
608
This should only be called once.
757
610
version = self._request.read_line()
758
if version != self.response_marker:
759
self._request.finished_reading()
760
raise errors.UnexpectedProtocolVersionMarker(version)
761
response_status = self._request.read_line()
762
result = SmartClientRequestProtocolOne._read_response_tuple(self)
763
self._response_is_unknown_method(result)
764
if response_status == 'success\n':
765
self.response_status = True
767
self._request.finished_reading()
769
elif response_status == 'failed\n':
770
self.response_status = False
771
self._request.finished_reading()
772
raise errors.ErrorFromSmartServer(result)
611
if version != RESPONSE_VERSION_TWO:
612
raise errors.SmartProtocolError('bad protocol marker %r' % version)
613
response_status = self._recv_line()
614
if response_status not in ('success\n', 'failed\n'):
774
615
raise errors.SmartProtocolError(
775
616
'bad protocol status %r' % response_status)
617
self.response_status = response_status == 'success\n'
618
return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
777
620
def _write_protocol_version(self):
778
621
"""Write any prefixes this protocol requires.
780
623
Version two sends the value of REQUEST_VERSION_TWO.
782
self._request.accept_bytes(self.request_marker)
625
self._request.accept_bytes(REQUEST_VERSION_TWO)
784
627
def read_streamed_body(self):
785
628
"""Read bytes from the body, decoding into a byte stream.
787
# Read no more than 64k at a time so that we don't risk error 10055 (no
788
# buffer space available) on Windows.
789
630
_body_decoder = ChunkedBodyDecoder()
790
631
while not _body_decoder.finished_reading:
791
bytes = self._request.read_bytes(_body_decoder.next_read_size())
793
# end of file encountered reading from server
794
raise errors.ConnectionReset(
795
"Connection lost while reading streamed body.")
632
bytes_wanted = _body_decoder.next_read_size()
633
bytes = self._request.read_bytes(bytes_wanted)
796
634
_body_decoder.accept_bytes(bytes)
797
635
for body_bytes in iter(_body_decoder.read_next_chunk, None):
798
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
636
if 'hpss' in debug.debug_flags:
799
637
mutter(' %d byte chunk read',
802
640
self._request.finished_reading()
805
def build_server_protocol_three(backing_transport, write_func,
807
request_handler = request.SmartServerRequestHandler(
808
backing_transport, commands=request.request_handlers,
809
root_client_path=root_client_path)
810
responder = ProtocolThreeResponder(write_func)
811
message_handler = message.ConventionalRequestHandler(request_handler, responder)
812
return ProtocolThreeDecoder(message_handler)
815
class ProtocolThreeDecoder(_StatefulDecoder):
817
response_marker = RESPONSE_VERSION_THREE
818
request_marker = REQUEST_VERSION_THREE
820
def __init__(self, message_handler, expect_version_marker=False):
821
_StatefulDecoder.__init__(self)
822
self._has_dispatched = False
824
if expect_version_marker:
825
self.state_accept = self._state_accept_expecting_protocol_version
826
# We're expecting at least the protocol version marker + some
828
self._number_needed_bytes = len(MESSAGE_VERSION_THREE) + 4
830
self.state_accept = self._state_accept_expecting_headers
831
self._number_needed_bytes = 4
832
self.decoding_failed = False
833
self.request_handler = self.message_handler = message_handler
835
def accept_bytes(self, bytes):
836
self._number_needed_bytes = None
838
_StatefulDecoder.accept_bytes(self, bytes)
839
except KeyboardInterrupt:
841
except errors.SmartMessageHandlerError, exception:
842
# We do *not* set self.decoding_failed here. The message handler
843
# has raised an error, but the decoder is still able to parse bytes
844
# and determine when this message ends.
845
log_exception_quietly()
846
self.message_handler.protocol_error(exception.exc_value)
847
# The state machine is ready to continue decoding, but the
848
# exception has interrupted the loop that runs the state machine.
849
# So we call accept_bytes again to restart it.
850
self.accept_bytes('')
851
except Exception, exception:
852
# The decoder itself has raised an exception. We cannot continue
854
self.decoding_failed = True
855
if isinstance(exception, errors.UnexpectedProtocolVersionMarker):
856
# This happens during normal operation when the client tries a
857
# protocol version the server doesn't understand, so no need to
858
# log a traceback every time.
859
# Note that this can only happen when
860
# expect_version_marker=True, which is only the case on the
864
log_exception_quietly()
865
self.message_handler.protocol_error(exception)
867
def _extract_length_prefixed_bytes(self):
868
if len(self._in_buffer) < 4:
869
# A length prefix by itself is 4 bytes, and we don't even have that
871
raise _NeedMoreBytes(4)
872
(length,) = struct.unpack('!L', self._in_buffer[:4])
873
end_of_bytes = 4 + length
874
if len(self._in_buffer) < end_of_bytes:
875
# We haven't yet read as many bytes as the length-prefix says there
877
raise _NeedMoreBytes(end_of_bytes)
878
# Extract the bytes from the buffer.
879
bytes = self._in_buffer[4:end_of_bytes]
880
self._in_buffer = self._in_buffer[end_of_bytes:]
883
def _extract_prefixed_bencoded_data(self):
884
prefixed_bytes = self._extract_length_prefixed_bytes()
886
decoded = bdecode(prefixed_bytes)
888
raise errors.SmartProtocolError(
889
'Bytes %r not bencoded' % (prefixed_bytes,))
892
def _extract_single_byte(self):
893
if self._in_buffer == '':
894
# The buffer is empty
895
raise _NeedMoreBytes(1)
896
one_byte = self._in_buffer[0]
897
self._in_buffer = self._in_buffer[1:]
900
def _state_accept_expecting_protocol_version(self):
901
needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
903
# We don't have enough bytes to check if the protocol version
904
# marker is right. But we can check if it is already wrong by
905
# checking that the start of MESSAGE_VERSION_THREE matches what
907
# [In fact, if the remote end isn't bzr we might never receive
908
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
909
# are wrong then we should just raise immediately rather than
911
if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
912
# We have enough bytes to know the protocol version is wrong
913
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
914
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
915
if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
916
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
917
self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
918
self.state_accept = self._state_accept_expecting_headers
920
def _state_accept_expecting_headers(self):
921
decoded = self._extract_prefixed_bencoded_data()
922
if type(decoded) is not dict:
923
raise errors.SmartProtocolError(
924
'Header object %r is not a dict' % (decoded,))
925
self.state_accept = self._state_accept_expecting_message_part
927
self.message_handler.headers_received(decoded)
929
raise errors.SmartMessageHandlerError(sys.exc_info())
931
def _state_accept_expecting_message_part(self):
932
message_part_kind = self._extract_single_byte()
933
if message_part_kind == 'o':
934
self.state_accept = self._state_accept_expecting_one_byte
935
elif message_part_kind == 's':
936
self.state_accept = self._state_accept_expecting_structure
937
elif message_part_kind == 'b':
938
self.state_accept = self._state_accept_expecting_bytes
939
elif message_part_kind == 'e':
942
raise errors.SmartProtocolError(
943
'Bad message kind byte: %r' % (message_part_kind,))
945
def _state_accept_expecting_one_byte(self):
946
byte = self._extract_single_byte()
947
self.state_accept = self._state_accept_expecting_message_part
949
self.message_handler.byte_part_received(byte)
951
raise errors.SmartMessageHandlerError(sys.exc_info())
953
def _state_accept_expecting_bytes(self):
954
# XXX: this should not buffer whole message part, but instead deliver
955
# the bytes as they arrive.
956
prefixed_bytes = self._extract_length_prefixed_bytes()
957
self.state_accept = self._state_accept_expecting_message_part
959
self.message_handler.bytes_part_received(prefixed_bytes)
961
raise errors.SmartMessageHandlerError(sys.exc_info())
963
def _state_accept_expecting_structure(self):
964
structure = self._extract_prefixed_bencoded_data()
965
self.state_accept = self._state_accept_expecting_message_part
967
self.message_handler.structure_part_received(structure)
969
raise errors.SmartMessageHandlerError(sys.exc_info())
972
self.unused_data = self._in_buffer
974
self.state_accept = self._state_accept_reading_unused
976
self.message_handler.end_received()
978
raise errors.SmartMessageHandlerError(sys.exc_info())
980
def _state_accept_reading_unused(self):
981
self.unused_data += self._in_buffer
984
def next_read_size(self):
985
if self.state_accept == self._state_accept_reading_unused:
987
elif self.decoding_failed:
988
# An exception occured while processing this message, probably from
989
# self.message_handler. We're not sure that this state machine is
990
# in a consistent state, so just signal that we're done (i.e. give
994
if self._number_needed_bytes is not None:
995
return self._number_needed_bytes - len(self._in_buffer)
997
raise AssertionError("don't know how many bytes are expected!")
1000
class _ProtocolThreeEncoder(object):
1002
response_marker = request_marker = MESSAGE_VERSION_THREE
1004
def __init__(self, write_func):
1006
self._real_write_func = write_func
1008
def _write_func(self, bytes):
1013
self._real_write_func(self._buf)
1016
def _serialise_offsets(self, offsets):
1017
"""Serialise a readv offset list."""
1019
for start, length in offsets:
1020
txt.append('%d,%d' % (start, length))
1021
return '\n'.join(txt)
1023
def _write_protocol_version(self):
1024
self._write_func(MESSAGE_VERSION_THREE)
1026
def _write_prefixed_bencode(self, structure):
1027
bytes = bencode(structure)
1028
self._write_func(struct.pack('!L', len(bytes)))
1029
self._write_func(bytes)
1031
def _write_headers(self, headers):
1032
self._write_prefixed_bencode(headers)
1034
def _write_structure(self, args):
1035
self._write_func('s')
1038
if type(arg) is unicode:
1039
utf8_args.append(arg.encode('utf8'))
1041
utf8_args.append(arg)
1042
self._write_prefixed_bencode(utf8_args)
1044
def _write_end(self):
1045
self._write_func('e')
1048
def _write_prefixed_body(self, bytes):
1049
self._write_func('b')
1050
self._write_func(struct.pack('!L', len(bytes)))
1051
self._write_func(bytes)
1053
def _write_error_status(self):
1054
self._write_func('oE')
1056
def _write_success_status(self):
1057
self._write_func('oS')
1060
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1062
def __init__(self, write_func):
1063
_ProtocolThreeEncoder.__init__(self, write_func)
1064
self.response_sent = False
1065
self._headers = {'Software version': bzrlib.__version__}
1067
def send_error(self, exception):
1068
if self.response_sent:
1069
raise AssertionError(
1070
"send_error(%s) called, but response already sent."
1072
if isinstance(exception, errors.UnknownSmartMethod):
1073
failure = request.FailedSmartServerResponse(
1074
('UnknownMethod', exception.verb))
1075
self.send_response(failure)
1077
self.response_sent = True
1078
self._write_protocol_version()
1079
self._write_headers(self._headers)
1080
self._write_error_status()
1081
self._write_structure(('error', str(exception)))
1084
def send_response(self, response):
1085
if self.response_sent:
1086
raise AssertionError(
1087
"send_response(%r) called, but response already sent."
1089
self.response_sent = True
1090
self._write_protocol_version()
1091
self._write_headers(self._headers)
1092
if response.is_successful():
1093
self._write_success_status()
1095
self._write_error_status()
1096
self._write_structure(response.args)
1097
if response.body is not None:
1098
self._write_prefixed_body(response.body)
1099
elif response.body_stream is not None:
1100
for chunk in response.body_stream:
1101
self._write_prefixed_body(chunk)
1106
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1108
def __init__(self, medium_request):
1109
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1110
self._medium_request = medium_request
1113
def set_headers(self, headers):
1114
self._headers = headers.copy()
1116
def call(self, *args):
1117
if 'hpss' in debug.debug_flags:
1118
mutter('hpss call: %s', repr(args)[1:-1])
1119
base = getattr(self._medium_request._medium, 'base', None)
1120
if base is not None:
1121
mutter(' (to %s)', base)
1122
self._request_start_time = time.time()
1123
self._write_protocol_version()
1124
self._write_headers(self._headers)
1125
self._write_structure(args)
1127
self._medium_request.finished_writing()
1129
def call_with_body_bytes(self, args, body):
1130
"""Make a remote call of args with body bytes 'body'.
1132
After calling this, call read_response_tuple to find the result out.
1134
if 'hpss' in debug.debug_flags:
1135
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
1136
path = getattr(self._medium_request._medium, '_path', None)
1137
if path is not None:
1138
mutter(' (to %s)', path)
1139
mutter(' %d bytes', len(body))
1140
self._request_start_time = time.time()
1141
self._write_protocol_version()
1142
self._write_headers(self._headers)
1143
self._write_structure(args)
1144
self._write_prefixed_body(body)
1146
self._medium_request.finished_writing()
1148
def call_with_body_readv_array(self, args, body):
1149
"""Make a remote call with a readv array.
1151
The body is encoded with one line per readv offset pair. The numbers in
1152
each pair are separated by a comma, and no trailing \n is emitted.
1154
if 'hpss' in debug.debug_flags:
1155
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1156
path = getattr(self._medium_request._medium, '_path', None)
1157
if path is not None:
1158
mutter(' (to %s)', path)
1159
self._request_start_time = time.time()
1160
self._write_protocol_version()
1161
self._write_headers(self._headers)
1162
self._write_structure(args)
1163
readv_bytes = self._serialise_offsets(body)
1164
if 'hpss' in debug.debug_flags:
1165
mutter(' %d bytes in readv request', len(readv_bytes))
1166
self._write_prefixed_body(readv_bytes)
1168
self._medium_request.finished_writing()