~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Andrew Bennetts
  • Date: 2007-08-31 15:30:02 UTC
  • mto: (2535.4.5 streaming-smart-fetch)
  • mto: This revision was merged to the branch mainline in revision 2985.
  • Revision ID: andrew.bennetts@canonical.com-20070831153002-2e21vf67prklfdey
Allow an error to interrupt (and terminate) a streamed response body.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
 
21
import collections
21
22
from cStringIO import StringIO
22
23
import time
23
24
 
214
215
 
215
216
def _send_chunks(stream, write_func):
216
217
    for chunk in stream:
217
 
        assert isinstance(chunk, str), 'body must be a str'
218
 
        if chunk == '':
219
 
            # Skip empty chunks, as they would prematurely signal
220
 
            # end-of-stream, and they're redundant anyway.
221
 
            continue
222
 
        bytes = "%x\n%s" % (len(chunk), chunk)
223
 
        write_func(bytes)
 
218
        if isinstance(chunk, str):
 
219
            if chunk == '':
 
220
                # Skip empty chunks, as they would prematurely signal
 
221
                # end-of-stream, and they're redundant anyway.
 
222
                continue
 
223
            bytes = "%x\n%s" % (len(chunk), chunk)
 
224
            write_func(bytes)
 
225
        elif isinstance(chunk, request.FailedSmartServerResponse):
 
226
            write_func('ERR\n' + _encode_tuple(chunk.args))
 
227
            return
 
228
        else:
 
229
            raise BzrError(
 
230
                'Chunks must be str or FailedSmartServerResponse, got %r'
 
231
                % chunks)
224
232
    write_func('0\n')
225
233
 
226
234
 
253
261
 
254
262
    This is very similar the HTTP's chunked encoding::
255
263
 
256
 
        BODY := CHUNKS TERMINATOR
 
264
        STREAMED_BODY := CHUNKS TERMINATOR
257
265
        CHUNKS := CHUNK [CHUNKS]
258
266
        CHUNK := CHUNK_LENGTH CHUNK_CONTENT
259
267
        CHUNK_LENGTH := HEX_DIGITS NEWLINE
260
268
        CHUNK_CONTENT := bytes
261
 
        TERMINATOR := '0' NEWLINE
 
269
 
 
270
        TERMINATOR := SUCCESS_TERMINATOR | ERROR_TERMINATOR
 
271
        SUCCESS_TERMINATOR := '0' NEWLINE
 
272
        ERROR_TERMINATOR := 'ERR' NEWLINE ARGS
 
273
        ARGS := (see bzrlib/smart/__init__.py)
262
274
 
263
275
    That is, the body consists of a series of chunks.  Each chunk starts with a
264
276
    length prefix in hexadecimal digits, followed by an ASCII newline byte.
265
277
    The end of the body is signaled by a zero the zero-length chunk, i.e.
266
 
    '0\\n'.
 
278
    '0\\n', or by 'ERR\\n' followed by a response args tuple of the error.
267
279
    """
268
280
 
269
281
    def __init__(self):
270
282
        _StatefulDecoder.__init__(self)
271
283
        self.state_accept = self._state_accept_expecting_length
272
284
        self._in_buffer = ''
273
 
        self._content_bytes = ''
 
285
        self.chunk_in_progress = None
 
286
        self.chunks = collections.deque()
274
287
    
275
288
    def next_read_size(self):
276
289
        # Note: the shortest possible chunk is 2 bytes: '0\n'.
287
300
                # We're in the middle of reading a chunk length.  So there's at
288
301
                # least one byte left, the '\n' that terminates the length.
289
302
                return 1
 
303
        elif self.state_accept == self._state_accept_reading_error:
 
304
            # We're reading an error tuple.  There's at least one byte left,
 
305
            # '\n'.
 
306
            return 1
290
307
        elif self.state_accept == self._state_accept_reading_unused:
291
308
            return 1
292
309
        else:
293
310
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
294
311
 
295
 
    def read_pending_data(self):
296
 
        bytes = self._content_bytes
297
 
        self._content_bytes = ''
298
 
        return bytes
 
312
    def read_next_chunk(self):
 
313
        try:
 
314
            return self.chunks.popleft()
 
315
        except IndexError:
 
316
            return None
299
317
 
300
 
    def _state_accept_expecting_length(self, bytes):
301
 
        self._in_buffer += bytes
 
318
    def _extract_line(self):
302
319
        pos = self._in_buffer.find('\n')
303
320
        if pos == -1:
304
321
            # We haven't read a complete length prefix yet, so there's nothing
305
322
            # to do.
306
 
            return
307
 
        self.bytes_left = int(self._in_buffer[:pos], 16)
308
 
        # Trim the length and '\n' delimiter from the _in_buffer.
 
323
            return None
 
324
        line = self._in_buffer[:pos]
 
325
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
309
326
        self._in_buffer = self._in_buffer[pos+1:]
 
327
        return line
 
328
 
 
329
    def _finished(self):
 
330
        self.unused_data = self._in_buffer
 
331
        self._in_buffer = None
 
332
        self.state_accept = self._state_accept_reading_unused
 
333
        self.finished_reading = True
 
334
 
 
335
    def _state_accept_expecting_length(self, bytes):
 
336
        self._in_buffer += bytes
 
337
        prefix = self._extract_line()
 
338
        if prefix is None:
 
339
            # We haven't read a complete length prefix yet, so there's nothing
 
340
            # to do.
 
341
            return
 
342
        elif prefix == 'ERR':
 
343
            self.state_accept = self._state_accept_reading_error
 
344
            self.state_accept('')
 
345
            return
 
346
        self.bytes_left = int(prefix, 16)
310
347
        if self.bytes_left == 0:
 
348
            # We've read the end-of-body marker.
311
349
            # Any further bytes are unused data, including the bytes left in
312
350
            # the _in_buffer.
313
 
            self.unused_data = self._in_buffer
314
 
            self._in_buffer = None
315
 
            self.state_accept = self._state_accept_reading_unused
316
 
            self.finished_reading = True
 
351
            self._finished()
317
352
            return
 
353
        self.chunk_in_progress = ''
318
354
        self.state_accept = self._state_accept_reading_chunk
319
355
 
320
356
    def _state_accept_reading_chunk(self, bytes):
321
357
        self._in_buffer += bytes
322
358
        in_buffer_len = len(self._in_buffer)
323
 
        self._content_bytes += self._in_buffer[:self.bytes_left]
 
359
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
324
360
        self._in_buffer = self._in_buffer[self.bytes_left:]
325
361
        self.bytes_left -= in_buffer_len
326
362
        if self.bytes_left <= 0:
327
363
            # Finished with chunk
328
364
            self.bytes_left = None
 
365
            self.chunks.append(self.chunk_in_progress)
 
366
            self.chunk_in_progress = None
329
367
            self.state_accept = self._state_accept_expecting_length
330
368
        
 
369
    def _state_accept_reading_error(self, bytes):
 
370
        self._in_buffer += bytes
 
371
        line = self._extract_line()
 
372
        if line is None:
 
373
            return
 
374
        error_args = _decode_tuple(line + '\n')
 
375
        self.chunks.append(request.FailedSmartServerResponse(error_args))
 
376
        self._finished()
 
377
 
331
378
    def _state_accept_reading_unused(self, bytes):
332
379
        self.unused_data += bytes
333
380
 
564
611
        return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
565
612
 
566
613
    def _write_protocol_version(self):
567
 
        r"""Write any prefixes this protocol requires.
 
614
        """Write any prefixes this protocol requires.
568
615
        
569
616
        Version two sends the value of REQUEST_VERSION_TWO.
570
617
        """
578
625
            bytes_wanted = _body_decoder.next_read_size()
579
626
            bytes = self._request.read_bytes(bytes_wanted)
580
627
            _body_decoder.accept_bytes(bytes)
581
 
            body_bytes = _body_decoder.read_pending_data()
582
 
            if body_bytes != '':
 
628
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
583
629
                if 'hpss' in debug.debug_flags:
584
 
                    mutter('              %d streamed body bytes read',
 
630
                    mutter('              %d byte chunk read',
585
631
                           len(body_bytes))
586
632
                yield body_bytes
587
633
        self._request.finished_reading()