251
192
def _write_protocol_version(self):
252
193
r"""Write any prefixes this protocol requires.
254
195
Version two sends the value of RESPONSE_VERSION_TWO.
256
self._write_func(self.response_marker)
258
def _send_response(self, response):
259
"""Send a smart server response down the output stream."""
261
raise AssertionError('response already sent')
262
self._finished = True
263
self._write_protocol_version()
264
self._write_success_or_failure_prefix(response)
265
self._write_func(_encode_tuple(response.args))
266
if response.body is not None:
267
if not isinstance(response.body, str):
268
raise AssertionError('body must be a str')
269
if not (response.body_stream is None):
270
raise AssertionError(
271
'body_stream and body cannot both be set')
272
bytes = self._encode_bulk_data(response.body)
273
self._write_func(bytes)
274
elif response.body_stream is not None:
275
_send_stream(response.body_stream, self._write_func)
278
def _send_stream(stream, write_func):
279
write_func('chunked\n')
280
_send_chunks(stream, write_func)
284
def _send_chunks(stream, write_func):
286
if isinstance(chunk, str):
287
bytes = "%x\n%s" % (len(chunk), chunk)
289
elif isinstance(chunk, request.FailedSmartServerResponse):
291
_send_chunks(chunk.args, write_func)
294
raise errors.BzrError(
295
'Chunks must be str or FailedSmartServerResponse, got %r'
299
class _NeedMoreBytes(Exception):
300
"""Raise this inside a _StatefulDecoder to stop decoding until more bytes
304
def __init__(self, count=None):
307
:param count: the total number of bytes needed by the current state.
308
May be None if the number of bytes needed is unknown.
313
class _StatefulDecoder(object):
314
"""Base class for writing state machines to decode byte streams.
316
Subclasses should provide a self.state_accept attribute that accepts bytes
317
and, if appropriate, updates self.state_accept to a different function.
318
accept_bytes will call state_accept as often as necessary to make sure the
319
state machine has progressed as far as possible before it returns.
321
See ProtocolThreeDecoder for an example subclass.
197
self._write_func(RESPONSE_VERSION_TWO)
200
class LengthPrefixedBodyDecoder(object):
201
"""Decodes the length-prefixed bulk data."""
324
203
def __init__(self):
204
self.bytes_left = None
325
205
self.finished_reading = False
326
self._in_buffer_list = []
327
self._in_buffer_len = 0
328
206
self.unused_data = ''
329
self.bytes_left = None
330
self._number_needed_bytes = None
332
def _get_in_buffer(self):
333
if len(self._in_buffer_list) == 1:
334
return self._in_buffer_list[0]
335
in_buffer = ''.join(self._in_buffer_list)
336
if len(in_buffer) != self._in_buffer_len:
337
raise AssertionError(
338
"Length of buffer did not match expected value: %s != %s"
339
% self._in_buffer_len, len(in_buffer))
340
self._in_buffer_list = [in_buffer]
343
def _get_in_bytes(self, count):
344
"""Grab X bytes from the input_buffer.
346
Callers should have already checked that self._in_buffer_len is >
347
count. Note, this does not consume the bytes from the buffer. The
348
caller will still need to call _get_in_buffer() and then
349
_set_in_buffer() if they actually need to consume the bytes.
351
# check if we can yield the bytes from just the first entry in our list
352
if len(self._in_buffer_list) == 0:
353
raise AssertionError('Callers must be sure we have buffered bytes'
354
' before calling _get_in_bytes')
355
if len(self._in_buffer_list[0]) > count:
356
return self._in_buffer_list[0][:count]
357
# We can't yield it from the first buffer, so collapse all buffers, and
359
in_buf = self._get_in_buffer()
360
return in_buf[:count]
362
def _set_in_buffer(self, new_buf):
363
if new_buf is not None:
364
self._in_buffer_list = [new_buf]
365
self._in_buffer_len = len(new_buf)
367
self._in_buffer_list = []
368
self._in_buffer_len = 0
207
self.state_accept = self._state_accept_expecting_length
208
self.state_read = self._state_read_no_data
210
self._trailer_buffer = ''
370
212
def accept_bytes(self, bytes):
371
213
"""Decode as much of bytes as possible.
377
219
data will be appended to self.unused_data.
379
221
# accept_bytes is allowed to change the state
380
self._number_needed_bytes = None
381
# lsprof puts a very large amount of time on this specific call for
383
self._in_buffer_list.append(bytes)
384
self._in_buffer_len += len(bytes)
386
# Run the function for the current state.
222
current_state = self.state_accept
223
self.state_accept(bytes)
224
while current_state != self.state_accept:
387
225
current_state = self.state_accept
389
while current_state != self.state_accept:
390
# The current state has changed. Run the function for the new
391
# current state, so that it can:
392
# - decode any unconsumed bytes left in a buffer, and
393
# - signal how many more bytes are expected (via raising
395
current_state = self.state_accept
397
except _NeedMoreBytes, e:
398
self._number_needed_bytes = e.count
401
class ChunkedBodyDecoder(_StatefulDecoder):
402
"""Decoder for chunked body data.
404
This is very similar the HTTP's chunked encoding. See the description of
405
streamed body data in `doc/developers/network-protocol.txt` for details.
409
_StatefulDecoder.__init__(self)
410
self.state_accept = self._state_accept_expecting_header
411
self.chunk_in_progress = None
412
self.chunks = collections.deque()
414
self.error_in_progress = None
416
def next_read_size(self):
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
# end-of-body marker is 4 bytes: 'END\n'.
419
if self.state_accept == self._state_accept_reading_chunk:
420
# We're expecting more chunk content. So we're expecting at least
421
# the rest of this chunk plus an END chunk.
422
return self.bytes_left + 4
423
elif self.state_accept == self._state_accept_expecting_length:
424
if self._in_buffer_len == 0:
425
# We're expecting a chunk length. There's at least two bytes
426
# left: a digit plus '\n'.
429
# We're in the middle of reading a chunk length. So there's at
430
# least one byte left, the '\n' that terminates the length.
432
elif self.state_accept == self._state_accept_reading_unused:
434
elif self.state_accept == self._state_accept_expecting_header:
435
return max(0, len('chunked\n') - self._in_buffer_len)
437
raise AssertionError("Impossible state: %r" % (self.state_accept,))
439
def read_next_chunk(self):
441
return self.chunks.popleft()
445
def _extract_line(self):
446
in_buf = self._get_in_buffer()
447
pos = in_buf.find('\n')
449
# We haven't read a complete line yet, so request more bytes before
451
raise _NeedMoreBytes(1)
453
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
454
self._set_in_buffer(in_buf[pos+1:])
458
self.unused_data = self._get_in_buffer()
459
self._in_buffer_list = []
460
self._in_buffer_len = 0
461
self.state_accept = self._state_accept_reading_unused
463
error_args = tuple(self.error_in_progress)
464
self.chunks.append(request.FailedSmartServerResponse(error_args))
465
self.error_in_progress = None
466
self.finished_reading = True
468
def _state_accept_expecting_header(self):
469
prefix = self._extract_line()
470
if prefix == 'chunked':
471
self.state_accept = self._state_accept_expecting_length
473
raise errors.SmartProtocolError(
474
'Bad chunked body header: "%s"' % (prefix,))
476
def _state_accept_expecting_length(self):
477
prefix = self._extract_line()
480
self.error_in_progress = []
481
self._state_accept_expecting_length()
483
elif prefix == 'END':
484
# We've read the end-of-body marker.
485
# Any further bytes are unused data, including the bytes left in
490
self.bytes_left = int(prefix, 16)
491
self.chunk_in_progress = ''
492
self.state_accept = self._state_accept_reading_chunk
494
def _state_accept_reading_chunk(self):
495
in_buf = self._get_in_buffer()
496
in_buffer_len = len(in_buf)
497
self.chunk_in_progress += in_buf[:self.bytes_left]
498
self._set_in_buffer(in_buf[self.bytes_left:])
499
self.bytes_left -= in_buffer_len
500
if self.bytes_left <= 0:
501
# Finished with chunk
502
self.bytes_left = None
504
self.error_in_progress.append(self.chunk_in_progress)
506
self.chunks.append(self.chunk_in_progress)
507
self.chunk_in_progress = None
508
self.state_accept = self._state_accept_expecting_length
510
def _state_accept_reading_unused(self):
511
self.unused_data += self._get_in_buffer()
512
self._in_buffer_list = []
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
"""Decodes the length-prefixed bulk data."""
519
_StatefulDecoder.__init__(self)
520
self.state_accept = self._state_accept_expecting_length
521
self.state_read = self._state_read_no_data
523
self._trailer_buffer = ''
226
self.state_accept('')
525
228
def next_read_size(self):
526
229
if self.bytes_left is not None:
791
405
def _write_protocol_version(self):
792
406
"""Write any prefixes this protocol requires.
794
408
Version one doesn't send protocol versions.
798
412
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
413
"""Version two of the client side of the smart protocol.
801
415
This prefixes the request with the value of REQUEST_VERSION_TWO.
804
response_marker = RESPONSE_VERSION_TWO
805
request_marker = REQUEST_VERSION_TWO
807
418
def read_response_tuple(self, expect_body=False):
808
419
"""Read a response tuple from the wire.
810
421
This should only be called once.
812
423
version = self._request.read_line()
813
if version != self.response_marker:
814
self._request.finished_reading()
815
raise errors.UnexpectedProtocolVersionMarker(version)
816
response_status = self._request.read_line()
817
result = SmartClientRequestProtocolOne._read_response_tuple(self)
818
self._response_is_unknown_method(result)
819
if response_status == 'success\n':
820
self.response_status = True
822
self._request.finished_reading()
824
elif response_status == 'failed\n':
825
self.response_status = False
826
self._request.finished_reading()
827
raise errors.ErrorFromSmartServer(result)
424
if version != RESPONSE_VERSION_TWO:
425
raise errors.SmartProtocolError('bad protocol marker %r' % version)
426
response_status = self._recv_line()
427
if response_status not in ('success\n', 'failed\n'):
829
428
raise errors.SmartProtocolError(
830
429
'bad protocol status %r' % response_status)
430
self.response_status = response_status == 'success\n'
431
return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
832
433
def _write_protocol_version(self):
833
"""Write any prefixes this protocol requires.
434
r"""Write any prefixes this protocol requires.
835
436
Version two sends the value of REQUEST_VERSION_TWO.
837
self._request.accept_bytes(self.request_marker)
839
def read_streamed_body(self):
840
"""Read bytes from the body, decoding into a byte stream.
842
# Read no more than 64k at a time so that we don't risk error 10055 (no
843
# buffer space available) on Windows.
844
_body_decoder = ChunkedBodyDecoder()
845
while not _body_decoder.finished_reading:
846
bytes = self._request.read_bytes(_body_decoder.next_read_size())
848
# end of file encountered reading from server
849
raise errors.ConnectionReset(
850
"Connection lost while reading streamed body.")
851
_body_decoder.accept_bytes(bytes)
852
for body_bytes in iter(_body_decoder.read_next_chunk, None):
853
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
854
mutter(' %d byte chunk read',
857
self._request.finished_reading()
860
def build_server_protocol_three(backing_transport, write_func,
862
request_handler = request.SmartServerRequestHandler(
863
backing_transport, commands=request.request_handlers,
864
root_client_path=root_client_path)
865
responder = ProtocolThreeResponder(write_func)
866
message_handler = message.ConventionalRequestHandler(request_handler, responder)
867
return ProtocolThreeDecoder(message_handler)
870
class ProtocolThreeDecoder(_StatefulDecoder):
872
response_marker = RESPONSE_VERSION_THREE
873
request_marker = REQUEST_VERSION_THREE
875
def __init__(self, message_handler, expect_version_marker=False):
876
_StatefulDecoder.__init__(self)
877
self._has_dispatched = False
879
if expect_version_marker:
880
self.state_accept = self._state_accept_expecting_protocol_version
881
# We're expecting at least the protocol version marker + some
883
self._number_needed_bytes = len(MESSAGE_VERSION_THREE) + 4
885
self.state_accept = self._state_accept_expecting_headers
886
self._number_needed_bytes = 4
887
self.decoding_failed = False
888
self.request_handler = self.message_handler = message_handler
890
def accept_bytes(self, bytes):
891
self._number_needed_bytes = None
893
_StatefulDecoder.accept_bytes(self, bytes)
894
except KeyboardInterrupt:
896
except errors.SmartMessageHandlerError, exception:
897
# We do *not* set self.decoding_failed here. The message handler
898
# has raised an error, but the decoder is still able to parse bytes
899
# and determine when this message ends.
900
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
901
log_exception_quietly()
902
self.message_handler.protocol_error(exception.exc_value)
903
# The state machine is ready to continue decoding, but the
904
# exception has interrupted the loop that runs the state machine.
905
# So we call accept_bytes again to restart it.
906
self.accept_bytes('')
907
except Exception, exception:
908
# The decoder itself has raised an exception. We cannot continue
910
self.decoding_failed = True
911
if isinstance(exception, errors.UnexpectedProtocolVersionMarker):
912
# This happens during normal operation when the client tries a
913
# protocol version the server doesn't understand, so no need to
914
# log a traceback every time.
915
# Note that this can only happen when
916
# expect_version_marker=True, which is only the case on the
920
log_exception_quietly()
921
self.message_handler.protocol_error(exception)
923
def _extract_length_prefixed_bytes(self):
924
if self._in_buffer_len < 4:
925
# A length prefix by itself is 4 bytes, and we don't even have that
927
raise _NeedMoreBytes(4)
928
(length,) = struct.unpack('!L', self._get_in_bytes(4))
929
end_of_bytes = 4 + length
930
if self._in_buffer_len < end_of_bytes:
931
# We haven't yet read as many bytes as the length-prefix says there
933
raise _NeedMoreBytes(end_of_bytes)
934
# Extract the bytes from the buffer.
935
in_buf = self._get_in_buffer()
936
bytes = in_buf[4:end_of_bytes]
937
self._set_in_buffer(in_buf[end_of_bytes:])
940
def _extract_prefixed_bencoded_data(self):
941
prefixed_bytes = self._extract_length_prefixed_bytes()
943
decoded = bdecode_as_tuple(prefixed_bytes)
945
raise errors.SmartProtocolError(
946
'Bytes %r not bencoded' % (prefixed_bytes,))
949
def _extract_single_byte(self):
950
if self._in_buffer_len == 0:
951
# The buffer is empty
952
raise _NeedMoreBytes(1)
953
in_buf = self._get_in_buffer()
955
self._set_in_buffer(in_buf[1:])
958
def _state_accept_expecting_protocol_version(self):
959
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
960
in_buf = self._get_in_buffer()
962
# We don't have enough bytes to check if the protocol version
963
# marker is right. But we can check if it is already wrong by
964
# checking that the start of MESSAGE_VERSION_THREE matches what
966
# [In fact, if the remote end isn't bzr we might never receive
967
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
968
# are wrong then we should just raise immediately rather than
970
if not MESSAGE_VERSION_THREE.startswith(in_buf):
971
# We have enough bytes to know the protocol version is wrong
972
raise errors.UnexpectedProtocolVersionMarker(in_buf)
973
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
974
if not in_buf.startswith(MESSAGE_VERSION_THREE):
975
raise errors.UnexpectedProtocolVersionMarker(in_buf)
976
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
977
self.state_accept = self._state_accept_expecting_headers
979
def _state_accept_expecting_headers(self):
980
decoded = self._extract_prefixed_bencoded_data()
981
if type(decoded) is not dict:
982
raise errors.SmartProtocolError(
983
'Header object %r is not a dict' % (decoded,))
984
self.state_accept = self._state_accept_expecting_message_part
986
self.message_handler.headers_received(decoded)
988
raise errors.SmartMessageHandlerError(sys.exc_info())
990
def _state_accept_expecting_message_part(self):
991
message_part_kind = self._extract_single_byte()
992
if message_part_kind == 'o':
993
self.state_accept = self._state_accept_expecting_one_byte
994
elif message_part_kind == 's':
995
self.state_accept = self._state_accept_expecting_structure
996
elif message_part_kind == 'b':
997
self.state_accept = self._state_accept_expecting_bytes
998
elif message_part_kind == 'e':
1001
raise errors.SmartProtocolError(
1002
'Bad message kind byte: %r' % (message_part_kind,))
1004
def _state_accept_expecting_one_byte(self):
1005
byte = self._extract_single_byte()
1006
self.state_accept = self._state_accept_expecting_message_part
1008
self.message_handler.byte_part_received(byte)
1010
raise errors.SmartMessageHandlerError(sys.exc_info())
1012
def _state_accept_expecting_bytes(self):
1013
# XXX: this should not buffer whole message part, but instead deliver
1014
# the bytes as they arrive.
1015
prefixed_bytes = self._extract_length_prefixed_bytes()
1016
self.state_accept = self._state_accept_expecting_message_part
1018
self.message_handler.bytes_part_received(prefixed_bytes)
1020
raise errors.SmartMessageHandlerError(sys.exc_info())
1022
def _state_accept_expecting_structure(self):
1023
structure = self._extract_prefixed_bencoded_data()
1024
self.state_accept = self._state_accept_expecting_message_part
1026
self.message_handler.structure_part_received(structure)
1028
raise errors.SmartMessageHandlerError(sys.exc_info())
1031
self.unused_data = self._get_in_buffer()
1032
self._set_in_buffer(None)
1033
self.state_accept = self._state_accept_reading_unused
1035
self.message_handler.end_received()
1037
raise errors.SmartMessageHandlerError(sys.exc_info())
1039
def _state_accept_reading_unused(self):
1040
self.unused_data += self._get_in_buffer()
1041
self._set_in_buffer(None)
1043
def next_read_size(self):
1044
if self.state_accept == self._state_accept_reading_unused:
1046
elif self.decoding_failed:
1047
# An exception occured while processing this message, probably from
1048
# self.message_handler. We're not sure that this state machine is
1049
# in a consistent state, so just signal that we're done (i.e. give
1053
if self._number_needed_bytes is not None:
1054
return self._number_needed_bytes - self._in_buffer_len
1056
raise AssertionError("don't know how many bytes are expected!")
1059
class _ProtocolThreeEncoder(object):
1061
response_marker = request_marker = MESSAGE_VERSION_THREE
1063
def __init__(self, write_func):
1065
self._real_write_func = write_func
1067
def _write_func(self, bytes):
1068
self._buf.append(bytes)
1069
if len(self._buf) > 100:
1074
self._real_write_func(''.join(self._buf))
1077
def _serialise_offsets(self, offsets):
1078
"""Serialise a readv offset list."""
1080
for start, length in offsets:
1081
txt.append('%d,%d' % (start, length))
1082
return '\n'.join(txt)
1084
def _write_protocol_version(self):
1085
self._write_func(MESSAGE_VERSION_THREE)
1087
def _write_prefixed_bencode(self, structure):
1088
bytes = bencode(structure)
1089
self._write_func(struct.pack('!L', len(bytes)))
1090
self._write_func(bytes)
1092
def _write_headers(self, headers):
1093
self._write_prefixed_bencode(headers)
1095
def _write_structure(self, args):
1096
self._write_func('s')
1099
if type(arg) is unicode:
1100
utf8_args.append(arg.encode('utf8'))
1102
utf8_args.append(arg)
1103
self._write_prefixed_bencode(utf8_args)
1105
def _write_end(self):
1106
self._write_func('e')
1109
def _write_prefixed_body(self, bytes):
1110
self._write_func('b')
1111
self._write_func(struct.pack('!L', len(bytes)))
1112
self._write_func(bytes)
1114
def _write_chunked_body_start(self):
1115
self._write_func('oC')
1117
def _write_error_status(self):
1118
self._write_func('oE')
1120
def _write_success_status(self):
1121
self._write_func('oS')
1124
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1126
def __init__(self, write_func):
1127
_ProtocolThreeEncoder.__init__(self, write_func)
1128
self.response_sent = False
1129
self._headers = {'Software version': bzrlib.__version__}
1131
def send_error(self, exception):
1132
if self.response_sent:
1133
raise AssertionError(
1134
"send_error(%s) called, but response already sent."
1136
if isinstance(exception, errors.UnknownSmartMethod):
1137
failure = request.FailedSmartServerResponse(
1138
('UnknownMethod', exception.verb))
1139
self.send_response(failure)
1141
self.response_sent = True
1142
self._write_protocol_version()
1143
self._write_headers(self._headers)
1144
self._write_error_status()
1145
self._write_structure(('error', str(exception)))
1148
def send_response(self, response):
1149
if self.response_sent:
1150
raise AssertionError(
1151
"send_response(%r) called, but response already sent."
1153
self.response_sent = True
1154
self._write_protocol_version()
1155
self._write_headers(self._headers)
1156
if response.is_successful():
1157
self._write_success_status()
1159
self._write_error_status()
1160
self._write_structure(response.args)
1161
if response.body is not None:
1162
self._write_prefixed_body(response.body)
1163
elif response.body_stream is not None:
1164
for exc_info, chunk in _iter_with_errors(response.body_stream):
1165
if exc_info is not None:
1166
self._write_error_status()
1167
error_struct = request._translate_error(exc_info[1])
1168
self._write_structure(error_struct)
1171
if isinstance(chunk, request.FailedSmartServerResponse):
1172
self._write_error_status()
1173
self._write_structure(chunk.args)
1175
self._write_prefixed_body(chunk)
1179
def _iter_with_errors(iterable):
1180
"""Handle errors from iterable.next().
1184
for exc_info, value in _iter_with_errors(iterable):
1187
This is a safer alternative to::
1190
for value in iterable:
1195
Because the latter will catch errors from the for-loop body, not just
1198
If an error occurs, exc_info will be a exc_info tuple, and the generator
1199
will terminate. Otherwise exc_info will be None, and value will be the
1200
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1201
will not be itercepted.
1203
iterator = iter(iterable)
1206
yield None, iterator.next()
1207
except StopIteration:
1209
except (KeyboardInterrupt, SystemExit):
1212
yield sys.exc_info(), None
1216
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1218
def __init__(self, medium_request):
1219
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1220
self._medium_request = medium_request
1223
def set_headers(self, headers):
1224
self._headers = headers.copy()
1226
def call(self, *args):
1227
if 'hpss' in debug.debug_flags:
1228
mutter('hpss call: %s', repr(args)[1:-1])
1229
base = getattr(self._medium_request._medium, 'base', None)
1230
if base is not None:
1231
mutter(' (to %s)', base)
1232
self._request_start_time = time.time()
1233
self._write_protocol_version()
1234
self._write_headers(self._headers)
1235
self._write_structure(args)
1237
self._medium_request.finished_writing()
1239
def call_with_body_bytes(self, args, body):
1240
"""Make a remote call of args with body bytes 'body'.
1242
After calling this, call read_response_tuple to find the result out.
1244
if 'hpss' in debug.debug_flags:
1245
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
1246
path = getattr(self._medium_request._medium, '_path', None)
1247
if path is not None:
1248
mutter(' (to %s)', path)
1249
mutter(' %d bytes', len(body))
1250
self._request_start_time = time.time()
1251
self._write_protocol_version()
1252
self._write_headers(self._headers)
1253
self._write_structure(args)
1254
self._write_prefixed_body(body)
1256
self._medium_request.finished_writing()
1258
def call_with_body_readv_array(self, args, body):
1259
"""Make a remote call with a readv array.
1261
The body is encoded with one line per readv offset pair. The numbers in
1262
each pair are separated by a comma, and no trailing \n is emitted.
1264
if 'hpss' in debug.debug_flags:
1265
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1266
path = getattr(self._medium_request._medium, '_path', None)
1267
if path is not None:
1268
mutter(' (to %s)', path)
1269
self._request_start_time = time.time()
1270
self._write_protocol_version()
1271
self._write_headers(self._headers)
1272
self._write_structure(args)
1273
readv_bytes = self._serialise_offsets(body)
1274
if 'hpss' in debug.debug_flags:
1275
mutter(' %d bytes in readv request', len(readv_bytes))
1276
self._write_prefixed_body(readv_bytes)
1278
self._medium_request.finished_writing()
1280
def call_with_body_stream(self, args, stream):
1281
if 'hpss' in debug.debug_flags:
1282
mutter('hpss call w/body stream: %r', args)
1283
path = getattr(self._medium_request._medium, '_path', None)
1284
if path is not None:
1285
mutter(' (to %s)', path)
1286
self._request_start_time = time.time()
1287
self._write_protocol_version()
1288
self._write_headers(self._headers)
1289
self._write_structure(args)
1290
# TODO: notice if the server has sent an early error reply before we
1291
# have finished sending the stream. We would notice at the end
1292
# anyway, but if the medium can deliver it early then it's good
1293
# to short-circuit the whole request...
1294
for exc_info, part in _iter_with_errors(stream):
1295
if exc_info is not None:
1296
# Iterating the stream failed. Cleanly abort the request.
1297
self._write_error_status()
1298
# Currently the client unconditionally sends ('error',) as the
1300
self._write_structure(('error',))
1302
self._medium_request.finished_writing()
1303
raise exc_info[0], exc_info[1], exc_info[2]
1305
self._write_prefixed_body(part)
1308
self._medium_request.finished_writing()
438
self._request.accept_bytes(REQUEST_VERSION_TWO)