~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Martin von Gagern
  • Date: 2011-06-01 12:53:56 UTC
  • mto: This revision was merged to the branch mainline in revision 6009.
  • Revision ID: martin.vgagern@gmx.net-20110601125356-lwozv2vecea6hxfz
Change from no_decorate to classify as name for the argument.

The command line switch remains as --no-classify, to keep backwards
compatibility.  Users are free to include --no-classify in an alias, and
still use --classify to change back.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
28
import sys
30
 
import time
31
29
import urllib
32
30
 
33
31
import bzrlib
34
32
from bzrlib.lazy_import import lazy_import
35
33
lazy_import(globals(), """
36
 
import select
37
34
import socket
38
35
import thread
39
36
import weakref
45
42
    ui,
46
43
    urlutils,
47
44
    )
48
 
from bzrlib.i18n import gettext
49
 
from bzrlib.smart import client, protocol, request, signals, vfs
 
45
from bzrlib.smart import client, protocol, request, vfs
50
46
from bzrlib.transport import ssh
51
47
""")
52
48
from bzrlib import osutils
179
175
        ui.ui_factory.report_transport_activity(self, bytes, direction)
180
176
 
181
177
 
182
 
_bad_file_descriptor = (errno.EBADF,)
183
 
if sys.platform == 'win32':
184
 
    # Given on Windows if you pass a closed socket to select.select. Probably
185
 
    # also given if you pass a file handle to select.
186
 
    WSAENOTSOCK = 10038
187
 
    _bad_file_descriptor += (WSAENOTSOCK,)
188
 
 
189
 
 
190
178
class SmartServerStreamMedium(SmartMedium):
191
179
    """Handles smart commands coming over a stream.
192
180
 
205
193
        the stream.  See also the _push_back method.
206
194
    """
207
195
 
208
 
    _timer = time.time
209
 
 
210
 
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
 
196
    def __init__(self, backing_transport, root_client_path='/'):
211
197
        """Construct new server.
212
198
 
213
199
        :param backing_transport: Transport for the directory served.
216
202
        self.backing_transport = backing_transport
217
203
        self.root_client_path = root_client_path
218
204
        self.finished = False
219
 
        if timeout is None:
220
 
            raise AssertionError('You must supply a timeout.')
221
 
        self._client_timeout = timeout
222
 
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
205
        SmartMedium.__init__(self)
224
206
 
225
207
    def serve(self):
230
212
        try:
231
213
            while not self.finished:
232
214
                server_protocol = self._build_protocol()
233
 
                # TODO: This seems inelegant:
234
 
                if server_protocol is None:
235
 
                    # We could 'continue' only to notice that self.finished is
236
 
                    # True...
237
 
                    break
238
215
                self._serve_one_request(server_protocol)
239
 
        except errors.ConnectionTimeout, e:
240
 
            trace.note('%s' % (e,))
241
 
            trace.log_exception_quietly()
242
 
            self._disconnect_client()
243
 
            # We reported it, no reason to make a big fuss.
244
 
            return
245
216
        except Exception, e:
246
217
            stderr.write("%s terminating on exception %s\n" % (self, e))
247
218
            raise
248
 
        self._disconnect_client()
249
 
 
250
 
    def _stop_gracefully(self):
251
 
        """When we finish this message, stop looking for more."""
252
 
        trace.mutter('Stopping %s' % (self,))
253
 
        self.finished = True
254
 
 
255
 
    def _disconnect_client(self):
256
 
        """Close the current connection. We stopped due to a timeout/etc."""
257
 
        # The default implementation is a no-op, because that is all we used to
258
 
        # do when disconnecting from a client. I suppose we never had the
259
 
        # *server* initiate a disconnect, before
260
 
 
261
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
262
 
        """Wait for more bytes to be read, but timeout if none available.
263
 
 
264
 
        This allows us to detect idle connections, and stop trying to read from
265
 
        them, without setting the socket itself to non-blocking. This also
266
 
        allows us to specify when we watch for idle timeouts.
267
 
 
268
 
        :return: Did we timeout? (True if we timed out, False if there is data
269
 
            to be read)
270
 
        """
271
 
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
272
219
 
273
220
    def _build_protocol(self):
274
221
        """Identifies the version of the incoming request, and returns an
279
226
 
280
227
        :returns: a SmartServerRequestProtocol.
281
228
        """
282
 
        self._wait_for_bytes_with_timeout(self._client_timeout)
283
 
        if self.finished:
284
 
            # We're stopping, so don't try to do any more work
285
 
            return None
286
229
        bytes = self._get_line()
287
230
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
288
231
        protocol = protocol_factory(
290
233
        protocol.accept_bytes(unused_bytes)
291
234
        return protocol
292
235
 
293
 
    def _wait_on_descriptor(self, fd, timeout_seconds):
294
 
        """select() on a file descriptor, waiting for nonblocking read()
295
 
 
296
 
        This will raise a ConnectionTimeout exception if we do not get a
297
 
        readable handle before timeout_seconds.
298
 
        :return: None
299
 
        """
300
 
        t_end = self._timer() + timeout_seconds
301
 
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
302
 
        rs = xs = None
303
 
        while not rs and not xs and self._timer() < t_end:
304
 
            if self.finished:
305
 
                return
306
 
            try:
307
 
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
308
 
            except (select.error, socket.error) as e:
309
 
                err = getattr(e, 'errno', None)
310
 
                if err is None and getattr(e, 'args', None) is not None:
311
 
                    # select.error doesn't have 'errno', it just has args[0]
312
 
                    err = e.args[0]
313
 
                if err in _bad_file_descriptor:
314
 
                    return # Not a socket indicates read() will fail
315
 
                elif err == errno.EINTR:
316
 
                    # Interrupted, keep looping.
317
 
                    continue
318
 
                raise
319
 
        if rs or xs:
320
 
            return
321
 
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
322
 
                                       % (timeout_seconds,))
323
 
 
324
236
    def _serve_one_request(self, protocol):
325
237
        """Read one request from input, process, send back a response.
326
238
 
347
259
 
348
260
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
349
261
 
350
 
    def __init__(self, sock, backing_transport, root_client_path='/',
351
 
                 timeout=None):
 
262
    def __init__(self, sock, backing_transport, root_client_path='/'):
352
263
        """Constructor.
353
264
 
354
265
        :param sock: the socket the server will read from.  It will be put
355
266
            into blocking mode.
356
267
        """
357
268
        SmartServerStreamMedium.__init__(
358
 
            self, backing_transport, root_client_path=root_client_path,
359
 
            timeout=timeout)
 
269
            self, backing_transport, root_client_path=root_client_path)
360
270
        sock.setblocking(True)
361
271
        self.socket = sock
362
 
        # Get the getpeername now, as we might be closed later when we care.
363
 
        try:
364
 
            self._client_info = sock.getpeername()
365
 
        except socket.error:
366
 
            self._client_info = '<unknown>'
367
 
 
368
 
    def __str__(self):
369
 
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
370
 
 
371
 
    def __repr__(self):
372
 
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
373
 
            self._client_info)
374
272
 
375
273
    def _serve_one_request_unguarded(self, protocol):
376
274
        while protocol.next_read_size():
385
283
 
386
284
        self._push_back(protocol.unused_data)
387
285
 
388
 
    def _disconnect_client(self):
389
 
        """Close the current connection. We stopped due to a timeout/etc."""
390
 
        self.socket.close()
391
 
 
392
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
393
 
        """Wait for more bytes to be read, but timeout if none available.
394
 
 
395
 
        This allows us to detect idle connections, and stop trying to read from
396
 
        them, without setting the socket itself to non-blocking. This also
397
 
        allows us to specify when we watch for idle timeouts.
398
 
 
399
 
        :return: None, this will raise ConnectionTimeout if we time out before
400
 
            data is available.
401
 
        """
402
 
        return self._wait_on_descriptor(self.socket, timeout_seconds)
403
 
 
404
286
    def _read_bytes(self, desired_count):
405
287
        return osutils.read_bytes_from_socket(
406
288
            self.socket, self._report_activity)
423
305
 
424
306
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
425
307
 
426
 
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
 
308
    def __init__(self, in_file, out_file, backing_transport):
427
309
        """Construct new server.
428
310
 
429
311
        :param in_file: Python file from which requests can be read.
430
312
        :param out_file: Python file to write responses.
431
313
        :param backing_transport: Transport for the directory served.
432
314
        """
433
 
        SmartServerStreamMedium.__init__(self, backing_transport,
434
 
            timeout=timeout)
 
315
        SmartServerStreamMedium.__init__(self, backing_transport)
435
316
        if sys.platform == 'win32':
436
317
            # force binary mode for files
437
318
            import msvcrt
442
323
        self._in = in_file
443
324
        self._out = out_file
444
325
 
445
 
    def serve(self):
446
 
        """See SmartServerStreamMedium.serve"""
447
 
        # This is the regular serve, except it adds signal trapping for soft
448
 
        # shutdown.
449
 
        stop_gracefully = self._stop_gracefully
450
 
        signals.register_on_hangup(id(self), stop_gracefully)
451
 
        try:
452
 
            return super(SmartServerPipeStreamMedium, self).serve()
453
 
        finally:
454
 
            signals.unregister_on_hangup(id(self))
455
 
 
456
326
    def _serve_one_request_unguarded(self, protocol):
457
327
        while True:
458
328
            # We need to be careful not to read past the end of the current
471
341
                return
472
342
            protocol.accept_bytes(bytes)
473
343
 
474
 
    def _disconnect_client(self):
475
 
        self._in.close()
476
 
        self._out.flush()
477
 
        self._out.close()
478
 
 
479
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
480
 
        """Wait for more bytes to be read, but timeout if none available.
481
 
 
482
 
        This allows us to detect idle connections, and stop trying to read from
483
 
        them, without setting the socket itself to non-blocking. This also
484
 
        allows us to specify when we watch for idle timeouts.
485
 
 
486
 
        :return: None, this will raise ConnectionTimeout if we time out before
487
 
            data is available.
488
 
        """
489
 
        if (getattr(self._in, 'fileno', None) is None
490
 
            or sys.platform == 'win32'):
491
 
            # You can't select() file descriptors on Windows.
492
 
            return
493
 
        return self._wait_on_descriptor(self._in, timeout_seconds)
494
 
 
495
344
    def _read_bytes(self, desired_count):
496
345
        return self._in.read(desired_count)
497
346
 
641
490
        return self._medium._get_line()
642
491
 
643
492
 
644
 
class _VfsRefuser(object):
645
 
    """An object that refuses all VFS requests.
646
 
 
647
 
    """
648
 
 
649
 
    def __init__(self):
650
 
        client._SmartClient.hooks.install_named_hook(
651
 
            'call', self.check_vfs, 'vfs refuser')
652
 
 
653
 
    def check_vfs(self, params):
654
 
        try:
655
 
            request_method = request.request_handlers.get(params.method)
656
 
        except KeyError:
657
 
            # A method we don't know about doesn't count as a VFS method.
658
 
            return
659
 
        if issubclass(request_method, vfs.VfsRequest):
660
 
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
661
 
 
662
 
 
663
493
class _DebugCounter(object):
664
494
    """An object that counts the HPSS calls made to each client medium.
665
495
 
712
542
        value['count'] = 0
713
543
        value['vfs_count'] = 0
714
544
        if count != 0:
715
 
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
716
 
                       count, vfs_count, medium_repr))
 
545
            trace.note('HPSS calls: %d (%d vfs) %s',
 
546
                       count, vfs_count, medium_repr)
717
547
 
718
548
    def flush_all(self):
719
549
        for ref in list(self.counts.keys()):
720
550
            self.done(ref)
721
551
 
722
552
_debug_counter = None
723
 
_vfs_refuser = None
724
553
 
725
554
 
726
555
class SmartClientMedium(SmartMedium):
743
572
            if _debug_counter is None:
744
573
                _debug_counter = _DebugCounter()
745
574
            _debug_counter.track(self)
746
 
        if 'hpss_client_no_vfs' in debug.debug_flags:
747
 
            global _vfs_refuser
748
 
            if _vfs_refuser is None:
749
 
                _vfs_refuser = _VfsRefuser()
750
575
 
751
576
    def _is_remote_before(self, version_tuple):
752
577
        """Is it possible the remote side supports RPCs for a given version?