150
235
return self._body_decoder.next_read_size()
153
class LengthPrefixedBodyDecoder(object):
154
"""Decodes the length-prefixed bulk data."""
238
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
239
r"""Version two of the server side of the smart protocol.
241
This prefixes responses with the value of RESPONSE_VERSION_TWO.
244
response_marker = RESPONSE_VERSION_TWO
245
request_marker = REQUEST_VERSION_TWO
247
def _write_success_or_failure_prefix(self, response):
248
"""Write the protocol specific success/failure prefix."""
249
if response.is_successful():
250
self._write_func('success\n')
252
self._write_func('failed\n')
254
def _write_protocol_version(self):
255
r"""Write any prefixes this protocol requires.
257
Version two sends the value of RESPONSE_VERSION_TWO.
259
self._write_func(self.response_marker)
261
def _send_response(self, response):
262
"""Send a smart server response down the output stream."""
264
raise AssertionError('response already sent')
265
self._finished = True
266
self._write_protocol_version()
267
self._write_success_or_failure_prefix(response)
268
self._write_func(_encode_tuple(response.args))
269
if response.body is not None:
270
if not isinstance(response.body, str):
271
raise AssertionError('body must be a str')
272
if not (response.body_stream is None):
273
raise AssertionError(
274
'body_stream and body cannot both be set')
275
bytes = self._encode_bulk_data(response.body)
276
self._write_func(bytes)
277
elif response.body_stream is not None:
278
_send_stream(response.body_stream, self._write_func)
281
def _send_stream(stream, write_func):
282
write_func('chunked\n')
283
_send_chunks(stream, write_func)
287
def _send_chunks(stream, write_func):
289
if isinstance(chunk, str):
290
bytes = "%x\n%s" % (len(chunk), chunk)
292
elif isinstance(chunk, request.FailedSmartServerResponse):
294
_send_chunks(chunk.args, write_func)
297
raise errors.BzrError(
298
'Chunks must be str or FailedSmartServerResponse, got %r'
302
class _NeedMoreBytes(Exception):
303
"""Raise this inside a _StatefulDecoder to stop decoding until more bytes
307
def __init__(self, count=None):
310
:param count: the total number of bytes needed by the current state.
311
May be None if the number of bytes needed is unknown.
316
class _StatefulDecoder(object):
317
"""Base class for writing state machines to decode byte streams.
319
Subclasses should provide a self.state_accept attribute that accepts bytes
320
and, if appropriate, updates self.state_accept to a different function.
321
accept_bytes will call state_accept as often as necessary to make sure the
322
state machine has progressed as far as possible before it returns.
324
See ProtocolThreeDecoder for an example subclass.
156
327
def __init__(self):
328
self.finished_reading = False
329
self._in_buffer_list = []
330
self._in_buffer_len = 0
331
self.unused_data = ''
157
332
self.bytes_left = None
158
self.finished_reading = False
159
self.unused_data = ''
160
self.state_accept = self._state_accept_expecting_length
161
self.state_read = self._state_read_no_data
163
self._trailer_buffer = ''
333
self._number_needed_bytes = None
335
def _get_in_buffer(self):
336
if len(self._in_buffer_list) == 1:
337
return self._in_buffer_list[0]
338
in_buffer = ''.join(self._in_buffer_list)
339
if len(in_buffer) != self._in_buffer_len:
340
raise AssertionError(
341
"Length of buffer did not match expected value: %s != %s"
342
% self._in_buffer_len, len(in_buffer))
343
self._in_buffer_list = [in_buffer]
346
def _get_in_bytes(self, count):
347
"""Grab X bytes from the input_buffer.
349
Callers should have already checked that self._in_buffer_len is >
350
count. Note, this does not consume the bytes from the buffer. The
351
caller will still need to call _get_in_buffer() and then
352
_set_in_buffer() if they actually need to consume the bytes.
354
# check if we can yield the bytes from just the first entry in our list
355
if len(self._in_buffer_list) == 0:
356
raise AssertionError('Callers must be sure we have buffered bytes'
357
' before calling _get_in_bytes')
358
if len(self._in_buffer_list[0]) > count:
359
return self._in_buffer_list[0][:count]
360
# We can't yield it from the first buffer, so collapse all buffers, and
362
in_buf = self._get_in_buffer()
363
return in_buf[:count]
365
def _set_in_buffer(self, new_buf):
366
if new_buf is not None:
367
self._in_buffer_list = [new_buf]
368
self._in_buffer_len = len(new_buf)
370
self._in_buffer_list = []
371
self._in_buffer_len = 0
165
373
def accept_bytes(self, bytes):
166
374
"""Decode as much of bytes as possible.
172
380
data will be appended to self.unused_data.
174
382
# accept_bytes is allowed to change the state
175
current_state = self.state_accept
176
self.state_accept(bytes)
177
while current_state != self.state_accept:
383
self._number_needed_bytes = None
384
# lsprof puts a very large amount of time on this specific call for
386
self._in_buffer_list.append(bytes)
387
self._in_buffer_len += len(bytes)
389
# Run the function for the current state.
178
390
current_state = self.state_accept
179
self.state_accept('')
392
while current_state != self.state_accept:
393
# The current state has changed. Run the function for the new
394
# current state, so that it can:
395
# - decode any unconsumed bytes left in a buffer, and
396
# - signal how many more bytes are expected (via raising
398
current_state = self.state_accept
400
except _NeedMoreBytes, e:
401
self._number_needed_bytes = e.count
404
class ChunkedBodyDecoder(_StatefulDecoder):
405
"""Decoder for chunked body data.
407
This is very similar the HTTP's chunked encoding. See the description of
408
streamed body data in `doc/developers/network-protocol.txt` for details.
412
_StatefulDecoder.__init__(self)
413
self.state_accept = self._state_accept_expecting_header
414
self.chunk_in_progress = None
415
self.chunks = collections.deque()
417
self.error_in_progress = None
419
def next_read_size(self):
420
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
421
# end-of-body marker is 4 bytes: 'END\n'.
422
if self.state_accept == self._state_accept_reading_chunk:
423
# We're expecting more chunk content. So we're expecting at least
424
# the rest of this chunk plus an END chunk.
425
return self.bytes_left + 4
426
elif self.state_accept == self._state_accept_expecting_length:
427
if self._in_buffer_len == 0:
428
# We're expecting a chunk length. There's at least two bytes
429
# left: a digit plus '\n'.
432
# We're in the middle of reading a chunk length. So there's at
433
# least one byte left, the '\n' that terminates the length.
435
elif self.state_accept == self._state_accept_reading_unused:
437
elif self.state_accept == self._state_accept_expecting_header:
438
return max(0, len('chunked\n') - self._in_buffer_len)
440
raise AssertionError("Impossible state: %r" % (self.state_accept,))
442
def read_next_chunk(self):
444
return self.chunks.popleft()
448
def _extract_line(self):
449
in_buf = self._get_in_buffer()
450
pos = in_buf.find('\n')
452
# We haven't read a complete line yet, so request more bytes before
454
raise _NeedMoreBytes(1)
456
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
457
self._set_in_buffer(in_buf[pos+1:])
461
self.unused_data = self._get_in_buffer()
462
self._in_buffer_list = []
463
self._in_buffer_len = 0
464
self.state_accept = self._state_accept_reading_unused
466
error_args = tuple(self.error_in_progress)
467
self.chunks.append(request.FailedSmartServerResponse(error_args))
468
self.error_in_progress = None
469
self.finished_reading = True
471
def _state_accept_expecting_header(self):
472
prefix = self._extract_line()
473
if prefix == 'chunked':
474
self.state_accept = self._state_accept_expecting_length
476
raise errors.SmartProtocolError(
477
'Bad chunked body header: "%s"' % (prefix,))
479
def _state_accept_expecting_length(self):
480
prefix = self._extract_line()
483
self.error_in_progress = []
484
self._state_accept_expecting_length()
486
elif prefix == 'END':
487
# We've read the end-of-body marker.
488
# Any further bytes are unused data, including the bytes left in
493
self.bytes_left = int(prefix, 16)
494
self.chunk_in_progress = ''
495
self.state_accept = self._state_accept_reading_chunk
497
def _state_accept_reading_chunk(self):
498
in_buf = self._get_in_buffer()
499
in_buffer_len = len(in_buf)
500
self.chunk_in_progress += in_buf[:self.bytes_left]
501
self._set_in_buffer(in_buf[self.bytes_left:])
502
self.bytes_left -= in_buffer_len
503
if self.bytes_left <= 0:
504
# Finished with chunk
505
self.bytes_left = None
507
self.error_in_progress.append(self.chunk_in_progress)
509
self.chunks.append(self.chunk_in_progress)
510
self.chunk_in_progress = None
511
self.state_accept = self._state_accept_expecting_length
513
def _state_accept_reading_unused(self):
514
self.unused_data += self._get_in_buffer()
515
self._in_buffer_list = []
518
class LengthPrefixedBodyDecoder(_StatefulDecoder):
519
"""Decodes the length-prefixed bulk data."""
522
_StatefulDecoder.__init__(self)
523
self.state_accept = self._state_accept_expecting_length
524
self.state_read = self._state_read_no_data
526
self._trailer_buffer = ''
181
528
def next_read_size(self):
182
529
if self.bytes_left is not None:
336
781
resp = self.read_response_tuple()
337
782
if resp == ('ok', '1'):
784
elif resp == ('ok', '2'):
340
787
raise errors.SmartProtocolError("bad response %r" % (resp,))
789
def _write_args(self, args):
790
self._write_protocol_version()
791
bytes = _encode_tuple(args)
792
self._request.accept_bytes(bytes)
794
def _write_protocol_version(self):
795
"""Write any prefixes this protocol requires.
797
Version one doesn't send protocol versions.
801
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
802
"""Version two of the client side of the smart protocol.
804
This prefixes the request with the value of REQUEST_VERSION_TWO.
807
response_marker = RESPONSE_VERSION_TWO
808
request_marker = REQUEST_VERSION_TWO
810
def read_response_tuple(self, expect_body=False):
811
"""Read a response tuple from the wire.
813
This should only be called once.
815
version = self._request.read_line()
816
if version != self.response_marker:
817
self._request.finished_reading()
818
raise errors.UnexpectedProtocolVersionMarker(version)
819
response_status = self._request.read_line()
820
result = SmartClientRequestProtocolOne._read_response_tuple(self)
821
self._response_is_unknown_method(result)
822
if response_status == 'success\n':
823
self.response_status = True
825
self._request.finished_reading()
827
elif response_status == 'failed\n':
828
self.response_status = False
829
self._request.finished_reading()
830
raise errors.ErrorFromSmartServer(result)
832
raise errors.SmartProtocolError(
833
'bad protocol status %r' % response_status)
835
def _write_protocol_version(self):
836
"""Write any prefixes this protocol requires.
838
Version two sends the value of REQUEST_VERSION_TWO.
840
self._request.accept_bytes(self.request_marker)
842
def read_streamed_body(self):
843
"""Read bytes from the body, decoding into a byte stream.
845
# Read no more than 64k at a time so that we don't risk error 10055 (no
846
# buffer space available) on Windows.
847
_body_decoder = ChunkedBodyDecoder()
848
while not _body_decoder.finished_reading:
849
bytes = self._request.read_bytes(_body_decoder.next_read_size())
851
# end of file encountered reading from server
852
raise errors.ConnectionReset(
853
"Connection lost while reading streamed body.")
854
_body_decoder.accept_bytes(bytes)
855
for body_bytes in iter(_body_decoder.read_next_chunk, None):
856
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
857
mutter(' %d byte chunk read',
860
self._request.finished_reading()
863
def build_server_protocol_three(backing_transport, write_func,
864
root_client_path, jail_root=None):
865
request_handler = request.SmartServerRequestHandler(
866
backing_transport, commands=request.request_handlers,
867
root_client_path=root_client_path, jail_root=jail_root)
868
responder = ProtocolThreeResponder(write_func)
869
message_handler = message.ConventionalRequestHandler(request_handler, responder)
870
return ProtocolThreeDecoder(message_handler)
873
class ProtocolThreeDecoder(_StatefulDecoder):
875
response_marker = RESPONSE_VERSION_THREE
876
request_marker = REQUEST_VERSION_THREE
878
def __init__(self, message_handler, expect_version_marker=False):
879
_StatefulDecoder.__init__(self)
880
self._has_dispatched = False
882
if expect_version_marker:
883
self.state_accept = self._state_accept_expecting_protocol_version
884
# We're expecting at least the protocol version marker + some
886
self._number_needed_bytes = len(MESSAGE_VERSION_THREE) + 4
888
self.state_accept = self._state_accept_expecting_headers
889
self._number_needed_bytes = 4
890
self.decoding_failed = False
891
self.request_handler = self.message_handler = message_handler
893
def accept_bytes(self, bytes):
894
self._number_needed_bytes = None
896
_StatefulDecoder.accept_bytes(self, bytes)
897
except KeyboardInterrupt:
899
except errors.SmartMessageHandlerError, exception:
900
# We do *not* set self.decoding_failed here. The message handler
901
# has raised an error, but the decoder is still able to parse bytes
902
# and determine when this message ends.
903
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
904
log_exception_quietly()
905
self.message_handler.protocol_error(exception.exc_value)
906
# The state machine is ready to continue decoding, but the
907
# exception has interrupted the loop that runs the state machine.
908
# So we call accept_bytes again to restart it.
909
self.accept_bytes('')
910
except Exception, exception:
911
# The decoder itself has raised an exception. We cannot continue
913
self.decoding_failed = True
914
if isinstance(exception, errors.UnexpectedProtocolVersionMarker):
915
# This happens during normal operation when the client tries a
916
# protocol version the server doesn't understand, so no need to
917
# log a traceback every time.
918
# Note that this can only happen when
919
# expect_version_marker=True, which is only the case on the
923
log_exception_quietly()
924
self.message_handler.protocol_error(exception)
926
def _extract_length_prefixed_bytes(self):
927
if self._in_buffer_len < 4:
928
# A length prefix by itself is 4 bytes, and we don't even have that
930
raise _NeedMoreBytes(4)
931
(length,) = struct.unpack('!L', self._get_in_bytes(4))
932
end_of_bytes = 4 + length
933
if self._in_buffer_len < end_of_bytes:
934
# We haven't yet read as many bytes as the length-prefix says there
936
raise _NeedMoreBytes(end_of_bytes)
937
# Extract the bytes from the buffer.
938
in_buf = self._get_in_buffer()
939
bytes = in_buf[4:end_of_bytes]
940
self._set_in_buffer(in_buf[end_of_bytes:])
943
def _extract_prefixed_bencoded_data(self):
944
prefixed_bytes = self._extract_length_prefixed_bytes()
946
decoded = bdecode_as_tuple(prefixed_bytes)
948
raise errors.SmartProtocolError(
949
'Bytes %r not bencoded' % (prefixed_bytes,))
952
def _extract_single_byte(self):
953
if self._in_buffer_len == 0:
954
# The buffer is empty
955
raise _NeedMoreBytes(1)
956
in_buf = self._get_in_buffer()
958
self._set_in_buffer(in_buf[1:])
961
def _state_accept_expecting_protocol_version(self):
962
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
963
in_buf = self._get_in_buffer()
965
# We don't have enough bytes to check if the protocol version
966
# marker is right. But we can check if it is already wrong by
967
# checking that the start of MESSAGE_VERSION_THREE matches what
969
# [In fact, if the remote end isn't bzr we might never receive
970
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
971
# are wrong then we should just raise immediately rather than
973
if not MESSAGE_VERSION_THREE.startswith(in_buf):
974
# We have enough bytes to know the protocol version is wrong
975
raise errors.UnexpectedProtocolVersionMarker(in_buf)
976
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
977
if not in_buf.startswith(MESSAGE_VERSION_THREE):
978
raise errors.UnexpectedProtocolVersionMarker(in_buf)
979
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
980
self.state_accept = self._state_accept_expecting_headers
982
def _state_accept_expecting_headers(self):
983
decoded = self._extract_prefixed_bencoded_data()
984
if type(decoded) is not dict:
985
raise errors.SmartProtocolError(
986
'Header object %r is not a dict' % (decoded,))
987
self.state_accept = self._state_accept_expecting_message_part
989
self.message_handler.headers_received(decoded)
991
raise errors.SmartMessageHandlerError(sys.exc_info())
993
def _state_accept_expecting_message_part(self):
994
message_part_kind = self._extract_single_byte()
995
if message_part_kind == 'o':
996
self.state_accept = self._state_accept_expecting_one_byte
997
elif message_part_kind == 's':
998
self.state_accept = self._state_accept_expecting_structure
999
elif message_part_kind == 'b':
1000
self.state_accept = self._state_accept_expecting_bytes
1001
elif message_part_kind == 'e':
1004
raise errors.SmartProtocolError(
1005
'Bad message kind byte: %r' % (message_part_kind,))
1007
def _state_accept_expecting_one_byte(self):
1008
byte = self._extract_single_byte()
1009
self.state_accept = self._state_accept_expecting_message_part
1011
self.message_handler.byte_part_received(byte)
1013
raise errors.SmartMessageHandlerError(sys.exc_info())
1015
def _state_accept_expecting_bytes(self):
1016
# XXX: this should not buffer whole message part, but instead deliver
1017
# the bytes as they arrive.
1018
prefixed_bytes = self._extract_length_prefixed_bytes()
1019
self.state_accept = self._state_accept_expecting_message_part
1021
self.message_handler.bytes_part_received(prefixed_bytes)
1023
raise errors.SmartMessageHandlerError(sys.exc_info())
1025
def _state_accept_expecting_structure(self):
1026
structure = self._extract_prefixed_bencoded_data()
1027
self.state_accept = self._state_accept_expecting_message_part
1029
self.message_handler.structure_part_received(structure)
1031
raise errors.SmartMessageHandlerError(sys.exc_info())
1034
self.unused_data = self._get_in_buffer()
1035
self._set_in_buffer(None)
1036
self.state_accept = self._state_accept_reading_unused
1038
self.message_handler.end_received()
1040
raise errors.SmartMessageHandlerError(sys.exc_info())
1042
def _state_accept_reading_unused(self):
1043
self.unused_data += self._get_in_buffer()
1044
self._set_in_buffer(None)
1046
def next_read_size(self):
1047
if self.state_accept == self._state_accept_reading_unused:
1049
elif self.decoding_failed:
1050
# An exception occured while processing this message, probably from
1051
# self.message_handler. We're not sure that this state machine is
1052
# in a consistent state, so just signal that we're done (i.e. give
1056
if self._number_needed_bytes is not None:
1057
return self._number_needed_bytes - self._in_buffer_len
1059
raise AssertionError("don't know how many bytes are expected!")
1062
class _ProtocolThreeEncoder(object):
1064
response_marker = request_marker = MESSAGE_VERSION_THREE
1066
def __init__(self, write_func):
1068
self._real_write_func = write_func
1070
def _write_func(self, bytes):
1071
self._buf.append(bytes)
1072
if len(self._buf) > 100:
1077
self._real_write_func(''.join(self._buf))
1080
def _serialise_offsets(self, offsets):
1081
"""Serialise a readv offset list."""
1083
for start, length in offsets:
1084
txt.append('%d,%d' % (start, length))
1085
return '\n'.join(txt)
1087
def _write_protocol_version(self):
1088
self._write_func(MESSAGE_VERSION_THREE)
1090
def _write_prefixed_bencode(self, structure):
1091
bytes = bencode(structure)
1092
self._write_func(struct.pack('!L', len(bytes)))
1093
self._write_func(bytes)
1095
def _write_headers(self, headers):
1096
self._write_prefixed_bencode(headers)
1098
def _write_structure(self, args):
1099
self._write_func('s')
1102
if type(arg) is unicode:
1103
utf8_args.append(arg.encode('utf8'))
1105
utf8_args.append(arg)
1106
self._write_prefixed_bencode(utf8_args)
1108
def _write_end(self):
1109
self._write_func('e')
1112
def _write_prefixed_body(self, bytes):
1113
self._write_func('b')
1114
self._write_func(struct.pack('!L', len(bytes)))
1115
self._write_func(bytes)
1117
def _write_chunked_body_start(self):
1118
self._write_func('oC')
1120
def _write_error_status(self):
1121
self._write_func('oE')
1123
def _write_success_status(self):
1124
self._write_func('oS')
1127
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1129
def __init__(self, write_func):
1130
_ProtocolThreeEncoder.__init__(self, write_func)
1131
self.response_sent = False
1132
self._headers = {'Software version': bzrlib.__version__}
1134
def send_error(self, exception):
1135
if self.response_sent:
1136
raise AssertionError(
1137
"send_error(%s) called, but response already sent."
1139
if isinstance(exception, errors.UnknownSmartMethod):
1140
failure = request.FailedSmartServerResponse(
1141
('UnknownMethod', exception.verb))
1142
self.send_response(failure)
1144
self.response_sent = True
1145
self._write_protocol_version()
1146
self._write_headers(self._headers)
1147
self._write_error_status()
1148
self._write_structure(('error', str(exception)))
1151
def send_response(self, response):
1152
if self.response_sent:
1153
raise AssertionError(
1154
"send_response(%r) called, but response already sent."
1156
self.response_sent = True
1157
self._write_protocol_version()
1158
self._write_headers(self._headers)
1159
if response.is_successful():
1160
self._write_success_status()
1162
self._write_error_status()
1163
self._write_structure(response.args)
1164
if response.body is not None:
1165
self._write_prefixed_body(response.body)
1166
elif response.body_stream is not None:
1167
for exc_info, chunk in _iter_with_errors(response.body_stream):
1168
if exc_info is not None:
1169
self._write_error_status()
1170
error_struct = request._translate_error(exc_info[1])
1171
self._write_structure(error_struct)
1174
if isinstance(chunk, request.FailedSmartServerResponse):
1175
self._write_error_status()
1176
self._write_structure(chunk.args)
1178
self._write_prefixed_body(chunk)
1182
def _iter_with_errors(iterable):
1183
"""Handle errors from iterable.next().
1187
for exc_info, value in _iter_with_errors(iterable):
1190
This is a safer alternative to::
1193
for value in iterable:
1198
Because the latter will catch errors from the for-loop body, not just
1201
If an error occurs, exc_info will be a exc_info tuple, and the generator
1202
will terminate. Otherwise exc_info will be None, and value will be the
1203
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1204
will not be itercepted.
1206
iterator = iter(iterable)
1209
yield None, iterator.next()
1210
except StopIteration:
1212
except (KeyboardInterrupt, SystemExit):
1215
mutter('_iter_with_errors caught error')
1216
log_exception_quietly()
1217
yield sys.exc_info(), None
1221
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1223
def __init__(self, medium_request):
1224
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1225
self._medium_request = medium_request
1228
def set_headers(self, headers):
1229
self._headers = headers.copy()
1231
def call(self, *args):
1232
if 'hpss' in debug.debug_flags:
1233
mutter('hpss call: %s', repr(args)[1:-1])
1234
base = getattr(self._medium_request._medium, 'base', None)
1235
if base is not None:
1236
mutter(' (to %s)', base)
1237
self._request_start_time = time.time()
1238
self._write_protocol_version()
1239
self._write_headers(self._headers)
1240
self._write_structure(args)
1242
self._medium_request.finished_writing()
1244
def call_with_body_bytes(self, args, body):
1245
"""Make a remote call of args with body bytes 'body'.
1247
After calling this, call read_response_tuple to find the result out.
1249
if 'hpss' in debug.debug_flags:
1250
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
1251
path = getattr(self._medium_request._medium, '_path', None)
1252
if path is not None:
1253
mutter(' (to %s)', path)
1254
mutter(' %d bytes', len(body))
1255
self._request_start_time = time.time()
1256
self._write_protocol_version()
1257
self._write_headers(self._headers)
1258
self._write_structure(args)
1259
self._write_prefixed_body(body)
1261
self._medium_request.finished_writing()
1263
def call_with_body_readv_array(self, args, body):
1264
"""Make a remote call with a readv array.
1266
The body is encoded with one line per readv offset pair. The numbers in
1267
each pair are separated by a comma, and no trailing \n is emitted.
1269
if 'hpss' in debug.debug_flags:
1270
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1271
path = getattr(self._medium_request._medium, '_path', None)
1272
if path is not None:
1273
mutter(' (to %s)', path)
1274
self._request_start_time = time.time()
1275
self._write_protocol_version()
1276
self._write_headers(self._headers)
1277
self._write_structure(args)
1278
readv_bytes = self._serialise_offsets(body)
1279
if 'hpss' in debug.debug_flags:
1280
mutter(' %d bytes in readv request', len(readv_bytes))
1281
self._write_prefixed_body(readv_bytes)
1283
self._medium_request.finished_writing()
1285
def call_with_body_stream(self, args, stream):
1286
if 'hpss' in debug.debug_flags:
1287
mutter('hpss call w/body stream: %r', args)
1288
path = getattr(self._medium_request._medium, '_path', None)
1289
if path is not None:
1290
mutter(' (to %s)', path)
1291
self._request_start_time = time.time()
1292
self._write_protocol_version()
1293
self._write_headers(self._headers)
1294
self._write_structure(args)
1295
# TODO: notice if the server has sent an early error reply before we
1296
# have finished sending the stream. We would notice at the end
1297
# anyway, but if the medium can deliver it early then it's good
1298
# to short-circuit the whole request...
1299
for exc_info, part in _iter_with_errors(stream):
1300
if exc_info is not None:
1301
# Iterating the stream failed. Cleanly abort the request.
1302
self._write_error_status()
1303
# Currently the client unconditionally sends ('error',) as the
1305
self._write_structure(('error',))
1307
self._medium_request.finished_writing()
1308
raise exc_info[0], exc_info[1], exc_info[2]
1310
self._write_prefixed_body(part)
1313
self._medium_request.finished_writing()