209
210
bytes = self._encode_bulk_data(response.body)
210
211
self._write_func(bytes)
211
212
elif response.body_stream is not None:
212
for chunk in response.body_stream:
213
assert isinstance(chunk, str), 'body must be a str'
215
# Skip empty chunks, as they would prematurely signal
216
# end-of-stream, and they're redundant anyway.
218
bytes = "%x\n%s" % (len(chunk), chunk)
219
self._write_func(bytes)
220
self._write_func('0\n')
213
_send_stream(response.body_stream, self._write_func)
216
def _send_stream(stream, write_func):
217
_send_chunks(stream, write_func)
221
def _send_chunks(stream, write_func):
223
if isinstance(chunk, str):
224
bytes = "%x\n%s" % (len(chunk), chunk)
226
elif isinstance(chunk, request.FailedSmartServerResponse):
228
_send_chunks(chunk.args, write_func)
232
'Chunks must be str or FailedSmartServerResponse, got %r'
223
236
class _StatefulDecoder(object):
247
260
class ChunkedBodyDecoder(_StatefulDecoder):
248
261
"""Decoder for chunked body data.
250
This is very similar the HTTP's chunked encoding::
252
BODY := CHUNKS TERMINATOR
253
CHUNKS := CHUNK [CHUNKS]
254
CHUNK := CHUNK_LENGTH CHUNK_CONTENT
255
CHUNK_LENGTH := HEX_DIGITS NEWLINE
256
CHUNK_CONTENT := bytes
257
TERMINATOR := '0' NEWLINE
259
That is, the body consists of a series of chunks. Each chunk starts with a
260
length prefix in hexadecimal digits, followed by an ASCII newline byte.
261
The end of the body is signaled by a zero the zero-length chunk, i.e.
263
This is very similar the HTTP's chunked encoding. See the description of
264
streamed body data in `doc/developers/network-protocol.txt` for details.
265
267
def __init__(self):
266
268
_StatefulDecoder.__init__(self)
267
269
self.state_accept = self._state_accept_expecting_length
268
270
self._in_buffer = ''
269
self._content_bytes = ''
271
self.chunk_in_progress = None
272
self.chunks = collections.deque()
274
self.error_in_progress = None
271
276
def next_read_size(self):
272
# Note: the shortest possible chunk is 2 bytes: '0\n'.
277
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
278
# end-of-body marker is 4 bytes: 'END\n'.
273
279
if self.state_accept == self._state_accept_reading_chunk:
274
280
# We're expecting more chunk content. So we're expecting at least
275
# the rest of this chunk plus another chunk header.
276
return self.bytes_left + 2
281
# the rest of this chunk plus an END chunk.
282
return self.bytes_left + 4
277
283
elif self.state_accept == self._state_accept_expecting_length:
278
284
if self._in_buffer == '':
279
285
# We're expecting a chunk length. There's at least two bytes
289
295
raise AssertionError("Impossible state: %r" % (self.state_accept,))
291
def read_pending_data(self):
292
bytes = self._content_bytes
293
self._content_bytes = ''
297
def read_next_chunk(self):
299
return self.chunks.popleft()
296
def _state_accept_expecting_length(self, bytes):
297
self._in_buffer += bytes
303
def _extract_line(self):
298
304
pos = self._in_buffer.find('\n')
300
306
# We haven't read a complete length prefix yet, so there's nothing
303
self.bytes_left = int(self._in_buffer[:pos], 16)
304
# Trim the length and '\n' delimiter from the _in_buffer.
309
line = self._in_buffer[:pos]
310
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
305
311
self._in_buffer = self._in_buffer[pos+1:]
306
if self.bytes_left == 0:
315
self.unused_data = self._in_buffer
316
self._in_buffer = None
317
self.state_accept = self._state_accept_reading_unused
319
error_args = tuple(self.error_in_progress)
320
self.chunks.append(request.FailedSmartServerResponse(error_args))
321
self.error_in_progress = None
322
self.finished_reading = True
324
def _state_accept_expecting_length(self, bytes):
325
self._in_buffer += bytes
326
prefix = self._extract_line()
328
# We haven't read a complete length prefix yet, so there's nothing
331
elif prefix == 'ERR':
333
self.error_in_progress = []
334
self._state_accept_expecting_length('')
336
elif prefix == 'END':
337
# We've read the end-of-body marker.
307
338
# Any further bytes are unused data, including the bytes left in
308
339
# the _in_buffer.
309
self.unused_data = self._in_buffer
310
self._in_buffer = None
311
self.state_accept = self._state_accept_reading_unused
312
self.finished_reading = True
314
self.state_accept = self._state_accept_reading_chunk
343
self.bytes_left = int(prefix, 16)
344
self.chunk_in_progress = ''
345
self.state_accept = self._state_accept_reading_chunk
316
347
def _state_accept_reading_chunk(self, bytes):
317
348
self._in_buffer += bytes
318
349
in_buffer_len = len(self._in_buffer)
319
self._content_bytes += self._in_buffer[:self.bytes_left]
350
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
320
351
self._in_buffer = self._in_buffer[self.bytes_left:]
321
352
self.bytes_left -= in_buffer_len
322
353
if self.bytes_left <= 0:
323
354
# Finished with chunk
324
355
self.bytes_left = None
357
self.error_in_progress.append(self.chunk_in_progress)
359
self.chunks.append(self.chunk_in_progress)
360
self.chunk_in_progress = None
325
361
self.state_accept = self._state_accept_expecting_length
327
363
def _state_accept_reading_unused(self, bytes):
574
610
bytes_wanted = _body_decoder.next_read_size()
575
611
bytes = self._request.read_bytes(bytes_wanted)
576
612
_body_decoder.accept_bytes(bytes)
577
body_bytes = _body_decoder.read_pending_data()
613
for body_bytes in iter(_body_decoder.read_next_chunk, None):
579
614
if 'hpss' in debug.debug_flags:
580
mutter(' %d streamed body bytes read',
615
mutter(' %d byte chunk read',
583
618
self._request.finished_reading()