199
195
self._write_func(RESPONSE_VERSION_TWO)
201
def _send_response(self, response):
202
"""Send a smart server response down the output stream."""
203
assert not self._finished, 'response already sent'
204
self._finished = True
205
self._write_protocol_version()
206
self._write_success_or_failure_prefix(response)
207
self._write_func(_encode_tuple(response.args))
208
if response.body is not None:
209
assert isinstance(response.body, str), 'body must be a str'
210
assert response.body_stream is None, (
211
'body_stream and body cannot both be set')
212
bytes = self._encode_bulk_data(response.body)
213
self._write_func(bytes)
214
elif response.body_stream is not None:
215
_send_stream(response.body_stream, self._write_func)
218
def _send_stream(stream, write_func):
219
write_func('chunked\n')
220
_send_chunks(stream, write_func)
224
def _send_chunks(stream, write_func):
226
if isinstance(chunk, str):
227
bytes = "%x\n%s" % (len(chunk), chunk)
229
elif isinstance(chunk, request.FailedSmartServerResponse):
231
_send_chunks(chunk.args, write_func)
234
raise errors.BzrError(
235
'Chunks must be str or FailedSmartServerResponse, got %r'
239
class _StatefulDecoder(object):
198
class LengthPrefixedBodyDecoder(object):
199
"""Decodes the length-prefixed bulk data."""
241
201
def __init__(self):
202
self.bytes_left = None
242
203
self.finished_reading = False
243
204
self.unused_data = ''
244
self.bytes_left = None
205
self.state_accept = self._state_accept_expecting_length
206
self.state_read = self._state_read_no_data
208
self._trailer_buffer = ''
246
210
def accept_bytes(self, bytes):
247
211
"""Decode as much of bytes as possible.
259
223
current_state = self.state_accept
260
224
self.state_accept('')
263
class ChunkedBodyDecoder(_StatefulDecoder):
264
"""Decoder for chunked body data.
266
This is very similar the HTTP's chunked encoding. See the description of
267
streamed body data in `doc/developers/network-protocol.txt` for details.
271
_StatefulDecoder.__init__(self)
272
self.state_accept = self._state_accept_expecting_header
274
self.chunk_in_progress = None
275
self.chunks = collections.deque()
277
self.error_in_progress = None
279
def next_read_size(self):
280
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
281
# end-of-body marker is 4 bytes: 'END\n'.
282
if self.state_accept == self._state_accept_reading_chunk:
283
# We're expecting more chunk content. So we're expecting at least
284
# the rest of this chunk plus an END chunk.
285
return self.bytes_left + 4
286
elif self.state_accept == self._state_accept_expecting_length:
287
if self._in_buffer == '':
288
# We're expecting a chunk length. There's at least two bytes
289
# left: a digit plus '\n'.
292
# We're in the middle of reading a chunk length. So there's at
293
# least one byte left, the '\n' that terminates the length.
295
elif self.state_accept == self._state_accept_reading_unused:
297
elif self.state_accept == self._state_accept_expecting_header:
298
return max(0, len('chunked\n') - len(self._in_buffer))
300
raise AssertionError("Impossible state: %r" % (self.state_accept,))
302
def read_next_chunk(self):
304
return self.chunks.popleft()
308
def _extract_line(self):
309
pos = self._in_buffer.find('\n')
311
# We haven't read a complete length prefix yet, so there's nothing
314
line = self._in_buffer[:pos]
315
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
316
self._in_buffer = self._in_buffer[pos+1:]
320
self.unused_data = self._in_buffer
321
self._in_buffer = None
322
self.state_accept = self._state_accept_reading_unused
324
error_args = tuple(self.error_in_progress)
325
self.chunks.append(request.FailedSmartServerResponse(error_args))
326
self.error_in_progress = None
327
self.finished_reading = True
329
def _state_accept_expecting_header(self, bytes):
330
self._in_buffer += bytes
331
prefix = self._extract_line()
333
# We haven't read a complete length prefix yet, so there's nothing
336
elif prefix == 'chunked':
337
self.state_accept = self._state_accept_expecting_length
339
raise errors.SmartProtocolError(
340
'Bad chunked body header: "%s"' % (prefix,))
342
def _state_accept_expecting_length(self, bytes):
343
self._in_buffer += bytes
344
prefix = self._extract_line()
346
# We haven't read a complete length prefix yet, so there's nothing
349
elif prefix == 'ERR':
351
self.error_in_progress = []
352
self._state_accept_expecting_length('')
354
elif prefix == 'END':
355
# We've read the end-of-body marker.
356
# Any further bytes are unused data, including the bytes left in
361
self.bytes_left = int(prefix, 16)
362
self.chunk_in_progress = ''
363
self.state_accept = self._state_accept_reading_chunk
365
def _state_accept_reading_chunk(self, bytes):
366
self._in_buffer += bytes
367
in_buffer_len = len(self._in_buffer)
368
self.chunk_in_progress += self._in_buffer[:self.bytes_left]
369
self._in_buffer = self._in_buffer[self.bytes_left:]
370
self.bytes_left -= in_buffer_len
371
if self.bytes_left <= 0:
372
# Finished with chunk
373
self.bytes_left = None
375
self.error_in_progress.append(self.chunk_in_progress)
377
self.chunks.append(self.chunk_in_progress)
378
self.chunk_in_progress = None
379
self.state_accept = self._state_accept_expecting_length
381
def _state_accept_reading_unused(self, bytes):
382
self.unused_data += bytes
385
class LengthPrefixedBodyDecoder(_StatefulDecoder):
386
"""Decodes the length-prefixed bulk data."""
389
_StatefulDecoder.__init__(self)
390
self.state_accept = self._state_accept_expecting_length
391
self.state_read = self._state_read_no_data
393
self._trailer_buffer = ''
395
226
def next_read_size(self):
396
227
if self.bytes_left is not None:
397
228
# Ideally we want to read all the remainder of the body and the
495
318
The body is encoded with one line per readv offset pair. The numbers in
496
319
each pair are separated by a comma, and no trailing \n is emitted.
498
if 'hpss' in debug.debug_flags:
499
mutter('hpss call w/readv: %s', repr(args)[1:-1])
500
self._request_start_time = time.time()
501
321
self._write_args(args)
502
322
readv_bytes = self._serialise_offsets(body)
503
323
bytes = self._encode_bulk_data(readv_bytes)
504
324
self._request.accept_bytes(bytes)
505
325
self._request.finished_writing()
506
if 'hpss' in debug.debug_flags:
507
mutter(' %d bytes in readv request', len(readv_bytes))
509
327
def cancel_read_body(self):
510
328
"""After expecting a body, a response code may indicate one otherwise.
618
421
return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
620
423
def _write_protocol_version(self):
621
"""Write any prefixes this protocol requires.
424
r"""Write any prefixes this protocol requires.
623
426
Version two sends the value of REQUEST_VERSION_TWO.
625
428
self._request.accept_bytes(REQUEST_VERSION_TWO)
627
def read_streamed_body(self):
628
"""Read bytes from the body, decoding into a byte stream.
630
_body_decoder = ChunkedBodyDecoder()
631
while not _body_decoder.finished_reading:
632
bytes_wanted = _body_decoder.next_read_size()
633
bytes = self._request.read_bytes(bytes_wanted)
634
_body_decoder.accept_bytes(bytes)
635
for body_bytes in iter(_body_decoder.read_next_chunk, None):
636
if 'hpss' in debug.debug_flags:
637
mutter(' %d byte chunk read',
640
self._request.finished_reading()