1
# Copyright (C) 8 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from cStringIO import StringIO
19
from bzrlib import errors
21
class MessageHandler(object):
26
def headers_received(self, headers):
27
self.headers = headers
29
def byte_part_received(self, byte):
30
raise NotImplementedError(self.byte_received)
32
def bytes_part_received(self, bytes):
33
raise NotImplementedError(self.bytes_received)
35
def structure_part_received(self, structure):
36
raise NotImplementedError(self.bytes_received)
38
def protocol_error(self, exception):
39
"""Called when there is a protocol decoding error."""
42
def end_received(self):
47
class ConventionalRequestHandler(MessageHandler):
49
def __init__(self, request_handler, responder):
50
MessageHandler.__init__(self)
51
self.request_handler = request_handler
52
self.responder = responder
53
self.args_received = False
56
# self.prefixed_body = None
57
# self.body_stream = None
59
def protocol_error(self, exception):
60
self.responder.send_error(exception)
62
def byte_part_received(self, byte):
63
raise errors.SmartProtocolError(
64
'Unexpected message part: byte(%r)' % (byte,))
66
def structure_part_received(self, structure):
67
if self.args_received:
68
raise errors.SmartProtocolError(
69
'Unexpected message part: structure(%r)' % (structure,))
70
self.args_received = True
71
self.request_handler.dispatch_command(structure[0], structure[1:])
72
if self.request_handler.finished_reading:
73
self.responder.send_response(self.request_handler.response)
75
def bytes_part_received(self, bytes):
76
# XXX: this API requires monolithic bodies to be buffered
77
# XXX: how to distinguish between a monolithic body and a chunk stream?
78
# Hmm, I guess the request handler knows which it is expecting
79
# (once the args have been received), so it should just deal? We
80
# don't yet have requests that expect a stream anyway.
81
# *Maybe* a one-byte 'c' or 'm' (chunk or monolithic) flag before
83
self.request.accept_body(bytes)
84
self.request.end_of_body()
86
def end_received(self):
91
class ConventionalResponseHandler(MessageHandler):
94
MessageHandler.__init__(self)
97
self._bytes_parts = []
99
self.finished_reading = False
101
def setProtoAndMedium(self, protocol_decoder, medium):
102
self._protocol_decoder = protocol_decoder
103
self._medium = medium
105
def byte_part_received(self, byte):
106
if self.status is not None:
107
raise errors.SmartProtocolError(
108
'Unexpected byte part received: %r' % (byte,))
109
if byte not in ['E', 'S']:
110
raise errors.SmartProtocolError(
111
'Unknown response status: %r' % (byte,))
114
def bytes_part_received(self, bytes):
115
self._bytes_parts.append(bytes)
117
def structure_part_received(self, structure):
118
if self.args is not None:
119
raise errors.SmartProtocolError(
120
'Unexpected structure received: %r (already got %r)'
121
% (structure, self.args))
122
self.args = structure
124
def _wait_for_response_args(self):
125
while self.args is None and not self.finished_reading:
128
def _wait_for_response_end(self):
129
while not self.finished_reading:
132
def _read_more(self):
133
next_read_size = self._protocol_decoder.next_read_size()
134
if next_read_size == 0:
135
# a complete request has been read.
136
self.finished_reading = True
137
self._medium.finished_reading()
139
bytes = self._medium.read_bytes(next_read_size)
141
# end of file encountered reading from server
142
raise errors.ConnectionReset(
143
"please check connectivity and permissions",
144
"(and try -Dhpss if further diagnosis is required)")
145
self._protocol_decoder.accept_bytes(bytes)
147
def read_response_tuple(self, expect_body=False):
148
"""Read a response tuple from the wire.
150
The expect_body flag is ignored.
152
self._wait_for_response_args()
153
if self.status == 'E':
154
xxx_translate_error()
155
return tuple(self.args)
157
def read_body_bytes(self, count=-1):
158
"""Read bytes from the body, decoding into a byte stream.
160
We read all bytes at once to ensure we've checked the trailer for
161
errors, and then feed the buffer back as read_body_bytes is called.
163
# XXX: don't buffer the full request
164
self._wait_for_response_end()
165
if self._body is None:
166
self._body = StringIO(''.join(self._bytes_parts))
167
self._bytes_parts = None
168
return self._body.read(count)
170
def cancel_read_body(self):
171
self._wait_for_response_end()