215
216
def _send_chunks(stream, write_func):
216
217
for chunk in stream:
217
assert isinstance(chunk, str), 'body must be a str'
219
# Skip empty chunks, as they would prematurely signal
220
# end-of-stream, and they're redundant anyway.
222
bytes = "%x\n%s" % (len(chunk), chunk)
218
if isinstance(chunk, str):
220
# Skip empty chunks, as they would prematurely signal
221
# end-of-stream, and they're redundant anyway.
223
bytes = "%x\n%s" % (len(chunk), chunk)
225
elif isinstance(chunk, request.FailedSmartServerResponse):
226
write_func('ERR\n' + _encode_tuple(chunk.args))
230
'Chunks must be str or FailedSmartServerResponse, got %r'
224
232
write_func('0\n')
254
262
This is very similar the HTTP's chunked encoding::
256
BODY := CHUNKS TERMINATOR
264
STREAMED_BODY := CHUNKS TERMINATOR
257
265
CHUNKS := CHUNK [CHUNKS]
258
266
CHUNK := CHUNK_LENGTH CHUNK_CONTENT
259
267
CHUNK_LENGTH := HEX_DIGITS NEWLINE
260
268
CHUNK_CONTENT := bytes
261
TERMINATOR := '0' NEWLINE
270
TERMINATOR := SUCCESS_TERMINATOR | ERROR_TERMINATOR
271
SUCCESS_TERMINATOR := '0' NEWLINE
272
ERROR_TERMINATOR := 'ERR' NEWLINE ARGS
273
ARGS := (see bzrlib/smart/__init__.py)
263
275
That is, the body consists of a series of chunks. Each chunk starts with a
264
276
length prefix in hexadecimal digits, followed by an ASCII newline byte.
265
277
The end of the body is signaled by a zero the zero-length chunk, i.e.
278
'0\\n', or by 'ERR\\n' followed by a response args tuple of the error.
269
281
def __init__(self):
270
282
_StatefulDecoder.__init__(self)
271
283
self.state_accept = self._state_accept_expecting_length
272
284
self._in_buffer = ''
273
self._content_bytes = ''
285
self.chunk_in_progress = None
286
self.chunks = collections.deque()
275
288
def next_read_size(self):
276
289
# Note: the shortest possible chunk is 2 bytes: '0\n'.
287
300
# We're in the middle of reading a chunk length. So there's at
288
301
# least one byte left, the '\n' that terminates the length.
303
elif self.state_accept == self._state_accept_reading_error:
304
# We're reading an error tuple. There's at least one byte left,
290
307
elif self.state_accept == self._state_accept_reading_unused:
293
310
raise AssertionError("Impossible state: %r" % (self.state_accept,))
295
def read_pending_data(self):
296
bytes = self._content_bytes
297
self._content_bytes = ''
312
def read_next_chunk(self):
314
return self.chunks.popleft()
300
def _state_accept_expecting_length(self, bytes):
301
self._in_buffer += bytes
318
def _extract_line(self):
302
319
pos = self._in_buffer.find('\n')
304
321
# We haven't read a complete length prefix yet, so there's nothing
307
self.bytes_left = int(self._in_buffer[:pos], 16)
308
# Trim the length and '\n' delimiter from the _in_buffer.
324
line = self._in_buffer[:pos]
325
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
309
326
self._in_buffer = self._in_buffer[pos+1:]
330
self.unused_data = self._in_buffer
331
self._in_buffer = None
332
self.state_accept = self._state_accept_reading_unused
333
self.finished_reading = True
335
def _state_accept_expecting_length(self, bytes):
336
self._in_buffer += bytes
337
prefix = self._extract_line()
339
# We haven't read a complete length prefix yet, so there's nothing
342
elif prefix == 'ERR':
343
self.state_accept = self._state_accept_reading_error
344
self.state_accept('')
346
self.bytes_left = int(prefix, 16)
310
347
if self.bytes_left == 0:
348
# We've read the end-of-body marker.
311
349
# Any further bytes are unused data, including the bytes left in
312
350
# the _in_buffer.
313
self.unused_data = self._in_buffer
314
self._in_buffer = None
315
self.state_accept = self._state_accept_reading_unused
316
self.finished_reading = True
353
self.chunk_in_progress = ''
318
354
self.state_accept = self._state_accept_reading_chunk
320
356
def _state_accept_reading_chunk(self, bytes):
321
357
self._in_buffer += bytes
322
358
in_buffer_len = len(self._in_buffer)
323
self._content_bytes += self._in_buffer[:self.bytes_left]
359
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
324
360
self._in_buffer = self._in_buffer[self.bytes_left:]
325
361
self.bytes_left -= in_buffer_len
326
362
if self.bytes_left <= 0:
327
363
# Finished with chunk
328
364
self.bytes_left = None
365
self.chunks.append(self.chunk_in_progress)
366
self.chunk_in_progress = None
329
367
self.state_accept = self._state_accept_expecting_length
369
def _state_accept_reading_error(self, bytes):
370
self._in_buffer += bytes
371
line = self._extract_line()
374
error_args = _decode_tuple(line + '\n')
375
self.chunks.append(request.FailedSmartServerResponse(error_args))
331
378
def _state_accept_reading_unused(self, bytes):
332
379
self.unused_data += bytes
578
625
bytes_wanted = _body_decoder.next_read_size()
579
626
bytes = self._request.read_bytes(bytes_wanted)
580
627
_body_decoder.accept_bytes(bytes)
581
body_bytes = _body_decoder.read_pending_data()
628
for body_bytes in iter(_body_decoder.read_next_chunk, None):
583
629
if 'hpss' in debug.debug_flags:
584
mutter(' %d streamed body bytes read',
630
mutter(' %d byte chunk read',
587
633
self._request.finished_reading()