~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Andrew Bennetts
  • Date: 2007-08-30 11:32:12 UTC
  • mto: (2535.4.1 streaming-smart-fetch)
  • mto: This revision was merged to the branch mainline in revision 2985.
  • Revision ID: andrew.bennetts@canonical.com-20070830113212-j6zhw555ma5x21yi
Add protocol (version two) support for streaming bodies (using chunking) in responses.

Show diffs side-by-side

added added

removed removed

Lines of Context:
197
197
        """
198
198
        self._write_func(RESPONSE_VERSION_TWO)
199
199
 
 
200
    def _send_response(self, response):
 
201
        """Send a smart server response down the output stream."""
 
202
        assert not self._finished, 'response already sent'
 
203
        self._finished = True
 
204
        self._write_protocol_version()
 
205
        self._write_success_or_failure_prefix(response)
 
206
        self._write_func(_encode_tuple(response.args))
 
207
        if response.body is not None:
 
208
            assert isinstance(response.body, str), 'body must be a str'
 
209
            bytes = self._encode_bulk_data(response.body)
 
210
            self._write_func(bytes)
 
211
        elif response.body_stream is not None:
 
212
            for chunk in response.body_stream:
 
213
                assert isinstance(chunk, str), 'body must be a str'
 
214
                if chunk == '':
 
215
                    # Skip empty chunks, as they would prematurely signal
 
216
                    # end-of-stream, and they're redundant anyway.
 
217
                    continue
 
218
                bytes = "%x\n%s" % (len(chunk), chunk)
 
219
                self._write_func(bytes)
 
220
            self._write_func('0\n')
 
221
 
200
222
 
201
223
class _StatefulDecoder(object):
202
224
 
267
289
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
268
290
 
269
291
    def read_pending_data(self):
270
 
        return self._content_bytes
 
292
        bytes = self._content_bytes
 
293
        self._content_bytes = ''
 
294
        return bytes
271
295
 
272
296
    def _state_accept_expecting_length(self, bytes):
273
297
        self._in_buffer += bytes
542
566
        """
543
567
        self._request.accept_bytes(REQUEST_VERSION_TWO)
544
568
 
 
569
    def read_streamed_body(self):
 
570
        """Read bytes from the body, decoding into a byte stream.
 
571
        """
 
572
        _body_decoder = ChunkedBodyDecoder()
 
573
        while not _body_decoder.finished_reading:
 
574
            bytes_wanted = _body_decoder.next_read_size()
 
575
            bytes = self._request.read_bytes(bytes_wanted)
 
576
            mutter('wire bytes: %r',  bytes)
 
577
            _body_decoder.accept_bytes(bytes)
 
578
            body_bytes = _body_decoder.read_pending_data()
 
579
            mutter('body bytes: %r',  body_bytes)
 
580
            mutter('in: %r',  bytes)
 
581
            if 'hpss' in debug.debug_flags:
 
582
                mutter('              %d streamed body bytes read',
 
583
                       len(body_bytes))
 
584
            if body_bytes != '':
 
585
                yield body_bytes
 
586
        self._request.finished_reading()
 
587