~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/server.py

  • Committer: John Arbash Meinel
  • Date: 2011-09-26 07:56:05 UTC
  • mfrom: (6165 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6170.
  • Revision ID: john@arbash-meinel.com-20110926075605-czuukdiawz68dpbd
Merge bzr.dev 6165, resolve conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
import os.path
21
21
import socket
22
22
import sys
 
23
import time
23
24
import threading
24
25
 
25
 
from bzrlib.hooks import HookPoint, Hooks
 
26
from bzrlib.hooks import Hooks
26
27
from bzrlib import (
27
28
    errors,
28
29
    trace,
29
 
    transport,
 
30
    transport as _mod_transport,
30
31
)
 
32
from bzrlib.i18n import gettext
31
33
from bzrlib.lazy_import import lazy_import
32
34
lazy_import(globals(), """
33
 
from bzrlib.smart import medium
 
35
from bzrlib.smart import (
 
36
    medium,
 
37
    signals,
 
38
    )
34
39
from bzrlib.transport import (
35
40
    chroot,
36
 
    get_transport,
37
41
    pathfilter,
38
42
    )
39
43
from bzrlib import (
 
44
    config,
40
45
    urlutils,
41
46
    )
42
47
""")
51
56
    hooks: An instance of SmartServerHooks.
52
57
    """
53
58
 
54
 
    def __init__(self, backing_transport, host='127.0.0.1', port=0,
55
 
                 root_client_path='/'):
 
59
    # This is the timeout on the socket we use .accept() on. It is exposed here
 
60
    # so the test suite can set it faster. (It thread.interrupt_main() will not
 
61
    # fire a KeyboardInterrupt during socket.accept)
 
62
    _ACCEPT_TIMEOUT = 1.0
 
63
    _SHUTDOWN_POLL_TIMEOUT = 1.0
 
64
    _LOG_WAITING_TIMEOUT = 10.0
 
65
 
 
66
    _timer = time.time
 
67
 
 
68
    def __init__(self, backing_transport, root_client_path='/',
 
69
                 client_timeout=None):
56
70
        """Construct a new server.
57
71
 
58
72
        To actually start it running, call either start_background_thread or
59
73
        serve.
60
74
 
61
75
        :param backing_transport: The transport to serve.
 
76
        :param root_client_path: The client path that will correspond to root
 
77
            of backing_transport.
 
78
        :param client_timeout: See SmartServerSocketStreamMedium's timeout
 
79
            parameter.
 
80
        """
 
81
        self.backing_transport = backing_transport
 
82
        self.root_client_path = root_client_path
 
83
        self._client_timeout = client_timeout
 
84
        self._active_connections = []
 
85
        # This is set to indicate we want to wait for clients to finish before
 
86
        # we disconnect.
 
87
        self._gracefully_stopping = False
 
88
 
 
89
    def start_server(self, host, port):
 
90
        """Create the server listening socket.
 
91
 
62
92
        :param host: Name of the interface to listen on.
63
93
        :param port: TCP port to listen on, or 0 to allocate a transient port.
64
 
        :param root_client_path: The client path that will correspond to root
65
 
            of backing_transport.
66
94
        """
67
95
        # let connections timeout so that we get a chance to terminate
68
96
        # Keep a reference to the exceptions we want to catch because the socket
88
116
        self._sockname = self._server_socket.getsockname()
89
117
        self.port = self._sockname[1]
90
118
        self._server_socket.listen(1)
91
 
        self._server_socket.settimeout(1)
92
 
        self.backing_transport = backing_transport
 
119
        self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
 
120
        # Once we start accept()ing connections, we set started.
93
121
        self._started = threading.Event()
 
122
        # Once we stop accept()ing connections (and are closing the socket) we
 
123
        # set _stopped
94
124
        self._stopped = threading.Event()
95
 
        self.root_client_path = root_client_path
 
125
        # Once we have finished waiting for all clients, etc. We set
 
126
        # _fully_stopped
 
127
        self._fully_stopped = threading.Event()
96
128
 
97
 
    def serve(self, thread_name_suffix=''):
98
 
        self._should_terminate = False
99
 
        # for hooks we are letting code know that a server has started (and
100
 
        # later stopped).
 
129
    def _backing_urls(self):
101
130
        # There are three interesting urls:
102
131
        # The URL the server can be contacted on. (e.g. bzr://host/)
103
132
        # The URL that a commit done on the same machine as the server will
104
133
        # have within the servers space. (e.g. file:///home/user/source)
105
134
        # The URL that will be given to other hooks in the same process -
106
 
        # the URL of the backing transport itself. (e.g. chroot+:///)
 
135
        # the URL of the backing transport itself. (e.g. filtered-36195:///)
107
136
        # We need all three because:
108
137
        #  * other machines see the first
109
138
        #  * local commits on this machine should be able to be mapped to
113
142
        # The latter two urls are different aliases to the servers url,
114
143
        # so we group those in a list - as there might be more aliases
115
144
        # in the future.
116
 
        backing_urls = [self.backing_transport.base]
 
145
        urls = [self.backing_transport.base]
117
146
        try:
118
 
            backing_urls.append(self.backing_transport.external_url())
 
147
            urls.append(self.backing_transport.external_url())
119
148
        except errors.InProcessTransport:
120
149
            pass
 
150
        return urls
 
151
 
 
152
    def run_server_started_hooks(self, backing_urls=None):
 
153
        if backing_urls is None:
 
154
            backing_urls = self._backing_urls()
121
155
        for hook in SmartTCPServer.hooks['server_started']:
122
156
            hook(backing_urls, self.get_url())
123
157
        for hook in SmartTCPServer.hooks['server_started_ex']:
124
158
            hook(backing_urls, self)
 
159
 
 
160
    def run_server_stopped_hooks(self, backing_urls=None):
 
161
        if backing_urls is None:
 
162
            backing_urls = self._backing_urls()
 
163
        for hook in SmartTCPServer.hooks['server_stopped']:
 
164
            hook(backing_urls, self.get_url())
 
165
 
 
166
    def _stop_gracefully(self):
 
167
        # XXX: ATM, once we see the self._should_terminate we immediately exit
 
168
        #      without waiting for client threads to shut down. (I don't know
 
169
        #      if we *can* because they are marked as Daemon.)
 
170
        #      Which means that while in-theory this is a graceful shutdown,
 
171
        #      because we don't actively close the connections, etc, we don't
 
172
        #      have a good way (yet) to poll the spawned clients and
 
173
        trace.note('Requested to stop gracefully')
 
174
        self._should_terminate = True
 
175
        self._gracefully_stopping = True
 
176
        for handler, _ in self._active_connections:
 
177
            handler._stop_gracefully()
 
178
 
 
179
    def _wait_for_clients_to_disconnect(self):
 
180
        self._poll_active_connections()
 
181
        if not self._active_connections:
 
182
            return
 
183
        trace.note('Waiting for %d client(s) to finish'
 
184
                   % (len(self._active_connections),))
 
185
        t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
 
186
        while self._active_connections:
 
187
            now = self._timer()
 
188
            if now >= t_next_log:
 
189
                trace.note('Still waiting for %d client(s) to finish'
 
190
                           % (len(self._active_connections),))
 
191
                t_next_log = now + self._LOG_WAITING_TIMEOUT
 
192
            self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
 
193
 
 
194
    def serve(self, thread_name_suffix=''):
 
195
        # Note: There is a temptation to do
 
196
        #       signals.register_on_hangup(id(self), self._stop_gracefully)
 
197
        #       However, that creates a temporary object which is a bound
 
198
        #       method. signals._on_sighup is a WeakKeyDictionary so it
 
199
        #       immediately gets garbage collected, because nothing else
 
200
        #       references it. Instead, we need to keep a real reference to the
 
201
        #       bound method for the lifetime of the serve() function.
 
202
        stop_gracefully = self._stop_gracefully
 
203
        signals.register_on_hangup(id(self), stop_gracefully)
 
204
        self._should_terminate = False
 
205
        # for hooks we are letting code know that a server has started (and
 
206
        # later stopped).
 
207
        self.run_server_started_hooks()
125
208
        self._started.set()
126
209
        try:
127
210
            try:
135
218
                        # if the socket is closed by stop_background_thread
136
219
                        # we might get a EBADF here, any other socket errors
137
220
                        # should get logged.
138
 
                        if e.args[0] != errno.EBADF:
 
221
                        if e.args[0] not in (errno.EBADF, errno.EINTR):
139
222
                            trace.warning("listening socket error: %s", e)
140
223
                    else:
141
224
                        if self._should_terminate:
 
225
                            conn.close()
142
226
                            break
143
227
                        self.serve_conn(conn, thread_name_suffix)
 
228
                    # Cleanout any threads that have finished processing.
 
229
                    self._poll_active_connections()
144
230
            except KeyboardInterrupt:
145
231
                # dont log when CTRL-C'd.
146
232
                raise
148
234
                trace.report_exception(sys.exc_info(), sys.stderr)
149
235
                raise
150
236
        finally:
151
 
            self._stopped.set()
152
237
            try:
153
238
                # ensure the server socket is closed.
154
239
                self._server_socket.close()
155
240
            except self._socket_error:
156
241
                # ignore errors on close
157
242
                pass
158
 
            for hook in SmartTCPServer.hooks['server_stopped']:
159
 
                hook(backing_urls, self.get_url())
 
243
            self._stopped.set()
 
244
            signals.unregister_on_hangup(id(self))
 
245
            self.run_server_stopped_hooks()
 
246
        if self._gracefully_stopping:
 
247
            self._wait_for_clients_to_disconnect()
 
248
        self._fully_stopped.set()
160
249
 
161
250
    def get_url(self):
162
251
        """Return the url of the server"""
163
 
        return "bzr://%s:%d/" % self._sockname
 
252
        return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
 
253
 
 
254
    def _make_handler(self, conn):
 
255
        return medium.SmartServerSocketStreamMedium(
 
256
            conn, self.backing_transport, self.root_client_path,
 
257
            timeout=self._client_timeout)
 
258
 
 
259
    def _poll_active_connections(self, timeout=0.0):
 
260
        """Check to see if any active connections have finished.
 
261
 
 
262
        This will iterate through self._active_connections, and update any
 
263
        connections that are finished.
 
264
 
 
265
        :param timeout: The timeout to pass to thread.join(). By default, we
 
266
            set it to 0, so that we don't hang if threads are not done yet.
 
267
        :return: None
 
268
        """
 
269
        still_active = []
 
270
        for handler, thread in self._active_connections:
 
271
            thread.join(timeout)
 
272
            if thread.isAlive():
 
273
                still_active.append((handler, thread))
 
274
        self._active_connections = still_active
164
275
 
165
276
    def serve_conn(self, conn, thread_name_suffix):
166
277
        # For WIN32, where the timeout value from the listening socket
167
278
        # propagates to the newly accepted socket.
168
279
        conn.setblocking(True)
169
280
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
170
 
        handler = medium.SmartServerSocketStreamMedium(
171
 
            conn, self.backing_transport, self.root_client_path)
172
281
        thread_name = 'smart-server-child' + thread_name_suffix
 
282
        handler = self._make_handler(conn)
173
283
        connection_thread = threading.Thread(
174
284
            None, handler.serve, name=thread_name)
 
285
        self._active_connections.append((handler, connection_thread))
175
286
        connection_thread.setDaemon(True)
176
287
        connection_thread.start()
 
288
        return connection_thread
177
289
 
178
290
    def start_background_thread(self, thread_name_suffix=''):
179
291
        self._started.clear()
219
331
        These are all empty initially, because by default nothing should get
220
332
        notified.
221
333
        """
222
 
        Hooks.__init__(self)
223
 
        self.create_hook(HookPoint('server_started',
 
334
        Hooks.__init__(self, "bzrlib.smart.server", "SmartTCPServer.hooks")
 
335
        self.add_hook('server_started',
224
336
            "Called by the bzr server when it starts serving a directory. "
225
337
            "server_started is called with (backing urls, public url), "
226
338
            "where backing_url is a list of URLs giving the "
227
339
            "server-specific directory locations, and public_url is the "
228
 
            "public URL for the directory being served.", (0, 16), None))
229
 
        self.create_hook(HookPoint('server_started_ex',
 
340
            "public URL for the directory being served.", (0, 16))
 
341
        self.add_hook('server_started_ex',
230
342
            "Called by the bzr server when it starts serving a directory. "
231
343
            "server_started is called with (backing_urls, server_obj).",
232
 
            (1, 17), None))
233
 
        self.create_hook(HookPoint('server_stopped',
 
344
            (1, 17))
 
345
        self.add_hook('server_stopped',
234
346
            "Called by the bzr server when it stops serving a directory. "
235
347
            "server_stopped is called with the same parameters as the "
236
 
            "server_started hook: (backing_urls, public_url).", (0, 16), None))
 
348
            "server_started hook: (backing_urls, public_url).", (0, 16))
 
349
        self.add_hook('server_exception',
 
350
            "Called by the bzr server when an exception occurs. "
 
351
            "server_exception is called with the sys.exc_info() tuple "
 
352
            "return true for the hook if the exception has been handled, "
 
353
            "in which case the server will exit normally.", (2, 4))
237
354
 
238
355
SmartTCPServer.hooks = SmartServerHooks()
239
356
 
305
422
        chroot_server = chroot.ChrootServer(transport)
306
423
        chroot_server.start_server()
307
424
        self.cleanups.append(chroot_server.stop_server)
308
 
        transport = get_transport(chroot_server.get_url())
 
425
        transport = _mod_transport.get_transport_from_url(chroot_server.get_url())
309
426
        if self.base_path is not None:
310
427
            # Decorate the server's backing transport with a filter that can
311
428
            # expand homedirs.
312
429
            expand_userdirs = self._make_expand_userdirs_filter(transport)
313
430
            expand_userdirs.start_server()
314
431
            self.cleanups.append(expand_userdirs.stop_server)
315
 
            transport = get_transport(expand_userdirs.get_url())
 
432
            transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
316
433
        self.transport = transport
317
434
 
318
 
    def _make_smart_server(self, host, port, inet):
 
435
    def _get_stdin_stdout(self):
 
436
        return sys.stdin, sys.stdout
 
437
 
 
438
    def _make_smart_server(self, host, port, inet, timeout):
 
439
        if timeout is None:
 
440
            c = config.GlobalStack()
 
441
            timeout = c.get('serve.client_timeout')
319
442
        if inet:
 
443
            stdin, stdout = self._get_stdin_stdout()
320
444
            smart_server = medium.SmartServerPipeStreamMedium(
321
 
                sys.stdin, sys.stdout, self.transport)
 
445
                stdin, stdout, self.transport, timeout=timeout)
322
446
        else:
323
447
            if host is None:
324
448
                host = medium.BZR_DEFAULT_INTERFACE
325
449
            if port is None:
326
450
                port = medium.BZR_DEFAULT_PORT
327
 
            smart_server = SmartTCPServer(self.transport, host=host, port=port)
328
 
            trace.note('listening on port: %s' % smart_server.port)
 
451
            smart_server = SmartTCPServer(self.transport,
 
452
                                          client_timeout=timeout)
 
453
            smart_server.start_server(host, port)
 
454
            trace.note(gettext('listening on port: %s') % smart_server.port)
329
455
        self.smart_server = smart_server
330
456
 
331
457
    def _change_globals(self):
342
468
        self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
343
469
        ui.ui_factory = ui.SilentUIFactory()
344
470
        lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
471
        orig = signals.install_sighup_handler()
 
472
        def restore_signals():
 
473
            signals.restore_sighup_handler(orig)
 
474
        self.cleanups.append(restore_signals)
345
475
 
346
 
    def set_up(self, transport, host, port, inet):
 
476
    def set_up(self, transport, host, port, inet, timeout):
347
477
        self._make_backing_transport(transport)
348
 
        self._make_smart_server(host, port, inet)
 
478
        self._make_smart_server(host, port, inet, timeout)
349
479
        self._change_globals()
350
480
 
351
481
    def tear_down(self):
353
483
            cleanup()
354
484
 
355
485
 
356
 
def serve_bzr(transport, host=None, port=None, inet=False):
 
486
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
357
487
    """This is the default implementation of 'bzr serve'.
358
 
    
 
488
 
359
489
    It creates a TCP or pipe smart server on 'transport, and runs it.  The
360
490
    transport will be decorated with a chroot and pathfilter (using
361
491
    os.path.expanduser).
362
492
    """
363
493
    bzr_server = BzrServerFactory()
364
494
    try:
365
 
        bzr_server.set_up(transport, host, port, inet)
 
495
        bzr_server.set_up(transport, host, port, inet, timeout)
366
496
        bzr_server.smart_server.serve()
 
497
    except:
 
498
        hook_caught_exception = False
 
499
        for hook in SmartTCPServer.hooks['server_exception']:
 
500
            hook_caught_exception = hook(sys.exc_info())
 
501
        if not hook_caught_exception:
 
502
            raise
367
503
    finally:
368
504
        bzr_server.tear_down()
369