~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Jelmer Vernooij
  • Date: 2009-10-27 21:54:26 UTC
  • mfrom: (4771 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4833.
  • Revision ID: jelmer@samba.org-20091027215426-72164bkd4mq9dsd4
merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
from cStringIO import StringIO
23
23
import struct
24
24
import sys
25
 
import thread
26
 
import threading
27
25
import time
28
26
 
29
27
import bzrlib
30
 
from bzrlib import (
31
 
    debug,
32
 
    errors,
33
 
    osutils,
34
 
    )
 
28
from bzrlib import debug
 
29
from bzrlib import errors
35
30
from bzrlib.smart import message, request
36
31
from bzrlib.trace import log_exception_quietly, mutter
37
32
from bzrlib.bencode import bdecode_as_tuple, bencode
620
615
            mutter('hpss call:   %s', repr(args)[1:-1])
621
616
            if getattr(self._request._medium, 'base', None) is not None:
622
617
                mutter('             (to %s)', self._request._medium.base)
623
 
            self._request_start_time = osutils.timer_func()
 
618
            self._request_start_time = time.time()
624
619
        self._write_args(args)
625
620
        self._request.finished_writing()
626
621
        self._last_verb = args[0]
635
630
            if getattr(self._request._medium, '_path', None) is not None:
636
631
                mutter('                  (to %s)', self._request._medium._path)
637
632
            mutter('              %d bytes', len(body))
638
 
            self._request_start_time = osutils.timer_func()
 
633
            self._request_start_time = time.time()
639
634
            if 'hpssdetail' in debug.debug_flags:
640
635
                mutter('hpss body content: %s', body)
641
636
        self._write_args(args)
654
649
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
655
650
            if getattr(self._request._medium, '_path', None) is not None:
656
651
                mutter('                  (to %s)', self._request._medium._path)
657
 
            self._request_start_time = osutils.timer_func()
 
652
            self._request_start_time = time.time()
658
653
        self._write_args(args)
659
654
        readv_bytes = self._serialise_offsets(body)
660
655
        bytes = self._encode_bulk_data(readv_bytes)
686
681
        if 'hpss' in debug.debug_flags:
687
682
            if self._request_start_time is not None:
688
683
                mutter('   result:   %6.3fs  %s',
689
 
                       osutils.timer_func() - self._request_start_time,
 
684
                       time.time() - self._request_start_time,
690
685
                       repr(result)[1:-1])
691
686
                self._request_start_time = None
692
687
            else:
1067
1062
class _ProtocolThreeEncoder(object):
1068
1063
 
1069
1064
    response_marker = request_marker = MESSAGE_VERSION_THREE
1070
 
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1071
1065
 
1072
1066
    def __init__(self, write_func):
1073
1067
        self._buf = []
1074
 
        self._buf_len = 0
1075
1068
        self._real_write_func = write_func
1076
1069
 
1077
1070
    def _write_func(self, bytes):
1078
 
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
1079
 
        #       for total number of bytes to write, rather than buffer based on
1080
 
        #       the number of write() calls
1081
 
        # TODO: Another possibility would be to turn this into an async model.
1082
 
        #       Where we let another thread know that we have some bytes if
1083
 
        #       they want it, but we don't actually block for it
1084
 
        #       Note that osutils.send_all always sends 64kB chunks anyway, so
1085
 
        #       we might just push out smaller bits at a time?
1086
1071
        self._buf.append(bytes)
1087
 
        self._buf_len += len(bytes)
1088
 
        if self._buf_len > self.BUFFER_SIZE:
 
1072
        if len(self._buf) > 100:
1089
1073
            self.flush()
1090
1074
 
1091
1075
    def flush(self):
1092
1076
        if self._buf:
1093
1077
            self._real_write_func(''.join(self._buf))
1094
1078
            del self._buf[:]
1095
 
            self._buf_len = 0
1096
1079
 
1097
1080
    def _serialise_offsets(self, offsets):
1098
1081
        """Serialise a readv offset list."""
1147
1130
        _ProtocolThreeEncoder.__init__(self, write_func)
1148
1131
        self.response_sent = False
1149
1132
        self._headers = {'Software version': bzrlib.__version__}
1150
 
        if 'hpss' in debug.debug_flags:
1151
 
            self._thread_id = thread.get_ident()
1152
 
            self._response_start_time = None
1153
 
 
1154
 
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1155
 
        if self._response_start_time is None:
1156
 
            self._response_start_time = osutils.timer_func()
1157
 
        if include_time:
1158
 
            t = '%5.3fs ' % (time.clock() - self._response_start_time)
1159
 
        else:
1160
 
            t = ''
1161
 
        if extra_bytes is None:
1162
 
            extra = ''
1163
 
        else:
1164
 
            extra = ' ' + repr(extra_bytes[:40])
1165
 
            if len(extra) > 33:
1166
 
                extra = extra[:29] + extra[-1] + '...'
1167
 
        mutter('%12s: [%s] %s%s%s'
1168
 
               % (action, self._thread_id, t, message, extra))
1169
1133
 
1170
1134
    def send_error(self, exception):
1171
1135
        if self.response_sent:
1177
1141
                ('UnknownMethod', exception.verb))
1178
1142
            self.send_response(failure)
1179
1143
            return
1180
 
        if 'hpss' in debug.debug_flags:
1181
 
            self._trace('error', str(exception))
1182
1144
        self.response_sent = True
1183
1145
        self._write_protocol_version()
1184
1146
        self._write_headers(self._headers)
1198
1160
            self._write_success_status()
1199
1161
        else:
1200
1162
            self._write_error_status()
1201
 
        if 'hpss' in debug.debug_flags:
1202
 
            self._trace('response', repr(response.args))
1203
1163
        self._write_structure(response.args)
1204
1164
        if response.body is not None:
1205
1165
            self._write_prefixed_body(response.body)
1206
 
            if 'hpss' in debug.debug_flags:
1207
 
                self._trace('body', '%d bytes' % (len(response.body),),
1208
 
                            response.body, include_time=True)
1209
1166
        elif response.body_stream is not None:
1210
 
            count = num_bytes = 0
1211
 
            first_chunk = None
1212
1167
            for exc_info, chunk in _iter_with_errors(response.body_stream):
1213
 
                count += 1
1214
1168
                if exc_info is not None:
1215
1169
                    self._write_error_status()
1216
1170
                    error_struct = request._translate_error(exc_info[1])
1221
1175
                        self._write_error_status()
1222
1176
                        self._write_structure(chunk.args)
1223
1177
                        break
1224
 
                    num_bytes += len(chunk)
1225
 
                    if first_chunk is None:
1226
 
                        first_chunk = chunk
1227
1178
                    self._write_prefixed_body(chunk)
1228
 
                    if 'hpssdetail' in debug.debug_flags:
1229
 
                        # Not worth timing separately, as _write_func is
1230
 
                        # actually buffered
1231
 
                        self._trace('body chunk',
1232
 
                                    '%d bytes' % (len(chunk),),
1233
 
                                    chunk, suppress_time=True)
1234
 
            if 'hpss' in debug.debug_flags:
1235
 
                self._trace('body stream',
1236
 
                            '%d bytes %d chunks' % (num_bytes, count),
1237
 
                            first_chunk)
1238
1179
        self._write_end()
1239
 
        if 'hpss' in debug.debug_flags:
1240
 
            self._trace('response end', '', include_time=True)
1241
1180
 
1242
1181
 
1243
1182
def _iter_with_errors(iterable):
1295
1234
            base = getattr(self._medium_request._medium, 'base', None)
1296
1235
            if base is not None:
1297
1236
                mutter('             (to %s)', base)
1298
 
            self._request_start_time = osutils.timer_func()
 
1237
            self._request_start_time = time.time()
1299
1238
        self._write_protocol_version()
1300
1239
        self._write_headers(self._headers)
1301
1240
        self._write_structure(args)
1313
1252
            if path is not None:
1314
1253
                mutter('                  (to %s)', path)
1315
1254
            mutter('              %d bytes', len(body))
1316
 
            self._request_start_time = osutils.timer_func()
 
1255
            self._request_start_time = time.time()
1317
1256
        self._write_protocol_version()
1318
1257
        self._write_headers(self._headers)
1319
1258
        self._write_structure(args)
1332
1271
            path = getattr(self._medium_request._medium, '_path', None)
1333
1272
            if path is not None:
1334
1273
                mutter('                  (to %s)', path)
1335
 
            self._request_start_time = osutils.timer_func()
 
1274
            self._request_start_time = time.time()
1336
1275
        self._write_protocol_version()
1337
1276
        self._write_headers(self._headers)
1338
1277
        self._write_structure(args)
1349
1288
            path = getattr(self._medium_request._medium, '_path', None)
1350
1289
            if path is not None:
1351
1290
                mutter('                  (to %s)', path)
1352
 
            self._request_start_time = osutils.timer_func()
 
1291
            self._request_start_time = time.time()
1353
1292
        self._write_protocol_version()
1354
1293
        self._write_headers(self._headers)
1355
1294
        self._write_structure(args)