~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

Merge latest chunking protocol, including support for errors, fixing a test failure.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
 
21
import collections
21
22
from cStringIO import StringIO
22
23
import time
23
24
 
209
210
            bytes = self._encode_bulk_data(response.body)
210
211
            self._write_func(bytes)
211
212
        elif response.body_stream is not None:
212
 
            for chunk in response.body_stream:
213
 
                assert isinstance(chunk, str), 'body must be a str'
214
 
                if chunk == '':
215
 
                    # Skip empty chunks, as they would prematurely signal
216
 
                    # end-of-stream, and they're redundant anyway.
217
 
                    continue
218
 
                bytes = "%x\n%s" % (len(chunk), chunk)
219
 
                self._write_func(bytes)
220
 
            self._write_func('0\n')
 
213
            _send_stream(response.body_stream, self._write_func)
 
214
 
 
215
 
 
216
def _send_stream(stream, write_func):
 
217
    _send_chunks(stream, write_func)
 
218
    write_func('END\n')
 
219
 
 
220
 
 
221
def _send_chunks(stream, write_func):
 
222
    for chunk in stream:
 
223
        if isinstance(chunk, str):
 
224
            bytes = "%x\n%s" % (len(chunk), chunk)
 
225
            write_func(bytes)
 
226
        elif isinstance(chunk, request.FailedSmartServerResponse):
 
227
            write_func('ERR\n')
 
228
            _send_chunks(chunk.args, write_func)
 
229
            return
 
230
        else:
 
231
            raise BzrError(
 
232
                'Chunks must be str or FailedSmartServerResponse, got %r'
 
233
                % chunks)
221
234
 
222
235
 
223
236
class _StatefulDecoder(object):
247
260
class ChunkedBodyDecoder(_StatefulDecoder):
248
261
    """Decoder for chunked body data.
249
262
 
250
 
    This is very similar the HTTP's chunked encoding::
251
 
 
252
 
        BODY := CHUNKS TERMINATOR
253
 
        CHUNKS := CHUNK [CHUNKS]
254
 
        CHUNK := CHUNK_LENGTH CHUNK_CONTENT
255
 
        CHUNK_LENGTH := HEX_DIGITS NEWLINE
256
 
        CHUNK_CONTENT := bytes
257
 
        TERMINATOR := '0' NEWLINE
258
 
 
259
 
    That is, the body consists of a series of chunks.  Each chunk starts with a
260
 
    length prefix in hexadecimal digits, followed by an ASCII newline byte.
261
 
    The end of the body is signaled by a zero the zero-length chunk, i.e.
262
 
    '0\\n'.
 
263
    This is very similar the HTTP's chunked encoding.  See the description of
 
264
    streamed body data in `doc/developers/network-protocol.txt` for details.
263
265
    """
264
266
 
265
267
    def __init__(self):
266
268
        _StatefulDecoder.__init__(self)
267
269
        self.state_accept = self._state_accept_expecting_length
268
270
        self._in_buffer = ''
269
 
        self._content_bytes = ''
 
271
        self.chunk_in_progress = None
 
272
        self.chunks = collections.deque()
 
273
        self.error = False
 
274
        self.error_in_progress = None
270
275
    
271
276
    def next_read_size(self):
272
 
        # Note: the shortest possible chunk is 2 bytes: '0\n'.
 
277
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
 
278
        # end-of-body marker is 4 bytes: 'END\n'.
273
279
        if self.state_accept == self._state_accept_reading_chunk:
274
280
            # We're expecting more chunk content.  So we're expecting at least
275
 
            # the rest of this chunk plus another chunk header.
276
 
            return self.bytes_left + 2
 
281
            # the rest of this chunk plus an END chunk.
 
282
            return self.bytes_left + 4
277
283
        elif self.state_accept == self._state_accept_expecting_length:
278
284
            if self._in_buffer == '':
279
285
                # We're expecting a chunk length.  There's at least two bytes
288
294
        else:
289
295
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
290
296
 
291
 
    def read_pending_data(self):
292
 
        bytes = self._content_bytes
293
 
        self._content_bytes = ''
294
 
        return bytes
 
297
    def read_next_chunk(self):
 
298
        try:
 
299
            return self.chunks.popleft()
 
300
        except IndexError:
 
301
            return None
295
302
 
296
 
    def _state_accept_expecting_length(self, bytes):
297
 
        self._in_buffer += bytes
 
303
    def _extract_line(self):
298
304
        pos = self._in_buffer.find('\n')
299
305
        if pos == -1:
300
306
            # We haven't read a complete length prefix yet, so there's nothing
301
307
            # to do.
302
 
            return
303
 
        self.bytes_left = int(self._in_buffer[:pos], 16)
304
 
        # Trim the length and '\n' delimiter from the _in_buffer.
 
308
            return None
 
309
        line = self._in_buffer[:pos]
 
310
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
305
311
        self._in_buffer = self._in_buffer[pos+1:]
306
 
        if self.bytes_left == 0:
 
312
        return line
 
313
 
 
314
    def _finished(self):
 
315
        self.unused_data = self._in_buffer
 
316
        self._in_buffer = None
 
317
        self.state_accept = self._state_accept_reading_unused
 
318
        if self.error:
 
319
            error_args = tuple(self.error_in_progress)
 
320
            self.chunks.append(request.FailedSmartServerResponse(error_args))
 
321
            self.error_in_progress = None
 
322
        self.finished_reading = True
 
323
 
 
324
    def _state_accept_expecting_length(self, bytes):
 
325
        self._in_buffer += bytes
 
326
        prefix = self._extract_line()
 
327
        if prefix is None:
 
328
            # We haven't read a complete length prefix yet, so there's nothing
 
329
            # to do.
 
330
            return
 
331
        elif prefix == 'ERR':
 
332
            self.error = True
 
333
            self.error_in_progress = []
 
334
            self._state_accept_expecting_length('')
 
335
            return
 
336
        elif prefix == 'END':
 
337
            # We've read the end-of-body marker.
307
338
            # Any further bytes are unused data, including the bytes left in
308
339
            # the _in_buffer.
309
 
            self.unused_data = self._in_buffer
310
 
            self._in_buffer = None
311
 
            self.state_accept = self._state_accept_reading_unused
312
 
            self.finished_reading = True
 
340
            self._finished()
313
341
            return
314
 
        self.state_accept = self._state_accept_reading_chunk
 
342
        else:
 
343
            self.bytes_left = int(prefix, 16)
 
344
            self.chunk_in_progress = ''
 
345
            self.state_accept = self._state_accept_reading_chunk
315
346
 
316
347
    def _state_accept_reading_chunk(self, bytes):
317
348
        self._in_buffer += bytes
318
349
        in_buffer_len = len(self._in_buffer)
319
 
        self._content_bytes += self._in_buffer[:self.bytes_left]
 
350
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
320
351
        self._in_buffer = self._in_buffer[self.bytes_left:]
321
352
        self.bytes_left -= in_buffer_len
322
353
        if self.bytes_left <= 0:
323
354
            # Finished with chunk
324
355
            self.bytes_left = None
 
356
            if self.error:
 
357
                self.error_in_progress.append(self.chunk_in_progress)
 
358
            else:
 
359
                self.chunks.append(self.chunk_in_progress)
 
360
            self.chunk_in_progress = None
325
361
            self.state_accept = self._state_accept_expecting_length
326
362
        
327
363
    def _state_accept_reading_unused(self, bytes):
560
596
        return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
561
597
 
562
598
    def _write_protocol_version(self):
563
 
        r"""Write any prefixes this protocol requires.
 
599
        """Write any prefixes this protocol requires.
564
600
        
565
601
        Version two sends the value of REQUEST_VERSION_TWO.
566
602
        """
574
610
            bytes_wanted = _body_decoder.next_read_size()
575
611
            bytes = self._request.read_bytes(bytes_wanted)
576
612
            _body_decoder.accept_bytes(bytes)
577
 
            body_bytes = _body_decoder.read_pending_data()
578
 
            if body_bytes != '':
 
613
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
579
614
                if 'hpss' in debug.debug_flags:
580
 
                    mutter('              %d streamed body bytes read',
 
615
                    mutter('              %d byte chunk read',
581
616
                           len(body_bytes))
582
617
                yield body_bytes
583
618
        self._request.finished_reading()