150
232
return self._body_decoder.next_read_size()
153
class LengthPrefixedBodyDecoder(object):
154
"""Decodes the length-prefixed bulk data."""
235
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
236
r"""Version two of the server side of the smart protocol.
238
This prefixes responses with the value of RESPONSE_VERSION_TWO.
241
response_marker = RESPONSE_VERSION_TWO
242
request_marker = REQUEST_VERSION_TWO
244
def _write_success_or_failure_prefix(self, response):
245
"""Write the protocol specific success/failure prefix."""
246
if response.is_successful():
247
self._write_func('success\n')
249
self._write_func('failed\n')
251
def _write_protocol_version(self):
252
r"""Write any prefixes this protocol requires.
254
Version two sends the value of RESPONSE_VERSION_TWO.
256
self._write_func(self.response_marker)
258
def _send_response(self, response):
259
"""Send a smart server response down the output stream."""
261
raise AssertionError('response already sent')
262
self._finished = True
263
self._write_protocol_version()
264
self._write_success_or_failure_prefix(response)
265
self._write_func(_encode_tuple(response.args))
266
if response.body is not None:
267
if not isinstance(response.body, str):
268
raise AssertionError('body must be a str')
269
if not (response.body_stream is None):
270
raise AssertionError(
271
'body_stream and body cannot both be set')
272
bytes = self._encode_bulk_data(response.body)
273
self._write_func(bytes)
274
elif response.body_stream is not None:
275
_send_stream(response.body_stream, self._write_func)
278
def _send_stream(stream, write_func):
279
write_func('chunked\n')
280
_send_chunks(stream, write_func)
284
def _send_chunks(stream, write_func):
286
if isinstance(chunk, str):
287
bytes = "%x\n%s" % (len(chunk), chunk)
289
elif isinstance(chunk, request.FailedSmartServerResponse):
291
_send_chunks(chunk.args, write_func)
294
raise errors.BzrError(
295
'Chunks must be str or FailedSmartServerResponse, got %r'
299
class _NeedMoreBytes(Exception):
300
"""Raise this inside a _StatefulDecoder to stop decoding until more bytes
304
def __init__(self, count=None):
307
:param count: the total number of bytes needed by the current state.
308
May be None if the number of bytes needed is unknown.
313
class _StatefulDecoder(object):
314
"""Base class for writing state machines to decode byte streams.
316
Subclasses should provide a self.state_accept attribute that accepts bytes
317
and, if appropriate, updates self.state_accept to a different function.
318
accept_bytes will call state_accept as often as necessary to make sure the
319
state machine has progressed as far as possible before it returns.
321
See ProtocolThreeDecoder for an example subclass.
156
324
def __init__(self):
325
self.finished_reading = False
326
self._in_buffer_list = []
327
self._in_buffer_len = 0
328
self.unused_data = ''
157
329
self.bytes_left = None
158
self.finished_reading = False
159
self.unused_data = ''
160
self.state_accept = self._state_accept_expecting_length
161
self.state_read = self._state_read_no_data
163
self._trailer_buffer = ''
330
self._number_needed_bytes = None
332
def _get_in_buffer(self):
333
if len(self._in_buffer_list) == 1:
334
return self._in_buffer_list[0]
335
in_buffer = ''.join(self._in_buffer_list)
336
if len(in_buffer) != self._in_buffer_len:
337
raise AssertionError(
338
"Length of buffer did not match expected value: %s != %s"
339
% self._in_buffer_len, len(in_buffer))
340
self._in_buffer_list = [in_buffer]
343
def _get_in_bytes(self, count):
344
"""Grab X bytes from the input_buffer.
346
Callers should have already checked that self._in_buffer_len is >
347
count. Note, this does not consume the bytes from the buffer. The
348
caller will still need to call _get_in_buffer() and then
349
_set_in_buffer() if they actually need to consume the bytes.
351
# check if we can yield the bytes from just the first entry in our list
352
if len(self._in_buffer_list) == 0:
353
raise AssertionError('Callers must be sure we have buffered bytes'
354
' before calling _get_in_bytes')
355
if len(self._in_buffer_list[0]) > count:
356
return self._in_buffer_list[0][:count]
357
# We can't yield it from the first buffer, so collapse all buffers, and
359
in_buf = self._get_in_buffer()
360
return in_buf[:count]
362
def _set_in_buffer(self, new_buf):
363
if new_buf is not None:
364
self._in_buffer_list = [new_buf]
365
self._in_buffer_len = len(new_buf)
367
self._in_buffer_list = []
368
self._in_buffer_len = 0
165
370
def accept_bytes(self, bytes):
166
371
"""Decode as much of bytes as possible.
172
377
data will be appended to self.unused_data.
174
379
# accept_bytes is allowed to change the state
175
current_state = self.state_accept
176
self.state_accept(bytes)
177
while current_state != self.state_accept:
380
self._number_needed_bytes = None
381
# lsprof puts a very large amount of time on this specific call for
383
self._in_buffer_list.append(bytes)
384
self._in_buffer_len += len(bytes)
386
# Run the function for the current state.
178
387
current_state = self.state_accept
179
self.state_accept('')
389
while current_state != self.state_accept:
390
# The current state has changed. Run the function for the new
391
# current state, so that it can:
392
# - decode any unconsumed bytes left in a buffer, and
393
# - signal how many more bytes are expected (via raising
395
current_state = self.state_accept
397
except _NeedMoreBytes, e:
398
self._number_needed_bytes = e.count
401
class ChunkedBodyDecoder(_StatefulDecoder):
402
"""Decoder for chunked body data.
404
This is very similar the HTTP's chunked encoding. See the description of
405
streamed body data in `doc/developers/network-protocol.txt` for details.
409
_StatefulDecoder.__init__(self)
410
self.state_accept = self._state_accept_expecting_header
411
self.chunk_in_progress = None
412
self.chunks = collections.deque()
414
self.error_in_progress = None
416
def next_read_size(self):
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
# end-of-body marker is 4 bytes: 'END\n'.
419
if self.state_accept == self._state_accept_reading_chunk:
420
# We're expecting more chunk content. So we're expecting at least
421
# the rest of this chunk plus an END chunk.
422
return self.bytes_left + 4
423
elif self.state_accept == self._state_accept_expecting_length:
424
if self._in_buffer_len == 0:
425
# We're expecting a chunk length. There's at least two bytes
426
# left: a digit plus '\n'.
429
# We're in the middle of reading a chunk length. So there's at
430
# least one byte left, the '\n' that terminates the length.
432
elif self.state_accept == self._state_accept_reading_unused:
434
elif self.state_accept == self._state_accept_expecting_header:
435
return max(0, len('chunked\n') - self._in_buffer_len)
437
raise AssertionError("Impossible state: %r" % (self.state_accept,))
439
def read_next_chunk(self):
441
return self.chunks.popleft()
445
def _extract_line(self):
446
in_buf = self._get_in_buffer()
447
pos = in_buf.find('\n')
449
# We haven't read a complete line yet, so request more bytes before
451
raise _NeedMoreBytes(1)
453
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
454
self._set_in_buffer(in_buf[pos+1:])
458
self.unused_data = self._get_in_buffer()
459
self._in_buffer_list = []
460
self._in_buffer_len = 0
461
self.state_accept = self._state_accept_reading_unused
463
error_args = tuple(self.error_in_progress)
464
self.chunks.append(request.FailedSmartServerResponse(error_args))
465
self.error_in_progress = None
466
self.finished_reading = True
468
def _state_accept_expecting_header(self):
469
prefix = self._extract_line()
470
if prefix == 'chunked':
471
self.state_accept = self._state_accept_expecting_length
473
raise errors.SmartProtocolError(
474
'Bad chunked body header: "%s"' % (prefix,))
476
def _state_accept_expecting_length(self):
477
prefix = self._extract_line()
480
self.error_in_progress = []
481
self._state_accept_expecting_length()
483
elif prefix == 'END':
484
# We've read the end-of-body marker.
485
# Any further bytes are unused data, including the bytes left in
490
self.bytes_left = int(prefix, 16)
491
self.chunk_in_progress = ''
492
self.state_accept = self._state_accept_reading_chunk
494
def _state_accept_reading_chunk(self):
495
in_buf = self._get_in_buffer()
496
in_buffer_len = len(in_buf)
497
self.chunk_in_progress += in_buf[:self.bytes_left]
498
self._set_in_buffer(in_buf[self.bytes_left:])
499
self.bytes_left -= in_buffer_len
500
if self.bytes_left <= 0:
501
# Finished with chunk
502
self.bytes_left = None
504
self.error_in_progress.append(self.chunk_in_progress)
506
self.chunks.append(self.chunk_in_progress)
507
self.chunk_in_progress = None
508
self.state_accept = self._state_accept_expecting_length
510
def _state_accept_reading_unused(self):
511
self.unused_data += self._get_in_buffer()
512
self._in_buffer_list = []
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
"""Decodes the length-prefixed bulk data."""
519
_StatefulDecoder.__init__(self)
520
self.state_accept = self._state_accept_expecting_length
521
self.state_read = self._state_read_no_data
523
self._trailer_buffer = ''
181
525
def next_read_size(self):
182
526
if self.bytes_left is not None:
336
778
resp = self.read_response_tuple()
337
779
if resp == ('ok', '1'):
781
elif resp == ('ok', '2'):
340
784
raise errors.SmartProtocolError("bad response %r" % (resp,))
786
def _write_args(self, args):
787
self._write_protocol_version()
788
bytes = _encode_tuple(args)
789
self._request.accept_bytes(bytes)
791
def _write_protocol_version(self):
792
"""Write any prefixes this protocol requires.
794
Version one doesn't send protocol versions.
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
"""Version two of the client side of the smart protocol.
801
This prefixes the request with the value of REQUEST_VERSION_TWO.
804
response_marker = RESPONSE_VERSION_TWO
805
request_marker = REQUEST_VERSION_TWO
807
def read_response_tuple(self, expect_body=False):
808
"""Read a response tuple from the wire.
810
This should only be called once.
812
version = self._request.read_line()
813
if version != self.response_marker:
814
self._request.finished_reading()
815
raise errors.UnexpectedProtocolVersionMarker(version)
816
response_status = self._request.read_line()
817
result = SmartClientRequestProtocolOne._read_response_tuple(self)
818
self._response_is_unknown_method(result)
819
if response_status == 'success\n':
820
self.response_status = True
822
self._request.finished_reading()
824
elif response_status == 'failed\n':
825
self.response_status = False
826
self._request.finished_reading()
827
raise errors.ErrorFromSmartServer(result)
829
raise errors.SmartProtocolError(
830
'bad protocol status %r' % response_status)
832
def _write_protocol_version(self):
833
"""Write any prefixes this protocol requires.
835
Version two sends the value of REQUEST_VERSION_TWO.
837
self._request.accept_bytes(self.request_marker)
839
def read_streamed_body(self):
840
"""Read bytes from the body, decoding into a byte stream.
842
# Read no more than 64k at a time so that we don't risk error 10055 (no
843
# buffer space available) on Windows.
844
_body_decoder = ChunkedBodyDecoder()
845
while not _body_decoder.finished_reading:
846
bytes = self._request.read_bytes(_body_decoder.next_read_size())
848
# end of file encountered reading from server
849
raise errors.ConnectionReset(
850
"Connection lost while reading streamed body.")
851
_body_decoder.accept_bytes(bytes)
852
for body_bytes in iter(_body_decoder.read_next_chunk, None):
853
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
854
mutter(' %d byte chunk read',
857
self._request.finished_reading()
860
def build_server_protocol_three(backing_transport, write_func,
862
request_handler = request.SmartServerRequestHandler(
863
backing_transport, commands=request.request_handlers,
864
root_client_path=root_client_path)
865
responder = ProtocolThreeResponder(write_func)
866
message_handler = message.ConventionalRequestHandler(request_handler, responder)
867
return ProtocolThreeDecoder(message_handler)
870
class ProtocolThreeDecoder(_StatefulDecoder):
872
response_marker = RESPONSE_VERSION_THREE
873
request_marker = REQUEST_VERSION_THREE
875
def __init__(self, message_handler, expect_version_marker=False):
876
_StatefulDecoder.__init__(self)
877
self._has_dispatched = False
879
if expect_version_marker:
880
self.state_accept = self._state_accept_expecting_protocol_version
881
# We're expecting at least the protocol version marker + some
883
self._number_needed_bytes = len(MESSAGE_VERSION_THREE) + 4
885
self.state_accept = self._state_accept_expecting_headers
886
self._number_needed_bytes = 4
887
self.decoding_failed = False
888
self.request_handler = self.message_handler = message_handler
890
def accept_bytes(self, bytes):
891
self._number_needed_bytes = None
893
_StatefulDecoder.accept_bytes(self, bytes)
894
except KeyboardInterrupt:
896
except errors.SmartMessageHandlerError, exception:
897
# We do *not* set self.decoding_failed here. The message handler
898
# has raised an error, but the decoder is still able to parse bytes
899
# and determine when this message ends.
900
log_exception_quietly()
901
self.message_handler.protocol_error(exception.exc_value)
902
# The state machine is ready to continue decoding, but the
903
# exception has interrupted the loop that runs the state machine.
904
# So we call accept_bytes again to restart it.
905
self.accept_bytes('')
906
except Exception, exception:
907
# The decoder itself has raised an exception. We cannot continue
909
self.decoding_failed = True
910
if isinstance(exception, errors.UnexpectedProtocolVersionMarker):
911
# This happens during normal operation when the client tries a
912
# protocol version the server doesn't understand, so no need to
913
# log a traceback every time.
914
# Note that this can only happen when
915
# expect_version_marker=True, which is only the case on the
919
log_exception_quietly()
920
self.message_handler.protocol_error(exception)
922
def _extract_length_prefixed_bytes(self):
923
if self._in_buffer_len < 4:
924
# A length prefix by itself is 4 bytes, and we don't even have that
926
raise _NeedMoreBytes(4)
927
(length,) = struct.unpack('!L', self._get_in_bytes(4))
928
end_of_bytes = 4 + length
929
if self._in_buffer_len < end_of_bytes:
930
# We haven't yet read as many bytes as the length-prefix says there
932
raise _NeedMoreBytes(end_of_bytes)
933
# Extract the bytes from the buffer.
934
in_buf = self._get_in_buffer()
935
bytes = in_buf[4:end_of_bytes]
936
self._set_in_buffer(in_buf[end_of_bytes:])
939
def _extract_prefixed_bencoded_data(self):
940
prefixed_bytes = self._extract_length_prefixed_bytes()
942
decoded = bdecode_as_tuple(prefixed_bytes)
944
raise errors.SmartProtocolError(
945
'Bytes %r not bencoded' % (prefixed_bytes,))
948
def _extract_single_byte(self):
949
if self._in_buffer_len == 0:
950
# The buffer is empty
951
raise _NeedMoreBytes(1)
952
in_buf = self._get_in_buffer()
954
self._set_in_buffer(in_buf[1:])
957
def _state_accept_expecting_protocol_version(self):
958
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
959
in_buf = self._get_in_buffer()
961
# We don't have enough bytes to check if the protocol version
962
# marker is right. But we can check if it is already wrong by
963
# checking that the start of MESSAGE_VERSION_THREE matches what
965
# [In fact, if the remote end isn't bzr we might never receive
966
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
967
# are wrong then we should just raise immediately rather than
969
if not MESSAGE_VERSION_THREE.startswith(in_buf):
970
# We have enough bytes to know the protocol version is wrong
971
raise errors.UnexpectedProtocolVersionMarker(in_buf)
972
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
973
if not in_buf.startswith(MESSAGE_VERSION_THREE):
974
raise errors.UnexpectedProtocolVersionMarker(in_buf)
975
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
976
self.state_accept = self._state_accept_expecting_headers
978
def _state_accept_expecting_headers(self):
979
decoded = self._extract_prefixed_bencoded_data()
980
if type(decoded) is not dict:
981
raise errors.SmartProtocolError(
982
'Header object %r is not a dict' % (decoded,))
983
self.state_accept = self._state_accept_expecting_message_part
985
self.message_handler.headers_received(decoded)
987
raise errors.SmartMessageHandlerError(sys.exc_info())
989
def _state_accept_expecting_message_part(self):
990
message_part_kind = self._extract_single_byte()
991
if message_part_kind == 'o':
992
self.state_accept = self._state_accept_expecting_one_byte
993
elif message_part_kind == 's':
994
self.state_accept = self._state_accept_expecting_structure
995
elif message_part_kind == 'b':
996
self.state_accept = self._state_accept_expecting_bytes
997
elif message_part_kind == 'e':
1000
raise errors.SmartProtocolError(
1001
'Bad message kind byte: %r' % (message_part_kind,))
1003
def _state_accept_expecting_one_byte(self):
1004
byte = self._extract_single_byte()
1005
self.state_accept = self._state_accept_expecting_message_part
1007
self.message_handler.byte_part_received(byte)
1009
raise errors.SmartMessageHandlerError(sys.exc_info())
1011
def _state_accept_expecting_bytes(self):
1012
# XXX: this should not buffer whole message part, but instead deliver
1013
# the bytes as they arrive.
1014
prefixed_bytes = self._extract_length_prefixed_bytes()
1015
self.state_accept = self._state_accept_expecting_message_part
1017
self.message_handler.bytes_part_received(prefixed_bytes)
1019
raise errors.SmartMessageHandlerError(sys.exc_info())
1021
def _state_accept_expecting_structure(self):
1022
structure = self._extract_prefixed_bencoded_data()
1023
self.state_accept = self._state_accept_expecting_message_part
1025
self.message_handler.structure_part_received(structure)
1027
raise errors.SmartMessageHandlerError(sys.exc_info())
1030
self.unused_data = self._get_in_buffer()
1031
self._set_in_buffer(None)
1032
self.state_accept = self._state_accept_reading_unused
1034
self.message_handler.end_received()
1036
raise errors.SmartMessageHandlerError(sys.exc_info())
1038
def _state_accept_reading_unused(self):
1039
self.unused_data = self._get_in_buffer()
1040
self._set_in_buffer(None)
1042
def next_read_size(self):
1043
if self.state_accept == self._state_accept_reading_unused:
1045
elif self.decoding_failed:
1046
# An exception occured while processing this message, probably from
1047
# self.message_handler. We're not sure that this state machine is
1048
# in a consistent state, so just signal that we're done (i.e. give
1052
if self._number_needed_bytes is not None:
1053
return self._number_needed_bytes - self._in_buffer_len
1055
raise AssertionError("don't know how many bytes are expected!")
1058
class _ProtocolThreeEncoder(object):
1060
response_marker = request_marker = MESSAGE_VERSION_THREE
1062
def __init__(self, write_func):
1064
self._real_write_func = write_func
1066
def _write_func(self, bytes):
1067
self._buf.append(bytes)
1068
if len(self._buf) > 100:
1073
self._real_write_func(''.join(self._buf))
1076
def _serialise_offsets(self, offsets):
1077
"""Serialise a readv offset list."""
1079
for start, length in offsets:
1080
txt.append('%d,%d' % (start, length))
1081
return '\n'.join(txt)
1083
def _write_protocol_version(self):
1084
self._write_func(MESSAGE_VERSION_THREE)
1086
def _write_prefixed_bencode(self, structure):
1087
bytes = bencode(structure)
1088
self._write_func(struct.pack('!L', len(bytes)))
1089
self._write_func(bytes)
1091
def _write_headers(self, headers):
1092
self._write_prefixed_bencode(headers)
1094
def _write_structure(self, args):
1095
self._write_func('s')
1098
if type(arg) is unicode:
1099
utf8_args.append(arg.encode('utf8'))
1101
utf8_args.append(arg)
1102
self._write_prefixed_bencode(utf8_args)
1104
def _write_end(self):
1105
self._write_func('e')
1108
def _write_prefixed_body(self, bytes):
1109
self._write_func('b')
1110
self._write_func(struct.pack('!L', len(bytes)))
1111
self._write_func(bytes)
1113
def _write_chunked_body_start(self):
1114
self._write_func('oC')
1116
def _write_error_status(self):
1117
self._write_func('oE')
1119
def _write_success_status(self):
1120
self._write_func('oS')
1123
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1125
def __init__(self, write_func):
1126
_ProtocolThreeEncoder.__init__(self, write_func)
1127
self.response_sent = False
1128
self._headers = {'Software version': bzrlib.__version__}
1130
def send_error(self, exception):
1131
if self.response_sent:
1132
raise AssertionError(
1133
"send_error(%s) called, but response already sent."
1135
if isinstance(exception, errors.UnknownSmartMethod):
1136
failure = request.FailedSmartServerResponse(
1137
('UnknownMethod', exception.verb))
1138
self.send_response(failure)
1140
self.response_sent = True
1141
self._write_protocol_version()
1142
self._write_headers(self._headers)
1143
self._write_error_status()
1144
self._write_structure(('error', str(exception)))
1147
def send_response(self, response):
1148
if self.response_sent:
1149
raise AssertionError(
1150
"send_response(%r) called, but response already sent."
1152
self.response_sent = True
1153
self._write_protocol_version()
1154
self._write_headers(self._headers)
1155
if response.is_successful():
1156
self._write_success_status()
1158
self._write_error_status()
1159
self._write_structure(response.args)
1160
if response.body is not None:
1161
self._write_prefixed_body(response.body)
1162
elif response.body_stream is not None:
1163
for exc_info, chunk in _iter_with_errors(response.body_stream):
1164
if exc_info is not None:
1165
self._write_error_status()
1166
error_struct = request._translate_error(exc_info[1])
1167
self._write_structure(error_struct)
1170
if isinstance(chunk, request.FailedSmartServerResponse):
1171
self._write_error_status()
1172
self._write_structure(chunk.args)
1174
self._write_prefixed_body(chunk)
1178
def _iter_with_errors(iterable):
1179
"""Handle errors from iterable.next().
1183
for exc_info, value in _iter_with_errors(iterable):
1186
This is a safer alternative to::
1189
for value in iterable:
1194
Because the latter will catch errors from the for-loop body, not just
1197
If an error occurs, exc_info will be a exc_info tuple, and the generator
1198
will terminate. Otherwise exc_info will be None, and value will be the
1199
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1200
will not be itercepted.
1202
iterator = iter(iterable)
1205
yield None, iterator.next()
1206
except StopIteration:
1208
except (KeyboardInterrupt, SystemExit):
1211
yield sys.exc_info(), None
1215
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1217
def __init__(self, medium_request):
1218
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1219
self._medium_request = medium_request
1222
def set_headers(self, headers):
1223
self._headers = headers.copy()
1225
def call(self, *args):
1226
if 'hpss' in debug.debug_flags:
1227
mutter('hpss call: %s', repr(args)[1:-1])
1228
base = getattr(self._medium_request._medium, 'base', None)
1229
if base is not None:
1230
mutter(' (to %s)', base)
1231
self._request_start_time = time.time()
1232
self._write_protocol_version()
1233
self._write_headers(self._headers)
1234
self._write_structure(args)
1236
self._medium_request.finished_writing()
1238
def call_with_body_bytes(self, args, body):
1239
"""Make a remote call of args with body bytes 'body'.
1241
After calling this, call read_response_tuple to find the result out.
1243
if 'hpss' in debug.debug_flags:
1244
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
1245
path = getattr(self._medium_request._medium, '_path', None)
1246
if path is not None:
1247
mutter(' (to %s)', path)
1248
mutter(' %d bytes', len(body))
1249
self._request_start_time = time.time()
1250
self._write_protocol_version()
1251
self._write_headers(self._headers)
1252
self._write_structure(args)
1253
self._write_prefixed_body(body)
1255
self._medium_request.finished_writing()
1257
def call_with_body_readv_array(self, args, body):
1258
"""Make a remote call with a readv array.
1260
The body is encoded with one line per readv offset pair. The numbers in
1261
each pair are separated by a comma, and no trailing \n is emitted.
1263
if 'hpss' in debug.debug_flags:
1264
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1265
path = getattr(self._medium_request._medium, '_path', None)
1266
if path is not None:
1267
mutter(' (to %s)', path)
1268
self._request_start_time = time.time()
1269
self._write_protocol_version()
1270
self._write_headers(self._headers)
1271
self._write_structure(args)
1272
readv_bytes = self._serialise_offsets(body)
1273
if 'hpss' in debug.debug_flags:
1274
mutter(' %d bytes in readv request', len(readv_bytes))
1275
self._write_prefixed_body(readv_bytes)
1277
self._medium_request.finished_writing()
1279
def call_with_body_stream(self, args, stream):
1280
if 'hpss' in debug.debug_flags:
1281
mutter('hpss call w/body stream: %r', args)
1282
path = getattr(self._medium_request._medium, '_path', None)
1283
if path is not None:
1284
mutter(' (to %s)', path)
1285
self._request_start_time = time.time()
1286
self._write_protocol_version()
1287
self._write_headers(self._headers)
1288
self._write_structure(args)
1289
# TODO: notice if the server has sent an early error reply before we
1290
# have finished sending the stream. We would notice at the end
1291
# anyway, but if the medium can deliver it early then it's good
1292
# to short-circuit the whole request...
1293
for exc_info, part in _iter_with_errors(stream):
1294
if exc_info is not None:
1295
# Iterating the stream failed. Cleanly abort the request.
1296
self._write_error_status()
1297
# Currently the client unconditionally sends ('error',) as the
1299
self._write_structure(('error',))
1301
self._medium_request.finished_writing()
1302
raise exc_info[0], exc_info[1], exc_info[2]
1304
self._write_prefixed_body(part)
1307
self._medium_request.finished_writing()