13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
from bzrlib.bencode import bdecode_as_tuple, bencode
32
from bzrlib.util.bencode import bdecode, bencode
35
35
# Protocol version strings. These are sent as prefixes of bzr requests and
109
109
for start, length in offsets:
110
110
txt.append('%d,%d' % (start, length))
111
111
return '\n'.join(txt)
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
"""Server-side encoding and decoding logic for smart version 1."""
117
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
self._backing_transport = backing_transport
119
119
self._root_client_path = root_client_path
128
128
def accept_bytes(self, bytes):
129
129
"""Take bytes, and advance the internal state machine appropriately.
131
131
:param bytes: must be a byte string
133
133
if not isinstance(bytes, str):
412
412
self.chunks = collections.deque()
413
413
self.error = False
414
414
self.error_in_progress = None
416
416
def next_read_size(self):
417
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
# end-of-body marker is 4 bytes: 'END\n'.
506
506
self.chunks.append(self.chunk_in_progress)
507
507
self.chunk_in_progress = None
508
508
self.state_accept = self._state_accept_expecting_length
510
510
def _state_accept_reading_unused(self):
511
511
self.unused_data += self._get_in_buffer()
512
512
self._in_buffer_list = []
515
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
516
"""Decodes the length-prefixed bulk data."""
518
518
def __init__(self):
519
519
_StatefulDecoder.__init__(self)
520
520
self.state_accept = self._state_accept_expecting_length
521
521
self.state_read = self._state_read_no_data
523
523
self._trailer_buffer = ''
525
525
def next_read_size(self):
526
526
if self.bytes_left is not None:
527
527
# Ideally we want to read all the remainder of the body and the
564
564
self._body = self._body[:self.bytes_left]
565
565
self.bytes_left = None
566
566
self.state_accept = self._state_accept_reading_trailer
568
568
def _state_accept_reading_trailer(self):
569
569
self._trailer_buffer += self._get_in_buffer()
570
570
self._set_in_buffer(None)
574
574
self.unused_data = self._trailer_buffer[len('done\n'):]
575
575
self.state_accept = self._state_accept_reading_unused
576
576
self.finished_reading = True
578
578
def _state_accept_reading_unused(self):
579
579
self.unused_data += self._get_in_buffer()
580
580
self._set_in_buffer(None)
656
656
mutter(' %d bytes in readv request', len(readv_bytes))
657
657
self._last_verb = args[0]
659
def call_with_body_stream(self, args, stream):
660
# Protocols v1 and v2 don't support body streams. So it's safe to
661
# assume that a v1/v2 server doesn't support whatever method we're
662
# trying to call with a body stream.
663
self._request.finished_writing()
664
self._request.finished_reading()
665
raise errors.UnknownSmartMethod(args[0])
667
659
def cancel_read_body(self):
668
660
"""After expecting a body, a response code may indicate one otherwise.
729
721
def _response_is_unknown_method(self, result_tuple):
730
722
"""Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
731
723
method' response to the request.
733
725
:param response: The response from a smart client call_expecting_body
735
727
:param verb: The verb used in that call.
742
734
# The response will have no body, so we've finished reading.
743
735
self._request.finished_reading()
744
736
raise errors.UnknownSmartMethod(self._last_verb)
746
738
def read_body_bytes(self, count=-1):
747
739
"""Read bytes from the body, decoding into a byte stream.
749
We read all bytes at once to ensure we've checked the trailer for
741
We read all bytes at once to ensure we've checked the trailer for
750
742
errors, and then feed the buffer back as read_body_bytes is called.
752
744
if self._body_buffer is not None:
791
783
def _write_protocol_version(self):
792
784
"""Write any prefixes this protocol requires.
794
786
Version one doesn't send protocol versions.
798
790
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
791
"""Version two of the client side of the smart protocol.
801
793
This prefixes the request with the value of REQUEST_VERSION_TWO.
939
931
def _extract_prefixed_bencoded_data(self):
940
932
prefixed_bytes = self._extract_length_prefixed_bytes()
942
decoded = bdecode_as_tuple(prefixed_bytes)
934
decoded = bdecode(prefixed_bytes)
943
935
except ValueError:
944
936
raise errors.SmartProtocolError(
945
937
'Bytes %r not bencoded' % (prefixed_bytes,))
985
977
self.message_handler.headers_received(decoded)
987
979
raise errors.SmartMessageHandlerError(sys.exc_info())
989
981
def _state_accept_expecting_message_part(self):
990
982
message_part_kind = self._extract_single_byte()
991
983
if message_part_kind == 'o':
1036
1028
raise errors.SmartMessageHandlerError(sys.exc_info())
1038
1030
def _state_accept_reading_unused(self):
1039
self.unused_data += self._get_in_buffer()
1031
self.unused_data = self._get_in_buffer()
1040
1032
self._set_in_buffer(None)
1042
1034
def next_read_size(self):
1060
1052
response_marker = request_marker = MESSAGE_VERSION_THREE
1062
1054
def __init__(self, write_func):
1064
1056
self._real_write_func = write_func
1066
1058
def _write_func(self, bytes):
1067
self._buf.append(bytes)
1068
if len(self._buf) > 100:
1071
1061
def flush(self):
1073
self._real_write_func(''.join(self._buf))
1063
self._real_write_func(self._buf)
1076
1066
def _serialise_offsets(self, offsets):
1077
1067
"""Serialise a readv offset list."""
1079
1069
for start, length in offsets:
1080
1070
txt.append('%d,%d' % (start, length))
1081
1071
return '\n'.join(txt)
1083
1073
def _write_protocol_version(self):
1084
1074
self._write_func(MESSAGE_VERSION_THREE)
1160
1147
if response.body is not None:
1161
1148
self._write_prefixed_body(response.body)
1162
1149
elif response.body_stream is not None:
1163
for exc_info, chunk in _iter_with_errors(response.body_stream):
1164
if exc_info is not None:
1165
self._write_error_status()
1166
error_struct = request._translate_error(exc_info[1])
1167
self._write_structure(error_struct)
1170
if isinstance(chunk, request.FailedSmartServerResponse):
1171
self._write_error_status()
1172
self._write_structure(chunk.args)
1174
self._write_prefixed_body(chunk)
1150
for chunk in response.body_stream:
1151
self._write_prefixed_body(chunk)
1175
1153
self._write_end()
1178
def _iter_with_errors(iterable):
1179
"""Handle errors from iterable.next().
1183
for exc_info, value in _iter_with_errors(iterable):
1186
This is a safer alternative to::
1189
for value in iterable:
1194
Because the latter will catch errors from the for-loop body, not just
1197
If an error occurs, exc_info will be a exc_info tuple, and the generator
1198
will terminate. Otherwise exc_info will be None, and value will be the
1199
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1200
will not be itercepted.
1202
iterator = iter(iterable)
1205
yield None, iterator.next()
1206
except StopIteration:
1208
except (KeyboardInterrupt, SystemExit):
1211
mutter('_iter_with_errors caught error')
1212
log_exception_quietly()
1213
yield sys.exc_info(), None
1217
1156
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1224
1163
def set_headers(self, headers):
1225
1164
self._headers = headers.copy()
1227
1166
def call(self, *args):
1228
1167
if 'hpss' in debug.debug_flags:
1229
1168
mutter('hpss call: %s', repr(args)[1:-1])
1278
1217
self._write_end()
1279
1218
self._medium_request.finished_writing()
1281
def call_with_body_stream(self, args, stream):
1282
if 'hpss' in debug.debug_flags:
1283
mutter('hpss call w/body stream: %r', args)
1284
path = getattr(self._medium_request._medium, '_path', None)
1285
if path is not None:
1286
mutter(' (to %s)', path)
1287
self._request_start_time = time.time()
1288
self._write_protocol_version()
1289
self._write_headers(self._headers)
1290
self._write_structure(args)
1291
# TODO: notice if the server has sent an early error reply before we
1292
# have finished sending the stream. We would notice at the end
1293
# anyway, but if the medium can deliver it early then it's good
1294
# to short-circuit the whole request...
1295
for exc_info, part in _iter_with_errors(stream):
1296
if exc_info is not None:
1297
# Iterating the stream failed. Cleanly abort the request.
1298
self._write_error_status()
1299
# Currently the client unconditionally sends ('error',) as the
1301
self._write_structure(('error',))
1303
self._medium_request.finished_writing()
1304
raise exc_info[0], exc_info[1], exc_info[2]
1306
self._write_prefixed_body(part)
1309
self._medium_request.finished_writing()