~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Patch Queue Manager
  • Date: 2011-10-06 10:15:06 UTC
  • mfrom: (6195.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20111006101506-mychax14dy7yjee2
(vila) Tag bzr-2.5b2 missed during freeze (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
import errno
27
28
import os
28
29
import sys
 
30
import time
29
31
import urllib
30
32
 
31
33
import bzrlib
32
34
from bzrlib.lazy_import import lazy_import
33
35
lazy_import(globals(), """
 
36
import select
34
37
import socket
35
38
import thread
36
39
import weakref
42
45
    ui,
43
46
    urlutils,
44
47
    )
45
 
from bzrlib.smart import client, protocol, request, vfs
 
48
from bzrlib.i18n import gettext
 
49
from bzrlib.smart import client, protocol, request, signals, vfs
46
50
from bzrlib.transport import ssh
47
51
""")
48
52
from bzrlib import osutils
175
179
        ui.ui_factory.report_transport_activity(self, bytes, direction)
176
180
 
177
181
 
 
182
_bad_file_descriptor = (errno.EBADF,)
 
183
if sys.platform == 'win32':
 
184
    # Given on Windows if you pass a closed socket to select.select. Probably
 
185
    # also given if you pass a file handle to select.
 
186
    WSAENOTSOCK = 10038
 
187
    _bad_file_descriptor += (WSAENOTSOCK,)
 
188
 
 
189
 
178
190
class SmartServerStreamMedium(SmartMedium):
179
191
    """Handles smart commands coming over a stream.
180
192
 
193
205
        the stream.  See also the _push_back method.
194
206
    """
195
207
 
196
 
    def __init__(self, backing_transport, root_client_path='/'):
 
208
    _timer = time.time
 
209
 
 
210
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
211
        """Construct new server.
198
212
 
199
213
        :param backing_transport: Transport for the directory served.
202
216
        self.backing_transport = backing_transport
203
217
        self.root_client_path = root_client_path
204
218
        self.finished = False
 
219
        if timeout is None:
 
220
            raise AssertionError('You must supply a timeout.')
 
221
        self._client_timeout = timeout
 
222
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
205
223
        SmartMedium.__init__(self)
206
224
 
207
225
    def serve(self):
213
231
            while not self.finished:
214
232
                server_protocol = self._build_protocol()
215
233
                self._serve_one_request(server_protocol)
 
234
        except errors.ConnectionTimeout, e:
 
235
            trace.note('%s' % (e,))
 
236
            trace.log_exception_quietly()
 
237
            self._disconnect_client()
 
238
            # We reported it, no reason to make a big fuss.
 
239
            return
216
240
        except Exception, e:
217
241
            stderr.write("%s terminating on exception %s\n" % (self, e))
218
242
            raise
 
243
        self._disconnect_client()
 
244
 
 
245
    def _stop_gracefully(self):
 
246
        """When we finish this message, stop looking for more."""
 
247
        trace.mutter('Stopping %s' % (self,))
 
248
        self.finished = True
 
249
 
 
250
    def _disconnect_client(self):
 
251
        """Close the current connection. We stopped due to a timeout/etc."""
 
252
        # The default implementation is a no-op, because that is all we used to
 
253
        # do when disconnecting from a client. I suppose we never had the
 
254
        # *server* initiate a disconnect, before
 
255
 
 
256
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
257
        """Wait for more bytes to be read, but timeout if none available.
 
258
 
 
259
        This allows us to detect idle connections, and stop trying to read from
 
260
        them, without setting the socket itself to non-blocking. This also
 
261
        allows us to specify when we watch for idle timeouts.
 
262
 
 
263
        :return: Did we timeout? (True if we timed out, False if there is data
 
264
            to be read)
 
265
        """
 
266
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
219
267
 
220
268
    def _build_protocol(self):
221
269
        """Identifies the version of the incoming request, and returns an
226
274
 
227
275
        :returns: a SmartServerRequestProtocol.
228
276
        """
 
277
        self._wait_for_bytes_with_timeout(self._client_timeout)
 
278
        if self.finished:
 
279
            # We're stopping, so don't try to do any more work
 
280
            return None
229
281
        bytes = self._get_line()
230
282
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
231
283
        protocol = protocol_factory(
233
285
        protocol.accept_bytes(unused_bytes)
234
286
        return protocol
235
287
 
 
288
    def _wait_on_descriptor(self, fd, timeout_seconds):
 
289
        """select() on a file descriptor, waiting for nonblocking read()
 
290
 
 
291
        This will raise a ConnectionTimeout exception if we do not get a
 
292
        readable handle before timeout_seconds.
 
293
        :return: None
 
294
        """
 
295
        t_end = self._timer() + timeout_seconds
 
296
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
 
297
        rs = xs = None
 
298
        while not rs and not xs and self._timer() < t_end:
 
299
            if self.finished:
 
300
                return
 
301
            try:
 
302
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
 
303
            except (select.error, socket.error) as e:
 
304
                err = getattr(e, 'errno', None)
 
305
                if err is None and getattr(e, 'args', None) is not None:
 
306
                    # select.error doesn't have 'errno', it just has args[0]
 
307
                    err = e.args[0]
 
308
                if err in _bad_file_descriptor:
 
309
                    return # Not a socket indicates read() will fail
 
310
                elif err == errno.EINTR:
 
311
                    # Interrupted, keep looping.
 
312
                    continue
 
313
                raise
 
314
        if rs or xs:
 
315
            return
 
316
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
 
317
                                       % (timeout_seconds,))
 
318
 
236
319
    def _serve_one_request(self, protocol):
237
320
        """Read one request from input, process, send back a response.
238
321
 
239
322
        :param protocol: a SmartServerRequestProtocol.
240
323
        """
 
324
        if protocol is None:
 
325
            return
241
326
        try:
242
327
            self._serve_one_request_unguarded(protocol)
243
328
        except KeyboardInterrupt:
259
344
 
260
345
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
261
346
 
262
 
    def __init__(self, sock, backing_transport, root_client_path='/'):
 
347
    def __init__(self, sock, backing_transport, root_client_path='/',
 
348
                 timeout=None):
263
349
        """Constructor.
264
350
 
265
351
        :param sock: the socket the server will read from.  It will be put
266
352
            into blocking mode.
267
353
        """
268
354
        SmartServerStreamMedium.__init__(
269
 
            self, backing_transport, root_client_path=root_client_path)
 
355
            self, backing_transport, root_client_path=root_client_path,
 
356
            timeout=timeout)
270
357
        sock.setblocking(True)
271
358
        self.socket = sock
 
359
        # Get the getpeername now, as we might be closed later when we care.
 
360
        try:
 
361
            self._client_info = sock.getpeername()
 
362
        except socket.error:
 
363
            self._client_info = '<unknown>'
 
364
 
 
365
    def __str__(self):
 
366
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
 
367
 
 
368
    def __repr__(self):
 
369
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
 
370
            self._client_info)
272
371
 
273
372
    def _serve_one_request_unguarded(self, protocol):
274
373
        while protocol.next_read_size():
283
382
 
284
383
        self._push_back(protocol.unused_data)
285
384
 
 
385
    def _disconnect_client(self):
 
386
        """Close the current connection. We stopped due to a timeout/etc."""
 
387
        self.socket.close()
 
388
 
 
389
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
390
        """Wait for more bytes to be read, but timeout if none available.
 
391
 
 
392
        This allows us to detect idle connections, and stop trying to read from
 
393
        them, without setting the socket itself to non-blocking. This also
 
394
        allows us to specify when we watch for idle timeouts.
 
395
 
 
396
        :return: None, this will raise ConnectionTimeout if we time out before
 
397
            data is available.
 
398
        """
 
399
        return self._wait_on_descriptor(self.socket, timeout_seconds)
 
400
 
286
401
    def _read_bytes(self, desired_count):
287
402
        return osutils.read_bytes_from_socket(
288
403
            self.socket, self._report_activity)
305
420
 
306
421
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
307
422
 
308
 
    def __init__(self, in_file, out_file, backing_transport):
 
423
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
424
        """Construct new server.
310
425
 
311
426
        :param in_file: Python file from which requests can be read.
312
427
        :param out_file: Python file to write responses.
313
428
        :param backing_transport: Transport for the directory served.
314
429
        """
315
 
        SmartServerStreamMedium.__init__(self, backing_transport)
 
430
        SmartServerStreamMedium.__init__(self, backing_transport,
 
431
            timeout=timeout)
316
432
        if sys.platform == 'win32':
317
433
            # force binary mode for files
318
434
            import msvcrt
323
439
        self._in = in_file
324
440
        self._out = out_file
325
441
 
 
442
    def serve(self):
 
443
        """See SmartServerStreamMedium.serve"""
 
444
        # This is the regular serve, except it adds signal trapping for soft
 
445
        # shutdown.
 
446
        stop_gracefully = self._stop_gracefully
 
447
        signals.register_on_hangup(id(self), stop_gracefully)
 
448
        try:
 
449
            return super(SmartServerPipeStreamMedium, self).serve()
 
450
        finally:
 
451
            signals.unregister_on_hangup(id(self))
 
452
 
326
453
    def _serve_one_request_unguarded(self, protocol):
327
454
        while True:
328
455
            # We need to be careful not to read past the end of the current
341
468
                return
342
469
            protocol.accept_bytes(bytes)
343
470
 
 
471
    def _disconnect_client(self):
 
472
        self._in.close()
 
473
        self._out.flush()
 
474
        self._out.close()
 
475
 
 
476
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
477
        """Wait for more bytes to be read, but timeout if none available.
 
478
 
 
479
        This allows us to detect idle connections, and stop trying to read from
 
480
        them, without setting the socket itself to non-blocking. This also
 
481
        allows us to specify when we watch for idle timeouts.
 
482
 
 
483
        :return: None, this will raise ConnectionTimeout if we time out before
 
484
            data is available.
 
485
        """
 
486
        if (getattr(self._in, 'fileno', None) is None
 
487
            or sys.platform == 'win32'):
 
488
            # You can't select() file descriptors on Windows.
 
489
            return
 
490
        return self._wait_on_descriptor(self._in, timeout_seconds)
 
491
 
344
492
    def _read_bytes(self, desired_count):
345
493
        return self._in.read(desired_count)
346
494
 
561
709
        value['count'] = 0
562
710
        value['vfs_count'] = 0
563
711
        if count != 0:
564
 
            trace.note('HPSS calls: %d (%d vfs) %s',
565
 
                       count, vfs_count, medium_repr)
 
712
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
 
713
                       count, vfs_count, medium_repr))
566
714
 
567
715
    def flush_all(self):
568
716
        for ref in list(self.counts.keys()):