193
259
def _write_protocol_version(self):
194
260
r"""Write any prefixes this protocol requires.
196
262
Version two sends the value of RESPONSE_VERSION_TWO.
198
self._write_func(RESPONSE_VERSION_TWO)
201
class LengthPrefixedBodyDecoder(object):
202
"""Decodes the length-prefixed bulk data."""
264
self._write_func(self.response_marker)
266
def _send_response(self, response):
267
"""Send a smart server response down the output stream."""
269
raise AssertionError('response already sent')
270
self._finished = True
271
self._write_protocol_version()
272
self._write_success_or_failure_prefix(response)
273
self._write_func(_encode_tuple(response.args))
274
if response.body is not None:
275
if not isinstance(response.body, str):
276
raise AssertionError('body must be a str')
277
if not (response.body_stream is None):
278
raise AssertionError(
279
'body_stream and body cannot both be set')
280
bytes = self._encode_bulk_data(response.body)
281
self._write_func(bytes)
282
elif response.body_stream is not None:
283
_send_stream(response.body_stream, self._write_func)
286
def _send_stream(stream, write_func):
287
write_func('chunked\n')
288
_send_chunks(stream, write_func)
292
def _send_chunks(stream, write_func):
294
if isinstance(chunk, str):
295
bytes = "%x\n%s" % (len(chunk), chunk)
297
elif isinstance(chunk, request.FailedSmartServerResponse):
299
_send_chunks(chunk.args, write_func)
302
raise errors.BzrError(
303
'Chunks must be str or FailedSmartServerResponse, got %r'
307
class _NeedMoreBytes(Exception):
308
"""Raise this inside a _StatefulDecoder to stop decoding until more bytes
312
def __init__(self, count=None):
315
:param count: the total number of bytes needed by the current state.
316
May be None if the number of bytes needed is unknown.
321
class _StatefulDecoder(object):
322
"""Base class for writing state machines to decode byte streams.
324
Subclasses should provide a self.state_accept attribute that accepts bytes
325
and, if appropriate, updates self.state_accept to a different function.
326
accept_bytes will call state_accept as often as necessary to make sure the
327
state machine has progressed as far as possible before it returns.
329
See ProtocolThreeDecoder for an example subclass.
204
332
def __init__(self):
333
self.finished_reading = False
334
self._in_buffer_list = []
335
self._in_buffer_len = 0
336
self.unused_data = ''
205
337
self.bytes_left = None
206
self.finished_reading = False
207
self.unused_data = ''
208
self.state_accept = self._state_accept_expecting_length
209
self.state_read = self._state_read_no_data
211
self._trailer_buffer = ''
338
self._number_needed_bytes = None
340
def _get_in_buffer(self):
341
if len(self._in_buffer_list) == 1:
342
return self._in_buffer_list[0]
343
in_buffer = ''.join(self._in_buffer_list)
344
if len(in_buffer) != self._in_buffer_len:
345
raise AssertionError(
346
"Length of buffer did not match expected value: %s != %s"
347
% self._in_buffer_len, len(in_buffer))
348
self._in_buffer_list = [in_buffer]
351
def _get_in_bytes(self, count):
352
"""Grab X bytes from the input_buffer.
354
Callers should have already checked that self._in_buffer_len is >
355
count. Note, this does not consume the bytes from the buffer. The
356
caller will still need to call _get_in_buffer() and then
357
_set_in_buffer() if they actually need to consume the bytes.
359
# check if we can yield the bytes from just the first entry in our list
360
if len(self._in_buffer_list) == 0:
361
raise AssertionError('Callers must be sure we have buffered bytes'
362
' before calling _get_in_bytes')
363
if len(self._in_buffer_list[0]) > count:
364
return self._in_buffer_list[0][:count]
365
# We can't yield it from the first buffer, so collapse all buffers, and
367
in_buf = self._get_in_buffer()
368
return in_buf[:count]
370
def _set_in_buffer(self, new_buf):
371
if new_buf is not None:
372
self._in_buffer_list = [new_buf]
373
self._in_buffer_len = len(new_buf)
375
self._in_buffer_list = []
376
self._in_buffer_len = 0
213
378
def accept_bytes(self, bytes):
214
379
"""Decode as much of bytes as possible.
220
385
data will be appended to self.unused_data.
222
387
# accept_bytes is allowed to change the state
223
current_state = self.state_accept
224
self.state_accept(bytes)
225
while current_state != self.state_accept:
388
self._number_needed_bytes = None
389
# lsprof puts a very large amount of time on this specific call for
391
self._in_buffer_list.append(bytes)
392
self._in_buffer_len += len(bytes)
394
# Run the function for the current state.
226
395
current_state = self.state_accept
227
self.state_accept('')
397
while current_state != self.state_accept:
398
# The current state has changed. Run the function for the new
399
# current state, so that it can:
400
# - decode any unconsumed bytes left in a buffer, and
401
# - signal how many more bytes are expected (via raising
403
current_state = self.state_accept
405
except _NeedMoreBytes, e:
406
self._number_needed_bytes = e.count
409
class ChunkedBodyDecoder(_StatefulDecoder):
410
"""Decoder for chunked body data.
412
This is very similar the HTTP's chunked encoding. See the description of
413
streamed body data in `doc/developers/network-protocol.txt` for details.
417
_StatefulDecoder.__init__(self)
418
self.state_accept = self._state_accept_expecting_header
419
self.chunk_in_progress = None
420
self.chunks = collections.deque()
422
self.error_in_progress = None
424
def next_read_size(self):
425
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
426
# end-of-body marker is 4 bytes: 'END\n'.
427
if self.state_accept == self._state_accept_reading_chunk:
428
# We're expecting more chunk content. So we're expecting at least
429
# the rest of this chunk plus an END chunk.
430
return self.bytes_left + 4
431
elif self.state_accept == self._state_accept_expecting_length:
432
if self._in_buffer_len == 0:
433
# We're expecting a chunk length. There's at least two bytes
434
# left: a digit plus '\n'.
437
# We're in the middle of reading a chunk length. So there's at
438
# least one byte left, the '\n' that terminates the length.
440
elif self.state_accept == self._state_accept_reading_unused:
442
elif self.state_accept == self._state_accept_expecting_header:
443
return max(0, len('chunked\n') - self._in_buffer_len)
445
raise AssertionError("Impossible state: %r" % (self.state_accept,))
447
def read_next_chunk(self):
449
return self.chunks.popleft()
453
def _extract_line(self):
454
in_buf = self._get_in_buffer()
455
pos = in_buf.find('\n')
457
# We haven't read a complete line yet, so request more bytes before
459
raise _NeedMoreBytes(1)
461
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
462
self._set_in_buffer(in_buf[pos+1:])
466
self.unused_data = self._get_in_buffer()
467
self._in_buffer_list = []
468
self._in_buffer_len = 0
469
self.state_accept = self._state_accept_reading_unused
471
error_args = tuple(self.error_in_progress)
472
self.chunks.append(request.FailedSmartServerResponse(error_args))
473
self.error_in_progress = None
474
self.finished_reading = True
476
def _state_accept_expecting_header(self):
477
prefix = self._extract_line()
478
if prefix == 'chunked':
479
self.state_accept = self._state_accept_expecting_length
481
raise errors.SmartProtocolError(
482
'Bad chunked body header: "%s"' % (prefix,))
484
def _state_accept_expecting_length(self):
485
prefix = self._extract_line()
488
self.error_in_progress = []
489
self._state_accept_expecting_length()
491
elif prefix == 'END':
492
# We've read the end-of-body marker.
493
# Any further bytes are unused data, including the bytes left in
498
self.bytes_left = int(prefix, 16)
499
self.chunk_in_progress = ''
500
self.state_accept = self._state_accept_reading_chunk
502
def _state_accept_reading_chunk(self):
503
in_buf = self._get_in_buffer()
504
in_buffer_len = len(in_buf)
505
self.chunk_in_progress += in_buf[:self.bytes_left]
506
self._set_in_buffer(in_buf[self.bytes_left:])
507
self.bytes_left -= in_buffer_len
508
if self.bytes_left <= 0:
509
# Finished with chunk
510
self.bytes_left = None
512
self.error_in_progress.append(self.chunk_in_progress)
514
self.chunks.append(self.chunk_in_progress)
515
self.chunk_in_progress = None
516
self.state_accept = self._state_accept_expecting_length
518
def _state_accept_reading_unused(self):
519
self.unused_data += self._get_in_buffer()
520
self._in_buffer_list = []
523
class LengthPrefixedBodyDecoder(_StatefulDecoder):
524
"""Decodes the length-prefixed bulk data."""
527
_StatefulDecoder.__init__(self)
528
self.state_accept = self._state_accept_expecting_length
529
self.state_read = self._state_read_no_data
531
self._trailer_buffer = ''
229
533
def next_read_size(self):
230
534
if self.bytes_left is not None:
426
799
def _write_protocol_version(self):
427
800
"""Write any prefixes this protocol requires.
429
802
Version one doesn't send protocol versions.
433
806
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
434
807
"""Version two of the client side of the smart protocol.
436
809
This prefixes the request with the value of REQUEST_VERSION_TWO.
812
response_marker = RESPONSE_VERSION_TWO
813
request_marker = REQUEST_VERSION_TWO
439
815
def read_response_tuple(self, expect_body=False):
440
816
"""Read a response tuple from the wire.
442
818
This should only be called once.
444
820
version = self._request.read_line()
445
if version != RESPONSE_VERSION_TWO:
446
raise errors.SmartProtocolError('bad protocol marker %r' % version)
447
response_status = self._recv_line()
448
if response_status not in ('success\n', 'failed\n'):
821
if version != self.response_marker:
822
self._request.finished_reading()
823
raise errors.UnexpectedProtocolVersionMarker(version)
824
response_status = self._request.read_line()
825
result = SmartClientRequestProtocolOne._read_response_tuple(self)
826
self._response_is_unknown_method(result)
827
if response_status == 'success\n':
828
self.response_status = True
830
self._request.finished_reading()
832
elif response_status == 'failed\n':
833
self.response_status = False
834
self._request.finished_reading()
835
raise errors.ErrorFromSmartServer(result)
449
837
raise errors.SmartProtocolError(
450
838
'bad protocol status %r' % response_status)
451
self.response_status = response_status == 'success\n'
452
return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
454
840
def _write_protocol_version(self):
455
r"""Write any prefixes this protocol requires.
841
"""Write any prefixes this protocol requires.
457
843
Version two sends the value of REQUEST_VERSION_TWO.
459
self._request.accept_bytes(REQUEST_VERSION_TWO)
845
self._request.accept_bytes(self.request_marker)
847
def read_streamed_body(self):
848
"""Read bytes from the body, decoding into a byte stream.
850
# Read no more than 64k at a time so that we don't risk error 10055 (no
851
# buffer space available) on Windows.
852
_body_decoder = ChunkedBodyDecoder()
853
while not _body_decoder.finished_reading:
854
bytes = self._request.read_bytes(_body_decoder.next_read_size())
856
# end of file encountered reading from server
857
raise errors.ConnectionReset(
858
"Connection lost while reading streamed body.")
859
_body_decoder.accept_bytes(bytes)
860
for body_bytes in iter(_body_decoder.read_next_chunk, None):
861
if 'hpss' in debug.debug_flags and type(body_bytes) is str:
862
mutter(' %d byte chunk read',
865
self._request.finished_reading()
868
def build_server_protocol_three(backing_transport, write_func,
869
root_client_path, jail_root=None):
870
request_handler = request.SmartServerRequestHandler(
871
backing_transport, commands=request.request_handlers,
872
root_client_path=root_client_path, jail_root=jail_root)
873
responder = ProtocolThreeResponder(write_func)
874
message_handler = message.ConventionalRequestHandler(request_handler, responder)
875
return ProtocolThreeDecoder(message_handler)
878
class ProtocolThreeDecoder(_StatefulDecoder):
880
response_marker = RESPONSE_VERSION_THREE
881
request_marker = REQUEST_VERSION_THREE
883
def __init__(self, message_handler, expect_version_marker=False):
884
_StatefulDecoder.__init__(self)
885
self._has_dispatched = False
887
if expect_version_marker:
888
self.state_accept = self._state_accept_expecting_protocol_version
889
# We're expecting at least the protocol version marker + some
891
self._number_needed_bytes = len(MESSAGE_VERSION_THREE) + 4
893
self.state_accept = self._state_accept_expecting_headers
894
self._number_needed_bytes = 4
895
self.decoding_failed = False
896
self.request_handler = self.message_handler = message_handler
898
def accept_bytes(self, bytes):
899
self._number_needed_bytes = None
901
_StatefulDecoder.accept_bytes(self, bytes)
902
except KeyboardInterrupt:
904
except errors.SmartMessageHandlerError, exception:
905
# We do *not* set self.decoding_failed here. The message handler
906
# has raised an error, but the decoder is still able to parse bytes
907
# and determine when this message ends.
908
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
909
log_exception_quietly()
910
self.message_handler.protocol_error(exception.exc_value)
911
# The state machine is ready to continue decoding, but the
912
# exception has interrupted the loop that runs the state machine.
913
# So we call accept_bytes again to restart it.
914
self.accept_bytes('')
915
except Exception, exception:
916
# The decoder itself has raised an exception. We cannot continue
918
self.decoding_failed = True
919
if isinstance(exception, errors.UnexpectedProtocolVersionMarker):
920
# This happens during normal operation when the client tries a
921
# protocol version the server doesn't understand, so no need to
922
# log a traceback every time.
923
# Note that this can only happen when
924
# expect_version_marker=True, which is only the case on the
928
log_exception_quietly()
929
self.message_handler.protocol_error(exception)
931
def _extract_length_prefixed_bytes(self):
932
if self._in_buffer_len < 4:
933
# A length prefix by itself is 4 bytes, and we don't even have that
935
raise _NeedMoreBytes(4)
936
(length,) = struct.unpack('!L', self._get_in_bytes(4))
937
end_of_bytes = 4 + length
938
if self._in_buffer_len < end_of_bytes:
939
# We haven't yet read as many bytes as the length-prefix says there
941
raise _NeedMoreBytes(end_of_bytes)
942
# Extract the bytes from the buffer.
943
in_buf = self._get_in_buffer()
944
bytes = in_buf[4:end_of_bytes]
945
self._set_in_buffer(in_buf[end_of_bytes:])
948
def _extract_prefixed_bencoded_data(self):
949
prefixed_bytes = self._extract_length_prefixed_bytes()
951
decoded = bdecode_as_tuple(prefixed_bytes)
953
raise errors.SmartProtocolError(
954
'Bytes %r not bencoded' % (prefixed_bytes,))
957
def _extract_single_byte(self):
958
if self._in_buffer_len == 0:
959
# The buffer is empty
960
raise _NeedMoreBytes(1)
961
in_buf = self._get_in_buffer()
963
self._set_in_buffer(in_buf[1:])
966
def _state_accept_expecting_protocol_version(self):
967
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
968
in_buf = self._get_in_buffer()
970
# We don't have enough bytes to check if the protocol version
971
# marker is right. But we can check if it is already wrong by
972
# checking that the start of MESSAGE_VERSION_THREE matches what
974
# [In fact, if the remote end isn't bzr we might never receive
975
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
976
# are wrong then we should just raise immediately rather than
978
if not MESSAGE_VERSION_THREE.startswith(in_buf):
979
# We have enough bytes to know the protocol version is wrong
980
raise errors.UnexpectedProtocolVersionMarker(in_buf)
981
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
982
if not in_buf.startswith(MESSAGE_VERSION_THREE):
983
raise errors.UnexpectedProtocolVersionMarker(in_buf)
984
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
985
self.state_accept = self._state_accept_expecting_headers
987
def _state_accept_expecting_headers(self):
988
decoded = self._extract_prefixed_bencoded_data()
989
if type(decoded) is not dict:
990
raise errors.SmartProtocolError(
991
'Header object %r is not a dict' % (decoded,))
992
self.state_accept = self._state_accept_expecting_message_part
994
self.message_handler.headers_received(decoded)
996
raise errors.SmartMessageHandlerError(sys.exc_info())
998
def _state_accept_expecting_message_part(self):
999
message_part_kind = self._extract_single_byte()
1000
if message_part_kind == 'o':
1001
self.state_accept = self._state_accept_expecting_one_byte
1002
elif message_part_kind == 's':
1003
self.state_accept = self._state_accept_expecting_structure
1004
elif message_part_kind == 'b':
1005
self.state_accept = self._state_accept_expecting_bytes
1006
elif message_part_kind == 'e':
1009
raise errors.SmartProtocolError(
1010
'Bad message kind byte: %r' % (message_part_kind,))
1012
def _state_accept_expecting_one_byte(self):
1013
byte = self._extract_single_byte()
1014
self.state_accept = self._state_accept_expecting_message_part
1016
self.message_handler.byte_part_received(byte)
1018
raise errors.SmartMessageHandlerError(sys.exc_info())
1020
def _state_accept_expecting_bytes(self):
1021
# XXX: this should not buffer whole message part, but instead deliver
1022
# the bytes as they arrive.
1023
prefixed_bytes = self._extract_length_prefixed_bytes()
1024
self.state_accept = self._state_accept_expecting_message_part
1026
self.message_handler.bytes_part_received(prefixed_bytes)
1028
raise errors.SmartMessageHandlerError(sys.exc_info())
1030
def _state_accept_expecting_structure(self):
1031
structure = self._extract_prefixed_bencoded_data()
1032
self.state_accept = self._state_accept_expecting_message_part
1034
self.message_handler.structure_part_received(structure)
1036
raise errors.SmartMessageHandlerError(sys.exc_info())
1039
self.unused_data = self._get_in_buffer()
1040
self._set_in_buffer(None)
1041
self.state_accept = self._state_accept_reading_unused
1043
self.message_handler.end_received()
1045
raise errors.SmartMessageHandlerError(sys.exc_info())
1047
def _state_accept_reading_unused(self):
1048
self.unused_data += self._get_in_buffer()
1049
self._set_in_buffer(None)
1051
def next_read_size(self):
1052
if self.state_accept == self._state_accept_reading_unused:
1054
elif self.decoding_failed:
1055
# An exception occured while processing this message, probably from
1056
# self.message_handler. We're not sure that this state machine is
1057
# in a consistent state, so just signal that we're done (i.e. give
1061
if self._number_needed_bytes is not None:
1062
return self._number_needed_bytes - self._in_buffer_len
1064
raise AssertionError("don't know how many bytes are expected!")
1067
class _ProtocolThreeEncoder(object):
1069
response_marker = request_marker = MESSAGE_VERSION_THREE
1070
BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1072
def __init__(self, write_func):
1075
self._real_write_func = write_func
1077
def _write_func(self, bytes):
1078
# TODO: It is probably more appropriate to use sum(map(len, _buf))
1079
# for total number of bytes to write, rather than buffer based on
1080
# the number of write() calls
1081
# TODO: Another possibility would be to turn this into an async model.
1082
# Where we let another thread know that we have some bytes if
1083
# they want it, but we don't actually block for it
1084
# Note that osutils.send_all always sends 64kB chunks anyway, so
1085
# we might just push out smaller bits at a time?
1086
self._buf.append(bytes)
1087
self._buf_len += len(bytes)
1088
if self._buf_len > self.BUFFER_SIZE:
1093
self._real_write_func(''.join(self._buf))
1097
def _serialise_offsets(self, offsets):
1098
"""Serialise a readv offset list."""
1100
for start, length in offsets:
1101
txt.append('%d,%d' % (start, length))
1102
return '\n'.join(txt)
1104
def _write_protocol_version(self):
1105
self._write_func(MESSAGE_VERSION_THREE)
1107
def _write_prefixed_bencode(self, structure):
1108
bytes = bencode(structure)
1109
self._write_func(struct.pack('!L', len(bytes)))
1110
self._write_func(bytes)
1112
def _write_headers(self, headers):
1113
self._write_prefixed_bencode(headers)
1115
def _write_structure(self, args):
1116
self._write_func('s')
1119
if type(arg) is unicode:
1120
utf8_args.append(arg.encode('utf8'))
1122
utf8_args.append(arg)
1123
self._write_prefixed_bencode(utf8_args)
1125
def _write_end(self):
1126
self._write_func('e')
1129
def _write_prefixed_body(self, bytes):
1130
self._write_func('b')
1131
self._write_func(struct.pack('!L', len(bytes)))
1132
self._write_func(bytes)
1134
def _write_chunked_body_start(self):
1135
self._write_func('oC')
1137
def _write_error_status(self):
1138
self._write_func('oE')
1140
def _write_success_status(self):
1141
self._write_func('oS')
1144
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1146
def __init__(self, write_func):
1147
_ProtocolThreeEncoder.__init__(self, write_func)
1148
self.response_sent = False
1149
self._headers = {'Software version': bzrlib.__version__}
1150
if 'hpss' in debug.debug_flags:
1151
self._thread_id = thread.get_ident()
1152
self._response_start_time = None
1154
def _trace(self, action, message, extra_bytes=None, include_time=False):
1155
if self._response_start_time is None:
1156
self._response_start_time = osutils.timer_func()
1158
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1161
if extra_bytes is None:
1164
extra = ' ' + repr(extra_bytes[:40])
1166
extra = extra[:29] + extra[-1] + '...'
1167
mutter('%12s: [%s] %s%s%s'
1168
% (action, self._thread_id, t, message, extra))
1170
def send_error(self, exception):
1171
if self.response_sent:
1172
raise AssertionError(
1173
"send_error(%s) called, but response already sent."
1175
if isinstance(exception, errors.UnknownSmartMethod):
1176
failure = request.FailedSmartServerResponse(
1177
('UnknownMethod', exception.verb))
1178
self.send_response(failure)
1180
if 'hpss' in debug.debug_flags:
1181
self._trace('error', str(exception))
1182
self.response_sent = True
1183
self._write_protocol_version()
1184
self._write_headers(self._headers)
1185
self._write_error_status()
1186
self._write_structure(('error', str(exception)))
1189
def send_response(self, response):
1190
if self.response_sent:
1191
raise AssertionError(
1192
"send_response(%r) called, but response already sent."
1194
self.response_sent = True
1195
self._write_protocol_version()
1196
self._write_headers(self._headers)
1197
if response.is_successful():
1198
self._write_success_status()
1200
self._write_error_status()
1201
if 'hpss' in debug.debug_flags:
1202
self._trace('response', repr(response.args))
1203
self._write_structure(response.args)
1204
if response.body is not None:
1205
self._write_prefixed_body(response.body)
1206
if 'hpss' in debug.debug_flags:
1207
self._trace('body', '%d bytes' % (len(response.body),),
1208
response.body, include_time=True)
1209
elif response.body_stream is not None:
1210
count = num_bytes = 0
1212
for exc_info, chunk in _iter_with_errors(response.body_stream):
1214
if exc_info is not None:
1215
self._write_error_status()
1216
error_struct = request._translate_error(exc_info[1])
1217
self._write_structure(error_struct)
1220
if isinstance(chunk, request.FailedSmartServerResponse):
1221
self._write_error_status()
1222
self._write_structure(chunk.args)
1224
num_bytes += len(chunk)
1225
if first_chunk is None:
1227
self._write_prefixed_body(chunk)
1228
if 'hpssdetail' in debug.debug_flags:
1229
# Not worth timing separately, as _write_func is
1231
self._trace('body chunk',
1232
'%d bytes' % (len(chunk),),
1233
chunk, suppress_time=True)
1234
if 'hpss' in debug.debug_flags:
1235
self._trace('body stream',
1236
'%d bytes %d chunks' % (num_bytes, count),
1239
if 'hpss' in debug.debug_flags:
1240
self._trace('response end', '', include_time=True)
1243
def _iter_with_errors(iterable):
1244
"""Handle errors from iterable.next().
1248
for exc_info, value in _iter_with_errors(iterable):
1251
This is a safer alternative to::
1254
for value in iterable:
1259
Because the latter will catch errors from the for-loop body, not just
1262
If an error occurs, exc_info will be a exc_info tuple, and the generator
1263
will terminate. Otherwise exc_info will be None, and value will be the
1264
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1265
will not be itercepted.
1267
iterator = iter(iterable)
1270
yield None, iterator.next()
1271
except StopIteration:
1273
except (KeyboardInterrupt, SystemExit):
1276
mutter('_iter_with_errors caught error')
1277
log_exception_quietly()
1278
yield sys.exc_info(), None
1282
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1284
def __init__(self, medium_request):
1285
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1286
self._medium_request = medium_request
1289
def set_headers(self, headers):
1290
self._headers = headers.copy()
1292
def call(self, *args):
1293
if 'hpss' in debug.debug_flags:
1294
mutter('hpss call: %s', repr(args)[1:-1])
1295
base = getattr(self._medium_request._medium, 'base', None)
1296
if base is not None:
1297
mutter(' (to %s)', base)
1298
self._request_start_time = osutils.timer_func()
1299
self._write_protocol_version()
1300
self._write_headers(self._headers)
1301
self._write_structure(args)
1303
self._medium_request.finished_writing()
1305
def call_with_body_bytes(self, args, body):
1306
"""Make a remote call of args with body bytes 'body'.
1308
After calling this, call read_response_tuple to find the result out.
1310
if 'hpss' in debug.debug_flags:
1311
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
1312
path = getattr(self._medium_request._medium, '_path', None)
1313
if path is not None:
1314
mutter(' (to %s)', path)
1315
mutter(' %d bytes', len(body))
1316
self._request_start_time = osutils.timer_func()
1317
self._write_protocol_version()
1318
self._write_headers(self._headers)
1319
self._write_structure(args)
1320
self._write_prefixed_body(body)
1322
self._medium_request.finished_writing()
1324
def call_with_body_readv_array(self, args, body):
1325
"""Make a remote call with a readv array.
1327
The body is encoded with one line per readv offset pair. The numbers in
1328
each pair are separated by a comma, and no trailing \n is emitted.
1330
if 'hpss' in debug.debug_flags:
1331
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1332
path = getattr(self._medium_request._medium, '_path', None)
1333
if path is not None:
1334
mutter(' (to %s)', path)
1335
self._request_start_time = osutils.timer_func()
1336
self._write_protocol_version()
1337
self._write_headers(self._headers)
1338
self._write_structure(args)
1339
readv_bytes = self._serialise_offsets(body)
1340
if 'hpss' in debug.debug_flags:
1341
mutter(' %d bytes in readv request', len(readv_bytes))
1342
self._write_prefixed_body(readv_bytes)
1344
self._medium_request.finished_writing()
1346
def call_with_body_stream(self, args, stream):
1347
if 'hpss' in debug.debug_flags:
1348
mutter('hpss call w/body stream: %r', args)
1349
path = getattr(self._medium_request._medium, '_path', None)
1350
if path is not None:
1351
mutter(' (to %s)', path)
1352
self._request_start_time = osutils.timer_func()
1353
self._write_protocol_version()
1354
self._write_headers(self._headers)
1355
self._write_structure(args)
1356
# TODO: notice if the server has sent an early error reply before we
1357
# have finished sending the stream. We would notice at the end
1358
# anyway, but if the medium can deliver it early then it's good
1359
# to short-circuit the whole request...
1360
for exc_info, part in _iter_with_errors(stream):
1361
if exc_info is not None:
1362
# Iterating the stream failed. Cleanly abort the request.
1363
self._write_error_status()
1364
# Currently the client unconditionally sends ('error',) as the
1366
self._write_structure(('error',))
1368
self._medium_request.finished_writing()
1369
raise exc_info[0], exc_info[1], exc_info[2]
1371
self._write_prefixed_body(part)
1374
self._medium_request.finished_writing()