13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
from bzrlib.bencode import bdecode_as_tuple, bencode
32
from bzrlib.util.bencode import bdecode, bencode
35
35
# Protocol version strings. These are sent as prefixes of bzr requests and
109
109
for start, length in offsets:
110
110
txt.append('%d,%d' % (start, length))
111
111
return '\n'.join(txt)
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
"""Server-side encoding and decoding logic for smart version 1."""
117
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
self._backing_transport = backing_transport
119
119
self._root_client_path = root_client_path
128
128
def accept_bytes(self, bytes):
129
129
"""Take bytes, and advance the internal state machine appropriately.
131
131
:param bytes: must be a byte string
133
133
if not isinstance(bytes, str):
412
412
self.chunks = collections.deque()
413
413
self.error = False
414
414
self.error_in_progress = None
416
416
def next_read_size(self):
417
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
# end-of-body marker is 4 bytes: 'END\n'.
506
507
self.chunks.append(self.chunk_in_progress)
507
508
self.chunk_in_progress = None
508
509
self.state_accept = self._state_accept_expecting_length
510
511
def _state_accept_reading_unused(self):
511
512
self.unused_data += self._get_in_buffer()
512
513
self._in_buffer_list = []
515
516
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
517
"""Decodes the length-prefixed bulk data."""
518
519
def __init__(self):
519
520
_StatefulDecoder.__init__(self)
520
521
self.state_accept = self._state_accept_expecting_length
521
522
self.state_read = self._state_read_no_data
523
524
self._trailer_buffer = ''
525
526
def next_read_size(self):
526
527
if self.bytes_left is not None:
527
528
# Ideally we want to read all the remainder of the body and the
564
565
self._body = self._body[:self.bytes_left]
565
566
self.bytes_left = None
566
567
self.state_accept = self._state_accept_reading_trailer
568
569
def _state_accept_reading_trailer(self):
569
570
self._trailer_buffer += self._get_in_buffer()
570
571
self._set_in_buffer(None)
574
575
self.unused_data = self._trailer_buffer[len('done\n'):]
575
576
self.state_accept = self._state_accept_reading_unused
576
577
self.finished_reading = True
578
579
def _state_accept_reading_unused(self):
579
580
self.unused_data += self._get_in_buffer()
580
581
self._set_in_buffer(None)
656
657
mutter(' %d bytes in readv request', len(readv_bytes))
657
658
self._last_verb = args[0]
659
def call_with_body_stream(self, args, stream):
660
# Protocols v1 and v2 don't support body streams. So it's safe to
661
# assume that a v1/v2 server doesn't support whatever method we're
662
# trying to call with a body stream.
663
self._request.finished_writing()
664
self._request.finished_reading()
665
raise errors.UnknownSmartMethod(args[0])
667
660
def cancel_read_body(self):
668
661
"""After expecting a body, a response code may indicate one otherwise.
729
722
def _response_is_unknown_method(self, result_tuple):
730
723
"""Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
731
724
method' response to the request.
733
726
:param response: The response from a smart client call_expecting_body
735
728
:param verb: The verb used in that call.
742
735
# The response will have no body, so we've finished reading.
743
736
self._request.finished_reading()
744
737
raise errors.UnknownSmartMethod(self._last_verb)
746
739
def read_body_bytes(self, count=-1):
747
740
"""Read bytes from the body, decoding into a byte stream.
749
We read all bytes at once to ensure we've checked the trailer for
742
We read all bytes at once to ensure we've checked the trailer for
750
743
errors, and then feed the buffer back as read_body_bytes is called.
752
745
if self._body_buffer is not None:
791
784
def _write_protocol_version(self):
792
785
"""Write any prefixes this protocol requires.
794
787
Version one doesn't send protocol versions.
798
791
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
792
"""Version two of the client side of the smart protocol.
801
794
This prefixes the request with the value of REQUEST_VERSION_TWO.
897
890
# We do *not* set self.decoding_failed here. The message handler
898
891
# has raised an error, but the decoder is still able to parse bytes
899
892
# and determine when this message ends.
900
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
901
log_exception_quietly()
893
log_exception_quietly()
902
894
self.message_handler.protocol_error(exception.exc_value)
903
895
# The state machine is ready to continue decoding, but the
904
896
# exception has interrupted the loop that runs the state machine.
940
932
def _extract_prefixed_bencoded_data(self):
941
933
prefixed_bytes = self._extract_length_prefixed_bytes()
943
decoded = bdecode_as_tuple(prefixed_bytes)
935
decoded = bdecode(prefixed_bytes)
944
936
except ValueError:
945
937
raise errors.SmartProtocolError(
946
938
'Bytes %r not bencoded' % (prefixed_bytes,))
986
978
self.message_handler.headers_received(decoded)
988
980
raise errors.SmartMessageHandlerError(sys.exc_info())
990
982
def _state_accept_expecting_message_part(self):
991
983
message_part_kind = self._extract_single_byte()
992
984
if message_part_kind == 'o':
1037
1029
raise errors.SmartMessageHandlerError(sys.exc_info())
1039
1031
def _state_accept_reading_unused(self):
1040
self.unused_data += self._get_in_buffer()
1032
self.unused_data = self._get_in_buffer()
1041
1033
self._set_in_buffer(None)
1043
1035
def next_read_size(self):
1061
1053
response_marker = request_marker = MESSAGE_VERSION_THREE
1063
1055
def __init__(self, write_func):
1065
1057
self._real_write_func = write_func
1067
1059
def _write_func(self, bytes):
1068
self._buf.append(bytes)
1069
if len(self._buf) > 100:
1072
1062
def flush(self):
1074
self._real_write_func(''.join(self._buf))
1064
self._real_write_func(self._buf)
1077
1067
def _serialise_offsets(self, offsets):
1078
1068
"""Serialise a readv offset list."""
1080
1070
for start, length in offsets:
1081
1071
txt.append('%d,%d' % (start, length))
1082
1072
return '\n'.join(txt)
1084
1074
def _write_protocol_version(self):
1085
1075
self._write_func(MESSAGE_VERSION_THREE)
1161
1148
if response.body is not None:
1162
1149
self._write_prefixed_body(response.body)
1163
1150
elif response.body_stream is not None:
1164
for exc_info, chunk in _iter_with_errors(response.body_stream):
1165
if exc_info is not None:
1166
self._write_error_status()
1167
error_struct = request._translate_error(exc_info[1])
1168
self._write_structure(error_struct)
1171
if isinstance(chunk, request.FailedSmartServerResponse):
1172
self._write_error_status()
1173
self._write_structure(chunk.args)
1175
self._write_prefixed_body(chunk)
1151
for chunk in response.body_stream:
1152
self._write_prefixed_body(chunk)
1176
1154
self._write_end()
1179
def _iter_with_errors(iterable):
1180
"""Handle errors from iterable.next().
1184
for exc_info, value in _iter_with_errors(iterable):
1187
This is a safer alternative to::
1190
for value in iterable:
1195
Because the latter will catch errors from the for-loop body, not just
1198
If an error occurs, exc_info will be a exc_info tuple, and the generator
1199
will terminate. Otherwise exc_info will be None, and value will be the
1200
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1201
will not be itercepted.
1203
iterator = iter(iterable)
1206
yield None, iterator.next()
1207
except StopIteration:
1209
except (KeyboardInterrupt, SystemExit):
1212
yield sys.exc_info(), None
1216
1157
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1223
1164
def set_headers(self, headers):
1224
1165
self._headers = headers.copy()
1226
1167
def call(self, *args):
1227
1168
if 'hpss' in debug.debug_flags:
1228
1169
mutter('hpss call: %s', repr(args)[1:-1])
1277
1218
self._write_end()
1278
1219
self._medium_request.finished_writing()
1280
def call_with_body_stream(self, args, stream):
1281
if 'hpss' in debug.debug_flags:
1282
mutter('hpss call w/body stream: %r', args)
1283
path = getattr(self._medium_request._medium, '_path', None)
1284
if path is not None:
1285
mutter(' (to %s)', path)
1286
self._request_start_time = time.time()
1287
self._write_protocol_version()
1288
self._write_headers(self._headers)
1289
self._write_structure(args)
1290
# TODO: notice if the server has sent an early error reply before we
1291
# have finished sending the stream. We would notice at the end
1292
# anyway, but if the medium can deliver it early then it's good
1293
# to short-circuit the whole request...
1294
for exc_info, part in _iter_with_errors(stream):
1295
if exc_info is not None:
1296
# Iterating the stream failed. Cleanly abort the request.
1297
self._write_error_status()
1298
# Currently the client unconditionally sends ('error',) as the
1300
self._write_structure(('error',))
1302
self._medium_request.finished_writing()
1303
raise exc_info[0], exc_info[1], exc_info[2]
1305
self._write_prefixed_body(part)
1308
self._medium_request.finished_writing()