654
679
self._request.finished_reading()
682
def build_server_protocol_three(backing_transport, write_func):
683
request_handler = request.SmartServerRequestHandler(
684
backing_transport, commands=request.request_handlers)
685
responder = ProtocolThreeResponder(write_func)
686
message_handler = message.ConventionalRequestHandler(request_handler, responder)
687
return _ProtocolThreeBase(message_handler)
690
class _ProtocolThreeBase(_StatefulDecoder):
692
response_marker = RESPONSE_VERSION_THREE
693
request_marker = REQUEST_VERSION_THREE
695
def __init__(self, message_handler):
696
_StatefulDecoder.__init__(self)
697
self.has_dispatched = False
700
self._number_needed_bytes = 4
701
self.state_accept = self._state_accept_expecting_headers
703
self.request_handler = self.message_handler = message_handler
705
# self.excess_buffer = ''
706
# self._finished = False
707
# self.has_dispatched = False
708
# self._body_decoder = None
710
def accept_bytes(self, bytes):
712
# if 'put_non_atomic' in bytes:
713
# import pdb; pdb.set_trace()
715
if self._in_buffer is None:
717
elif len(self._in_buffer) <= 6:
718
buf_summary = repr(self._in_buffer)
720
buf_summary = repr(self._in_buffer[:3] + '...')
722
handler_name = self.message_handler.__class__.__name__
723
handler_name = handler_name[len('Conventional'):-len('Handler')]
724
state_now = self.state_accept.im_func.__name__[len('_state_accept_'):]
725
buf_now = summarise_buf()
726
#from pprint import pprint; pprint([bytes, self.__dict__])
727
self._number_needed_bytes = None
729
_StatefulDecoder.accept_bytes(self, bytes)
730
except KeyboardInterrupt:
732
except Exception, exception:
733
log_exception_quietly()
735
self.message_handler.protocol_error(exception)
736
#self._send_response(request.FailedSmartServerResponse(
737
# ('error', str(exception))))
738
pr('%s in %s(%s), got %r --> %s(%s)' % (
739
handler_name, state_now, buf_now, bytes,
740
self.state_accept.im_func.__name__[len('_state_accept_'):],
744
def _extract_length_prefixed_bytes(self):
745
if len(self._in_buffer) < 4:
746
# A length prefix by itself is 4 bytes, and we don't even have that
748
raise _NeedMoreBytes(4)
749
(length,) = struct.unpack('!L', self._in_buffer[:4])
750
end_of_bytes = 4 + length
751
if len(self._in_buffer) < end_of_bytes:
752
# We haven't yet read as many bytes as the length-prefix says there
754
raise _NeedMoreBytes(end_of_bytes)
755
# Extract the bytes from the buffer.
756
bytes = self._in_buffer[4:end_of_bytes]
757
self._in_buffer = self._in_buffer[end_of_bytes:]
760
def _extract_prefixed_bencoded_data(self):
761
prefixed_bytes = self._extract_length_prefixed_bytes()
763
decoded = bdecode(prefixed_bytes)
765
raise errors.SmartProtocolError(
766
'Bytes %r not bencoded' % (prefixed_bytes,))
769
def _extract_single_byte(self):
770
if self._in_buffer == '':
771
# The buffer is empty
772
raise _NeedMoreBytes()
773
one_byte = self._in_buffer[0]
774
self._in_buffer = self._in_buffer[1:]
777
def _state_accept_expecting_headers(self, bytes):
778
self._in_buffer += bytes
779
decoded = self._extract_prefixed_bencoded_data()
780
if type(decoded) is not dict:
781
raise errors.SmartProtocolError(
782
'Header object %r is not a dict' % (decoded,))
783
self.message_handler.headers_received(decoded)
784
self.state_accept = self._state_accept_expecting_message_part
786
def _state_accept_expecting_message_part(self, bytes):
787
#import sys; print >> sys.stderr, 'msg part bytes:', repr(bytes)
788
self._in_buffer += bytes
789
message_part_kind = self._extract_single_byte()
790
if message_part_kind == 'o':
791
self.state_accept = self._state_accept_expecting_one_byte
792
elif message_part_kind == 's':
793
self.state_accept = self._state_accept_expecting_structure
794
elif message_part_kind == 'b':
795
self.state_accept = self._state_accept_expecting_bytes
796
elif message_part_kind == 'e':
799
raise errors.SmartProtocolError(
800
'Bad message kind byte: %r' % (message_part_kind,))
801
#import sys; print >> sys.stderr, 'state:', self.state_accept, '_in_buffer:', repr(self._in_buffer)
803
def _state_accept_expecting_one_byte(self, bytes):
804
self._in_buffer += bytes
805
byte = self._extract_single_byte()
806
self.message_handler.byte_part_received(byte)
807
self.state_accept = self._state_accept_expecting_message_part
809
def _state_accept_expecting_bytes(self, bytes):
810
# XXX: this should not buffer whole message part, but instead deliver
811
# the bytes as they arrive.
812
self._in_buffer += bytes
813
prefixed_bytes = self._extract_length_prefixed_bytes()
814
self.message_handler.bytes_part_received(prefixed_bytes)
815
self.state_accept = self._state_accept_expecting_message_part
817
def _state_accept_expecting_structure(self, bytes):
818
self._in_buffer += bytes
819
structure = self._extract_prefixed_bencoded_data()
820
self.message_handler.structure_part_received(structure)
821
self.state_accept = self._state_accept_expecting_message_part
824
#import sys; print >> sys.stderr, 'Done!', repr(self._in_buffer)
825
self.unused_data = self._in_buffer
826
self._in_buffer = None
827
self.state_accept = self._state_accept_reading_unused
828
self.message_handler.end_received()
830
def _state_accept_reading_unused(self, bytes):
831
self.unused_data += bytes
834
def excess_buffer(self):
835
# XXX: this property is a compatibility hack. Really there should not
836
# be both unused_data and excess_buffer.
837
return self.unused_data
839
def next_read_size(self):
840
if self.state_accept == self._state_accept_reading_unused:
843
if self._number_needed_bytes is not None:
844
return self._number_needed_bytes - len(self._in_buffer)
849
class SmartServerRequestProtocolThree(_ProtocolThreeBase):
851
def _args_received(self, args):
853
raise errors.SmartProtocolError('Empty argument sequence')
854
self.state_accept = self._state_accept_expecting_body_kind
855
self.request_handler.args_received(args)
858
class SmartClientRequestProtocolThree(_ProtocolThreeBase, SmartClientRequestProtocolTwo):
860
response_marker = RESPONSE_VERSION_THREE
861
request_marker = REQUEST_VERSION_THREE
863
def __init__(self, client_medium_request):
864
from bzrlib.smart.message import MessageHandler
865
_ProtocolThreeBase.__init__(self, MessageHandler())
866
SmartClientRequestProtocolTwo.__init__(self, client_medium_request)
869
self.state_accept = self._state_accept_expecting_headers
870
self.response_handler = self.request_handler = self.message_handler
872
def _state_accept_expecting_response_status(self, bytes):
873
self._in_buffer += bytes
874
response_status = self._extract_single_byte()
875
if response_status not in ['S', 'F']:
876
raise errors.SmartProtocolError(
877
'Unknown response status: %r' % (response_status,))
878
self.successful_status = bool(response_status == 'S')
879
self.state_accept = self._state_accept_expecting_request_args
881
def _args_received(self, args):
882
if self.successful_status:
883
self.response_handler.args_received(args)
886
raise errors.SmartProtocolError('Empty error details')
887
self.response_handler.error_received(args)
891
# XXX: the encoding of requests and decoding responses are somewhat
892
# conflated into one class here. The protocol is half-duplex, so combining
893
# them just makes the code needlessly ugly.
895
def _write_prefixed_bencode(self, structure):
896
bytes = bencode(structure)
897
self._request.accept_bytes(struct.pack('!L', len(bytes)))
898
self._request.accept_bytes(bytes)
900
def _write_headers(self, headers=None):
902
headers = {'Software version': bzrlib.__version__}
903
self._write_prefixed_bencode(headers)
905
def _write_args(self, args):
906
self._request.accept_bytes('s')
907
self._write_prefixed_bencode(args)
909
def _write_end(self):
910
self._request.accept_bytes('e')
912
def _write_prefixed_body(self, bytes):
913
self._request.accept_bytes('b')
914
self._request.accept_bytes(struct.pack('!L', len(bytes)))
915
self._request.accept_bytes(bytes)
917
def _wait_for_request_end(self):
919
next_read_size = self.next_read_size()
920
if next_read_size == 0:
921
# a complete request has been read.
923
bytes = self._request.read_bytes(next_read_size)
925
# end of file encountered reading from server
926
raise errors.ConnectionReset(
927
"please check connectivity and permissions",
928
"(and try -Dhpss if further diagnosis is required)")
929
self.accept_bytes(bytes)
931
# these methods from SmartClientRequestProtocolOne/Two
932
def call(self, *args, **kw):
933
# XXX: ideally, signature would be call(self, *args, headers=None), but
934
# python doesn't allow that. So, we fake it.
937
headers = kw.pop('headers')
939
raise TypeError('Unexpected keyword arguments: %r' % (kw,))
940
if 'hpss' in debug.debug_flags:
941
mutter('hpss call: %s', repr(args)[1:-1])
942
if getattr(self._request._medium, 'base', None) is not None:
943
mutter(' (to %s)', self._request._medium.base)
944
self._request_start_time = time.time()
945
self._write_protocol_version()
946
self._write_headers(headers)
947
self._write_args(args)
949
self._request.finished_writing()
951
def call_with_body_bytes(self, args, body, headers=None):
952
"""Make a remote call of args with body bytes 'body'.
954
After calling this, call read_response_tuple to find the result out.
956
if 'hpss' in debug.debug_flags:
957
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
958
if getattr(self._request._medium, '_path', None) is not None:
959
mutter(' (to %s)', self._request._medium._path)
960
mutter(' %d bytes', len(body))
961
self._request_start_time = time.time()
962
self._write_protocol_version()
963
self._write_headers(headers)
964
self._write_args(args)
965
self._write_prefixed_body(body)
967
self._request.finished_writing()
969
def call_with_body_readv_array(self, args, body, headers=None):
970
"""Make a remote call with a readv array.
972
The body is encoded with one line per readv offset pair. The numbers in
973
each pair are separated by a comma, and no trailing \n is emitted.
975
if 'hpss' in debug.debug_flags:
976
mutter('hpss call w/readv: %s', repr(args)[1:-1])
977
if getattr(self._request._medium, '_path', None) is not None:
978
mutter(' (to %s)', self._request._medium._path)
979
self._request_start_time = time.time()
980
self._write_protocol_version()
981
self._write_headers(headers)
982
self._write_args(args)
983
readv_bytes = self._serialise_offsets(body)
984
self._write_prefixed_body(readv_bytes)
985
self._request.finished_writing()
986
if 'hpss' in debug.debug_flags:
987
mutter(' %d bytes in readv request', len(readv_bytes))
989
def cancel_read_body(self):
990
"""Ignored. Not relevant to version 3 of the protocol."""
992
def read_response_tuple(self, expect_body=False):
993
"""Read a response tuple from the wire.
995
The expect_body flag is ignored.
997
# XXX: warn if expect_body doesn't match the response?
998
self._wait_for_request_end()
999
if self.response_handler.error_args is not None:
1000
self._translate_error()
1001
return self.response_handler.error_args
1002
return self.response_handler.args
1004
def read_body_bytes(self, count=-1):
1005
"""Read bytes from the body, decoding into a byte stream.
1007
We read all bytes at once to ensure we've checked the trailer for
1008
errors, and then feed the buffer back as read_body_bytes is called.
1010
# XXX: don't buffer the full request
1011
self._wait_for_request_end()
1012
return self.response_handler.prefixed_body.read(count)
1014
def _translate_error(self, error_tuple):
1015
# XXX: Hmm! Need state from the request. Hmm.
1016
error_name = error_tuple[0]
1017
error_args = error_tuple[1:]
1018
if error_name == 'LockContention':
1019
raise errors.LockContention('(remote lock)')
1020
elif error_name == 'LockFailed':
1021
raise errors.LockContention(*error_args[:2])
1024
raise errors.UnexpectedSmartServerResponse('Sucktitude: %r' %
1028
class _ProtocolThreeEncoder(object):
1030
def __init__(self, write_func):
1033
pr('writing:', repr(bytes))
1034
return write_func(bytes)
1035
self._write_func = wf
1037
def _write_protocol_version(self):
1038
self._write_func(MESSAGE_VERSION_THREE)
1040
def _write_prefixed_bencode(self, structure):
1041
bytes = bencode(structure)
1042
self._write_func(struct.pack('!L', len(bytes)))
1043
self._write_func(bytes)
1045
def _write_headers(self, headers=None):
1047
headers = {'Software version': bzrlib.__version__}
1048
self._write_prefixed_bencode(headers)
1050
def _write_structure(self, args):
1051
self._write_func('s')
1054
if type(arg) is unicode:
1055
utf8_args.append(arg.encode('utf8'))
1057
utf8_args.append(arg)
1058
self._write_prefixed_bencode(utf8_args)
1060
def _write_end(self):
1061
self._write_func('e')
1063
def _write_prefixed_body(self, bytes):
1064
self._write_func('b')
1065
self._write_func(struct.pack('!L', len(bytes)))
1066
self._write_func(bytes)
1068
def _write_error_status(self):
1069
self._write_func('oE')
1071
def _write_success_status(self):
1072
self._write_func('oS')
1075
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1077
def __init__(self, write_func):
1078
_ProtocolThreeEncoder.__init__(self, write_func)
1079
self.response_sent = False
1081
def send_error(self, exception):
1082
#import sys; print >> sys.stderr, 'exc:', str(exception); return #XXX
1083
assert not self.response_sent
1084
self.response_sent = True
1085
self._write_headers()
1086
self._write_error_status()
1087
self._write_structure(('error', str(exception)))
1090
def send_response(self, response):
1091
#import sys; print >> sys.stderr, 'rsp:', str(response)
1092
assert not self.response_sent
1093
self.response_sent = True
1094
self._write_headers()
1095
if response.is_successful():
1096
self._write_success_status()
1098
self._write_error_status()
1099
self._write_structure(response.args)
1100
if response.body is not None:
1101
self._write_prefixed_body(response.body)
1102
elif response.body_stream is not None:
1103
for chunk in response.body_stream:
1104
self._write_prefixed_body(chunk)
1108
class ProtocolThreeRequester(_ProtocolThreeEncoder):
1110
def __init__(self, medium_request):
1111
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1112
self._medium_request = medium_request
1114
# def _wait_for_request_end(self):
1117
# next_read_size = self.next_read_size()
1118
# if next_read_size == 0:
1119
# # a complete request has been read.
1121
# bytes = self._request.read_bytes(next_read_size)
1123
# # end of file encountered reading from server
1124
# raise errors.ConnectionReset(
1125
# "please check connectivity and permissions",
1126
# "(and try -Dhpss if further diagnosis is required)")
1127
# self.accept_bytes(bytes)
1129
# these methods from SmartClientRequestProtocolOne/Two
1130
def call(self, *args, **kw):
1131
# XXX: ideally, signature would be call(self, *args, headers=None), but
1132
# python doesn't allow that. So, we fake it.
1135
headers = kw.pop('headers')
1137
raise TypeError('Unexpected keyword arguments: %r' % (kw,))
1138
if 'hpss' in debug.debug_flags:
1139
mutter('hpss call: %s', repr(args)[1:-1])
1140
base = getattr(self._medium_request._medium, 'base', None)
1141
if base is not None:
1142
mutter(' (to %s)', base)
1143
self._request_start_time = time.time()
1144
self._write_protocol_version()
1145
self._write_headers(headers)
1146
self._write_structure(args)
1148
self._medium_request.finished_writing()
1150
def call_with_body_bytes(self, args, body, headers=None):
1151
"""Make a remote call of args with body bytes 'body'.
1153
After calling this, call read_response_tuple to find the result out.
1155
if 'hpss' in debug.debug_flags:
1156
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
1157
if getattr(self._request._medium, '_path', None) is not None:
1158
mutter(' (to %s)', self._request._medium._path)
1159
mutter(' %d bytes', len(body))
1160
self._request_start_time = time.time()
1161
pr('call_with_body_bytes: %r, %r' % (args, body))
1162
self._write_protocol_version()
1163
self._write_headers(headers)
1164
self._write_structure(args)
1165
self._write_prefixed_body(body)
1167
self._medium_request.finished_writing()
1169
def call_with_body_readv_array(self, args, body, headers=None):
1170
"""Make a remote call with a readv array.
1172
The body is encoded with one line per readv offset pair. The numbers in
1173
each pair are separated by a comma, and no trailing \n is emitted.
1175
if 'hpss' in debug.debug_flags:
1176
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1177
if getattr(self._request._medium, '_path', None) is not None:
1178
mutter(' (to %s)', self._request._medium._path)
1179
self._request_start_time = time.time()
1180
self._write_protocol_version()
1181
self._write_headers(headers)
1182
self._write_structure(args)
1183
readv_bytes = self._serialise_offsets(body)
1184
self._write_prefixed_body(readv_bytes)
1185
self._request.finished_writing()
1186
if 'hpss' in debug.debug_flags:
1187
mutter(' %d bytes in readv request', len(readv_bytes))
1189
# def cancel_read_body(self):
1190
# """Ignored. Not relevant to version 3 of the protocol."""
1192
# def read_response_tuple(self, expect_body=False):
1193
# """Read a response tuple from the wire.
1195
# The expect_body flag is ignored.
1197
# # XXX: warn if expect_body doesn't match the response?
1198
# self._wait_for_request_end()
1199
# if self.response_handler.error_args is not None:
1200
# xxx_translate_error()
1201
# return self.response_handler.args
1203
# def read_body_bytes(self, count=-1):
1204
# """Read bytes from the body, decoding into a byte stream.
1206
# We read all bytes at once to ensure we've checked the trailer for
1207
# errors, and then feed the buffer back as read_body_bytes is called.
1209
# # XXX: don't buffer the full request
1210
# self._wait_for_request_end()
1211
# return self.response_handler.prefixed_body.read(count)
1214
from thread import get_ident
1217
print '%x' % get_ident(),