198
198
self._write_func(RESPONSE_VERSION_TWO)
201
class LengthPrefixedBodyDecoder(object):
202
"""Decodes the length-prefixed bulk data."""
200
def _send_response(self, response):
201
"""Send a smart server response down the output stream."""
202
assert not self._finished, 'response already sent'
203
self._finished = True
204
self._write_protocol_version()
205
self._write_success_or_failure_prefix(response)
206
self._write_func(_encode_tuple(response.args))
207
if response.body is not None:
208
assert isinstance(response.body, str), 'body must be a str'
209
bytes = self._encode_bulk_data(response.body)
210
self._write_func(bytes)
211
elif response.body_stream is not None:
212
for chunk in response.body_stream:
213
assert isinstance(chunk, str), 'body must be a str'
215
# Skip empty chunks, as they would prematurely signal
216
# end-of-stream, and they're redundant anyway.
218
bytes = "%x\n%s" % (len(chunk), chunk)
219
self._write_func(bytes)
220
self._write_func('0\n')
223
class _StatefulDecoder(object):
204
225
def __init__(self):
205
self.bytes_left = None
206
226
self.finished_reading = False
207
227
self.unused_data = ''
208
self.state_accept = self._state_accept_expecting_length
209
self.state_read = self._state_read_no_data
211
self._trailer_buffer = ''
228
self.bytes_left = None
213
230
def accept_bytes(self, bytes):
214
231
"""Decode as much of bytes as possible.
226
243
current_state = self.state_accept
227
244
self.state_accept('')
247
class ChunkedBodyDecoder(_StatefulDecoder):
248
"""Decoder for chunked body data.
250
This is very similar the HTTP's chunked encoding::
252
BODY := CHUNKS TERMINATOR
253
CHUNKS := CHUNK [CHUNKS]
254
CHUNK := CHUNK_LENGTH CHUNK_CONTENT
255
CHUNK_LENGTH := HEX_DIGITS NEWLINE
256
CHUNK_CONTENT := bytes
257
TERMINATOR := '0' NEWLINE
259
That is, the body consists of a series of chunks. Each chunk starts with a
260
length prefix in hexadecimal digits, followed by an ASCII newline byte.
261
The end of the body is signaled by a zero the zero-length chunk, i.e.
266
_StatefulDecoder.__init__(self)
267
self.state_accept = self._state_accept_expecting_length
269
self._content_bytes = ''
271
def next_read_size(self):
272
# Note: the shortest possible chunk is 2 bytes: '0\n'.
273
if self.state_accept == self._state_accept_reading_chunk:
274
# We're expecting more chunk content. So we're expecting at least
275
# the rest of this chunk plus another chunk header.
276
return self.bytes_left + 2
277
elif self.state_accept == self._state_accept_expecting_length:
278
if self._in_buffer == '':
279
# We're expecting a chunk length. There's at least two bytes
280
# left: a digit plus '\n'.
283
# We're in the middle of reading a chunk length. So there's at
284
# least one byte left, the '\n' that terminates the length.
286
elif self.state_accept == self._state_accept_reading_unused:
289
raise AssertionError("Impossible state: %r" % (self.state_accept,))
291
def read_pending_data(self):
292
bytes = self._content_bytes
293
self._content_bytes = ''
296
def _state_accept_expecting_length(self, bytes):
297
self._in_buffer += bytes
298
pos = self._in_buffer.find('\n')
300
# We haven't read a complete length prefix yet, so there's nothing
303
self.bytes_left = int(self._in_buffer[:pos], 16)
304
# Trim the length and '\n' delimiter from the _in_buffer.
305
self._in_buffer = self._in_buffer[pos+1:]
306
if self.bytes_left == 0:
307
# Any further bytes are unused data, including the bytes left in
309
self.unused_data = self._in_buffer
310
self._in_buffer = None
311
self.state_accept = self._state_accept_reading_unused
312
self.finished_reading = True
314
self.state_accept = self._state_accept_reading_chunk
316
def _state_accept_reading_chunk(self, bytes):
317
self._in_buffer += bytes
318
in_buffer_len = len(self._in_buffer)
319
self._content_bytes += self._in_buffer[:self.bytes_left]
320
self._in_buffer = self._in_buffer[self.bytes_left:]
321
self.bytes_left -= in_buffer_len
322
if self.bytes_left <= 0:
323
# Finished with chunk
324
self.bytes_left = None
325
self.state_accept = self._state_accept_expecting_length
327
def _state_accept_reading_unused(self, bytes):
328
self.unused_data += bytes
331
class LengthPrefixedBodyDecoder(_StatefulDecoder):
332
"""Decodes the length-prefixed bulk data."""
335
_StatefulDecoder.__init__(self)
336
self.state_accept = self._state_accept_expecting_length
337
self.state_read = self._state_read_no_data
339
self._trailer_buffer = ''
229
341
def next_read_size(self):
230
342
if self.bytes_left is not None:
231
343
# Ideally we want to read all the remainder of the body and the
455
567
self._request.accept_bytes(REQUEST_VERSION_TWO)
569
def read_streamed_body(self):
570
"""Read bytes from the body, decoding into a byte stream.
572
_body_decoder = ChunkedBodyDecoder()
573
while not _body_decoder.finished_reading:
574
bytes_wanted = _body_decoder.next_read_size()
575
bytes = self._request.read_bytes(bytes_wanted)
576
mutter('wire bytes: %r', bytes)
577
_body_decoder.accept_bytes(bytes)
578
body_bytes = _body_decoder.read_pending_data()
579
mutter('body bytes: %r', body_bytes)
580
mutter('in: %r', bytes)
581
if 'hpss' in debug.debug_flags:
582
mutter(' %d streamed body bytes read',
586
self._request.finished_reading()