~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/message.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-18 22:34:21 UTC
  • mto: (3606.5.6 1.6)
  • mto: This revision was merged to the branch mainline in revision 3641.
  • Revision ID: john@arbash-meinel.com-20080818223421-todjny24vj4faj4t
Add tests for the fetching behavior.

The proper parameter passed is 'unordered' add an assert for it, and
fix callers that were passing 'unsorted' instead.
Add tests that we make the right get_record_stream call based
on the value of _fetch_uses_deltas.
Fix the fetch request for signatures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import collections
 
18
from cStringIO import StringIO
 
19
 
 
20
from bzrlib import (
 
21
    debug,
 
22
    errors,
 
23
    )
 
24
from bzrlib.trace import mutter
 
25
 
 
26
 
 
27
class MessageHandler(object):
 
28
    """Base class for handling messages received via the smart protocol.
 
29
 
 
30
    As parts of a message are received, the corresponding PART_received method
 
31
    will be called.
 
32
    """
 
33
 
 
34
    def __init__(self):
 
35
        self.headers = None
 
36
 
 
37
    def headers_received(self, headers):
 
38
        """Called when message headers are received.
 
39
        
 
40
        This default implementation just stores them in self.headers.
 
41
        """
 
42
        self.headers = headers
 
43
 
 
44
    def byte_part_received(self, byte):
 
45
        """Called when a 'byte' part is received.
 
46
 
 
47
        Note that a 'byte' part is a message part consisting of exactly one
 
48
        byte.
 
49
        """
 
50
        raise NotImplementedError(self.byte_received)
 
51
 
 
52
    def bytes_part_received(self, bytes):
 
53
        """Called when a 'bytes' part is received.
 
54
 
 
55
        A 'bytes' message part can contain any number of bytes.  It should not
 
56
        be confused with a 'byte' part, which is always a single byte.
 
57
        """
 
58
        raise NotImplementedError(self.bytes_received)
 
59
 
 
60
    def structure_part_received(self, structure):
 
61
        """Called when a 'structure' part is received.
 
62
 
 
63
        :param structure: some structured data, which will be some combination
 
64
            of list, dict, int, and str objects.
 
65
        """
 
66
        raise NotImplementedError(self.bytes_received)
 
67
 
 
68
    def protocol_error(self, exception):
 
69
        """Called when there is a protocol decoding error.
 
70
        
 
71
        The default implementation just re-raises the exception.
 
72
        """
 
73
        raise
 
74
    
 
75
    def end_received(self):
 
76
        """Called when the end of the message is received."""
 
77
        # No-op by default.
 
78
        pass
 
79
 
 
80
 
 
81
class ConventionalRequestHandler(MessageHandler):
 
82
    """A message handler for "conventional" requests.
 
83
 
 
84
    "Conventional" is used in the sense described in
 
85
    doc/developers/network-protocol.txt: a simple message with arguments and an
 
86
    optional body.
 
87
    """
 
88
 
 
89
    def __init__(self, request_handler, responder):
 
90
        MessageHandler.__init__(self)
 
91
        self.request_handler = request_handler
 
92
        self.responder = responder
 
93
        self.args_received = False
 
94
 
 
95
    def protocol_error(self, exception):
 
96
        if self.responder.response_sent:
 
97
            # We can only send one response to a request, no matter how many
 
98
            # errors happen while processing it.
 
99
            return
 
100
        self.responder.send_error(exception)
 
101
 
 
102
    def byte_part_received(self, byte):
 
103
        raise errors.SmartProtocolError(
 
104
            'Unexpected message part: byte(%r)' % (byte,))
 
105
 
 
106
    def structure_part_received(self, structure):
 
107
        if self.args_received:
 
108
            raise errors.SmartProtocolError(
 
109
                'Unexpected message part: structure(%r)' % (structure,))
 
110
        self.args_received = True
 
111
        self.request_handler.dispatch_command(structure[0], structure[1:])
 
112
        if self.request_handler.finished_reading:
 
113
            self.responder.send_response(self.request_handler.response)
 
114
 
 
115
    def bytes_part_received(self, bytes):
 
116
        # Note that there's no intrinsic way to distinguish a monolithic body
 
117
        # from a chunk stream.  A request handler knows which it is expecting
 
118
        # (once the args have been received), so it should be able to do the
 
119
        # right thing.
 
120
        self.request_handler.accept_body(bytes)
 
121
        self.request_handler.end_of_body()
 
122
        if not self.request_handler.finished_reading:
 
123
            raise SmartProtocolError(
 
124
                "Conventional request body was received, but request handler "
 
125
                "has not finished reading.")
 
126
        self.responder.send_response(self.request_handler.response)
 
127
 
 
128
 
 
129
class ResponseHandler(object):
 
130
    """Abstract base class for an object that handles a smart response."""
 
131
 
 
132
    def read_response_tuple(self, expect_body=False):
 
133
        """Reads and returns the response tuple for the current request.
 
134
        
 
135
        :keyword expect_body: a boolean indicating if a body is expected in the
 
136
            response.  Some protocol versions needs this information to know
 
137
            when a response is finished.  If False, read_body_bytes should
 
138
            *not* be called afterwards.  Defaults to False.
 
139
        :returns: tuple of response arguments.
 
140
        """
 
141
        raise NotImplementedError(self.read_response_tuple)
 
142
 
 
143
    def read_body_bytes(self, count=-1):
 
144
        """Read and return some bytes from the body.
 
145
 
 
146
        :param count: if specified, read up to this many bytes.  By default,
 
147
            reads the entire body.
 
148
        :returns: str of bytes from the response body.
 
149
        """
 
150
        raise NotImplementedError(self.read_body_bytes)
 
151
 
 
152
    def read_streamed_body(self):
 
153
        """Returns an iterable that reads and returns a series of body chunks.
 
154
        """
 
155
        raise NotImplementedError(self.read_streamed_body)
 
156
 
 
157
    def cancel_read_body(self):
 
158
        """Stop expecting a body for this response.
 
159
 
 
160
        If expect_body was passed to read_response_tuple, this cancels that
 
161
        expectation (and thus finishes reading the response, allowing a new
 
162
        request to be issued).  This is useful if a response turns out to be an
 
163
        error rather than a normal result with a body.
 
164
        """
 
165
        raise NotImplementedError(self.cancel_read_body)
 
166
 
 
167
 
 
168
class ConventionalResponseHandler(MessageHandler, ResponseHandler):
 
169
 
 
170
    def __init__(self):
 
171
        MessageHandler.__init__(self)
 
172
        self.status = None
 
173
        self.args = None
 
174
        self._bytes_parts = collections.deque()
 
175
        self._body_started = False
 
176
        self._body_stream_status = None
 
177
        self._body = None
 
178
        self._body_error_args = None
 
179
        self.finished_reading = False
 
180
 
 
181
    def setProtoAndMediumRequest(self, protocol_decoder, medium_request):
 
182
        self._protocol_decoder = protocol_decoder
 
183
        self._medium_request = medium_request
 
184
 
 
185
    def byte_part_received(self, byte):
 
186
        if byte not in ['E', 'S']:
 
187
            raise errors.SmartProtocolError(
 
188
                'Unknown response status: %r' % (byte,))
 
189
        if self._body_started:
 
190
            if self._body_stream_status is not None:
 
191
                raise errors.SmartProtocolError(
 
192
                    'Unexpected byte part received: %r' % (byte,))
 
193
            self._body_stream_status = byte
 
194
        else:
 
195
            if self.status is not None:
 
196
                raise errors.SmartProtocolError(
 
197
                    'Unexpected byte part received: %r' % (byte,))
 
198
            self.status = byte
 
199
 
 
200
    def bytes_part_received(self, bytes):
 
201
        self._body_started = True
 
202
        self._bytes_parts.append(bytes)
 
203
 
 
204
    def structure_part_received(self, structure):
 
205
        if type(structure) is not list:
 
206
            raise errors.SmartProtocolError(
 
207
                'Args structure is not a sequence: %r' % (structure,))
 
208
        structure = tuple(structure)
 
209
        if not self._body_started:
 
210
            if self.args is not None:
 
211
                raise errors.SmartProtocolError(
 
212
                    'Unexpected structure received: %r (already got %r)'
 
213
                    % (structure, self.args))
 
214
            self.args = structure
 
215
        else:
 
216
            if self._body_stream_status != 'E':
 
217
                raise errors.SmartProtocolError(
 
218
                    'Unexpected structure received after body: %r'
 
219
                    % (structure,))
 
220
            self._body_error_args = structure
 
221
 
 
222
    def _wait_for_response_args(self):
 
223
        while self.args is None and not self.finished_reading:
 
224
            self._read_more()
 
225
 
 
226
    def _wait_for_response_end(self):
 
227
        while not self.finished_reading:
 
228
            self._read_more()
 
229
 
 
230
    def _read_more(self):
 
231
        next_read_size = self._protocol_decoder.next_read_size()
 
232
        if next_read_size == 0:
 
233
            # a complete request has been read.
 
234
            self.finished_reading = True
 
235
            self._medium_request.finished_reading()
 
236
            return
 
237
        bytes = self._medium_request.read_bytes(next_read_size)
 
238
        if bytes == '':
 
239
            # end of file encountered reading from server
 
240
            if 'hpss' in debug.debug_flags:
 
241
                mutter(
 
242
                    'decoder state: buf[:10]=%r, state_accept=%s',
 
243
                    self._protocol_decoder._in_buffer[:10],
 
244
                    self._protocol_decoder.state_accept.__name__)
 
245
            raise errors.ConnectionReset(
 
246
                "please check connectivity and permissions",
 
247
                "(and try -Dhpss if further diagnosis is required)")
 
248
        self._protocol_decoder.accept_bytes(bytes)
 
249
 
 
250
    def protocol_error(self, exception):
 
251
        # Whatever the error is, we're done with this request.
 
252
        self.finished_reading = True
 
253
        self._medium_request.finished_reading()
 
254
        raise
 
255
        
 
256
    def read_response_tuple(self, expect_body=False):
 
257
        """Read a response tuple from the wire."""
 
258
        self._wait_for_response_args()
 
259
        if not expect_body:
 
260
            self._wait_for_response_end()
 
261
        if 'hpss' in debug.debug_flags:
 
262
            mutter('   result:   %r', self.args)
 
263
        if self.status == 'E':
 
264
            self._wait_for_response_end()
 
265
            _translate_error(self.args)
 
266
        return tuple(self.args)
 
267
 
 
268
    def read_body_bytes(self, count=-1):
 
269
        """Read bytes from the body, decoding into a byte stream.
 
270
        
 
271
        We read all bytes at once to ensure we've checked the trailer for 
 
272
        errors, and then feed the buffer back as read_body_bytes is called.
 
273
 
 
274
        Like the builtin file.read in Python, a count of -1 (the default) means
 
275
        read the entire body.
 
276
        """
 
277
        # TODO: we don't necessarily need to buffer the full request if count
 
278
        # != -1.  (2008/04/30, Andrew Bennetts)
 
279
        if self._body is None:
 
280
            self._wait_for_response_end()
 
281
            body_bytes = ''.join(self._bytes_parts)
 
282
            if 'hpss' in debug.debug_flags:
 
283
                mutter('              %d body bytes read', len(body_bytes))
 
284
            self._body = StringIO(body_bytes)
 
285
            self._bytes_parts = None
 
286
        return self._body.read(count)
 
287
 
 
288
    def read_streamed_body(self):
 
289
        while not self.finished_reading:
 
290
            while self._bytes_parts:
 
291
                bytes_part = self._bytes_parts.popleft()
 
292
                if 'hpss' in debug.debug_flags:
 
293
                    mutter('              %d byte part read', len(bytes_part))
 
294
                yield bytes_part
 
295
            self._read_more()
 
296
        if self._body_stream_status == 'E':
 
297
            _translate_error(self._body_error_args)
 
298
 
 
299
    def cancel_read_body(self):
 
300
        self._wait_for_response_end()
 
301
 
 
302
 
 
303
def _translate_error(error_tuple):
 
304
    # Many exceptions need some state from the requestor to be properly
 
305
    # translated (e.g. they need a branch object).  So this only translates a
 
306
    # few errors, and the rest are turned into a generic ErrorFromSmartServer.
 
307
    error_name = error_tuple[0]
 
308
    error_args = error_tuple[1:]
 
309
    if error_name == 'UnknownMethod':
 
310
        raise errors.UnknownSmartMethod(error_args[0])
 
311
    if error_name == 'LockContention':
 
312
        raise errors.LockContention('(remote lock)')
 
313
    elif error_name == 'LockFailed':
 
314
        raise errors.LockFailed(*error_args[:2])
 
315
    else:
 
316
        raise errors.ErrorFromSmartServer(error_tuple)