1
# Copyright (C) 8 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
20
from bzrlib import errors
22
class MessageHandler(object):
27
def headers_received(self, headers):
28
self.headers = headers
30
def byte_part_received(self, byte):
31
raise NotImplementedError(self.byte_received)
33
def bytes_part_received(self, bytes):
34
raise NotImplementedError(self.bytes_received)
36
def structure_part_received(self, structure):
37
raise NotImplementedError(self.bytes_received)
39
def protocol_error(self, exception):
40
"""Called when there is a protocol decoding error."""
43
def end_received(self):
48
class ConventionalRequestHandler(MessageHandler):
50
def __init__(self, request_handler, responder):
51
MessageHandler.__init__(self)
52
self.request_handler = request_handler
53
self.responder = responder
54
self.args_received = False
57
# self.prefixed_body = None
58
# self.body_stream = None
60
def protocol_error(self, exception):
61
self.responder.send_error(exception)
63
def byte_part_received(self, byte):
64
raise errors.SmartProtocolError(
65
'Unexpected message part: byte(%r)' % (byte,))
67
def structure_part_received(self, structure):
68
if self.args_received:
69
raise errors.SmartProtocolError(
70
'Unexpected message part: structure(%r)' % (structure,))
71
self.args_received = True
72
self.request_handler.dispatch_command(structure[0], structure[1:])
73
if self.request_handler.finished_reading:
74
self.responder.send_response(self.request_handler.response)
76
def bytes_part_received(self, bytes):
77
# XXX: this API requires monolithic bodies to be buffered
78
# XXX: how to distinguish between a monolithic body and a chunk stream?
79
# Hmm, I guess the request handler knows which it is expecting
80
# (once the args have been received), so it should just deal? We
81
# don't yet have requests that expect a stream anyway.
82
# *Maybe* a one-byte 'c' or 'm' (chunk or monolithic) flag before
84
self.request_handler.accept_body(bytes)
85
self.request_handler.end_of_body()
86
assert self.request_handler.finished_reading
87
self.responder.send_response(self.request_handler.response)
89
def end_received(self):
94
class ConventionalResponseHandler(MessageHandler):
97
MessageHandler.__init__(self)
100
self._bytes_parts = collections.deque()
102
self.finished_reading = False
104
def setProtoAndMedium(self, protocol_decoder, medium):
105
self._protocol_decoder = protocol_decoder
106
self._medium = medium
108
def byte_part_received(self, byte):
109
if self.status is not None:
110
raise errors.SmartProtocolError(
111
'Unexpected byte part received: %r' % (byte,))
112
if byte not in ['E', 'S']:
113
raise errors.SmartProtocolError(
114
'Unknown response status: %r' % (byte,))
117
def bytes_part_received(self, bytes):
118
self._bytes_parts.append(bytes)
120
def structure_part_received(self, structure):
121
if self.args is not None:
122
raise errors.SmartProtocolError(
123
'Unexpected structure received: %r (already got %r)'
124
% (structure, self.args))
125
self.args = structure
127
def _wait_for_response_args(self):
128
while self.args is None and not self.finished_reading:
131
def _wait_for_response_end(self):
132
while not self.finished_reading:
135
def _read_more(self):
136
next_read_size = self._protocol_decoder.next_read_size()
137
if next_read_size == 0:
138
# a complete request has been read.
139
self.finished_reading = True
140
self._medium.finished_reading()
142
bytes = self._medium.read_bytes(next_read_size)
144
# end of file encountered reading from server
145
raise errors.ConnectionReset(
146
"please check connectivity and permissions",
147
"(and try -Dhpss if further diagnosis is required)")
148
self._protocol_decoder.accept_bytes(bytes)
150
def read_response_tuple(self, expect_body=False):
151
"""Read a response tuple from the wire.
153
The expect_body flag is ignored.
155
self._wait_for_response_args()
157
self._wait_for_response_end()
158
#if self.status == 'E':
159
# xxx_translate_error() # XXX
160
return tuple(self.args)
162
def read_body_bytes(self, count=-1):
163
"""Read bytes from the body, decoding into a byte stream.
165
We read all bytes at once to ensure we've checked the trailer for
166
errors, and then feed the buffer back as read_body_bytes is called.
168
# XXX: don't buffer the full request
169
if self._body is None:
170
self._wait_for_response_end()
171
self._body = StringIO(''.join(self._bytes_parts))
172
self._bytes_parts = None
173
return self._body.read(count)
175
def read_streamed_body(self):
176
# XXX: this doesn't implement error handling for interrupted streams.
177
while not self.finished_reading:
178
while self._bytes_parts:
179
yield self._bytes_parts.popleft()
182
def cancel_read_body(self):
183
self._wait_for_response_end()