~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-04 12:56:11 UTC
  • mfrom: (3842.3.22 call_with_body_stream)
  • Revision ID: pqm@pqm.ubuntu.com-20090204125611-m7kqmwruvndk7yrv
Add client and server APIs for streamed request bodies.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
 
from bzrlib.util.bencode import bdecode, bencode
 
32
from bzrlib.util.bencode import bdecode_as_tuple, bencode
33
33
 
34
34
 
35
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
655
655
        if 'hpss' in debug.debug_flags:
656
656
            mutter('              %d bytes in readv request', len(readv_bytes))
657
657
        self._last_verb = args[0]
 
658
    
 
659
    def call_with_body_stream(self, args, stream):
 
660
        # Protocols v1 and v2 don't support body streams.  So it's safe to
 
661
        # assume that a v1/v2 server doesn't support whatever method we're
 
662
        # trying to call with a body stream.
 
663
        self._request.finished_writing()
 
664
        self._request.finished_reading()
 
665
        raise errors.UnknownSmartMethod(args[0])
658
666
 
659
667
    def cancel_read_body(self):
660
668
        """After expecting a body, a response code may indicate one otherwise.
931
939
    def _extract_prefixed_bencoded_data(self):
932
940
        prefixed_bytes = self._extract_length_prefixed_bytes()
933
941
        try:
934
 
            decoded = bdecode(prefixed_bytes)
 
942
            decoded = bdecode_as_tuple(prefixed_bytes)
935
943
        except ValueError:
936
944
            raise errors.SmartProtocolError(
937
945
                'Bytes %r not bencoded' % (prefixed_bytes,))
1100
1108
        self._write_func(struct.pack('!L', len(bytes)))
1101
1109
        self._write_func(bytes)
1102
1110
 
 
1111
    def _write_chunked_body_start(self):
 
1112
        self._write_func('oC')
 
1113
 
1103
1114
    def _write_error_status(self):
1104
1115
        self._write_func('oE')
1105
1116
 
1217
1228
        self._write_end()
1218
1229
        self._medium_request.finished_writing()
1219
1230
 
 
1231
    def call_with_body_stream(self, args, stream):
 
1232
        if 'hpss' in debug.debug_flags:
 
1233
            mutter('hpss call w/body stream: %r', args)
 
1234
            path = getattr(self._medium_request._medium, '_path', None)
 
1235
            if path is not None:
 
1236
                mutter('                  (to %s)', path)
 
1237
            self._request_start_time = time.time()
 
1238
        self._write_protocol_version()
 
1239
        self._write_headers(self._headers)
 
1240
        self._write_structure(args)
 
1241
        # TODO: notice if the server has sent an early error reply before we
 
1242
        #       have finished sending the stream.  We would notice at the end
 
1243
        #       anyway, but if the medium can deliver it early then it's good
 
1244
        #       to short-circuit the whole request...
 
1245
        try:
 
1246
            for part in stream:
 
1247
                self._write_prefixed_body(part)
 
1248
                self.flush()
 
1249
        except Exception:
 
1250
            # Iterating the stream failed.  Cleanly abort the request.
 
1251
            self._write_error_status()
 
1252
            # Currently the client unconditionally sends ('error',) as the
 
1253
            # error args.
 
1254
            self._write_structure(('error',))
 
1255
        self._write_end()
 
1256
        self._medium_request.finished_writing()
 
1257