13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Wire-level encoding and decoding of requests and responses for the smart
22
22
from cStringIO import StringIO
28
from bzrlib import debug
29
from bzrlib import errors
35
30
from bzrlib.smart import message, request
36
31
from bzrlib.trace import log_exception_quietly, mutter
37
from bzrlib.bencode import bdecode_as_tuple, bencode
32
from bzrlib.util.bencode import bdecode_as_tuple, bencode
40
35
# Protocol version strings. These are sent as prefixes of bzr requests and
119
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
120
115
"""Server-side encoding and decoding logic for smart version 1."""
122
def __init__(self, backing_transport, write_func, root_client_path='/',
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
124
118
self._backing_transport = backing_transport
125
119
self._root_client_path = root_client_path
126
self._jail_root = jail_root
127
120
self.unused_data = ''
128
121
self._finished = False
129
122
self.in_buffer = ''
151
144
req_args = _decode_tuple(first_line)
152
145
self.request = request.SmartServerRequestHandler(
153
146
self._backing_transport, commands=request.request_handlers,
154
root_client_path=self._root_client_path,
155
jail_root=self._jail_root)
156
self.request.args_received(req_args)
147
root_client_path=self._root_client_path)
148
self.request.dispatch_command(req_args[0], req_args[1:])
157
149
if self.request.finished_reading:
158
150
# trivial request
159
151
self.unused_data = self.in_buffer
620
612
mutter('hpss call: %s', repr(args)[1:-1])
621
613
if getattr(self._request._medium, 'base', None) is not None:
622
614
mutter(' (to %s)', self._request._medium.base)
623
self._request_start_time = osutils.timer_func()
615
self._request_start_time = time.time()
624
616
self._write_args(args)
625
617
self._request.finished_writing()
626
618
self._last_verb = args[0]
635
627
if getattr(self._request._medium, '_path', None) is not None:
636
628
mutter(' (to %s)', self._request._medium._path)
637
629
mutter(' %d bytes', len(body))
638
self._request_start_time = osutils.timer_func()
630
self._request_start_time = time.time()
639
631
if 'hpssdetail' in debug.debug_flags:
640
632
mutter('hpss body content: %s', body)
641
633
self._write_args(args)
654
646
mutter('hpss call w/readv: %s', repr(args)[1:-1])
655
647
if getattr(self._request._medium, '_path', None) is not None:
656
648
mutter(' (to %s)', self._request._medium._path)
657
self._request_start_time = osutils.timer_func()
649
self._request_start_time = time.time()
658
650
self._write_args(args)
659
651
readv_bytes = self._serialise_offsets(body)
660
652
bytes = self._encode_bulk_data(readv_bytes)
868
860
def build_server_protocol_three(backing_transport, write_func,
869
root_client_path, jail_root=None):
870
862
request_handler = request.SmartServerRequestHandler(
871
863
backing_transport, commands=request.request_handlers,
872
root_client_path=root_client_path, jail_root=jail_root)
864
root_client_path=root_client_path)
873
865
responder = ProtocolThreeResponder(write_func)
874
866
message_handler = message.ConventionalRequestHandler(request_handler, responder)
875
867
return ProtocolThreeDecoder(message_handler)
905
897
# We do *not* set self.decoding_failed here. The message handler
906
898
# has raised an error, but the decoder is still able to parse bytes
907
899
# and determine when this message ends.
908
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
909
log_exception_quietly()
900
log_exception_quietly()
910
901
self.message_handler.protocol_error(exception.exc_value)
911
902
# The state machine is ready to continue decoding, but the
912
903
# exception has interrupted the loop that runs the state machine.
1067
1058
class _ProtocolThreeEncoder(object):
1069
1060
response_marker = request_marker = MESSAGE_VERSION_THREE
1070
BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1072
1062
def __init__(self, write_func):
1075
1064
self._real_write_func = write_func
1077
1066
def _write_func(self, bytes):
1078
# TODO: It is probably more appropriate to use sum(map(len, _buf))
1079
# for total number of bytes to write, rather than buffer based on
1080
# the number of write() calls
1081
# TODO: Another possibility would be to turn this into an async model.
1082
# Where we let another thread know that we have some bytes if
1083
# they want it, but we don't actually block for it
1084
# Note that osutils.send_all always sends 64kB chunks anyway, so
1085
# we might just push out smaller bits at a time?
1086
self._buf.append(bytes)
1087
self._buf_len += len(bytes)
1088
if self._buf_len > self.BUFFER_SIZE:
1091
1069
def flush(self):
1093
self._real_write_func(''.join(self._buf))
1071
self._real_write_func(self._buf)
1097
1074
def _serialise_offsets(self, offsets):
1098
1075
"""Serialise a readv offset list."""
1147
1124
_ProtocolThreeEncoder.__init__(self, write_func)
1148
1125
self.response_sent = False
1149
1126
self._headers = {'Software version': bzrlib.__version__}
1150
if 'hpss' in debug.debug_flags:
1151
self._thread_id = thread.get_ident()
1152
self._response_start_time = None
1154
def _trace(self, action, message, extra_bytes=None, include_time=False):
1155
if self._response_start_time is None:
1156
self._response_start_time = osutils.timer_func()
1158
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1161
if extra_bytes is None:
1164
extra = ' ' + repr(extra_bytes[:40])
1166
extra = extra[:29] + extra[-1] + '...'
1167
mutter('%12s: [%s] %s%s%s'
1168
% (action, self._thread_id, t, message, extra))
1170
1128
def send_error(self, exception):
1171
1129
if self.response_sent:
1198
1154
self._write_success_status()
1200
1156
self._write_error_status()
1201
if 'hpss' in debug.debug_flags:
1202
self._trace('response', repr(response.args))
1203
1157
self._write_structure(response.args)
1204
1158
if response.body is not None:
1205
1159
self._write_prefixed_body(response.body)
1206
if 'hpss' in debug.debug_flags:
1207
self._trace('body', '%d bytes' % (len(response.body),),
1208
response.body, include_time=True)
1209
1160
elif response.body_stream is not None:
1210
count = num_bytes = 0
1212
for exc_info, chunk in _iter_with_errors(response.body_stream):
1214
if exc_info is not None:
1215
self._write_error_status()
1216
error_struct = request._translate_error(exc_info[1])
1217
self._write_structure(error_struct)
1220
if isinstance(chunk, request.FailedSmartServerResponse):
1221
self._write_error_status()
1222
self._write_structure(chunk.args)
1224
num_bytes += len(chunk)
1225
if first_chunk is None:
1227
self._write_prefixed_body(chunk)
1228
if 'hpssdetail' in debug.debug_flags:
1229
# Not worth timing separately, as _write_func is
1231
self._trace('body chunk',
1232
'%d bytes' % (len(chunk),),
1233
chunk, suppress_time=True)
1234
if 'hpss' in debug.debug_flags:
1235
self._trace('body stream',
1236
'%d bytes %d chunks' % (num_bytes, count),
1161
for chunk in response.body_stream:
1162
self._write_prefixed_body(chunk)
1238
1164
self._write_end()
1239
if 'hpss' in debug.debug_flags:
1240
self._trace('response end', '', include_time=True)
1243
def _iter_with_errors(iterable):
1244
"""Handle errors from iterable.next().
1248
for exc_info, value in _iter_with_errors(iterable):
1251
This is a safer alternative to::
1254
for value in iterable:
1259
Because the latter will catch errors from the for-loop body, not just
1262
If an error occurs, exc_info will be a exc_info tuple, and the generator
1263
will terminate. Otherwise exc_info will be None, and value will be the
1264
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1265
will not be itercepted.
1267
iterator = iter(iterable)
1270
yield None, iterator.next()
1271
except StopIteration:
1273
except (KeyboardInterrupt, SystemExit):
1276
mutter('_iter_with_errors caught error')
1277
log_exception_quietly()
1278
yield sys.exc_info(), None
1282
1167
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1357
1242
# have finished sending the stream. We would notice at the end
1358
1243
# anyway, but if the medium can deliver it early then it's good
1359
1244
# to short-circuit the whole request...
1360
for exc_info, part in _iter_with_errors(stream):
1361
if exc_info is not None:
1362
# Iterating the stream failed. Cleanly abort the request.
1363
self._write_error_status()
1364
# Currently the client unconditionally sends ('error',) as the
1366
self._write_structure(('error',))
1368
self._medium_request.finished_writing()
1369
raise exc_info[0], exc_info[1], exc_info[2]
1371
1247
self._write_prefixed_body(part)
1250
exc_info = sys.exc_info()
1251
# Iterating the stream failed. Cleanly abort the request.
1252
self._write_error_status()
1253
# Currently the client unconditionally sends ('error',) as the
1255
self._write_structure(('error',))
1257
self._medium_request.finished_writing()
1258
raise exc_info[0], exc_info[1], exc_info[2]
1373
1259
self._write_end()
1374
1260
self._medium_request.finished_writing()