17
17
"""Server for smart-server protocol."""
19
from __future__ import absolute_import
28
from bzrlib.hooks import Hooks
26
from bzrlib.hooks import HookPoint, Hooks
29
27
from bzrlib import (
32
transport as _mod_transport,
34
from bzrlib.i18n import gettext
35
32
from bzrlib.lazy_import import lazy_import
36
33
lazy_import(globals(), """
37
from bzrlib.smart import (
34
from bzrlib.smart import medium
41
35
from bzrlib.transport import (
45
40
from bzrlib import (
58
52
hooks: An instance of SmartServerHooks.
61
# This is the timeout on the socket we use .accept() on. It is exposed here
62
# so the test suite can set it faster. (It thread.interrupt_main() will not
63
# fire a KeyboardInterrupt during socket.accept)
65
_SHUTDOWN_POLL_TIMEOUT = 1.0
66
_LOG_WAITING_TIMEOUT = 10.0
70
def __init__(self, backing_transport, root_client_path='/',
55
def __init__(self, backing_transport, root_client_path='/'):
72
56
"""Construct a new server.
74
58
To actually start it running, call either start_background_thread or
77
61
:param backing_transport: The transport to serve.
78
62
:param root_client_path: The client path that will correspond to root
79
63
of backing_transport.
80
:param client_timeout: See SmartServerSocketStreamMedium's timeout
83
65
self.backing_transport = backing_transport
84
66
self.root_client_path = root_client_path
85
self._client_timeout = client_timeout
86
self._active_connections = []
87
# This is set to indicate we want to wait for clients to finish before
89
self._gracefully_stopping = False
91
68
def start_server(self, host, port):
92
69
"""Create the server listening socket.
118
95
self._sockname = self._server_socket.getsockname()
119
96
self.port = self._sockname[1]
120
97
self._server_socket.listen(1)
121
self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
122
# Once we start accept()ing connections, we set started.
98
self._server_socket.settimeout(1)
123
99
self._started = threading.Event()
124
# Once we stop accept()ing connections (and are closing the socket) we
126
100
self._stopped = threading.Event()
127
# Once we have finished waiting for all clients, etc. We set
129
self._fully_stopped = threading.Event()
131
102
def _backing_urls(self):
132
103
# There are three interesting urls:
134
105
# The URL that a commit done on the same machine as the server will
135
106
# have within the servers space. (e.g. file:///home/user/source)
136
107
# The URL that will be given to other hooks in the same process -
137
# the URL of the backing transport itself. (e.g. filtered-36195:///)
108
# the URL of the backing transport itself. (e.g. chroot+:///)
138
109
# We need all three because:
139
110
# * other machines see the first
140
111
# * local commits on this machine should be able to be mapped to
165
136
for hook in SmartTCPServer.hooks['server_stopped']:
166
137
hook(backing_urls, self.get_url())
168
def _stop_gracefully(self):
169
trace.note(gettext('Requested to stop gracefully'))
170
self._should_terminate = True
171
self._gracefully_stopping = True
172
for handler, _ in self._active_connections:
173
handler._stop_gracefully()
175
def _wait_for_clients_to_disconnect(self):
176
self._poll_active_connections()
177
if not self._active_connections:
179
trace.note(gettext('Waiting for %d client(s) to finish')
180
% (len(self._active_connections),))
181
t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
182
while self._active_connections:
184
if now >= t_next_log:
185
trace.note(gettext('Still waiting for %d client(s) to finish')
186
% (len(self._active_connections),))
187
t_next_log = now + self._LOG_WAITING_TIMEOUT
188
self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
190
139
def serve(self, thread_name_suffix=''):
191
# Note: There is a temptation to do
192
# signals.register_on_hangup(id(self), self._stop_gracefully)
193
# However, that creates a temporary object which is a bound
194
# method. signals._on_sighup is a WeakKeyDictionary so it
195
# immediately gets garbage collected, because nothing else
196
# references it. Instead, we need to keep a real reference to the
197
# bound method for the lifetime of the serve() function.
198
stop_gracefully = self._stop_gracefully
199
signals.register_on_hangup(id(self), stop_gracefully)
200
140
self._should_terminate = False
201
141
# for hooks we are letting code know that a server has started (and
202
142
# later stopped).
213
153
except self._socket_error, e:
214
154
# if the socket is closed by stop_background_thread
215
# we might get a EBADF here, or if we get a signal we
216
# can get EINTR, any other socket errors should get
218
if e.args[0] not in (errno.EBADF, errno.EINTR):
219
trace.warning(gettext("listening socket error: %s")
155
# we might get a EBADF here, any other socket errors
157
if e.args[0] != errno.EBADF:
158
trace.warning("listening socket error: %s", e)
222
160
if self._should_terminate:
225
162
self.serve_conn(conn, thread_name_suffix)
226
# Cleanout any threads that have finished processing.
227
self._poll_active_connections()
228
163
except KeyboardInterrupt:
229
164
# dont log when CTRL-C'd.
232
167
trace.report_exception(sys.exc_info(), sys.stderr)
236
172
# ensure the server socket is closed.
237
173
self._server_socket.close()
238
174
except self._socket_error:
239
175
# ignore errors on close
242
signals.unregister_on_hangup(id(self))
243
177
self.run_server_stopped_hooks()
244
if self._gracefully_stopping:
245
self._wait_for_clients_to_disconnect()
246
self._fully_stopped.set()
248
179
def get_url(self):
249
180
"""Return the url of the server"""
250
return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
252
def _make_handler(self, conn):
253
return medium.SmartServerSocketStreamMedium(
254
conn, self.backing_transport, self.root_client_path,
255
timeout=self._client_timeout)
257
def _poll_active_connections(self, timeout=0.0):
258
"""Check to see if any active connections have finished.
260
This will iterate through self._active_connections, and update any
261
connections that are finished.
263
:param timeout: The timeout to pass to thread.join(). By default, we
264
set it to 0, so that we don't hang if threads are not done yet.
268
for handler, thread in self._active_connections:
271
still_active.append((handler, thread))
272
self._active_connections = still_active
181
return "bzr://%s:%d/" % self._sockname
274
183
def serve_conn(self, conn, thread_name_suffix):
275
184
# For WIN32, where the timeout value from the listening socket
276
185
# propagates to the newly accepted socket.
277
186
conn.setblocking(True)
278
187
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
188
handler = medium.SmartServerSocketStreamMedium(
189
conn, self.backing_transport, self.root_client_path)
279
190
thread_name = 'smart-server-child' + thread_name_suffix
280
handler = self._make_handler(conn)
281
191
connection_thread = threading.Thread(
282
192
None, handler.serve, name=thread_name)
283
self._active_connections.append((handler, connection_thread))
193
# FIXME: This thread is never joined, it should at least be collected
194
# somewhere so that tests that want to check for leaked threads can get
195
# rid of them -- vila 20100531
284
196
connection_thread.setDaemon(True)
285
197
connection_thread.start()
286
198
return connection_thread
329
241
These are all empty initially, because by default nothing should get
332
Hooks.__init__(self, "bzrlib.smart.server", "SmartTCPServer.hooks")
333
self.add_hook('server_started',
245
self.create_hook(HookPoint('server_started',
334
246
"Called by the bzr server when it starts serving a directory. "
335
247
"server_started is called with (backing urls, public url), "
336
248
"where backing_url is a list of URLs giving the "
337
249
"server-specific directory locations, and public_url is the "
338
"public URL for the directory being served.", (0, 16))
339
self.add_hook('server_started_ex',
250
"public URL for the directory being served.", (0, 16), None))
251
self.create_hook(HookPoint('server_started_ex',
340
252
"Called by the bzr server when it starts serving a directory. "
341
253
"server_started is called with (backing_urls, server_obj).",
343
self.add_hook('server_stopped',
255
self.create_hook(HookPoint('server_stopped',
344
256
"Called by the bzr server when it stops serving a directory. "
345
257
"server_stopped is called with the same parameters as the "
346
"server_started hook: (backing_urls, public_url).", (0, 16))
347
self.add_hook('server_exception',
348
"Called by the bzr server when an exception occurs. "
349
"server_exception is called with the sys.exc_info() tuple "
350
"return true for the hook if the exception has been handled, "
351
"in which case the server will exit normally.", (2, 4))
258
"server_started hook: (backing_urls, public_url).", (0, 16), None))
353
260
SmartTCPServer.hooks = SmartServerHooks()
420
327
chroot_server = chroot.ChrootServer(transport)
421
328
chroot_server.start_server()
422
329
self.cleanups.append(chroot_server.stop_server)
423
transport = _mod_transport.get_transport_from_url(chroot_server.get_url())
330
transport = get_transport(chroot_server.get_url())
424
331
if self.base_path is not None:
425
332
# Decorate the server's backing transport with a filter that can
426
333
# expand homedirs.
427
334
expand_userdirs = self._make_expand_userdirs_filter(transport)
428
335
expand_userdirs.start_server()
429
336
self.cleanups.append(expand_userdirs.stop_server)
430
transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
337
transport = get_transport(expand_userdirs.get_url())
431
338
self.transport = transport
433
def _get_stdin_stdout(self):
434
return sys.stdin, sys.stdout
436
def _make_smart_server(self, host, port, inet, timeout):
438
c = config.GlobalStack()
439
timeout = c.get('serve.client_timeout')
340
def _make_smart_server(self, host, port, inet):
441
stdin, stdout = self._get_stdin_stdout()
442
342
smart_server = medium.SmartServerPipeStreamMedium(
443
stdin, stdout, self.transport, timeout=timeout)
343
sys.stdin, sys.stdout, self.transport)
446
346
host = medium.BZR_DEFAULT_INTERFACE
448
348
port = medium.BZR_DEFAULT_PORT
449
smart_server = SmartTCPServer(self.transport,
450
client_timeout=timeout)
349
smart_server = SmartTCPServer(self.transport)
451
350
smart_server.start_server(host, port)
452
trace.note(gettext('listening on port: %s') % smart_server.port)
351
trace.note('listening on port: %s' % smart_server.port)
453
352
self.smart_server = smart_server
455
354
def _change_globals(self):
466
365
self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
467
366
ui.ui_factory = ui.SilentUIFactory()
468
367
lockdir._DEFAULT_TIMEOUT_SECONDS = 0
469
orig = signals.install_sighup_handler()
470
def restore_signals():
471
signals.restore_sighup_handler(orig)
472
self.cleanups.append(restore_signals)
474
def set_up(self, transport, host, port, inet, timeout):
369
def set_up(self, transport, host, port, inet):
475
370
self._make_backing_transport(transport)
476
self._make_smart_server(host, port, inet, timeout)
371
self._make_smart_server(host, port, inet)
477
372
self._change_globals()
479
374
def tear_down(self):
484
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
379
def serve_bzr(transport, host=None, port=None, inet=False):
485
380
"""This is the default implementation of 'bzr serve'.
487
382
It creates a TCP or pipe smart server on 'transport, and runs it. The
488
383
transport will be decorated with a chroot and pathfilter (using
489
384
os.path.expanduser).
491
386
bzr_server = BzrServerFactory()
493
bzr_server.set_up(transport, host, port, inet, timeout)
388
bzr_server.set_up(transport, host, port, inet)
494
389
bzr_server.smart_server.serve()
496
hook_caught_exception = False
497
for hook in SmartTCPServer.hooks['server_exception']:
498
hook_caught_exception = hook(sys.exc_info())
499
if not hook_caught_exception:
502
391
bzr_server.tear_down()