13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
21
from __future__ import absolute_import
24
22
from cStringIO import StringIO
28
from bzrlib import debug
29
from bzrlib import errors
36
30
from bzrlib.smart import message, request
37
31
from bzrlib.trace import log_exception_quietly, mutter
38
from bzrlib.bencode import bdecode_as_tuple, bencode
32
from bzrlib.util.bencode import bdecode, bencode
41
35
# Protocol version strings. These are sent as prefixes of bzr requests and
64
58
def _encode_tuple(args):
65
59
"""Encode the tuple args to a bytestream."""
66
joined = '\x01'.join(args) + '\n'
67
if type(joined) is unicode:
68
# XXX: We should fix things so this never happens! -AJB, 20100304
69
mutter('response args contain unicode, should be only bytes: %r',
71
joined = joined.encode('ascii')
60
return '\x01'.join(args) + '\n'
75
63
class Requester(object):
121
109
for start, length in offsets:
122
110
txt.append('%d,%d' % (start, length))
123
111
return '\n'.join(txt)
126
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
127
115
"""Server-side encoding and decoding logic for smart version 1."""
129
def __init__(self, backing_transport, write_func, root_client_path='/',
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
131
118
self._backing_transport = backing_transport
132
119
self._root_client_path = root_client_path
133
self._jail_root = jail_root
134
120
self.unused_data = ''
135
121
self._finished = False
136
122
self.in_buffer = ''
158
144
req_args = _decode_tuple(first_line)
159
145
self.request = request.SmartServerRequestHandler(
160
146
self._backing_transport, commands=request.request_handlers,
161
root_client_path=self._root_client_path,
162
jail_root=self._jail_root)
163
self.request.args_received(req_args)
147
root_client_path=self._root_client_path)
148
self.request.dispatch_command(req_args[0], req_args[1:])
164
149
if self.request.finished_reading:
165
150
# trivial request
166
151
self.unused_data = self.in_buffer
339
324
def __init__(self):
340
325
self.finished_reading = False
341
self._in_buffer_list = []
342
self._in_buffer_len = 0
343
327
self.unused_data = ''
344
328
self.bytes_left = None
345
329
self._number_needed_bytes = None
347
def _get_in_buffer(self):
348
if len(self._in_buffer_list) == 1:
349
return self._in_buffer_list[0]
350
in_buffer = ''.join(self._in_buffer_list)
351
if len(in_buffer) != self._in_buffer_len:
352
raise AssertionError(
353
"Length of buffer did not match expected value: %s != %s"
354
% self._in_buffer_len, len(in_buffer))
355
self._in_buffer_list = [in_buffer]
358
def _get_in_bytes(self, count):
359
"""Grab X bytes from the input_buffer.
361
Callers should have already checked that self._in_buffer_len is >
362
count. Note, this does not consume the bytes from the buffer. The
363
caller will still need to call _get_in_buffer() and then
364
_set_in_buffer() if they actually need to consume the bytes.
366
# check if we can yield the bytes from just the first entry in our list
367
if len(self._in_buffer_list) == 0:
368
raise AssertionError('Callers must be sure we have buffered bytes'
369
' before calling _get_in_bytes')
370
if len(self._in_buffer_list[0]) > count:
371
return self._in_buffer_list[0][:count]
372
# We can't yield it from the first buffer, so collapse all buffers, and
374
in_buf = self._get_in_buffer()
375
return in_buf[:count]
377
def _set_in_buffer(self, new_buf):
378
if new_buf is not None:
379
self._in_buffer_list = [new_buf]
380
self._in_buffer_len = len(new_buf)
382
self._in_buffer_list = []
383
self._in_buffer_len = 0
385
331
def accept_bytes(self, bytes):
386
332
"""Decode as much of bytes as possible.
392
338
data will be appended to self.unused_data.
394
340
# accept_bytes is allowed to change the state
341
current_state = self.state_accept
395
342
self._number_needed_bytes = None
396
# lsprof puts a very large amount of time on this specific call for
398
self._in_buffer_list.append(bytes)
399
self._in_buffer_len += len(bytes)
343
self._in_buffer += bytes
401
345
# Run the function for the current state.
402
current_state = self.state_accept
403
346
self.state_accept()
404
347
while current_state != self.state_accept:
405
348
# The current state has changed. Run the function for the new
460
403
def _extract_line(self):
461
in_buf = self._get_in_buffer()
462
pos = in_buf.find('\n')
404
pos = self._in_buffer.find('\n')
464
406
# We haven't read a complete line yet, so request more bytes before
466
408
raise _NeedMoreBytes(1)
409
line = self._in_buffer[:pos]
468
410
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
469
self._set_in_buffer(in_buf[pos+1:])
411
self._in_buffer = self._in_buffer[pos+1:]
472
414
def _finished(self):
473
self.unused_data = self._get_in_buffer()
474
self._in_buffer_list = []
475
self._in_buffer_len = 0
415
self.unused_data = self._in_buffer
476
417
self.state_accept = self._state_accept_reading_unused
478
419
error_args = tuple(self.error_in_progress)
507
448
self.state_accept = self._state_accept_reading_chunk
509
450
def _state_accept_reading_chunk(self):
510
in_buf = self._get_in_buffer()
511
in_buffer_len = len(in_buf)
512
self.chunk_in_progress += in_buf[:self.bytes_left]
513
self._set_in_buffer(in_buf[self.bytes_left:])
451
in_buffer_len = len(self._in_buffer)
452
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
453
self._in_buffer = self._in_buffer[self.bytes_left:]
514
454
self.bytes_left -= in_buffer_len
515
455
if self.bytes_left <= 0:
516
456
# Finished with chunk
521
461
self.chunks.append(self.chunk_in_progress)
522
462
self.chunk_in_progress = None
523
463
self.state_accept = self._state_accept_expecting_length
525
465
def _state_accept_reading_unused(self):
526
self.unused_data += self._get_in_buffer()
527
self._in_buffer_list = []
466
self.unused_data += self._in_buffer
530
470
class LengthPrefixedBodyDecoder(_StatefulDecoder):
531
471
"""Decodes the length-prefixed bulk data."""
533
473
def __init__(self):
534
474
_StatefulDecoder.__init__(self)
535
475
self.state_accept = self._state_accept_expecting_length
536
476
self.state_read = self._state_read_no_data
538
478
self._trailer_buffer = ''
540
480
def next_read_size(self):
541
481
if self.bytes_left is not None:
542
482
# Ideally we want to read all the remainder of the body and the
553
493
# Reading excess data. Either way, 1 byte at a time is fine.
556
496
def read_pending_data(self):
557
497
"""Return any pending data that has been decoded."""
558
498
return self.state_read()
560
500
def _state_accept_expecting_length(self):
561
in_buf = self._get_in_buffer()
562
pos = in_buf.find('\n')
501
pos = self._in_buffer.find('\n')
565
self.bytes_left = int(in_buf[:pos])
566
self._set_in_buffer(in_buf[pos+1:])
504
self.bytes_left = int(self._in_buffer[:pos])
505
self._in_buffer = self._in_buffer[pos+1:]
567
506
self.state_accept = self._state_accept_reading_body
568
507
self.state_read = self._state_read_body_buffer
570
509
def _state_accept_reading_body(self):
571
in_buf = self._get_in_buffer()
573
self.bytes_left -= len(in_buf)
574
self._set_in_buffer(None)
510
self._body += self._in_buffer
511
self.bytes_left -= len(self._in_buffer)
575
513
if self.bytes_left <= 0:
576
514
# Finished with body
577
515
if self.bytes_left != 0:
579
517
self._body = self._body[:self.bytes_left]
580
518
self.bytes_left = None
581
519
self.state_accept = self._state_accept_reading_trailer
583
521
def _state_accept_reading_trailer(self):
584
self._trailer_buffer += self._get_in_buffer()
585
self._set_in_buffer(None)
522
self._trailer_buffer += self._in_buffer
586
524
# TODO: what if the trailer does not match "done\n"? Should this raise
587
525
# a ProtocolViolation exception?
588
526
if self._trailer_buffer.startswith('done\n'):
589
527
self.unused_data = self._trailer_buffer[len('done\n'):]
590
528
self.state_accept = self._state_accept_reading_unused
591
529
self.finished_reading = True
593
531
def _state_accept_reading_unused(self):
594
self.unused_data += self._get_in_buffer()
595
self._set_in_buffer(None)
532
self.unused_data += self._in_buffer
597
535
def _state_read_no_data(self):
627
565
mutter('hpss call: %s', repr(args)[1:-1])
628
566
if getattr(self._request._medium, 'base', None) is not None:
629
567
mutter(' (to %s)', self._request._medium.base)
630
self._request_start_time = osutils.timer_func()
568
self._request_start_time = time.time()
631
569
self._write_args(args)
632
570
self._request.finished_writing()
633
571
self._last_verb = args[0]
642
580
if getattr(self._request._medium, '_path', None) is not None:
643
581
mutter(' (to %s)', self._request._medium._path)
644
582
mutter(' %d bytes', len(body))
645
self._request_start_time = osutils.timer_func()
583
self._request_start_time = time.time()
646
584
if 'hpssdetail' in debug.debug_flags:
647
585
mutter('hpss body content: %s', body)
648
586
self._write_args(args)
655
593
"""Make a remote call with a readv array.
657
595
The body is encoded with one line per readv offset pair. The numbers in
658
each pair are separated by a comma, and no trailing \\n is emitted.
596
each pair are separated by a comma, and no trailing \n is emitted.
660
598
if 'hpss' in debug.debug_flags:
661
599
mutter('hpss call w/readv: %s', repr(args)[1:-1])
662
600
if getattr(self._request._medium, '_path', None) is not None:
663
601
mutter(' (to %s)', self._request._medium._path)
664
self._request_start_time = osutils.timer_func()
602
self._request_start_time = time.time()
665
603
self._write_args(args)
666
604
readv_bytes = self._serialise_offsets(body)
667
605
bytes = self._encode_bulk_data(readv_bytes)
671
609
mutter(' %d bytes in readv request', len(readv_bytes))
672
610
self._last_verb = args[0]
674
def call_with_body_stream(self, args, stream):
675
# Protocols v1 and v2 don't support body streams. So it's safe to
676
# assume that a v1/v2 server doesn't support whatever method we're
677
# trying to call with a body stream.
678
self._request.finished_writing()
679
self._request.finished_reading()
680
raise errors.UnknownSmartMethod(args[0])
682
612
def cancel_read_body(self):
683
613
"""After expecting a body, a response code may indicate one otherwise.
757
687
# The response will have no body, so we've finished reading.
758
688
self._request.finished_reading()
759
689
raise errors.UnknownSmartMethod(self._last_verb)
761
691
def read_body_bytes(self, count=-1):
762
692
"""Read bytes from the body, decoding into a byte stream.
764
We read all bytes at once to ensure we've checked the trailer for
694
We read all bytes at once to ensure we've checked the trailer for
765
695
errors, and then feed the buffer back as read_body_bytes is called.
767
697
if self._body_buffer is not None:
768
698
return self._body_buffer.read(count)
769
699
_body_decoder = LengthPrefixedBodyDecoder()
701
# Read no more than 64k at a time so that we don't risk error 10055 (no
702
# buffer space available) on Windows.
771
704
while not _body_decoder.finished_reading:
772
bytes = self._request.read_bytes(_body_decoder.next_read_size())
705
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
706
bytes = self._request.read_bytes(bytes_wanted)
774
708
# end of file encountered reading from server
775
709
raise errors.ConnectionReset(
786
720
def _recv_tuple(self):
787
721
"""Receive a tuple from the medium request."""
788
return _decode_tuple(self._request.read_line())
722
return _decode_tuple(self._recv_line())
724
def _recv_line(self):
725
"""Read an entire line from the medium request."""
727
while not line or line[-1] != '\n':
728
# TODO: this is inefficient - but tuples are short.
729
new_char = self._request.read_bytes(1)
731
# end of file encountered reading from server
732
raise errors.ConnectionReset(
733
"please check connectivity and permissions",
734
"(and try -Dhpss if further diagnosis is required)")
790
738
def query_version(self):
791
739
"""Return protocol version number of the server."""
828
776
if version != self.response_marker:
829
777
self._request.finished_reading()
830
778
raise errors.UnexpectedProtocolVersionMarker(version)
831
response_status = self._request.read_line()
779
response_status = self._recv_line()
832
780
result = SmartClientRequestProtocolOne._read_response_tuple(self)
833
781
self._response_is_unknown_method(result)
834
782
if response_status == 'success\n':
857
805
# Read no more than 64k at a time so that we don't risk error 10055 (no
858
806
# buffer space available) on Windows.
859
808
_body_decoder = ChunkedBodyDecoder()
860
809
while not _body_decoder.finished_reading:
861
bytes = self._request.read_bytes(_body_decoder.next_read_size())
810
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
811
bytes = self._request.read_bytes(bytes_wanted)
863
813
# end of file encountered reading from server
864
814
raise errors.ConnectionReset(
875
825
def build_server_protocol_three(backing_transport, write_func,
876
root_client_path, jail_root=None):
877
827
request_handler = request.SmartServerRequestHandler(
878
828
backing_transport, commands=request.request_handlers,
879
root_client_path=root_client_path, jail_root=jail_root)
829
root_client_path=root_client_path)
880
830
responder = ProtocolThreeResponder(write_func)
881
831
message_handler = message.ConventionalRequestHandler(request_handler, responder)
882
832
return ProtocolThreeDecoder(message_handler)
912
862
# We do *not* set self.decoding_failed here. The message handler
913
863
# has raised an error, but the decoder is still able to parse bytes
914
864
# and determine when this message ends.
915
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
916
log_exception_quietly()
865
log_exception_quietly()
917
866
self.message_handler.protocol_error(exception.exc_value)
918
867
# The state machine is ready to continue decoding, but the
919
868
# exception has interrupted the loop that runs the state machine.
936
885
self.message_handler.protocol_error(exception)
938
887
def _extract_length_prefixed_bytes(self):
939
if self._in_buffer_len < 4:
888
if len(self._in_buffer) < 4:
940
889
# A length prefix by itself is 4 bytes, and we don't even have that
942
891
raise _NeedMoreBytes(4)
943
(length,) = struct.unpack('!L', self._get_in_bytes(4))
892
(length,) = struct.unpack('!L', self._in_buffer[:4])
944
893
end_of_bytes = 4 + length
945
if self._in_buffer_len < end_of_bytes:
894
if len(self._in_buffer) < end_of_bytes:
946
895
# We haven't yet read as many bytes as the length-prefix says there
948
897
raise _NeedMoreBytes(end_of_bytes)
949
898
# Extract the bytes from the buffer.
950
in_buf = self._get_in_buffer()
951
bytes = in_buf[4:end_of_bytes]
952
self._set_in_buffer(in_buf[end_of_bytes:])
899
bytes = self._in_buffer[4:end_of_bytes]
900
self._in_buffer = self._in_buffer[end_of_bytes:]
955
903
def _extract_prefixed_bencoded_data(self):
956
904
prefixed_bytes = self._extract_length_prefixed_bytes()
958
decoded = bdecode_as_tuple(prefixed_bytes)
906
decoded = bdecode(prefixed_bytes)
959
907
except ValueError:
960
908
raise errors.SmartProtocolError(
961
909
'Bytes %r not bencoded' % (prefixed_bytes,))
964
912
def _extract_single_byte(self):
965
if self._in_buffer_len == 0:
913
if self._in_buffer == '':
966
914
# The buffer is empty
967
915
raise _NeedMoreBytes(1)
968
in_buf = self._get_in_buffer()
970
self._set_in_buffer(in_buf[1:])
916
one_byte = self._in_buffer[0]
917
self._in_buffer = self._in_buffer[1:]
973
920
def _state_accept_expecting_protocol_version(self):
974
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
975
in_buf = self._get_in_buffer()
921
needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
976
922
if needed_bytes > 0:
977
923
# We don't have enough bytes to check if the protocol version
978
924
# marker is right. But we can check if it is already wrong by
982
928
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
983
929
# are wrong then we should just raise immediately rather than
985
if not MESSAGE_VERSION_THREE.startswith(in_buf):
931
if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
986
932
# We have enough bytes to know the protocol version is wrong
987
raise errors.UnexpectedProtocolVersionMarker(in_buf)
933
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
988
934
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
989
if not in_buf.startswith(MESSAGE_VERSION_THREE):
990
raise errors.UnexpectedProtocolVersionMarker(in_buf)
991
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
935
if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
936
raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
937
self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
992
938
self.state_accept = self._state_accept_expecting_headers
994
940
def _state_accept_expecting_headers(self):
1074
1020
class _ProtocolThreeEncoder(object):
1076
1022
response_marker = request_marker = MESSAGE_VERSION_THREE
1077
BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1079
1024
def __init__(self, write_func):
1082
1026
self._real_write_func = write_func
1084
1028
def _write_func(self, bytes):
1085
# TODO: Another possibility would be to turn this into an async model.
1086
# Where we let another thread know that we have some bytes if
1087
# they want it, but we don't actually block for it
1088
# Note that osutils.send_all always sends 64kB chunks anyway, so
1089
# we might just push out smaller bits at a time?
1090
self._buf.append(bytes)
1091
self._buf_len += len(bytes)
1092
if self._buf_len > self.BUFFER_SIZE:
1095
1031
def flush(self):
1097
self._real_write_func(''.join(self._buf))
1033
self._real_write_func(self._buf)
1101
1036
def _serialise_offsets(self, offsets):
1102
1037
"""Serialise a readv offset list."""
1151
1083
_ProtocolThreeEncoder.__init__(self, write_func)
1152
1084
self.response_sent = False
1153
1085
self._headers = {'Software version': bzrlib.__version__}
1154
if 'hpss' in debug.debug_flags:
1155
self._thread_id = thread.get_ident()
1156
self._response_start_time = None
1158
def _trace(self, action, message, extra_bytes=None, include_time=False):
1159
if self._response_start_time is None:
1160
self._response_start_time = osutils.timer_func()
1162
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1165
if extra_bytes is None:
1168
extra = ' ' + repr(extra_bytes[:40])
1170
extra = extra[:29] + extra[-1] + '...'
1171
mutter('%12s: [%s] %s%s%s'
1172
% (action, self._thread_id, t, message, extra))
1174
1087
def send_error(self, exception):
1175
1088
if self.response_sent:
1202
1113
self._write_success_status()
1204
1115
self._write_error_status()
1205
if 'hpss' in debug.debug_flags:
1206
self._trace('response', repr(response.args))
1207
1116
self._write_structure(response.args)
1208
1117
if response.body is not None:
1209
1118
self._write_prefixed_body(response.body)
1210
if 'hpss' in debug.debug_flags:
1211
self._trace('body', '%d bytes' % (len(response.body),),
1212
response.body, include_time=True)
1213
1119
elif response.body_stream is not None:
1214
count = num_bytes = 0
1216
for exc_info, chunk in _iter_with_errors(response.body_stream):
1218
if exc_info is not None:
1219
self._write_error_status()
1220
error_struct = request._translate_error(exc_info[1])
1221
self._write_structure(error_struct)
1224
if isinstance(chunk, request.FailedSmartServerResponse):
1225
self._write_error_status()
1226
self._write_structure(chunk.args)
1228
num_bytes += len(chunk)
1229
if first_chunk is None:
1231
self._write_prefixed_body(chunk)
1233
if 'hpssdetail' in debug.debug_flags:
1234
# Not worth timing separately, as _write_func is
1236
self._trace('body chunk',
1237
'%d bytes' % (len(chunk),),
1238
chunk, suppress_time=True)
1239
if 'hpss' in debug.debug_flags:
1240
self._trace('body stream',
1241
'%d bytes %d chunks' % (num_bytes, count),
1120
for chunk in response.body_stream:
1121
self._write_prefixed_body(chunk)
1243
1123
self._write_end()
1244
if 'hpss' in debug.debug_flags:
1245
self._trace('response end', '', include_time=True)
1248
def _iter_with_errors(iterable):
1249
"""Handle errors from iterable.next().
1253
for exc_info, value in _iter_with_errors(iterable):
1256
This is a safer alternative to::
1259
for value in iterable:
1264
Because the latter will catch errors from the for-loop body, not just
1267
If an error occurs, exc_info will be a exc_info tuple, and the generator
1268
will terminate. Otherwise exc_info will be None, and value will be the
1269
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1270
will not be itercepted.
1272
iterator = iter(iterable)
1275
yield None, iterator.next()
1276
except StopIteration:
1278
except (KeyboardInterrupt, SystemExit):
1281
mutter('_iter_with_errors caught error')
1282
log_exception_quietly()
1283
yield sys.exc_info(), None
1287
1126
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1290
1129
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1291
1130
self._medium_request = medium_request
1292
1131
self._headers = {}
1293
self.body_stream_started = None
1295
1133
def set_headers(self, headers):
1296
1134
self._headers = headers.copy()
1298
1136
def call(self, *args):
1299
1137
if 'hpss' in debug.debug_flags:
1300
1138
mutter('hpss call: %s', repr(args)[1:-1])
1301
1139
base = getattr(self._medium_request._medium, 'base', None)
1302
1140
if base is not None:
1303
1141
mutter(' (to %s)', base)
1304
self._request_start_time = osutils.timer_func()
1142
self._request_start_time = time.time()
1305
1143
self._write_protocol_version()
1306
1144
self._write_headers(self._headers)
1307
1145
self._write_structure(args)
1331
1169
"""Make a remote call with a readv array.
1333
1171
The body is encoded with one line per readv offset pair. The numbers in
1334
each pair are separated by a comma, and no trailing \\n is emitted.
1172
each pair are separated by a comma, and no trailing \n is emitted.
1336
1174
if 'hpss' in debug.debug_flags:
1337
1175
mutter('hpss call w/readv: %s', repr(args)[1:-1])
1338
1176
path = getattr(self._medium_request._medium, '_path', None)
1339
1177
if path is not None:
1340
1178
mutter(' (to %s)', path)
1341
self._request_start_time = osutils.timer_func()
1179
self._request_start_time = time.time()
1342
1180
self._write_protocol_version()
1343
1181
self._write_headers(self._headers)
1344
1182
self._write_structure(args)
1349
1187
self._write_end()
1350
1188
self._medium_request.finished_writing()
1352
def call_with_body_stream(self, args, stream):
1353
if 'hpss' in debug.debug_flags:
1354
mutter('hpss call w/body stream: %r', args)
1355
path = getattr(self._medium_request._medium, '_path', None)
1356
if path is not None:
1357
mutter(' (to %s)', path)
1358
self._request_start_time = osutils.timer_func()
1359
self.body_stream_started = False
1360
self._write_protocol_version()
1361
self._write_headers(self._headers)
1362
self._write_structure(args)
1363
# TODO: notice if the server has sent an early error reply before we
1364
# have finished sending the stream. We would notice at the end
1365
# anyway, but if the medium can deliver it early then it's good
1366
# to short-circuit the whole request...
1367
# Provoke any ConnectionReset failures before we start the body stream.
1369
self.body_stream_started = True
1370
for exc_info, part in _iter_with_errors(stream):
1371
if exc_info is not None:
1372
# Iterating the stream failed. Cleanly abort the request.
1373
self._write_error_status()
1374
# Currently the client unconditionally sends ('error',) as the
1376
self._write_structure(('error',))
1378
self._medium_request.finished_writing()
1379
raise exc_info[0], exc_info[1], exc_info[2]
1381
self._write_prefixed_body(part)
1384
self._medium_request.finished_writing()