~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Andrew Bennetts
  • Date: 2007-08-30 14:24:58 UTC
  • mfrom: (2748.4.2 hpss-streaming)
  • mto: This revision was merged to the branch mainline in revision 3174.
  • Revision ID: andrew.bennetts@canonical.com-20070830142458-as28m7ho1sd4yeko
Merge from hpss-streaming.

Show diffs side-by-side

added added

removed removed

Lines of Context:
197
197
        """
198
198
        self._write_func(RESPONSE_VERSION_TWO)
199
199
 
200
 
 
201
 
class LengthPrefixedBodyDecoder(object):
202
 
    """Decodes the length-prefixed bulk data."""
203
 
    
 
200
    def _send_response(self, response):
 
201
        """Send a smart server response down the output stream."""
 
202
        assert not self._finished, 'response already sent'
 
203
        self._finished = True
 
204
        self._write_protocol_version()
 
205
        self._write_success_or_failure_prefix(response)
 
206
        self._write_func(_encode_tuple(response.args))
 
207
        if response.body is not None:
 
208
            assert isinstance(response.body, str), 'body must be a str'
 
209
            bytes = self._encode_bulk_data(response.body)
 
210
            self._write_func(bytes)
 
211
        elif response.body_stream is not None:
 
212
            for chunk in response.body_stream:
 
213
                assert isinstance(chunk, str), 'body must be a str'
 
214
                if chunk == '':
 
215
                    # Skip empty chunks, as they would prematurely signal
 
216
                    # end-of-stream, and they're redundant anyway.
 
217
                    continue
 
218
                bytes = "%x\n%s" % (len(chunk), chunk)
 
219
                self._write_func(bytes)
 
220
            self._write_func('0\n')
 
221
 
 
222
 
 
223
class _StatefulDecoder(object):
 
224
 
204
225
    def __init__(self):
205
 
        self.bytes_left = None
206
226
        self.finished_reading = False
207
227
        self.unused_data = ''
208
 
        self.state_accept = self._state_accept_expecting_length
209
 
        self.state_read = self._state_read_no_data
210
 
        self._in_buffer = ''
211
 
        self._trailer_buffer = ''
212
 
    
 
228
        self.bytes_left = None
 
229
 
213
230
    def accept_bytes(self, bytes):
214
231
        """Decode as much of bytes as possible.
215
232
 
226
243
            current_state = self.state_accept
227
244
            self.state_accept('')
228
245
 
 
246
 
 
247
class ChunkedBodyDecoder(_StatefulDecoder):
 
248
    """Decoder for chunked body data.
 
249
 
 
250
    This is very similar the HTTP's chunked encoding::
 
251
 
 
252
        BODY := CHUNKS TERMINATOR
 
253
        CHUNKS := CHUNK [CHUNKS]
 
254
        CHUNK := CHUNK_LENGTH CHUNK_CONTENT
 
255
        CHUNK_LENGTH := HEX_DIGITS NEWLINE
 
256
        CHUNK_CONTENT := bytes
 
257
        TERMINATOR := '0' NEWLINE
 
258
 
 
259
    That is, the body consists of a series of chunks.  Each chunk starts with a
 
260
    length prefix in hexadecimal digits, followed by an ASCII newline byte.
 
261
    The end of the body is signaled by a zero the zero-length chunk, i.e.
 
262
    '0\\n'.
 
263
    """
 
264
 
 
265
    def __init__(self):
 
266
        _StatefulDecoder.__init__(self)
 
267
        self.state_accept = self._state_accept_expecting_length
 
268
        self._in_buffer = ''
 
269
        self._content_bytes = ''
 
270
    
 
271
    def next_read_size(self):
 
272
        # Note: the shortest possible chunk is 2 bytes: '0\n'.
 
273
        if self.state_accept == self._state_accept_reading_chunk:
 
274
            # We're expecting more chunk content.  So we're expecting at least
 
275
            # the rest of this chunk plus another chunk header.
 
276
            return self.bytes_left + 2
 
277
        elif self.state_accept == self._state_accept_expecting_length:
 
278
            if self._in_buffer == '':
 
279
                # We're expecting a chunk length.  There's at least two bytes
 
280
                # left: a digit plus '\n'.
 
281
                return 2
 
282
            else:
 
283
                # We're in the middle of reading a chunk length.  So there's at
 
284
                # least one byte left, the '\n' that terminates the length.
 
285
                return 1
 
286
        elif self.state_accept == self._state_accept_reading_unused:
 
287
            return 1
 
288
        else:
 
289
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
 
290
 
 
291
    def read_pending_data(self):
 
292
        bytes = self._content_bytes
 
293
        self._content_bytes = ''
 
294
        return bytes
 
295
 
 
296
    def _state_accept_expecting_length(self, bytes):
 
297
        self._in_buffer += bytes
 
298
        pos = self._in_buffer.find('\n')
 
299
        if pos == -1:
 
300
            # We haven't read a complete length prefix yet, so there's nothing
 
301
            # to do.
 
302
            return
 
303
        self.bytes_left = int(self._in_buffer[:pos], 16)
 
304
        # Trim the length and '\n' delimiter from the _in_buffer.
 
305
        self._in_buffer = self._in_buffer[pos+1:]
 
306
        if self.bytes_left == 0:
 
307
            # Any further bytes are unused data, including the bytes left in
 
308
            # the _in_buffer.
 
309
            self.unused_data = self._in_buffer
 
310
            self._in_buffer = None
 
311
            self.state_accept = self._state_accept_reading_unused
 
312
            self.finished_reading = True
 
313
            return
 
314
        self.state_accept = self._state_accept_reading_chunk
 
315
 
 
316
    def _state_accept_reading_chunk(self, bytes):
 
317
        self._in_buffer += bytes
 
318
        in_buffer_len = len(self._in_buffer)
 
319
        self._content_bytes += self._in_buffer[:self.bytes_left]
 
320
        self._in_buffer = self._in_buffer[self.bytes_left:]
 
321
        self.bytes_left -= in_buffer_len
 
322
        if self.bytes_left <= 0:
 
323
            # Finished with chunk
 
324
            self.bytes_left = None
 
325
            self.state_accept = self._state_accept_expecting_length
 
326
        
 
327
    def _state_accept_reading_unused(self, bytes):
 
328
        self.unused_data += bytes
 
329
 
 
330
 
 
331
class LengthPrefixedBodyDecoder(_StatefulDecoder):
 
332
    """Decodes the length-prefixed bulk data."""
 
333
    
 
334
    def __init__(self):
 
335
        _StatefulDecoder.__init__(self)
 
336
        self.state_accept = self._state_accept_expecting_length
 
337
        self.state_read = self._state_read_no_data
 
338
        self._in_buffer = ''
 
339
        self._trailer_buffer = ''
 
340
    
229
341
    def next_read_size(self):
230
342
        if self.bytes_left is not None:
231
343
            # Ideally we want to read all the remainder of the body and the
454
566
        """
455
567
        self._request.accept_bytes(REQUEST_VERSION_TWO)
456
568
 
 
569
    def read_streamed_body(self):
 
570
        """Read bytes from the body, decoding into a byte stream.
 
571
        """
 
572
        _body_decoder = ChunkedBodyDecoder()
 
573
        while not _body_decoder.finished_reading:
 
574
            bytes_wanted = _body_decoder.next_read_size()
 
575
            bytes = self._request.read_bytes(bytes_wanted)
 
576
            mutter('wire bytes: %r',  bytes)
 
577
            _body_decoder.accept_bytes(bytes)
 
578
            body_bytes = _body_decoder.read_pending_data()
 
579
            mutter('body bytes: %r',  body_bytes)
 
580
            mutter('in: %r',  bytes)
 
581
            if 'hpss' in debug.debug_flags:
 
582
                mutter('              %d streamed body bytes read',
 
583
                       len(body_bytes))
 
584
            if body_bytes != '':
 
585
                yield body_bytes
 
586
        self._request.finished_reading()
 
587