1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
24
from bzrlib.trace import mutter
27
class MessageHandler(object):
28
"""Base class for handling messages received via the smart protocol.
30
As parts of a message are received, the corresponding PART_received method
37
def headers_received(self, headers):
38
"""Called when message headers are received.
40
This default implementation just stores them in self.headers.
42
self.headers = headers
44
def byte_part_received(self, byte):
45
"""Called when a 'byte' part is received.
47
Note that a 'byte' part is a message part consisting of exactly one
50
raise NotImplementedError(self.byte_received)
52
def bytes_part_received(self, bytes):
53
"""Called when a 'bytes' part is received.
55
A 'bytes' message part can contain any number of bytes. It should not
56
be confused with a 'byte' part, which is always a single byte.
58
raise NotImplementedError(self.bytes_received)
60
def structure_part_received(self, structure):
61
"""Called when a 'structure' part is received.
63
:param structure: some structured data, which will be some combination
64
of list, dict, int, and str objects.
66
raise NotImplementedError(self.bytes_received)
68
def protocol_error(self, exception):
69
"""Called when there is a protocol decoding error.
71
The default implementation just re-raises the exception.
75
def end_received(self):
76
"""Called when the end of the message is received."""
81
class ConventionalRequestHandler(MessageHandler):
82
"""A message handler for "conventional" requests.
84
"Conventional" is used in the sense described in
85
doc/developers/network-protocol.txt: a simple message with arguments and an
89
def __init__(self, request_handler, responder):
90
MessageHandler.__init__(self)
91
self.request_handler = request_handler
92
self.responder = responder
93
self.args_received = False
95
def protocol_error(self, exception):
96
if self.responder.response_sent:
97
# We can only send one response to a request, no matter how many
98
# errors happen while processing it.
100
self.responder.send_error(exception)
102
def byte_part_received(self, byte):
103
raise errors.SmartProtocolError(
104
'Unexpected message part: byte(%r)' % (byte,))
106
def structure_part_received(self, structure):
107
if self.args_received:
108
raise errors.SmartProtocolError(
109
'Unexpected message part: structure(%r)' % (structure,))
110
self.args_received = True
111
self.request_handler.dispatch_command(structure[0], structure[1:])
112
if self.request_handler.finished_reading:
113
self.responder.send_response(self.request_handler.response)
115
def bytes_part_received(self, bytes):
116
# Note that there's no intrinsic way to distinguish a monolithic body
117
# from a chunk stream. A request handler knows which it is expecting
118
# (once the args have been received), so it should be able to do the
120
self.request_handler.accept_body(bytes)
121
self.request_handler.end_of_body()
122
if not self.request_handler.finished_reading:
123
raise errors.SmartProtocolError(
124
"Conventional request body was received, but request handler "
125
"has not finished reading.")
126
self.responder.send_response(self.request_handler.response)
129
class ResponseHandler(object):
130
"""Abstract base class for an object that handles a smart response."""
132
def read_response_tuple(self, expect_body=False):
133
"""Reads and returns the response tuple for the current request.
135
:keyword expect_body: a boolean indicating if a body is expected in the
136
response. Some protocol versions needs this information to know
137
when a response is finished. If False, read_body_bytes should
138
*not* be called afterwards. Defaults to False.
139
:returns: tuple of response arguments.
141
raise NotImplementedError(self.read_response_tuple)
143
def read_body_bytes(self, count=-1):
144
"""Read and return some bytes from the body.
146
:param count: if specified, read up to this many bytes. By default,
147
reads the entire body.
148
:returns: str of bytes from the response body.
150
raise NotImplementedError(self.read_body_bytes)
152
def read_streamed_body(self):
153
"""Returns an iterable that reads and returns a series of body chunks.
155
raise NotImplementedError(self.read_streamed_body)
157
def cancel_read_body(self):
158
"""Stop expecting a body for this response.
160
If expect_body was passed to read_response_tuple, this cancels that
161
expectation (and thus finishes reading the response, allowing a new
162
request to be issued). This is useful if a response turns out to be an
163
error rather than a normal result with a body.
165
raise NotImplementedError(self.cancel_read_body)
168
class ConventionalResponseHandler(MessageHandler, ResponseHandler):
171
MessageHandler.__init__(self)
174
self._bytes_parts = collections.deque()
175
self._body_started = False
176
self._body_stream_status = None
178
self._body_error_args = None
179
self.finished_reading = False
181
def setProtoAndMediumRequest(self, protocol_decoder, medium_request):
182
self._protocol_decoder = protocol_decoder
183
self._medium_request = medium_request
185
def byte_part_received(self, byte):
186
if byte not in ['E', 'S']:
187
raise errors.SmartProtocolError(
188
'Unknown response status: %r' % (byte,))
189
if self._body_started:
190
if self._body_stream_status is not None:
191
raise errors.SmartProtocolError(
192
'Unexpected byte part received: %r' % (byte,))
193
self._body_stream_status = byte
195
if self.status is not None:
196
raise errors.SmartProtocolError(
197
'Unexpected byte part received: %r' % (byte,))
200
def bytes_part_received(self, bytes):
201
self._body_started = True
202
self._bytes_parts.append(bytes)
204
def structure_part_received(self, structure):
205
if type(structure) is not list:
206
raise errors.SmartProtocolError(
207
'Args structure is not a sequence: %r' % (structure,))
208
structure = tuple(structure)
209
if not self._body_started:
210
if self.args is not None:
211
raise errors.SmartProtocolError(
212
'Unexpected structure received: %r (already got %r)'
213
% (structure, self.args))
214
self.args = structure
216
if self._body_stream_status != 'E':
217
raise errors.SmartProtocolError(
218
'Unexpected structure received after body: %r'
220
self._body_error_args = structure
222
def _wait_for_response_args(self):
223
while self.args is None and not self.finished_reading:
226
def _wait_for_response_end(self):
227
while not self.finished_reading:
230
def _read_more(self):
231
next_read_size = self._protocol_decoder.next_read_size()
232
if next_read_size == 0:
233
# a complete request has been read.
234
self.finished_reading = True
235
self._medium_request.finished_reading()
237
bytes = self._medium_request.read_bytes(next_read_size)
239
# end of file encountered reading from server
240
if 'hpss' in debug.debug_flags:
242
'decoder state: buf[:10]=%r, state_accept=%s',
243
self._protocol_decoder._in_buffer[:10],
244
self._protocol_decoder.state_accept.__name__)
245
raise errors.ConnectionReset(
246
"please check connectivity and permissions",
247
"(and try -Dhpss if further diagnosis is required)")
248
self._protocol_decoder.accept_bytes(bytes)
250
def protocol_error(self, exception):
251
# Whatever the error is, we're done with this request.
252
self.finished_reading = True
253
self._medium_request.finished_reading()
256
def read_response_tuple(self, expect_body=False):
257
"""Read a response tuple from the wire."""
258
self._wait_for_response_args()
260
self._wait_for_response_end()
261
if 'hpss' in debug.debug_flags:
262
mutter(' result: %r', self.args)
263
if self.status == 'E':
264
self._wait_for_response_end()
265
_translate_error(self.args)
266
return tuple(self.args)
268
def read_body_bytes(self, count=-1):
269
"""Read bytes from the body, decoding into a byte stream.
271
We read all bytes at once to ensure we've checked the trailer for
272
errors, and then feed the buffer back as read_body_bytes is called.
274
Like the builtin file.read in Python, a count of -1 (the default) means
275
read the entire body.
277
# TODO: we don't necessarily need to buffer the full request if count
278
# != -1. (2008/04/30, Andrew Bennetts)
279
if self._body is None:
280
self._wait_for_response_end()
281
body_bytes = ''.join(self._bytes_parts)
282
if 'hpss' in debug.debug_flags:
283
mutter(' %d body bytes read', len(body_bytes))
284
self._body = StringIO(body_bytes)
285
self._bytes_parts = None
286
return self._body.read(count)
288
def read_streamed_body(self):
289
while not self.finished_reading:
290
while self._bytes_parts:
291
bytes_part = self._bytes_parts.popleft()
292
if 'hpss' in debug.debug_flags:
293
mutter(' %d byte part read', len(bytes_part))
296
if self._body_stream_status == 'E':
297
_translate_error(self._body_error_args)
299
def cancel_read_body(self):
300
self._wait_for_response_end()
303
def _translate_error(error_tuple):
304
# Many exceptions need some state from the requestor to be properly
305
# translated (e.g. they need a branch object). So this only translates a
306
# few errors, and the rest are turned into a generic ErrorFromSmartServer.
307
error_name = error_tuple[0]
308
error_args = error_tuple[1:]
309
if error_name == 'UnknownMethod':
310
raise errors.UnknownSmartMethod(error_args[0])
311
if error_name == 'LockContention':
312
raise errors.LockContention('(remote lock)')
313
elif error_name == 'LockFailed':
314
raise errors.LockFailed(*error_args[:2])
316
raise errors.ErrorFromSmartServer(error_tuple)