~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/server.py

Merge bzr.dev, update to use new hooks.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
18
18
 
19
19
import errno
20
20
import os.path
21
 
import select
22
21
import socket
23
22
import sys
 
23
import time
24
24
import threading
25
25
 
26
 
from bzrlib.hooks import HookPoint, Hooks
 
26
from bzrlib.hooks import Hooks
27
27
from bzrlib import (
28
28
    errors,
29
29
    trace,
30
 
    transport,
 
30
    transport as _mod_transport,
31
31
)
 
32
from bzrlib.i18n import gettext
32
33
from bzrlib.lazy_import import lazy_import
33
34
lazy_import(globals(), """
34
 
from bzrlib.smart import medium
 
35
from bzrlib.smart import (
 
36
    medium,
 
37
    signals,
 
38
    )
35
39
from bzrlib.transport import (
36
40
    chroot,
37
 
    get_transport,
38
41
    pathfilter,
39
42
    )
40
43
from bzrlib import (
 
44
    config,
41
45
    urlutils,
42
46
    )
43
47
""")
52
56
    hooks: An instance of SmartServerHooks.
53
57
    """
54
58
 
55
 
    def __init__(self, backing_transport, root_client_path='/'):
 
59
    # This is the timeout on the socket we use .accept() on. It is exposed here
 
60
    # so the test suite can set it faster. (It thread.interrupt_main() will not
 
61
    # fire a KeyboardInterrupt during socket.accept)
 
62
    _ACCEPT_TIMEOUT = 1.0
 
63
    _SHUTDOWN_POLL_TIMEOUT = 1.0
 
64
    _LOG_WAITING_TIMEOUT = 10.0
 
65
 
 
66
    _timer = time.time
 
67
 
 
68
    def __init__(self, backing_transport, root_client_path='/',
 
69
                 client_timeout=None):
56
70
        """Construct a new server.
57
71
 
58
72
        To actually start it running, call either start_background_thread or
61
75
        :param backing_transport: The transport to serve.
62
76
        :param root_client_path: The client path that will correspond to root
63
77
            of backing_transport.
 
78
        :param client_timeout: See SmartServerSocketStreamMedium's timeout
 
79
            parameter.
64
80
        """
65
81
        self.backing_transport = backing_transport
66
82
        self.root_client_path = root_client_path
 
83
        self._client_timeout = client_timeout
 
84
        self._active_connections = []
 
85
        # This is set to indicate we want to wait for clients to finish before
 
86
        # we disconnect.
 
87
        self._gracefully_stopping = False
67
88
 
68
89
    def start_server(self, host, port):
69
90
        """Create the server listening socket.
95
116
        self._sockname = self._server_socket.getsockname()
96
117
        self.port = self._sockname[1]
97
118
        self._server_socket.listen(1)
98
 
        self._server_socket.settimeout(1)
 
119
        self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
 
120
        # Once we start accept()ing connections, we set started.
99
121
        self._started = threading.Event()
 
122
        # Once we stop accept()ing connections (and are closing the socket) we
 
123
        # set _stopped
100
124
        self._stopped = threading.Event()
 
125
        # Once we have finished waiting for all clients, etc. We set
 
126
        # _fully_stopped
 
127
        self._fully_stopped = threading.Event()
101
128
 
102
129
    def _backing_urls(self):
103
130
        # There are three interesting urls:
105
132
        # The URL that a commit done on the same machine as the server will
106
133
        # have within the servers space. (e.g. file:///home/user/source)
107
134
        # The URL that will be given to other hooks in the same process -
108
 
        # the URL of the backing transport itself. (e.g. chroot+:///)
 
135
        # the URL of the backing transport itself. (e.g. filtered-36195:///)
109
136
        # We need all three because:
110
137
        #  * other machines see the first
111
138
        #  * local commits on this machine should be able to be mapped to
136
163
        for hook in SmartTCPServer.hooks['server_stopped']:
137
164
            hook(backing_urls, self.get_url())
138
165
 
 
166
    def _stop_gracefully(self):
 
167
        trace.note(gettext('Requested to stop gracefully'))
 
168
        self._should_terminate = True
 
169
        self._gracefully_stopping = True
 
170
        for handler, _ in self._active_connections:
 
171
            handler._stop_gracefully()
 
172
 
 
173
    def _wait_for_clients_to_disconnect(self):
 
174
        self._poll_active_connections()
 
175
        if not self._active_connections:
 
176
            return
 
177
        trace.note(gettext('Waiting for %d client(s) to finish')
 
178
                   % (len(self._active_connections),))
 
179
        t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
 
180
        while self._active_connections:
 
181
            now = self._timer()
 
182
            if now >= t_next_log:
 
183
                trace.note(gettext('Still waiting for %d client(s) to finish')
 
184
                           % (len(self._active_connections),))
 
185
                t_next_log = now + self._LOG_WAITING_TIMEOUT
 
186
            self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
 
187
 
139
188
    def serve(self, thread_name_suffix=''):
 
189
        # Note: There is a temptation to do
 
190
        #       signals.register_on_hangup(id(self), self._stop_gracefully)
 
191
        #       However, that creates a temporary object which is a bound
 
192
        #       method. signals._on_sighup is a WeakKeyDictionary so it
 
193
        #       immediately gets garbage collected, because nothing else
 
194
        #       references it. Instead, we need to keep a real reference to the
 
195
        #       bound method for the lifetime of the serve() function.
 
196
        stop_gracefully = self._stop_gracefully
 
197
        signals.register_on_hangup(id(self), stop_gracefully)
140
198
        self._should_terminate = False
141
199
        # for hooks we are letting code know that a server has started (and
142
200
        # later stopped).
152
210
                        pass
153
211
                    except self._socket_error, e:
154
212
                        # if the socket is closed by stop_background_thread
155
 
                        # we might get a EBADF here, any other socket errors
156
 
                        # should get logged.
157
 
                        if e.args[0] != errno.EBADF:
158
 
                            trace.warning("listening socket error: %s", e)
 
213
                        # we might get a EBADF here, or if we get a signal we
 
214
                        # can get EINTR, any other socket errors should get
 
215
                        # logged.
 
216
                        if e.args[0] not in (errno.EBADF, errno.EINTR):
 
217
                            trace.warning(gettext("listening socket error: %s")
 
218
                                          % (e,))
159
219
                    else:
160
220
                        if self._should_terminate:
 
221
                            conn.close()
161
222
                            break
162
223
                        self.serve_conn(conn, thread_name_suffix)
 
224
                    # Cleanout any threads that have finished processing.
 
225
                    self._poll_active_connections()
163
226
            except KeyboardInterrupt:
164
227
                # dont log when CTRL-C'd.
165
228
                raise
167
230
                trace.report_exception(sys.exc_info(), sys.stderr)
168
231
                raise
169
232
        finally:
170
 
            self._stopped.set()
171
233
            try:
172
234
                # ensure the server socket is closed.
173
235
                self._server_socket.close()
174
236
            except self._socket_error:
175
237
                # ignore errors on close
176
238
                pass
 
239
            self._stopped.set()
 
240
            signals.unregister_on_hangup(id(self))
177
241
            self.run_server_stopped_hooks()
 
242
        if self._gracefully_stopping:
 
243
            self._wait_for_clients_to_disconnect()
 
244
        self._fully_stopped.set()
178
245
 
179
246
    def get_url(self):
180
247
        """Return the url of the server"""
181
 
        return "bzr://%s:%d/" % self._sockname
 
248
        return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
 
249
 
 
250
    def _make_handler(self, conn):
 
251
        return medium.SmartServerSocketStreamMedium(
 
252
            conn, self.backing_transport, self.root_client_path,
 
253
            timeout=self._client_timeout)
 
254
 
 
255
    def _poll_active_connections(self, timeout=0.0):
 
256
        """Check to see if any active connections have finished.
 
257
 
 
258
        This will iterate through self._active_connections, and update any
 
259
        connections that are finished.
 
260
 
 
261
        :param timeout: The timeout to pass to thread.join(). By default, we
 
262
            set it to 0, so that we don't hang if threads are not done yet.
 
263
        :return: None
 
264
        """
 
265
        still_active = []
 
266
        for handler, thread in self._active_connections:
 
267
            thread.join(timeout)
 
268
            if thread.isAlive():
 
269
                still_active.append((handler, thread))
 
270
        self._active_connections = still_active
182
271
 
183
272
    def serve_conn(self, conn, thread_name_suffix):
184
273
        # For WIN32, where the timeout value from the listening socket
185
274
        # propagates to the newly accepted socket.
186
275
        conn.setblocking(True)
187
276
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
188
 
        handler = medium.SmartServerSocketStreamMedium(
189
 
            conn, self.backing_transport, self.root_client_path)
190
277
        thread_name = 'smart-server-child' + thread_name_suffix
 
278
        handler = self._make_handler(conn)
191
279
        connection_thread = threading.Thread(
192
280
            None, handler.serve, name=thread_name)
193
 
        # FIXME: This thread is never joined, it should at least be collected
194
 
        # somewhere so that tests that want to check for leaked threads can get
195
 
        # rid of them -- vila 20100531
 
281
        self._active_connections.append((handler, connection_thread))
196
282
        connection_thread.setDaemon(True)
197
283
        connection_thread.start()
198
284
        return connection_thread
241
327
        These are all empty initially, because by default nothing should get
242
328
        notified.
243
329
        """
244
 
        Hooks.__init__(self)
245
 
        self.create_hook(HookPoint('server_started',
 
330
        Hooks.__init__(self, "bzrlib.smart.server", "SmartTCPServer.hooks")
 
331
        self.add_hook('server_started',
246
332
            "Called by the bzr server when it starts serving a directory. "
247
333
            "server_started is called with (backing urls, public url), "
248
334
            "where backing_url is a list of URLs giving the "
249
335
            "server-specific directory locations, and public_url is the "
250
 
            "public URL for the directory being served.", (0, 16), None))
251
 
        self.create_hook(HookPoint('server_started_ex',
 
336
            "public URL for the directory being served.", (0, 16))
 
337
        self.add_hook('server_started_ex',
252
338
            "Called by the bzr server when it starts serving a directory. "
253
339
            "server_started is called with (backing_urls, server_obj).",
254
 
            (1, 17), None))
255
 
        self.create_hook(HookPoint('server_stopped',
 
340
            (1, 17))
 
341
        self.add_hook('server_stopped',
256
342
            "Called by the bzr server when it stops serving a directory. "
257
343
            "server_stopped is called with the same parameters as the "
258
 
            "server_started hook: (backing_urls, public_url).", (0, 16), None))
 
344
            "server_started hook: (backing_urls, public_url).", (0, 16))
 
345
        self.add_hook('server_exception',
 
346
            "Called by the bzr server when an exception occurs. "
 
347
            "server_exception is called with the sys.exc_info() tuple "
 
348
            "return true for the hook if the exception has been handled, "
 
349
            "in which case the server will exit normally.", (2, 4))
259
350
 
260
351
SmartTCPServer.hooks = SmartServerHooks()
261
352
 
327
418
        chroot_server = chroot.ChrootServer(transport)
328
419
        chroot_server.start_server()
329
420
        self.cleanups.append(chroot_server.stop_server)
330
 
        transport = get_transport(chroot_server.get_url())
 
421
        transport = _mod_transport.get_transport_from_url(chroot_server.get_url())
331
422
        if self.base_path is not None:
332
423
            # Decorate the server's backing transport with a filter that can
333
424
            # expand homedirs.
334
425
            expand_userdirs = self._make_expand_userdirs_filter(transport)
335
426
            expand_userdirs.start_server()
336
427
            self.cleanups.append(expand_userdirs.stop_server)
337
 
            transport = get_transport(expand_userdirs.get_url())
 
428
            transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
338
429
        self.transport = transport
339
430
 
340
 
    def _make_smart_server(self, host, port, inet):
 
431
    def _get_stdin_stdout(self):
 
432
        return sys.stdin, sys.stdout
 
433
 
 
434
    def _make_smart_server(self, host, port, inet, timeout):
 
435
        if timeout is None:
 
436
            c = config.GlobalStack()
 
437
            timeout = c.get('serve.client_timeout')
341
438
        if inet:
 
439
            stdin, stdout = self._get_stdin_stdout()
342
440
            smart_server = medium.SmartServerPipeStreamMedium(
343
 
                sys.stdin, sys.stdout, self.transport)
 
441
                stdin, stdout, self.transport, timeout=timeout)
344
442
        else:
345
443
            if host is None:
346
444
                host = medium.BZR_DEFAULT_INTERFACE
347
445
            if port is None:
348
446
                port = medium.BZR_DEFAULT_PORT
349
 
            smart_server = SmartTCPServer(self.transport)
 
447
            smart_server = SmartTCPServer(self.transport,
 
448
                                          client_timeout=timeout)
350
449
            smart_server.start_server(host, port)
351
 
            trace.note('listening on port: %s' % smart_server.port)
 
450
            trace.note(gettext('listening on port: %s') % smart_server.port)
352
451
        self.smart_server = smart_server
353
452
 
354
453
    def _change_globals(self):
365
464
        self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
366
465
        ui.ui_factory = ui.SilentUIFactory()
367
466
        lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
467
        orig = signals.install_sighup_handler()
 
468
        def restore_signals():
 
469
            signals.restore_sighup_handler(orig)
 
470
        self.cleanups.append(restore_signals)
368
471
 
369
 
    def set_up(self, transport, host, port, inet):
 
472
    def set_up(self, transport, host, port, inet, timeout):
370
473
        self._make_backing_transport(transport)
371
 
        self._make_smart_server(host, port, inet)
 
474
        self._make_smart_server(host, port, inet, timeout)
372
475
        self._change_globals()
373
476
 
374
477
    def tear_down(self):
376
479
            cleanup()
377
480
 
378
481
 
379
 
def serve_bzr(transport, host=None, port=None, inet=False):
 
482
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
380
483
    """This is the default implementation of 'bzr serve'.
381
 
    
 
484
 
382
485
    It creates a TCP or pipe smart server on 'transport, and runs it.  The
383
486
    transport will be decorated with a chroot and pathfilter (using
384
487
    os.path.expanduser).
385
488
    """
386
489
    bzr_server = BzrServerFactory()
387
490
    try:
388
 
        bzr_server.set_up(transport, host, port, inet)
 
491
        bzr_server.set_up(transport, host, port, inet, timeout)
389
492
        bzr_server.smart_server.serve()
 
493
    except:
 
494
        hook_caught_exception = False
 
495
        for hook in SmartTCPServer.hooks['server_exception']:
 
496
            hook_caught_exception = hook(sys.exc_info())
 
497
        if not hook_caught_exception:
 
498
            raise
390
499
    finally:
391
500
        bzr_server.tear_down()
392