~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/server.py

  • Committer: Vincent Ladeuil
  • Date: 2011-10-31 22:11:59 UTC
  • mfrom: (6236 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6252.
  • Revision ID: v.ladeuil+lp@free.fr-20111031221159-zmbd6izz1flzyisa
Merge trunk before submission

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
import os.path
21
21
import socket
22
22
import sys
 
23
import time
23
24
import threading
24
25
 
25
26
from bzrlib.hooks import Hooks
28
29
    trace,
29
30
    transport as _mod_transport,
30
31
)
 
32
from bzrlib.i18n import gettext
31
33
from bzrlib.lazy_import import lazy_import
32
34
lazy_import(globals(), """
33
 
from bzrlib.smart import medium
 
35
from bzrlib.smart import (
 
36
    medium,
 
37
    signals,
 
38
    )
34
39
from bzrlib.transport import (
35
40
    chroot,
36
41
    pathfilter,
37
42
    )
38
43
from bzrlib import (
 
44
    config,
39
45
    urlutils,
40
46
    )
41
47
""")
50
56
    hooks: An instance of SmartServerHooks.
51
57
    """
52
58
 
53
 
    def __init__(self, backing_transport, root_client_path='/'):
 
59
    # This is the timeout on the socket we use .accept() on. It is exposed here
 
60
    # so the test suite can set it faster. (It thread.interrupt_main() will not
 
61
    # fire a KeyboardInterrupt during socket.accept)
 
62
    _ACCEPT_TIMEOUT = 1.0
 
63
    _SHUTDOWN_POLL_TIMEOUT = 1.0
 
64
    _LOG_WAITING_TIMEOUT = 10.0
 
65
 
 
66
    _timer = time.time
 
67
 
 
68
    def __init__(self, backing_transport, root_client_path='/',
 
69
                 client_timeout=None):
54
70
        """Construct a new server.
55
71
 
56
72
        To actually start it running, call either start_background_thread or
59
75
        :param backing_transport: The transport to serve.
60
76
        :param root_client_path: The client path that will correspond to root
61
77
            of backing_transport.
 
78
        :param client_timeout: See SmartServerSocketStreamMedium's timeout
 
79
            parameter.
62
80
        """
63
81
        self.backing_transport = backing_transport
64
82
        self.root_client_path = root_client_path
 
83
        self._client_timeout = client_timeout
 
84
        self._active_connections = []
 
85
        # This is set to indicate we want to wait for clients to finish before
 
86
        # we disconnect.
 
87
        self._gracefully_stopping = False
65
88
 
66
89
    def start_server(self, host, port):
67
90
        """Create the server listening socket.
93
116
        self._sockname = self._server_socket.getsockname()
94
117
        self.port = self._sockname[1]
95
118
        self._server_socket.listen(1)
96
 
        self._server_socket.settimeout(1)
 
119
        self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
 
120
        # Once we start accept()ing connections, we set started.
97
121
        self._started = threading.Event()
 
122
        # Once we stop accept()ing connections (and are closing the socket) we
 
123
        # set _stopped
98
124
        self._stopped = threading.Event()
 
125
        # Once we have finished waiting for all clients, etc. We set
 
126
        # _fully_stopped
 
127
        self._fully_stopped = threading.Event()
99
128
 
100
129
    def _backing_urls(self):
101
130
        # There are three interesting urls:
134
163
        for hook in SmartTCPServer.hooks['server_stopped']:
135
164
            hook(backing_urls, self.get_url())
136
165
 
 
166
    def _stop_gracefully(self):
 
167
        trace.note(gettext('Requested to stop gracefully'))
 
168
        self._should_terminate = True
 
169
        self._gracefully_stopping = True
 
170
        for handler, _ in self._active_connections:
 
171
            handler._stop_gracefully()
 
172
 
 
173
    def _wait_for_clients_to_disconnect(self):
 
174
        self._poll_active_connections()
 
175
        if not self._active_connections:
 
176
            return
 
177
        trace.note(gettext('Waiting for %d client(s) to finish')
 
178
                   % (len(self._active_connections),))
 
179
        t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
 
180
        while self._active_connections:
 
181
            now = self._timer()
 
182
            if now >= t_next_log:
 
183
                trace.note(gettext('Still waiting for %d client(s) to finish')
 
184
                           % (len(self._active_connections),))
 
185
                t_next_log = now + self._LOG_WAITING_TIMEOUT
 
186
            self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
 
187
 
137
188
    def serve(self, thread_name_suffix=''):
 
189
        # Note: There is a temptation to do
 
190
        #       signals.register_on_hangup(id(self), self._stop_gracefully)
 
191
        #       However, that creates a temporary object which is a bound
 
192
        #       method. signals._on_sighup is a WeakKeyDictionary so it
 
193
        #       immediately gets garbage collected, because nothing else
 
194
        #       references it. Instead, we need to keep a real reference to the
 
195
        #       bound method for the lifetime of the serve() function.
 
196
        stop_gracefully = self._stop_gracefully
 
197
        signals.register_on_hangup(id(self), stop_gracefully)
138
198
        self._should_terminate = False
139
199
        # for hooks we are letting code know that a server has started (and
140
200
        # later stopped).
150
210
                        pass
151
211
                    except self._socket_error, e:
152
212
                        # if the socket is closed by stop_background_thread
153
 
                        # we might get a EBADF here, any other socket errors
154
 
                        # should get logged.
155
 
                        if e.args[0] != errno.EBADF:
156
 
                            trace.warning("listening socket error: %s", e)
 
213
                        # we might get a EBADF here, or if we get a signal we
 
214
                        # can get EINTR, any other socket errors should get
 
215
                        # logged.
 
216
                        if e.args[0] not in (errno.EBADF, errno.EINTR):
 
217
                            trace.warning(gettext("listening socket error: %s")
 
218
                                          % (e,))
157
219
                    else:
158
220
                        if self._should_terminate:
 
221
                            conn.close()
159
222
                            break
160
223
                        self.serve_conn(conn, thread_name_suffix)
 
224
                    # Cleanout any threads that have finished processing.
 
225
                    self._poll_active_connections()
161
226
            except KeyboardInterrupt:
162
227
                # dont log when CTRL-C'd.
163
228
                raise
165
230
                trace.report_exception(sys.exc_info(), sys.stderr)
166
231
                raise
167
232
        finally:
168
 
            self._stopped.set()
169
233
            try:
170
234
                # ensure the server socket is closed.
171
235
                self._server_socket.close()
172
236
            except self._socket_error:
173
237
                # ignore errors on close
174
238
                pass
 
239
            self._stopped.set()
 
240
            signals.unregister_on_hangup(id(self))
175
241
            self.run_server_stopped_hooks()
 
242
        if self._gracefully_stopping:
 
243
            self._wait_for_clients_to_disconnect()
 
244
        self._fully_stopped.set()
176
245
 
177
246
    def get_url(self):
178
247
        """Return the url of the server"""
179
248
        return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
180
249
 
 
250
    def _make_handler(self, conn):
 
251
        return medium.SmartServerSocketStreamMedium(
 
252
            conn, self.backing_transport, self.root_client_path,
 
253
            timeout=self._client_timeout)
 
254
 
 
255
    def _poll_active_connections(self, timeout=0.0):
 
256
        """Check to see if any active connections have finished.
 
257
 
 
258
        This will iterate through self._active_connections, and update any
 
259
        connections that are finished.
 
260
 
 
261
        :param timeout: The timeout to pass to thread.join(). By default, we
 
262
            set it to 0, so that we don't hang if threads are not done yet.
 
263
        :return: None
 
264
        """
 
265
        still_active = []
 
266
        for handler, thread in self._active_connections:
 
267
            thread.join(timeout)
 
268
            if thread.isAlive():
 
269
                still_active.append((handler, thread))
 
270
        self._active_connections = still_active
 
271
 
181
272
    def serve_conn(self, conn, thread_name_suffix):
182
273
        # For WIN32, where the timeout value from the listening socket
183
274
        # propagates to the newly accepted socket.
184
275
        conn.setblocking(True)
185
276
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
186
 
        handler = medium.SmartServerSocketStreamMedium(
187
 
            conn, self.backing_transport, self.root_client_path)
188
277
        thread_name = 'smart-server-child' + thread_name_suffix
 
278
        handler = self._make_handler(conn)
189
279
        connection_thread = threading.Thread(
190
280
            None, handler.serve, name=thread_name)
191
 
        # FIXME: This thread is never joined, it should at least be collected
192
 
        # somewhere so that tests that want to check for leaked threads can get
193
 
        # rid of them -- vila 20100531
 
281
        self._active_connections.append((handler, connection_thread))
194
282
        connection_thread.setDaemon(True)
195
283
        connection_thread.start()
196
284
        return connection_thread
340
428
            transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
341
429
        self.transport = transport
342
430
 
343
 
    def _make_smart_server(self, host, port, inet):
 
431
    def _get_stdin_stdout(self):
 
432
        return sys.stdin, sys.stdout
 
433
 
 
434
    def _make_smart_server(self, host, port, inet, timeout):
 
435
        if timeout is None:
 
436
            c = config.GlobalStack()
 
437
            timeout = c.get('serve.client_timeout')
344
438
        if inet:
 
439
            stdin, stdout = self._get_stdin_stdout()
345
440
            smart_server = medium.SmartServerPipeStreamMedium(
346
 
                sys.stdin, sys.stdout, self.transport)
 
441
                stdin, stdout, self.transport, timeout=timeout)
347
442
        else:
348
443
            if host is None:
349
444
                host = medium.BZR_DEFAULT_INTERFACE
350
445
            if port is None:
351
446
                port = medium.BZR_DEFAULT_PORT
352
 
            smart_server = SmartTCPServer(self.transport)
 
447
            smart_server = SmartTCPServer(self.transport,
 
448
                                          client_timeout=timeout)
353
449
            smart_server.start_server(host, port)
354
 
            trace.note('listening on port: %s' % smart_server.port)
 
450
            trace.note(gettext('listening on port: %s') % smart_server.port)
355
451
        self.smart_server = smart_server
356
452
 
357
453
    def _change_globals(self):
368
464
        self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
369
465
        ui.ui_factory = ui.SilentUIFactory()
370
466
        lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
467
        orig = signals.install_sighup_handler()
 
468
        def restore_signals():
 
469
            signals.restore_sighup_handler(orig)
 
470
        self.cleanups.append(restore_signals)
371
471
 
372
 
    def set_up(self, transport, host, port, inet):
 
472
    def set_up(self, transport, host, port, inet, timeout):
373
473
        self._make_backing_transport(transport)
374
 
        self._make_smart_server(host, port, inet)
 
474
        self._make_smart_server(host, port, inet, timeout)
375
475
        self._change_globals()
376
476
 
377
477
    def tear_down(self):
378
478
        for cleanup in reversed(self.cleanups):
379
479
            cleanup()
380
480
 
381
 
def serve_bzr(transport, host=None, port=None, inet=False):
 
481
 
 
482
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
382
483
    """This is the default implementation of 'bzr serve'.
383
 
    
 
484
 
384
485
    It creates a TCP or pipe smart server on 'transport, and runs it.  The
385
486
    transport will be decorated with a chroot and pathfilter (using
386
487
    os.path.expanduser).
387
488
    """
388
489
    bzr_server = BzrServerFactory()
389
490
    try:
390
 
        bzr_server.set_up(transport, host, port, inet)
 
491
        bzr_server.set_up(transport, host, port, inet, timeout)
391
492
        bzr_server.smart_server.serve()
392
493
    except:
393
494
        hook_caught_exception = False