~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
import errno
27
28
import os
28
29
import sys
 
30
import time
29
31
import urllib
30
32
 
31
33
import bzrlib
32
34
from bzrlib.lazy_import import lazy_import
33
35
lazy_import(globals(), """
 
36
import select
34
37
import socket
35
38
import thread
36
39
import weakref
43
46
    urlutils,
44
47
    )
45
48
from bzrlib.i18n import gettext
46
 
from bzrlib.smart import client, protocol, request, vfs
 
49
from bzrlib.smart import client, protocol, request, signals, vfs
47
50
from bzrlib.transport import ssh
48
51
""")
49
52
from bzrlib import osutils
176
179
        ui.ui_factory.report_transport_activity(self, bytes, direction)
177
180
 
178
181
 
 
182
_bad_file_descriptor = (errno.EBADF,)
 
183
if sys.platform == 'win32':
 
184
    # Given on Windows if you pass a closed socket to select.select. Probably
 
185
    # also given if you pass a file handle to select.
 
186
    WSAENOTSOCK = 10038
 
187
    _bad_file_descriptor += (WSAENOTSOCK,)
 
188
 
 
189
 
179
190
class SmartServerStreamMedium(SmartMedium):
180
191
    """Handles smart commands coming over a stream.
181
192
 
194
205
        the stream.  See also the _push_back method.
195
206
    """
196
207
 
197
 
    def __init__(self, backing_transport, root_client_path='/'):
 
208
    _timer = time.time
 
209
 
 
210
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
211
        """Construct new server.
199
212
 
200
213
        :param backing_transport: Transport for the directory served.
203
216
        self.backing_transport = backing_transport
204
217
        self.root_client_path = root_client_path
205
218
        self.finished = False
 
219
        if timeout is None:
 
220
            raise AssertionError('You must supply a timeout.')
 
221
        self._client_timeout = timeout
 
222
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
223
        SmartMedium.__init__(self)
207
224
 
208
225
    def serve(self):
213
230
        try:
214
231
            while not self.finished:
215
232
                server_protocol = self._build_protocol()
 
233
                # TODO: This seems inelegant:
 
234
                if server_protocol is None:
 
235
                    # We could 'continue' only to notice that self.finished is
 
236
                    # True...
 
237
                    break
216
238
                self._serve_one_request(server_protocol)
 
239
        except errors.ConnectionTimeout, e:
 
240
            trace.note('%s' % (e,))
 
241
            trace.log_exception_quietly()
 
242
            self._disconnect_client()
 
243
            # We reported it, no reason to make a big fuss.
 
244
            return
217
245
        except Exception, e:
218
246
            stderr.write("%s terminating on exception %s\n" % (self, e))
219
247
            raise
 
248
        self._disconnect_client()
 
249
 
 
250
    def _stop_gracefully(self):
 
251
        """When we finish this message, stop looking for more."""
 
252
        trace.mutter('Stopping %s' % (self,))
 
253
        self.finished = True
 
254
 
 
255
    def _disconnect_client(self):
 
256
        """Close the current connection. We stopped due to a timeout/etc."""
 
257
        # The default implementation is a no-op, because that is all we used to
 
258
        # do when disconnecting from a client. I suppose we never had the
 
259
        # *server* initiate a disconnect, before
 
260
 
 
261
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
262
        """Wait for more bytes to be read, but timeout if none available.
 
263
 
 
264
        This allows us to detect idle connections, and stop trying to read from
 
265
        them, without setting the socket itself to non-blocking. This also
 
266
        allows us to specify when we watch for idle timeouts.
 
267
 
 
268
        :return: Did we timeout? (True if we timed out, False if there is data
 
269
            to be read)
 
270
        """
 
271
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
220
272
 
221
273
    def _build_protocol(self):
222
274
        """Identifies the version of the incoming request, and returns an
227
279
 
228
280
        :returns: a SmartServerRequestProtocol.
229
281
        """
 
282
        self._wait_for_bytes_with_timeout(self._client_timeout)
 
283
        if self.finished:
 
284
            # We're stopping, so don't try to do any more work
 
285
            return None
230
286
        bytes = self._get_line()
231
287
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
288
        protocol = protocol_factory(
234
290
        protocol.accept_bytes(unused_bytes)
235
291
        return protocol
236
292
 
 
293
    def _wait_on_descriptor(self, fd, timeout_seconds):
 
294
        """select() on a file descriptor, waiting for nonblocking read()
 
295
 
 
296
        This will raise a ConnectionTimeout exception if we do not get a
 
297
        readable handle before timeout_seconds.
 
298
        :return: None
 
299
        """
 
300
        t_end = self._timer() + timeout_seconds
 
301
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
 
302
        rs = xs = None
 
303
        while not rs and not xs and self._timer() < t_end:
 
304
            if self.finished:
 
305
                return
 
306
            try:
 
307
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
 
308
            except (select.error, socket.error) as e:
 
309
                err = getattr(e, 'errno', None)
 
310
                if err is None and getattr(e, 'args', None) is not None:
 
311
                    # select.error doesn't have 'errno', it just has args[0]
 
312
                    err = e.args[0]
 
313
                if err in _bad_file_descriptor:
 
314
                    return # Not a socket indicates read() will fail
 
315
                elif err == errno.EINTR:
 
316
                    # Interrupted, keep looping.
 
317
                    continue
 
318
                raise
 
319
        if rs or xs:
 
320
            return
 
321
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
 
322
                                       % (timeout_seconds,))
 
323
 
237
324
    def _serve_one_request(self, protocol):
238
325
        """Read one request from input, process, send back a response.
239
326
 
260
347
 
261
348
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
262
349
 
263
 
    def __init__(self, sock, backing_transport, root_client_path='/'):
 
350
    def __init__(self, sock, backing_transport, root_client_path='/',
 
351
                 timeout=None):
264
352
        """Constructor.
265
353
 
266
354
        :param sock: the socket the server will read from.  It will be put
267
355
            into blocking mode.
268
356
        """
269
357
        SmartServerStreamMedium.__init__(
270
 
            self, backing_transport, root_client_path=root_client_path)
 
358
            self, backing_transport, root_client_path=root_client_path,
 
359
            timeout=timeout)
271
360
        sock.setblocking(True)
272
361
        self.socket = sock
 
362
        # Get the getpeername now, as we might be closed later when we care.
 
363
        try:
 
364
            self._client_info = sock.getpeername()
 
365
        except socket.error:
 
366
            self._client_info = '<unknown>'
 
367
 
 
368
    def __str__(self):
 
369
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
 
370
 
 
371
    def __repr__(self):
 
372
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
 
373
            self._client_info)
273
374
 
274
375
    def _serve_one_request_unguarded(self, protocol):
275
376
        while protocol.next_read_size():
284
385
 
285
386
        self._push_back(protocol.unused_data)
286
387
 
 
388
    def _disconnect_client(self):
 
389
        """Close the current connection. We stopped due to a timeout/etc."""
 
390
        self.socket.close()
 
391
 
 
392
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
393
        """Wait for more bytes to be read, but timeout if none available.
 
394
 
 
395
        This allows us to detect idle connections, and stop trying to read from
 
396
        them, without setting the socket itself to non-blocking. This also
 
397
        allows us to specify when we watch for idle timeouts.
 
398
 
 
399
        :return: None, this will raise ConnectionTimeout if we time out before
 
400
            data is available.
 
401
        """
 
402
        return self._wait_on_descriptor(self.socket, timeout_seconds)
 
403
 
287
404
    def _read_bytes(self, desired_count):
288
405
        return osutils.read_bytes_from_socket(
289
406
            self.socket, self._report_activity)
306
423
 
307
424
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
308
425
 
309
 
    def __init__(self, in_file, out_file, backing_transport):
 
426
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
427
        """Construct new server.
311
428
 
312
429
        :param in_file: Python file from which requests can be read.
313
430
        :param out_file: Python file to write responses.
314
431
        :param backing_transport: Transport for the directory served.
315
432
        """
316
 
        SmartServerStreamMedium.__init__(self, backing_transport)
 
433
        SmartServerStreamMedium.__init__(self, backing_transport,
 
434
            timeout=timeout)
317
435
        if sys.platform == 'win32':
318
436
            # force binary mode for files
319
437
            import msvcrt
324
442
        self._in = in_file
325
443
        self._out = out_file
326
444
 
 
445
    def serve(self):
 
446
        """See SmartServerStreamMedium.serve"""
 
447
        # This is the regular serve, except it adds signal trapping for soft
 
448
        # shutdown.
 
449
        stop_gracefully = self._stop_gracefully
 
450
        signals.register_on_hangup(id(self), stop_gracefully)
 
451
        try:
 
452
            return super(SmartServerPipeStreamMedium, self).serve()
 
453
        finally:
 
454
            signals.unregister_on_hangup(id(self))
 
455
 
327
456
    def _serve_one_request_unguarded(self, protocol):
328
457
        while True:
329
458
            # We need to be careful not to read past the end of the current
342
471
                return
343
472
            protocol.accept_bytes(bytes)
344
473
 
 
474
    def _disconnect_client(self):
 
475
        self._in.close()
 
476
        self._out.flush()
 
477
        self._out.close()
 
478
 
 
479
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
480
        """Wait for more bytes to be read, but timeout if none available.
 
481
 
 
482
        This allows us to detect idle connections, and stop trying to read from
 
483
        them, without setting the socket itself to non-blocking. This also
 
484
        allows us to specify when we watch for idle timeouts.
 
485
 
 
486
        :return: None, this will raise ConnectionTimeout if we time out before
 
487
            data is available.
 
488
        """
 
489
        if (getattr(self._in, 'fileno', None) is None
 
490
            or sys.platform == 'win32'):
 
491
            # You can't select() file descriptors on Windows.
 
492
            return
 
493
        return self._wait_on_descriptor(self._in, timeout_seconds)
 
494
 
345
495
    def _read_bytes(self, desired_count):
346
496
        return self._in.read(desired_count)
347
497