~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Andrew Bennetts
  • Date: 2010-01-15 03:58:20 UTC
  • mfrom: (4963 +trunk)
  • mto: (4973.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4975.
  • Revision ID: andrew.bennetts@canonical.com-20100115035820-ilb3t36swgzq6v1l
MergeĀ lp:bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
from cStringIO import StringIO
23
23
import struct
24
24
import sys
 
25
import thread
 
26
import threading
25
27
import time
26
28
 
27
29
import bzrlib
28
 
from bzrlib import debug
29
 
from bzrlib import errors
 
30
from bzrlib import (
 
31
    debug,
 
32
    errors,
 
33
    osutils,
 
34
    )
30
35
from bzrlib.smart import message, request
31
36
from bzrlib.trace import log_exception_quietly, mutter
32
37
from bzrlib.bencode import bdecode_as_tuple, bencode
615
620
            mutter('hpss call:   %s', repr(args)[1:-1])
616
621
            if getattr(self._request._medium, 'base', None) is not None:
617
622
                mutter('             (to %s)', self._request._medium.base)
618
 
            self._request_start_time = time.time()
 
623
            self._request_start_time = osutils.timer_func()
619
624
        self._write_args(args)
620
625
        self._request.finished_writing()
621
626
        self._last_verb = args[0]
630
635
            if getattr(self._request._medium, '_path', None) is not None:
631
636
                mutter('                  (to %s)', self._request._medium._path)
632
637
            mutter('              %d bytes', len(body))
633
 
            self._request_start_time = time.time()
 
638
            self._request_start_time = osutils.timer_func()
634
639
            if 'hpssdetail' in debug.debug_flags:
635
640
                mutter('hpss body content: %s', body)
636
641
        self._write_args(args)
649
654
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
650
655
            if getattr(self._request._medium, '_path', None) is not None:
651
656
                mutter('                  (to %s)', self._request._medium._path)
652
 
            self._request_start_time = time.time()
 
657
            self._request_start_time = osutils.timer_func()
653
658
        self._write_args(args)
654
659
        readv_bytes = self._serialise_offsets(body)
655
660
        bytes = self._encode_bulk_data(readv_bytes)
681
686
        if 'hpss' in debug.debug_flags:
682
687
            if self._request_start_time is not None:
683
688
                mutter('   result:   %6.3fs  %s',
684
 
                       time.time() - self._request_start_time,
 
689
                       osutils.timer_func() - self._request_start_time,
685
690
                       repr(result)[1:-1])
686
691
                self._request_start_time = None
687
692
            else:
1062
1067
class _ProtocolThreeEncoder(object):
1063
1068
 
1064
1069
    response_marker = request_marker = MESSAGE_VERSION_THREE
 
1070
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1065
1071
 
1066
1072
    def __init__(self, write_func):
1067
1073
        self._buf = []
 
1074
        self._buf_len = 0
1068
1075
        self._real_write_func = write_func
1069
1076
 
1070
1077
    def _write_func(self, bytes):
 
1078
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
 
1079
        #       for total number of bytes to write, rather than buffer based on
 
1080
        #       the number of write() calls
 
1081
        # TODO: Another possibility would be to turn this into an async model.
 
1082
        #       Where we let another thread know that we have some bytes if
 
1083
        #       they want it, but we don't actually block for it
 
1084
        #       Note that osutils.send_all always sends 64kB chunks anyway, so
 
1085
        #       we might just push out smaller bits at a time?
1071
1086
        self._buf.append(bytes)
1072
 
        if len(self._buf) > 100:
 
1087
        self._buf_len += len(bytes)
 
1088
        if self._buf_len > self.BUFFER_SIZE:
1073
1089
            self.flush()
1074
1090
 
1075
1091
    def flush(self):
1076
1092
        if self._buf:
1077
1093
            self._real_write_func(''.join(self._buf))
1078
1094
            del self._buf[:]
 
1095
            self._buf_len = 0
1079
1096
 
1080
1097
    def _serialise_offsets(self, offsets):
1081
1098
        """Serialise a readv offset list."""
1130
1147
        _ProtocolThreeEncoder.__init__(self, write_func)
1131
1148
        self.response_sent = False
1132
1149
        self._headers = {'Software version': bzrlib.__version__}
 
1150
        if 'hpss' in debug.debug_flags:
 
1151
            self._thread_id = thread.get_ident()
 
1152
            self._response_start_time = None
 
1153
 
 
1154
    def _trace(self, action, message, extra_bytes=None, include_time=False):
 
1155
        if self._response_start_time is None:
 
1156
            self._response_start_time = osutils.timer_func()
 
1157
        if include_time:
 
1158
            t = '%5.3fs ' % (time.clock() - self._response_start_time)
 
1159
        else:
 
1160
            t = ''
 
1161
        if extra_bytes is None:
 
1162
            extra = ''
 
1163
        else:
 
1164
            extra = ' ' + repr(extra_bytes[:40])
 
1165
            if len(extra) > 33:
 
1166
                extra = extra[:29] + extra[-1] + '...'
 
1167
        mutter('%12s: [%s] %s%s%s'
 
1168
               % (action, self._thread_id, t, message, extra))
1133
1169
 
1134
1170
    def send_error(self, exception):
1135
1171
        if self.response_sent:
1141
1177
                ('UnknownMethod', exception.verb))
1142
1178
            self.send_response(failure)
1143
1179
            return
 
1180
        if 'hpss' in debug.debug_flags:
 
1181
            self._trace('error', str(exception))
1144
1182
        self.response_sent = True
1145
1183
        self._write_protocol_version()
1146
1184
        self._write_headers(self._headers)
1160
1198
            self._write_success_status()
1161
1199
        else:
1162
1200
            self._write_error_status()
 
1201
        if 'hpss' in debug.debug_flags:
 
1202
            self._trace('response', repr(response.args))
1163
1203
        self._write_structure(response.args)
1164
1204
        if response.body is not None:
1165
1205
            self._write_prefixed_body(response.body)
 
1206
            if 'hpss' in debug.debug_flags:
 
1207
                self._trace('body', '%d bytes' % (len(response.body),),
 
1208
                            response.body, include_time=True)
1166
1209
        elif response.body_stream is not None:
 
1210
            count = num_bytes = 0
 
1211
            first_chunk = None
1167
1212
            for exc_info, chunk in _iter_with_errors(response.body_stream):
 
1213
                count += 1
1168
1214
                if exc_info is not None:
1169
1215
                    self._write_error_status()
1170
1216
                    error_struct = request._translate_error(exc_info[1])
1175
1221
                        self._write_error_status()
1176
1222
                        self._write_structure(chunk.args)
1177
1223
                        break
 
1224
                    num_bytes += len(chunk)
 
1225
                    if first_chunk is None:
 
1226
                        first_chunk = chunk
1178
1227
                    self._write_prefixed_body(chunk)
 
1228
                    if 'hpssdetail' in debug.debug_flags:
 
1229
                        # Not worth timing separately, as _write_func is
 
1230
                        # actually buffered
 
1231
                        self._trace('body chunk',
 
1232
                                    '%d bytes' % (len(chunk),),
 
1233
                                    chunk, suppress_time=True)
 
1234
            if 'hpss' in debug.debug_flags:
 
1235
                self._trace('body stream',
 
1236
                            '%d bytes %d chunks' % (num_bytes, count),
 
1237
                            first_chunk)
1179
1238
        self._write_end()
 
1239
        if 'hpss' in debug.debug_flags:
 
1240
            self._trace('response end', '', include_time=True)
1180
1241
 
1181
1242
 
1182
1243
def _iter_with_errors(iterable):
1234
1295
            base = getattr(self._medium_request._medium, 'base', None)
1235
1296
            if base is not None:
1236
1297
                mutter('             (to %s)', base)
1237
 
            self._request_start_time = time.time()
 
1298
            self._request_start_time = osutils.timer_func()
1238
1299
        self._write_protocol_version()
1239
1300
        self._write_headers(self._headers)
1240
1301
        self._write_structure(args)
1252
1313
            if path is not None:
1253
1314
                mutter('                  (to %s)', path)
1254
1315
            mutter('              %d bytes', len(body))
1255
 
            self._request_start_time = time.time()
 
1316
            self._request_start_time = osutils.timer_func()
1256
1317
        self._write_protocol_version()
1257
1318
        self._write_headers(self._headers)
1258
1319
        self._write_structure(args)
1271
1332
            path = getattr(self._medium_request._medium, '_path', None)
1272
1333
            if path is not None:
1273
1334
                mutter('                  (to %s)', path)
1274
 
            self._request_start_time = time.time()
 
1335
            self._request_start_time = osutils.timer_func()
1275
1336
        self._write_protocol_version()
1276
1337
        self._write_headers(self._headers)
1277
1338
        self._write_structure(args)
1288
1349
            path = getattr(self._medium_request._medium, '_path', None)
1289
1350
            if path is not None:
1290
1351
                mutter('                  (to %s)', path)
1291
 
            self._request_start_time = time.time()
 
1352
            self._request_start_time = osutils.timer_func()
1292
1353
        self._write_protocol_version()
1293
1354
        self._write_headers(self._headers)
1294
1355
        self._write_structure(args)