198
199
self._write_func(RESPONSE_VERSION_TWO)
201
class LengthPrefixedBodyDecoder(object):
202
"""Decodes the length-prefixed bulk data."""
201
def _send_response(self, response):
202
"""Send a smart server response down the output stream."""
203
assert not self._finished, 'response already sent'
204
self._finished = True
205
self._write_protocol_version()
206
self._write_success_or_failure_prefix(response)
207
self._write_func(_encode_tuple(response.args))
208
if response.body is not None:
209
assert isinstance(response.body, str), 'body must be a str'
210
assert response.body_stream is None, (
211
'body_stream and body cannot both be set')
212
bytes = self._encode_bulk_data(response.body)
213
self._write_func(bytes)
214
elif response.body_stream is not None:
215
_send_stream(response.body_stream, self._write_func)
218
def _send_stream(stream, write_func):
219
write_func('chunked\n')
220
_send_chunks(stream, write_func)
224
def _send_chunks(stream, write_func):
226
if isinstance(chunk, str):
227
bytes = "%x\n%s" % (len(chunk), chunk)
229
elif isinstance(chunk, request.FailedSmartServerResponse):
231
_send_chunks(chunk.args, write_func)
234
raise errors.BzrError(
235
'Chunks must be str or FailedSmartServerResponse, got %r'
239
class _StatefulDecoder(object):
204
241
def __init__(self):
205
self.bytes_left = None
206
242
self.finished_reading = False
207
243
self.unused_data = ''
208
self.state_accept = self._state_accept_expecting_length
209
self.state_read = self._state_read_no_data
211
self._trailer_buffer = ''
244
self.bytes_left = None
213
246
def accept_bytes(self, bytes):
214
247
"""Decode as much of bytes as possible.
226
259
current_state = self.state_accept
227
260
self.state_accept('')
263
class ChunkedBodyDecoder(_StatefulDecoder):
264
"""Decoder for chunked body data.
266
This is very similar the HTTP's chunked encoding. See the description of
267
streamed body data in `doc/developers/network-protocol.txt` for details.
271
_StatefulDecoder.__init__(self)
272
self.state_accept = self._state_accept_expecting_header
274
self.chunk_in_progress = None
275
self.chunks = collections.deque()
277
self.error_in_progress = None
279
def next_read_size(self):
280
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
281
# end-of-body marker is 4 bytes: 'END\n'.
282
if self.state_accept == self._state_accept_reading_chunk:
283
# We're expecting more chunk content. So we're expecting at least
284
# the rest of this chunk plus an END chunk.
285
return self.bytes_left + 4
286
elif self.state_accept == self._state_accept_expecting_length:
287
if self._in_buffer == '':
288
# We're expecting a chunk length. There's at least two bytes
289
# left: a digit plus '\n'.
292
# We're in the middle of reading a chunk length. So there's at
293
# least one byte left, the '\n' that terminates the length.
295
elif self.state_accept == self._state_accept_reading_unused:
297
elif self.state_accept == self._state_accept_expecting_header:
298
return max(0, len('chunked\n') - len(self._in_buffer))
300
raise AssertionError("Impossible state: %r" % (self.state_accept,))
302
def read_next_chunk(self):
304
return self.chunks.popleft()
308
def _extract_line(self):
309
pos = self._in_buffer.find('\n')
311
# We haven't read a complete length prefix yet, so there's nothing
314
line = self._in_buffer[:pos]
315
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
316
self._in_buffer = self._in_buffer[pos+1:]
320
self.unused_data = self._in_buffer
321
self._in_buffer = None
322
self.state_accept = self._state_accept_reading_unused
324
error_args = tuple(self.error_in_progress)
325
self.chunks.append(request.FailedSmartServerResponse(error_args))
326
self.error_in_progress = None
327
self.finished_reading = True
329
def _state_accept_expecting_header(self, bytes):
330
self._in_buffer += bytes
331
prefix = self._extract_line()
333
# We haven't read a complete length prefix yet, so there's nothing
336
elif prefix == 'chunked':
337
self.state_accept = self._state_accept_expecting_length
339
raise errors.SmartProtocolError(
340
'Bad chunked body header: "%s"' % (prefix,))
342
def _state_accept_expecting_length(self, bytes):
343
self._in_buffer += bytes
344
prefix = self._extract_line()
346
# We haven't read a complete length prefix yet, so there's nothing
349
elif prefix == 'ERR':
351
self.error_in_progress = []
352
self._state_accept_expecting_length('')
354
elif prefix == 'END':
355
# We've read the end-of-body marker.
356
# Any further bytes are unused data, including the bytes left in
361
self.bytes_left = int(prefix, 16)
362
self.chunk_in_progress = ''
363
self.state_accept = self._state_accept_reading_chunk
365
def _state_accept_reading_chunk(self, bytes):
366
self._in_buffer += bytes
367
in_buffer_len = len(self._in_buffer)
368
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
369
self._in_buffer = self._in_buffer[self.bytes_left:]
370
self.bytes_left -= in_buffer_len
371
if self.bytes_left <= 0:
372
# Finished with chunk
373
self.bytes_left = None
375
self.error_in_progress.append(self.chunk_in_progress)
377
self.chunks.append(self.chunk_in_progress)
378
self.chunk_in_progress = None
379
self.state_accept = self._state_accept_expecting_length
381
def _state_accept_reading_unused(self, bytes):
382
self.unused_data += bytes
385
class LengthPrefixedBodyDecoder(_StatefulDecoder):
386
"""Decodes the length-prefixed bulk data."""
389
_StatefulDecoder.__init__(self)
390
self.state_accept = self._state_accept_expecting_length
391
self.state_read = self._state_read_no_data
393
self._trailer_buffer = ''
229
395
def next_read_size(self):
230
396
if self.bytes_left is not None:
231
397
# Ideally we want to read all the remainder of the body and the
452
618
return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
454
620
def _write_protocol_version(self):
455
r"""Write any prefixes this protocol requires.
621
"""Write any prefixes this protocol requires.
457
623
Version two sends the value of REQUEST_VERSION_TWO.
459
625
self._request.accept_bytes(REQUEST_VERSION_TWO)
627
def read_streamed_body(self):
628
"""Read bytes from the body, decoding into a byte stream.
630
_body_decoder = ChunkedBodyDecoder()
631
while not _body_decoder.finished_reading:
632
bytes_wanted = _body_decoder.next_read_size()
633
bytes = self._request.read_bytes(bytes_wanted)
634
_body_decoder.accept_bytes(bytes)
635
for body_bytes in iter(_body_decoder.read_next_chunk, None):
636
if 'hpss' in debug.debug_flags:
637
mutter(' %d byte chunk read',
640
self._request.finished_reading()