~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/message.py

Merge in the protocol-v3 implementation so far, integrating with the protocol negotiation in bzrlib.smart.client.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 8 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import collections
 
18
from cStringIO import StringIO
 
19
 
 
20
from bzrlib import errors
 
21
 
 
22
class MessageHandler(object):
 
23
 
 
24
    def __init__(self):
 
25
        self.headers = None
 
26
 
 
27
    def headers_received(self, headers):
 
28
        self.headers = headers
 
29
 
 
30
    def byte_part_received(self, byte):
 
31
        raise NotImplementedError(self.byte_received)
 
32
 
 
33
    def bytes_part_received(self, bytes):
 
34
        raise NotImplementedError(self.bytes_received)
 
35
 
 
36
    def structure_part_received(self, structure):
 
37
        raise NotImplementedError(self.bytes_received)
 
38
 
 
39
    def protocol_error(self, exception):
 
40
        """Called when there is a protocol decoding error."""
 
41
        raise
 
42
    
 
43
    def end_received(self):
 
44
        # XXX
 
45
        pass
 
46
 
 
47
 
 
48
class ConventionalRequestHandler(MessageHandler):
 
49
 
 
50
    def __init__(self, request_handler, responder):
 
51
        MessageHandler.__init__(self)
 
52
        self.request_handler = request_handler
 
53
        self.responder = responder
 
54
        self.args_received = False
 
55
#        self.args = None
 
56
#        self.error = None
 
57
#        self.prefixed_body = None
 
58
#        self.body_stream = None
 
59
 
 
60
    def protocol_error(self, exception):
 
61
        self.responder.send_error(exception)
 
62
 
 
63
    def byte_part_received(self, byte):
 
64
        raise errors.SmartProtocolError(
 
65
            'Unexpected message part: byte(%r)' % (byte,))
 
66
 
 
67
    def structure_part_received(self, structure):
 
68
        if self.args_received:
 
69
            raise errors.SmartProtocolError(
 
70
                'Unexpected message part: structure(%r)' % (structure,))
 
71
        self.args_received = True
 
72
        self.request_handler.dispatch_command(structure[0], structure[1:])
 
73
        if self.request_handler.finished_reading:
 
74
            self.responder.send_response(self.request_handler.response)
 
75
 
 
76
    def bytes_part_received(self, bytes):
 
77
        # XXX: this API requires monolithic bodies to be buffered
 
78
        # XXX: how to distinguish between a monolithic body and a chunk stream?
 
79
        #      Hmm, I guess the request handler knows which it is expecting
 
80
        #      (once the args have been received), so it should just deal?  We
 
81
        #      don't yet have requests that expect a stream anyway.
 
82
        #      *Maybe* a one-byte 'c' or 'm' (chunk or monolithic) flag before
 
83
        #      first bytes part?
 
84
        self.request_handler.accept_body(bytes)
 
85
        self.request_handler.end_of_body()
 
86
        assert self.request_handler.finished_reading
 
87
        self.responder.send_response(self.request_handler.response)
 
88
 
 
89
    def end_received(self):
 
90
        # XXX
 
91
        pass
 
92
 
 
93
 
 
94
class ConventionalResponseHandler(MessageHandler):
 
95
 
 
96
    def __init__(self):
 
97
        MessageHandler.__init__(self)
 
98
        self.status = None
 
99
        self.args = None
 
100
        self._bytes_parts = collections.deque()
 
101
        self._body = None
 
102
        self.finished_reading = False
 
103
 
 
104
    def setProtoAndMedium(self, protocol_decoder, medium):
 
105
        self._protocol_decoder = protocol_decoder
 
106
        self._medium = medium
 
107
 
 
108
    def byte_part_received(self, byte):
 
109
        if self.status is not None:
 
110
            raise errors.SmartProtocolError(
 
111
                'Unexpected byte part received: %r' % (byte,))
 
112
        if byte not in ['E', 'S']:
 
113
            raise errors.SmartProtocolError(
 
114
                'Unknown response status: %r' % (byte,))
 
115
        self.status = byte
 
116
 
 
117
    def bytes_part_received(self, bytes):
 
118
        self._bytes_parts.append(bytes)
 
119
 
 
120
    def structure_part_received(self, structure):
 
121
        if self.args is not None:
 
122
            raise errors.SmartProtocolError(
 
123
                'Unexpected structure received: %r (already got %r)'
 
124
                % (structure, self.args))
 
125
        self.args = structure
 
126
 
 
127
    def _wait_for_response_args(self):
 
128
        while self.args is None and not self.finished_reading:
 
129
            self._read_more()
 
130
 
 
131
    def _wait_for_response_end(self):
 
132
        while not self.finished_reading:
 
133
            self._read_more()
 
134
 
 
135
    def _read_more(self):
 
136
        next_read_size = self._protocol_decoder.next_read_size()
 
137
        if next_read_size == 0:
 
138
            # a complete request has been read.
 
139
            self.finished_reading = True
 
140
            self._medium.finished_reading()
 
141
            return
 
142
        bytes = self._medium.read_bytes(next_read_size)
 
143
        if bytes == '':
 
144
            # end of file encountered reading from server
 
145
            raise errors.ConnectionReset(
 
146
                "please check connectivity and permissions",
 
147
                "(and try -Dhpss if further diagnosis is required)")
 
148
        self._protocol_decoder.accept_bytes(bytes)
 
149
 
 
150
    def read_response_tuple(self, expect_body=False):
 
151
        """Read a response tuple from the wire.
 
152
 
 
153
        The expect_body flag is ignored.
 
154
        """
 
155
        self._wait_for_response_args()
 
156
        if not expect_body:
 
157
            self._wait_for_response_end()
 
158
        #if self.status == 'E':
 
159
        #    xxx_translate_error() # XXX
 
160
        return tuple(self.args)
 
161
 
 
162
    def read_body_bytes(self, count=-1):
 
163
        """Read bytes from the body, decoding into a byte stream.
 
164
        
 
165
        We read all bytes at once to ensure we've checked the trailer for 
 
166
        errors, and then feed the buffer back as read_body_bytes is called.
 
167
        """
 
168
        # XXX: don't buffer the full request
 
169
        if self._body is None:
 
170
            self._wait_for_response_end()
 
171
            self._body = StringIO(''.join(self._bytes_parts))
 
172
            self._bytes_parts = None
 
173
        return self._body.read(count)
 
174
 
 
175
    def read_streamed_body(self):
 
176
        # XXX: this doesn't implement error handling for interrupted streams.
 
177
        while not self.finished_reading:
 
178
            while self._bytes_parts:
 
179
                yield self._bytes_parts.popleft()
 
180
            self._read_more()
 
181
 
 
182
    def cancel_read_body(self):
 
183
        self._wait_for_response_end()