13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
from bzrlib.util.bencode import bdecode, bencode
32
from bzrlib.bencode import bdecode_as_tuple, bencode
35
35
# Protocol version strings. These are sent as prefixes of bzr requests and
109
109
for start, length in offsets:
110
110
txt.append('%d,%d' % (start, length))
111
111
return '\n'.join(txt)
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
"""Server-side encoding and decoding logic for smart version 1."""
117
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
self._backing_transport = backing_transport
119
119
self._root_client_path = root_client_path
128
128
def accept_bytes(self, bytes):
129
129
"""Take bytes, and advance the internal state machine appropriately.
131
131
:param bytes: must be a byte string
133
133
if not isinstance(bytes, str):
412
412
self.chunks = collections.deque()
413
413
self.error = False
414
414
self.error_in_progress = None
416
416
def next_read_size(self):
417
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
# end-of-body marker is 4 bytes: 'END\n'.
507
506
self.chunks.append(self.chunk_in_progress)
508
507
self.chunk_in_progress = None
509
508
self.state_accept = self._state_accept_expecting_length
511
510
def _state_accept_reading_unused(self):
512
511
self.unused_data += self._get_in_buffer()
513
512
self._in_buffer_list = []
516
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
517
516
"""Decodes the length-prefixed bulk data."""
519
518
def __init__(self):
520
519
_StatefulDecoder.__init__(self)
521
520
self.state_accept = self._state_accept_expecting_length
522
521
self.state_read = self._state_read_no_data
524
523
self._trailer_buffer = ''
526
525
def next_read_size(self):
527
526
if self.bytes_left is not None:
528
527
# Ideally we want to read all the remainder of the body and the
565
564
self._body = self._body[:self.bytes_left]
566
565
self.bytes_left = None
567
566
self.state_accept = self._state_accept_reading_trailer
569
568
def _state_accept_reading_trailer(self):
570
569
self._trailer_buffer += self._get_in_buffer()
571
570
self._set_in_buffer(None)
575
574
self.unused_data = self._trailer_buffer[len('done\n'):]
576
575
self.state_accept = self._state_accept_reading_unused
577
576
self.finished_reading = True
579
578
def _state_accept_reading_unused(self):
580
579
self.unused_data += self._get_in_buffer()
581
580
self._set_in_buffer(None)
657
656
mutter(' %d bytes in readv request', len(readv_bytes))
658
657
self._last_verb = args[0]
659
def call_with_body_stream(self, args, stream):
660
# Protocols v1 and v2 don't support body streams. So it's safe to
661
# assume that a v1/v2 server doesn't support whatever method we're
662
# trying to call with a body stream.
663
self._request.finished_writing()
664
self._request.finished_reading()
665
raise errors.UnknownSmartMethod(args[0])
660
667
def cancel_read_body(self):
661
668
"""After expecting a body, a response code may indicate one otherwise.
722
729
def _response_is_unknown_method(self, result_tuple):
723
730
"""Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
724
731
method' response to the request.
726
733
:param response: The response from a smart client call_expecting_body
728
735
:param verb: The verb used in that call.
735
742
# The response will have no body, so we've finished reading.
736
743
self._request.finished_reading()
737
744
raise errors.UnknownSmartMethod(self._last_verb)
739
746
def read_body_bytes(self, count=-1):
740
747
"""Read bytes from the body, decoding into a byte stream.
742
We read all bytes at once to ensure we've checked the trailer for
749
We read all bytes at once to ensure we've checked the trailer for
743
750
errors, and then feed the buffer back as read_body_bytes is called.
745
752
if self._body_buffer is not None:
784
791
def _write_protocol_version(self):
785
792
"""Write any prefixes this protocol requires.
787
794
Version one doesn't send protocol versions.
791
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
792
799
"""Version two of the client side of the smart protocol.
794
801
This prefixes the request with the value of REQUEST_VERSION_TWO.
890
897
# We do *not* set self.decoding_failed here. The message handler
891
898
# has raised an error, but the decoder is still able to parse bytes
892
899
# and determine when this message ends.
893
log_exception_quietly()
900
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
901
log_exception_quietly()
894
902
self.message_handler.protocol_error(exception.exc_value)
895
903
# The state machine is ready to continue decoding, but the
896
904
# exception has interrupted the loop that runs the state machine.
932
940
def _extract_prefixed_bencoded_data(self):
933
941
prefixed_bytes = self._extract_length_prefixed_bytes()
935
decoded = bdecode(prefixed_bytes)
943
decoded = bdecode_as_tuple(prefixed_bytes)
936
944
except ValueError:
937
945
raise errors.SmartProtocolError(
938
946
'Bytes %r not bencoded' % (prefixed_bytes,))
978
986
self.message_handler.headers_received(decoded)
980
988
raise errors.SmartMessageHandlerError(sys.exc_info())
982
990
def _state_accept_expecting_message_part(self):
983
991
message_part_kind = self._extract_single_byte()
984
992
if message_part_kind == 'o':
1029
1037
raise errors.SmartMessageHandlerError(sys.exc_info())
1031
1039
def _state_accept_reading_unused(self):
1032
self.unused_data = self._get_in_buffer()
1040
self.unused_data += self._get_in_buffer()
1033
1041
self._set_in_buffer(None)
1035
1043
def next_read_size(self):
1053
1061
response_marker = request_marker = MESSAGE_VERSION_THREE
1055
1063
def __init__(self, write_func):
1057
1065
self._real_write_func = write_func
1059
1067
def _write_func(self, bytes):
1068
self._buf.append(bytes)
1069
if len(self._buf) > 100:
1062
1072
def flush(self):
1064
self._real_write_func(self._buf)
1074
self._real_write_func(''.join(self._buf))
1067
1077
def _serialise_offsets(self, offsets):
1068
1078
"""Serialise a readv offset list."""
1070
1080
for start, length in offsets:
1071
1081
txt.append('%d,%d' % (start, length))
1072
1082
return '\n'.join(txt)
1074
1084
def _write_protocol_version(self):
1075
1085
self._write_func(MESSAGE_VERSION_THREE)
1148
1161
if response.body is not None:
1149
1162
self._write_prefixed_body(response.body)
1150
1163
elif response.body_stream is not None:
1151
for chunk in response.body_stream:
1152
self._write_prefixed_body(chunk)
1164
for exc_info, chunk in _iter_with_errors(response.body_stream):
1165
if exc_info is not None:
1166
self._write_error_status()
1167
error_struct = request._translate_error(exc_info[1])
1168
self._write_structure(error_struct)
1171
if isinstance(chunk, request.FailedSmartServerResponse):
1172
self._write_error_status()
1173
self._write_structure(chunk.args)
1175
self._write_prefixed_body(chunk)
1154
1176
self._write_end()
1179
def _iter_with_errors(iterable):
1180
"""Handle errors from iterable.next().
1184
for exc_info, value in _iter_with_errors(iterable):
1187
This is a safer alternative to::
1190
for value in iterable:
1195
Because the latter will catch errors from the for-loop body, not just
1198
If an error occurs, exc_info will be a exc_info tuple, and the generator
1199
will terminate. Otherwise exc_info will be None, and value will be the
1200
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1201
will not be itercepted.
1203
iterator = iter(iterable)
1206
yield None, iterator.next()
1207
except StopIteration:
1209
except (KeyboardInterrupt, SystemExit):
1212
mutter('_iter_with_errors caught error')
1213
log_exception_quietly()
1214
yield sys.exc_info(), None
1157
1218
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1164
1225
def set_headers(self, headers):
1165
1226
self._headers = headers.copy()
1167
1228
def call(self, *args):
1168
1229
if 'hpss' in debug.debug_flags:
1169
1230
mutter('hpss call: %s', repr(args)[1:-1])
1218
1279
self._write_end()
1219
1280
self._medium_request.finished_writing()
1282
def call_with_body_stream(self, args, stream):
1283
if 'hpss' in debug.debug_flags:
1284
mutter('hpss call w/body stream: %r', args)
1285
path = getattr(self._medium_request._medium, '_path', None)
1286
if path is not None:
1287
mutter(' (to %s)', path)
1288
self._request_start_time = time.time()
1289
self._write_protocol_version()
1290
self._write_headers(self._headers)
1291
self._write_structure(args)
1292
# TODO: notice if the server has sent an early error reply before we
1293
# have finished sending the stream. We would notice at the end
1294
# anyway, but if the medium can deliver it early then it's good
1295
# to short-circuit the whole request...
1296
for exc_info, part in _iter_with_errors(stream):
1297
if exc_info is not None:
1298
# Iterating the stream failed. Cleanly abort the request.
1299
self._write_error_status()
1300
# Currently the client unconditionally sends ('error',) as the
1302
self._write_structure(('error',))
1304
self._medium_request.finished_writing()
1305
raise exc_info[0], exc_info[1], exc_info[2]
1307
self._write_prefixed_body(part)
1310
self._medium_request.finished_writing()