~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

Merge in the protocol-v3 implementation so far, integrating with the protocol negotiation in bzrlib.smart.client.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
import collections
22
22
from cStringIO import StringIO
 
23
import struct
23
24
import time
24
25
 
 
26
import bzrlib
25
27
from bzrlib import debug
26
28
from bzrlib import errors
27
 
from bzrlib.smart import request
 
29
from bzrlib.smart import message, request
28
30
from bzrlib.trace import log_exception_quietly, mutter
 
31
from bzrlib.util.bencode import bdecode, bencode
29
32
 
30
33
 
31
34
# Protocol version strings.  These are sent as prefixes of bzr requests and
34
37
REQUEST_VERSION_TWO = 'bzr request 2\n'
35
38
RESPONSE_VERSION_TWO = 'bzr response 2\n'
36
39
 
 
40
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.3)\n'
 
41
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
 
42
 
37
43
 
38
44
def _recv_tuple(from_file):
39
45
    req_line = from_file.readline()
184
190
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
185
191
    """
186
192
 
 
193
    response_marker = RESPONSE_VERSION_TWO
 
194
    request_marker = REQUEST_VERSION_TWO
 
195
 
187
196
    def _write_success_or_failure_prefix(self, response):
188
197
        """Write the protocol specific success/failure prefix."""
189
198
        if response.is_successful():
196
205
        
197
206
        Version two sends the value of RESPONSE_VERSION_TWO.
198
207
        """
199
 
        self._write_func(RESPONSE_VERSION_TWO)
 
208
        self._write_func(self.response_marker)
200
209
 
201
210
    def _send_response(self, response):
202
211
        """Send a smart server response down the output stream."""
236
245
                % chunk)
237
246
 
238
247
 
 
248
class _NeedMoreBytes(Exception):
 
249
    """Raise this inside a _StatefulDecoder to stop decoding until more bytes
 
250
    have been received.
 
251
    """
 
252
 
 
253
    def __init__(self, count=None):
 
254
        self.count = count
 
255
 
 
256
 
239
257
class _StatefulDecoder(object):
240
258
 
241
259
    def __init__(self):
242
260
        self.finished_reading = False
243
261
        self.unused_data = ''
244
262
        self.bytes_left = None
 
263
        self._number_needed_bytes = None
245
264
 
246
265
    def accept_bytes(self, bytes):
247
266
        """Decode as much of bytes as possible.
254
273
        """
255
274
        # accept_bytes is allowed to change the state
256
275
        current_state = self.state_accept
257
 
        self.state_accept(bytes)
258
 
        while current_state != self.state_accept:
259
 
            current_state = self.state_accept
260
 
            self.state_accept('')
 
276
        self._number_needed_bytes = None
 
277
        try:
 
278
            pr('invoking state_accept %s' %
 
279
                    (self.state_accept.im_func.__name__[len('_state_accept_'):],))
 
280
            self.state_accept(bytes)
 
281
            while current_state != self.state_accept:
 
282
                current_state = self.state_accept
 
283
                pr('invoking state_accept %s' %
 
284
                        (self.state_accept.im_func.__name__[len('_state_accept_'):],))
 
285
                self.state_accept('')
 
286
        except _NeedMoreBytes, e:
 
287
            #print '(need more bytes: %r)' % e.count
 
288
            self._number_needed_bytes = e.count
261
289
 
262
290
 
263
291
class ChunkedBodyDecoder(_StatefulDecoder):
310
338
        if pos == -1:
311
339
            # We haven't read a complete length prefix yet, so there's nothing
312
340
            # to do.
313
 
            return None
 
341
            raise _NeedMoreBytes()
314
342
        line = self._in_buffer[:pos]
315
343
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
316
344
        self._in_buffer = self._in_buffer[pos+1:]
329
357
    def _state_accept_expecting_header(self, bytes):
330
358
        self._in_buffer += bytes
331
359
        prefix = self._extract_line()
332
 
        if prefix is None:
333
 
            # We haven't read a complete length prefix yet, so there's nothing
334
 
            # to do.
335
 
            return
336
 
        elif prefix == 'chunked':
 
360
        if prefix == 'chunked':
337
361
            self.state_accept = self._state_accept_expecting_length
338
362
        else:
339
363
            raise errors.SmartProtocolError(
342
366
    def _state_accept_expecting_length(self, bytes):
343
367
        self._in_buffer += bytes
344
368
        prefix = self._extract_line()
345
 
        if prefix is None:
346
 
            # We haven't read a complete length prefix yet, so there's nothing
347
 
            # to do.
348
 
            return
349
 
        elif prefix == 'ERR':
 
369
        if prefix == 'ERR':
350
370
            self.error = True
351
371
            self.error_in_progress = []
352
372
            self._state_accept_expecting_length('')
592
612
            return 1
593
613
        elif resp == ('ok', '2'):
594
614
            return 2
 
615
        elif resp == ('ok', '3'):
 
616
            return 3
595
617
        else:
596
618
            raise errors.SmartProtocolError("bad response %r" % (resp,))
597
619
 
613
635
    This prefixes the request with the value of REQUEST_VERSION_TWO.
614
636
    """
615
637
 
 
638
    response_marker = RESPONSE_VERSION_TWO
 
639
    request_marker = REQUEST_VERSION_TWO
 
640
 
616
641
    def read_response_tuple(self, expect_body=False):
617
642
        """Read a response tuple from the wire.
618
643
 
619
644
        This should only be called once.
620
645
        """
621
646
        version = self._request.read_line()
622
 
        if version != RESPONSE_VERSION_TWO:
 
647
        if version != self.response_marker:
623
648
            raise errors.SmartProtocolError('bad protocol marker %r' % version)
624
649
        response_status = self._recv_line()
625
650
        if response_status not in ('success\n', 'failed\n'):
633
658
        
634
659
        Version two sends the value of REQUEST_VERSION_TWO.
635
660
        """
636
 
        self._request.accept_bytes(REQUEST_VERSION_TWO)
 
661
        self._request.accept_bytes(self.request_marker)
637
662
 
638
663
    def read_streamed_body(self):
639
664
        """Read bytes from the body, decoding into a byte stream.
653
678
                yield body_bytes
654
679
        self._request.finished_reading()
655
680
 
 
681
 
 
682
def build_server_protocol_three(backing_transport, write_func):
 
683
    request_handler = request.SmartServerRequestHandler(
 
684
        backing_transport, commands=request.request_handlers)
 
685
    responder = ProtocolThreeResponder(write_func)
 
686
    message_handler = message.ConventionalRequestHandler(request_handler, responder)
 
687
    return _ProtocolThreeBase(message_handler)
 
688
 
 
689
 
 
690
class _ProtocolThreeBase(_StatefulDecoder):
 
691
 
 
692
    response_marker = RESPONSE_VERSION_THREE
 
693
    request_marker = REQUEST_VERSION_THREE
 
694
 
 
695
    def __init__(self, message_handler):
 
696
        _StatefulDecoder.__init__(self)
 
697
        self.has_dispatched = False
 
698
        # Initial state
 
699
        self._in_buffer = ''
 
700
        self._number_needed_bytes = 4
 
701
        self.state_accept = self._state_accept_expecting_headers
 
702
 
 
703
        self.request_handler = self.message_handler = message_handler
 
704
 
 
705
#        self.excess_buffer = ''
 
706
#        self._finished = False
 
707
#        self.has_dispatched = False
 
708
#        self._body_decoder = None
 
709
 
 
710
    def accept_bytes(self, bytes):
 
711
        pr('......')
 
712
#        if 'put_non_atomic' in bytes:
 
713
#            import pdb; pdb.set_trace()
 
714
        def summarise_buf():
 
715
            if self._in_buffer is None:
 
716
                buf_summary = 'None'
 
717
            elif len(self._in_buffer) <= 6:
 
718
                buf_summary = repr(self._in_buffer)
 
719
            else:
 
720
                buf_summary = repr(self._in_buffer[:3] + '...')
 
721
            return buf_summary
 
722
        handler_name = self.message_handler.__class__.__name__
 
723
        handler_name = handler_name[len('Conventional'):-len('Handler')]
 
724
        state_now = self.state_accept.im_func.__name__[len('_state_accept_'):]
 
725
        buf_now = summarise_buf()
 
726
        #from pprint import pprint; pprint([bytes, self.__dict__])
 
727
        self._number_needed_bytes = None
 
728
        try:
 
729
            _StatefulDecoder.accept_bytes(self, bytes)
 
730
        except KeyboardInterrupt:
 
731
            raise
 
732
        except Exception, exception:
 
733
            log_exception_quietly()
 
734
            # XXX
 
735
            self.message_handler.protocol_error(exception)
 
736
            #self._send_response(request.FailedSmartServerResponse(
 
737
            #    ('error', str(exception))))
 
738
        pr('%s in %s(%s), got %r --> %s(%s)' % (
 
739
            handler_name, state_now, buf_now, bytes,
 
740
            self.state_accept.im_func.__name__[len('_state_accept_'):],
 
741
            summarise_buf()))
 
742
        pr('~~~~~~')
 
743
 
 
744
    def _extract_length_prefixed_bytes(self):
 
745
        if len(self._in_buffer) < 4:
 
746
            # A length prefix by itself is 4 bytes, and we don't even have that
 
747
            # many yet.
 
748
            raise _NeedMoreBytes(4)
 
749
        (length,) = struct.unpack('!L', self._in_buffer[:4])
 
750
        end_of_bytes = 4 + length
 
751
        if len(self._in_buffer) < end_of_bytes:
 
752
            # We haven't yet read as many bytes as the length-prefix says there
 
753
            # are.
 
754
            raise _NeedMoreBytes(end_of_bytes)
 
755
        # Extract the bytes from the buffer.
 
756
        bytes = self._in_buffer[4:end_of_bytes]
 
757
        self._in_buffer = self._in_buffer[end_of_bytes:]
 
758
        return bytes
 
759
 
 
760
    def _extract_prefixed_bencoded_data(self):
 
761
        prefixed_bytes = self._extract_length_prefixed_bytes()
 
762
        try:
 
763
            decoded = bdecode(prefixed_bytes)
 
764
        except ValueError:
 
765
            raise errors.SmartProtocolError(
 
766
                'Bytes %r not bencoded' % (prefixed_bytes,))
 
767
        return decoded
 
768
 
 
769
    def _extract_single_byte(self):
 
770
        if self._in_buffer == '':
 
771
            # The buffer is empty
 
772
            raise _NeedMoreBytes()
 
773
        one_byte = self._in_buffer[0]
 
774
        self._in_buffer = self._in_buffer[1:]
 
775
        return one_byte
 
776
 
 
777
    def _state_accept_expecting_headers(self, bytes):
 
778
        self._in_buffer += bytes
 
779
        decoded = self._extract_prefixed_bencoded_data()
 
780
        if type(decoded) is not dict:
 
781
            raise errors.SmartProtocolError(
 
782
                'Header object %r is not a dict' % (decoded,))
 
783
        self.message_handler.headers_received(decoded)
 
784
        self.state_accept = self._state_accept_expecting_message_part
 
785
    
 
786
    def _state_accept_expecting_message_part(self, bytes):
 
787
        #import sys; print >> sys.stderr, 'msg part bytes:', repr(bytes)
 
788
        self._in_buffer += bytes
 
789
        message_part_kind = self._extract_single_byte()
 
790
        if message_part_kind == 'o':
 
791
            self.state_accept = self._state_accept_expecting_one_byte
 
792
        elif message_part_kind == 's':
 
793
            self.state_accept = self._state_accept_expecting_structure
 
794
        elif message_part_kind == 'b':
 
795
            self.state_accept = self._state_accept_expecting_bytes
 
796
        elif message_part_kind == 'e':
 
797
            self.done()
 
798
        else:
 
799
            raise errors.SmartProtocolError(
 
800
                'Bad message kind byte: %r' % (message_part_kind,))
 
801
        #import sys; print >> sys.stderr, 'state:', self.state_accept, '_in_buffer:', repr(self._in_buffer)
 
802
 
 
803
    def _state_accept_expecting_one_byte(self, bytes):
 
804
        self._in_buffer += bytes
 
805
        byte = self._extract_single_byte()
 
806
        self.message_handler.byte_part_received(byte)
 
807
        self.state_accept = self._state_accept_expecting_message_part
 
808
 
 
809
    def _state_accept_expecting_bytes(self, bytes):
 
810
        # XXX: this should not buffer whole message part, but instead deliver
 
811
        # the bytes as they arrive.
 
812
        self._in_buffer += bytes
 
813
        prefixed_bytes = self._extract_length_prefixed_bytes()
 
814
        self.message_handler.bytes_part_received(prefixed_bytes)
 
815
        self.state_accept = self._state_accept_expecting_message_part
 
816
 
 
817
    def _state_accept_expecting_structure(self, bytes):
 
818
        self._in_buffer += bytes
 
819
        structure = self._extract_prefixed_bencoded_data()
 
820
        self.message_handler.structure_part_received(structure)
 
821
        self.state_accept = self._state_accept_expecting_message_part
 
822
 
 
823
    def done(self):
 
824
        #import sys; print >> sys.stderr, 'Done!', repr(self._in_buffer)
 
825
        self.unused_data = self._in_buffer
 
826
        self._in_buffer = None
 
827
        self.state_accept = self._state_accept_reading_unused
 
828
        self.message_handler.end_received()
 
829
 
 
830
    def _state_accept_reading_unused(self, bytes):
 
831
        self.unused_data += bytes
 
832
 
 
833
    @property
 
834
    def excess_buffer(self):
 
835
        # XXX: this property is a compatibility hack.  Really there should not
 
836
        # be both unused_data and excess_buffer.
 
837
        return self.unused_data
 
838
    
 
839
    def next_read_size(self):
 
840
        if self.state_accept == self._state_accept_reading_unused:
 
841
            return 0
 
842
        else:
 
843
            if self._number_needed_bytes is not None:
 
844
                return self._number_needed_bytes - len(self._in_buffer)
 
845
            else:
 
846
                return 1 # XXX !!!
 
847
 
 
848
 
 
849
class SmartServerRequestProtocolThree(_ProtocolThreeBase):
 
850
 
 
851
    def _args_received(self, args):
 
852
        if len(args) < 1:
 
853
            raise errors.SmartProtocolError('Empty argument sequence')
 
854
        self.state_accept = self._state_accept_expecting_body_kind
 
855
        self.request_handler.args_received(args)
 
856
 
 
857
 
 
858
class SmartClientRequestProtocolThree(_ProtocolThreeBase, SmartClientRequestProtocolTwo):
 
859
 
 
860
    response_marker = RESPONSE_VERSION_THREE
 
861
    request_marker = REQUEST_VERSION_THREE
 
862
 
 
863
    def __init__(self, client_medium_request):
 
864
        from bzrlib.smart.message import MessageHandler
 
865
        _ProtocolThreeBase.__init__(self, MessageHandler())
 
866
        SmartClientRequestProtocolTwo.__init__(self, client_medium_request)
 
867
        # Initial state
 
868
        self._in_buffer = ''
 
869
        self.state_accept = self._state_accept_expecting_headers
 
870
        self.response_handler = self.request_handler = self.message_handler
 
871
 
 
872
    def _state_accept_expecting_response_status(self, bytes):
 
873
        self._in_buffer += bytes
 
874
        response_status = self._extract_single_byte()
 
875
        if response_status not in ['S', 'F']:
 
876
            raise errors.SmartProtocolError(
 
877
                'Unknown response status: %r' % (response_status,))
 
878
        self.successful_status = bool(response_status == 'S')
 
879
        self.state_accept = self._state_accept_expecting_request_args
 
880
 
 
881
    def _args_received(self, args):
 
882
        if self.successful_status:
 
883
            self.response_handler.args_received(args)
 
884
        else:
 
885
            if len(args) < 1:
 
886
                raise errors.SmartProtocolError('Empty error details')
 
887
            self.response_handler.error_received(args)
 
888
        self.done()
 
889
 
 
890
 
 
891
    # XXX: the encoding of requests and decoding responses are somewhat
 
892
    # conflated into one class here.  The protocol is half-duplex, so combining
 
893
    # them just makes the code needlessly ugly.
 
894
 
 
895
    def _write_prefixed_bencode(self, structure):
 
896
        bytes = bencode(structure)
 
897
        self._request.accept_bytes(struct.pack('!L', len(bytes)))
 
898
        self._request.accept_bytes(bytes)
 
899
 
 
900
    def _write_headers(self, headers=None):
 
901
        if headers is None:
 
902
            headers = {'Software version': bzrlib.__version__}
 
903
        self._write_prefixed_bencode(headers)
 
904
 
 
905
    def _write_args(self, args):
 
906
        self._request.accept_bytes('s')
 
907
        self._write_prefixed_bencode(args)
 
908
 
 
909
    def _write_end(self):
 
910
        self._request.accept_bytes('e')
 
911
 
 
912
    def _write_prefixed_body(self, bytes):
 
913
        self._request.accept_bytes('b')
 
914
        self._request.accept_bytes(struct.pack('!L', len(bytes)))
 
915
        self._request.accept_bytes(bytes)
 
916
 
 
917
    def _wait_for_request_end(self):
 
918
        while True:
 
919
            next_read_size = self.next_read_size() 
 
920
            if next_read_size == 0:
 
921
                # a complete request has been read.
 
922
                break
 
923
            bytes = self._request.read_bytes(next_read_size)
 
924
            if bytes == '':
 
925
                # end of file encountered reading from server
 
926
                raise errors.ConnectionReset(
 
927
                    "please check connectivity and permissions",
 
928
                    "(and try -Dhpss if further diagnosis is required)")
 
929
            self.accept_bytes(bytes)
 
930
 
 
931
    # these methods from SmartClientRequestProtocolOne/Two
 
932
    def call(self, *args, **kw):
 
933
        # XXX: ideally, signature would be call(self, *args, headers=None), but
 
934
        # python doesn't allow that.  So, we fake it.
 
935
        headers = None
 
936
        if 'headers' in kw:
 
937
            headers = kw.pop('headers')
 
938
        if kw != {}:
 
939
            raise TypeError('Unexpected keyword arguments: %r' % (kw,))
 
940
        if 'hpss' in debug.debug_flags:
 
941
            mutter('hpss call:   %s', repr(args)[1:-1])
 
942
            if getattr(self._request._medium, 'base', None) is not None:
 
943
                mutter('             (to %s)', self._request._medium.base)
 
944
            self._request_start_time = time.time()
 
945
        self._write_protocol_version()
 
946
        self._write_headers(headers)
 
947
        self._write_args(args)
 
948
        self._write_end()
 
949
        self._request.finished_writing()
 
950
 
 
951
    def call_with_body_bytes(self, args, body, headers=None):
 
952
        """Make a remote call of args with body bytes 'body'.
 
953
 
 
954
        After calling this, call read_response_tuple to find the result out.
 
955
        """
 
956
        if 'hpss' in debug.debug_flags:
 
957
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
 
958
            if getattr(self._request._medium, '_path', None) is not None:
 
959
                mutter('                  (to %s)', self._request._medium._path)
 
960
            mutter('              %d bytes', len(body))
 
961
            self._request_start_time = time.time()
 
962
        self._write_protocol_version()
 
963
        self._write_headers(headers)
 
964
        self._write_args(args)
 
965
        self._write_prefixed_body(body)
 
966
        self._write_end()
 
967
        self._request.finished_writing()
 
968
 
 
969
    def call_with_body_readv_array(self, args, body, headers=None):
 
970
        """Make a remote call with a readv array.
 
971
 
 
972
        The body is encoded with one line per readv offset pair. The numbers in
 
973
        each pair are separated by a comma, and no trailing \n is emitted.
 
974
        """
 
975
        if 'hpss' in debug.debug_flags:
 
976
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
 
977
            if getattr(self._request._medium, '_path', None) is not None:
 
978
                mutter('                  (to %s)', self._request._medium._path)
 
979
            self._request_start_time = time.time()
 
980
        self._write_protocol_version()
 
981
        self._write_headers(headers)
 
982
        self._write_args(args)
 
983
        readv_bytes = self._serialise_offsets(body)
 
984
        self._write_prefixed_body(readv_bytes)
 
985
        self._request.finished_writing()
 
986
        if 'hpss' in debug.debug_flags:
 
987
            mutter('              %d bytes in readv request', len(readv_bytes))
 
988
 
 
989
    def cancel_read_body(self):
 
990
        """Ignored.  Not relevant to version 3 of the protocol."""
 
991
 
 
992
    def read_response_tuple(self, expect_body=False):
 
993
        """Read a response tuple from the wire.
 
994
 
 
995
        The expect_body flag is ignored.
 
996
        """
 
997
        # XXX: warn if expect_body doesn't match the response?
 
998
        self._wait_for_request_end()
 
999
        if self.response_handler.error_args is not None:
 
1000
            self._translate_error()
 
1001
            return self.response_handler.error_args
 
1002
        return self.response_handler.args
 
1003
 
 
1004
    def read_body_bytes(self, count=-1):
 
1005
        """Read bytes from the body, decoding into a byte stream.
 
1006
        
 
1007
        We read all bytes at once to ensure we've checked the trailer for 
 
1008
        errors, and then feed the buffer back as read_body_bytes is called.
 
1009
        """
 
1010
        # XXX: don't buffer the full request
 
1011
        self._wait_for_request_end()
 
1012
        return self.response_handler.prefixed_body.read(count)
 
1013
 
 
1014
    def _translate_error(self, error_tuple):
 
1015
        # XXX: Hmm!  Need state from the request.  Hmm.
 
1016
        error_name = error_tuple[0]
 
1017
        error_args = error_tuple[1:]
 
1018
        if error_name == 'LockContention':
 
1019
            raise errors.LockContention('(remote lock)')
 
1020
        elif error_name == 'LockFailed':
 
1021
            raise errors.LockContention(*error_args[:2])
 
1022
        else:
 
1023
            return # XXX
 
1024
            raise errors.UnexpectedSmartServerResponse('Sucktitude: %r' %
 
1025
                    (error_tuple,))
 
1026
 
 
1027
 
 
1028
class _ProtocolThreeEncoder(object):
 
1029
 
 
1030
    def __init__(self, write_func):
 
1031
        import sys
 
1032
        def wf(bytes):
 
1033
            pr('writing:', repr(bytes))
 
1034
            return write_func(bytes)
 
1035
        self._write_func = wf
 
1036
 
 
1037
    def _write_protocol_version(self):
 
1038
        self._write_func(MESSAGE_VERSION_THREE)
 
1039
 
 
1040
    def _write_prefixed_bencode(self, structure):
 
1041
        bytes = bencode(structure)
 
1042
        self._write_func(struct.pack('!L', len(bytes)))
 
1043
        self._write_func(bytes)
 
1044
 
 
1045
    def _write_headers(self, headers=None):
 
1046
        if headers is None:
 
1047
            headers = {'Software version': bzrlib.__version__}
 
1048
        self._write_prefixed_bencode(headers)
 
1049
 
 
1050
    def _write_structure(self, args):
 
1051
        self._write_func('s')
 
1052
        utf8_args = []
 
1053
        for arg in args:
 
1054
            if type(arg) is unicode:
 
1055
                utf8_args.append(arg.encode('utf8'))
 
1056
            else:
 
1057
                utf8_args.append(arg)
 
1058
        self._write_prefixed_bencode(utf8_args)
 
1059
 
 
1060
    def _write_end(self):
 
1061
        self._write_func('e')
 
1062
 
 
1063
    def _write_prefixed_body(self, bytes):
 
1064
        self._write_func('b')
 
1065
        self._write_func(struct.pack('!L', len(bytes)))
 
1066
        self._write_func(bytes)
 
1067
 
 
1068
    def _write_error_status(self):
 
1069
        self._write_func('oE')
 
1070
 
 
1071
    def _write_success_status(self):
 
1072
        self._write_func('oS')
 
1073
 
 
1074
 
 
1075
class ProtocolThreeResponder(_ProtocolThreeEncoder):
 
1076
 
 
1077
    def __init__(self, write_func):
 
1078
        _ProtocolThreeEncoder.__init__(self, write_func)
 
1079
        self.response_sent = False
 
1080
 
 
1081
    def send_error(self, exception):
 
1082
        #import sys; print >> sys.stderr, 'exc:', str(exception); return #XXX
 
1083
        assert not self.response_sent
 
1084
        self.response_sent = True
 
1085
        self._write_headers()
 
1086
        self._write_error_status()
 
1087
        self._write_structure(('error', str(exception)))
 
1088
        self._write_end()
 
1089
 
 
1090
    def send_response(self, response):
 
1091
        #import sys; print >> sys.stderr, 'rsp:', str(response)
 
1092
        assert not self.response_sent
 
1093
        self.response_sent = True
 
1094
        self._write_headers()
 
1095
        if response.is_successful():
 
1096
            self._write_success_status()
 
1097
        else:
 
1098
            self._write_error_status()
 
1099
        self._write_structure(response.args)
 
1100
        if response.body is not None:
 
1101
            self._write_prefixed_body(response.body)
 
1102
        elif response.body_stream is not None:
 
1103
            for chunk in response.body_stream:
 
1104
                self._write_prefixed_body(chunk)
 
1105
        self._write_end()
 
1106
        
 
1107
 
 
1108
class ProtocolThreeRequester(_ProtocolThreeEncoder):
 
1109
 
 
1110
    def __init__(self, medium_request):
 
1111
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
 
1112
        self._medium_request = medium_request
 
1113
 
 
1114
#    def _wait_for_request_end(self):
 
1115
#        XXX # XXX
 
1116
#        while True:
 
1117
#            next_read_size = self.next_read_size() 
 
1118
#            if next_read_size == 0:
 
1119
#                # a complete request has been read.
 
1120
#                break
 
1121
#            bytes = self._request.read_bytes(next_read_size)
 
1122
#            if bytes == '':
 
1123
#                # end of file encountered reading from server
 
1124
#                raise errors.ConnectionReset(
 
1125
#                    "please check connectivity and permissions",
 
1126
#                    "(and try -Dhpss if further diagnosis is required)")
 
1127
#            self.accept_bytes(bytes)
 
1128
 
 
1129
    # these methods from SmartClientRequestProtocolOne/Two
 
1130
    def call(self, *args, **kw):
 
1131
        # XXX: ideally, signature would be call(self, *args, headers=None), but
 
1132
        # python doesn't allow that.  So, we fake it.
 
1133
        headers = None
 
1134
        if 'headers' in kw:
 
1135
            headers = kw.pop('headers')
 
1136
        if kw != {}:
 
1137
            raise TypeError('Unexpected keyword arguments: %r' % (kw,))
 
1138
        if 'hpss' in debug.debug_flags:
 
1139
            mutter('hpss call:   %s', repr(args)[1:-1])
 
1140
            base = getattr(self._medium_request._medium, 'base', None)
 
1141
            if base is not None:
 
1142
                mutter('             (to %s)', base)
 
1143
            self._request_start_time = time.time()
 
1144
        self._write_protocol_version()
 
1145
        self._write_headers(headers)
 
1146
        self._write_structure(args)
 
1147
        self._write_end()
 
1148
        self._medium_request.finished_writing()
 
1149
 
 
1150
    def call_with_body_bytes(self, args, body, headers=None):
 
1151
        """Make a remote call of args with body bytes 'body'.
 
1152
 
 
1153
        After calling this, call read_response_tuple to find the result out.
 
1154
        """
 
1155
        if 'hpss' in debug.debug_flags:
 
1156
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
 
1157
            if getattr(self._request._medium, '_path', None) is not None:
 
1158
                mutter('                  (to %s)', self._request._medium._path)
 
1159
            mutter('              %d bytes', len(body))
 
1160
            self._request_start_time = time.time()
 
1161
        pr('call_with_body_bytes: %r, %r' % (args, body))
 
1162
        self._write_protocol_version()
 
1163
        self._write_headers(headers)
 
1164
        self._write_structure(args)
 
1165
        self._write_prefixed_body(body)
 
1166
        self._write_end()
 
1167
        self._medium_request.finished_writing()
 
1168
 
 
1169
    def call_with_body_readv_array(self, args, body, headers=None):
 
1170
        """Make a remote call with a readv array.
 
1171
 
 
1172
        The body is encoded with one line per readv offset pair. The numbers in
 
1173
        each pair are separated by a comma, and no trailing \n is emitted.
 
1174
        """
 
1175
        if 'hpss' in debug.debug_flags:
 
1176
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
 
1177
            if getattr(self._request._medium, '_path', None) is not None:
 
1178
                mutter('                  (to %s)', self._request._medium._path)
 
1179
            self._request_start_time = time.time()
 
1180
        self._write_protocol_version()
 
1181
        self._write_headers(headers)
 
1182
        self._write_structure(args)
 
1183
        readv_bytes = self._serialise_offsets(body)
 
1184
        self._write_prefixed_body(readv_bytes)
 
1185
        self._request.finished_writing()
 
1186
        if 'hpss' in debug.debug_flags:
 
1187
            mutter('              %d bytes in readv request', len(readv_bytes))
 
1188
 
 
1189
#    def cancel_read_body(self):
 
1190
#        """Ignored.  Not relevant to version 3 of the protocol."""
 
1191
#
 
1192
#    def read_response_tuple(self, expect_body=False):
 
1193
#        """Read a response tuple from the wire.
 
1194
#
 
1195
#        The expect_body flag is ignored.
 
1196
#        """
 
1197
#        # XXX: warn if expect_body doesn't match the response?
 
1198
#        self._wait_for_request_end()
 
1199
#        if self.response_handler.error_args is not None:
 
1200
#            xxx_translate_error()
 
1201
#        return self.response_handler.args
 
1202
#
 
1203
#    def read_body_bytes(self, count=-1):
 
1204
#        """Read bytes from the body, decoding into a byte stream.
 
1205
#        
 
1206
#        We read all bytes at once to ensure we've checked the trailer for 
 
1207
#        errors, and then feed the buffer back as read_body_bytes is called.
 
1208
#        """
 
1209
#        # XXX: don't buffer the full request
 
1210
#        self._wait_for_request_end()
 
1211
#        return self.response_handler.prefixed_body.read(count)
 
1212
 
 
1213
 
 
1214
from thread import get_ident
 
1215
def pr(*args):
 
1216
    return
 
1217
    print '%x' % get_ident(),
 
1218
    for arg in args:
 
1219
        print arg,
 
1220
    print