32
29
transport as _mod_transport,
34
from bzrlib.i18n import gettext
35
31
from bzrlib.lazy_import import lazy_import
36
32
lazy_import(globals(), """
37
from bzrlib.smart import (
33
from bzrlib.smart import medium
41
34
from bzrlib.transport import (
45
38
from bzrlib import (
58
50
hooks: An instance of SmartServerHooks.
61
# This is the timeout on the socket we use .accept() on. It is exposed here
62
# so the test suite can set it faster. (It thread.interrupt_main() will not
63
# fire a KeyboardInterrupt during socket.accept)
65
_SHUTDOWN_POLL_TIMEOUT = 1.0
66
_LOG_WAITING_TIMEOUT = 10.0
70
def __init__(self, backing_transport, root_client_path='/',
53
def __init__(self, backing_transport, root_client_path='/'):
72
54
"""Construct a new server.
74
56
To actually start it running, call either start_background_thread or
77
59
:param backing_transport: The transport to serve.
78
60
:param root_client_path: The client path that will correspond to root
79
61
of backing_transport.
80
:param client_timeout: See SmartServerSocketStreamMedium's timeout
83
63
self.backing_transport = backing_transport
84
64
self.root_client_path = root_client_path
85
self._client_timeout = client_timeout
86
self._active_connections = []
87
# This is set to indicate we want to wait for clients to finish before
89
self._gracefully_stopping = False
91
66
def start_server(self, host, port):
92
67
"""Create the server listening socket.
118
93
self._sockname = self._server_socket.getsockname()
119
94
self.port = self._sockname[1]
120
95
self._server_socket.listen(1)
121
self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
122
# Once we start accept()ing connections, we set started.
96
self._server_socket.settimeout(1)
123
97
self._started = threading.Event()
124
# Once we stop accept()ing connections (and are closing the socket) we
126
98
self._stopped = threading.Event()
127
# Once we have finished waiting for all clients, etc. We set
129
self._fully_stopped = threading.Event()
131
100
def _backing_urls(self):
132
101
# There are three interesting urls:
165
134
for hook in SmartTCPServer.hooks['server_stopped']:
166
135
hook(backing_urls, self.get_url())
168
def _stop_gracefully(self):
169
trace.note(gettext('Requested to stop gracefully'))
170
self._should_terminate = True
171
self._gracefully_stopping = True
172
for handler, _ in self._active_connections:
173
handler._stop_gracefully()
175
def _wait_for_clients_to_disconnect(self):
176
self._poll_active_connections()
177
if not self._active_connections:
179
trace.note(gettext('Waiting for %d client(s) to finish')
180
% (len(self._active_connections),))
181
t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
182
while self._active_connections:
184
if now >= t_next_log:
185
trace.note(gettext('Still waiting for %d client(s) to finish')
186
% (len(self._active_connections),))
187
t_next_log = now + self._LOG_WAITING_TIMEOUT
188
self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
190
137
def serve(self, thread_name_suffix=''):
191
# Note: There is a temptation to do
192
# signals.register_on_hangup(id(self), self._stop_gracefully)
193
# However, that creates a temporary object which is a bound
194
# method. signals._on_sighup is a WeakKeyDictionary so it
195
# immediately gets garbage collected, because nothing else
196
# references it. Instead, we need to keep a real reference to the
197
# bound method for the lifetime of the serve() function.
198
stop_gracefully = self._stop_gracefully
199
signals.register_on_hangup(id(self), stop_gracefully)
200
138
self._should_terminate = False
201
139
# for hooks we are letting code know that a server has started (and
202
140
# later stopped).
213
151
except self._socket_error, e:
214
152
# if the socket is closed by stop_background_thread
215
# we might get a EBADF here, or if we get a signal we
216
# can get EINTR, any other socket errors should get
218
if e.args[0] not in (errno.EBADF, errno.EINTR):
219
trace.warning(gettext("listening socket error: %s")
153
# we might get a EBADF here, any other socket errors
155
if e.args[0] != errno.EBADF:
156
trace.warning("listening socket error: %s", e)
222
158
if self._should_terminate:
225
160
self.serve_conn(conn, thread_name_suffix)
226
# Cleanout any threads that have finished processing.
227
self._poll_active_connections()
228
161
except KeyboardInterrupt:
229
162
# dont log when CTRL-C'd.
232
165
trace.report_exception(sys.exc_info(), sys.stderr)
236
170
# ensure the server socket is closed.
237
171
self._server_socket.close()
238
172
except self._socket_error:
239
173
# ignore errors on close
242
signals.unregister_on_hangup(id(self))
243
175
self.run_server_stopped_hooks()
244
if self._gracefully_stopping:
245
self._wait_for_clients_to_disconnect()
246
self._fully_stopped.set()
248
177
def get_url(self):
249
178
"""Return the url of the server"""
250
179
return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
252
def _make_handler(self, conn):
253
return medium.SmartServerSocketStreamMedium(
254
conn, self.backing_transport, self.root_client_path,
255
timeout=self._client_timeout)
257
def _poll_active_connections(self, timeout=0.0):
258
"""Check to see if any active connections have finished.
260
This will iterate through self._active_connections, and update any
261
connections that are finished.
263
:param timeout: The timeout to pass to thread.join(). By default, we
264
set it to 0, so that we don't hang if threads are not done yet.
268
for handler, thread in self._active_connections:
271
still_active.append((handler, thread))
272
self._active_connections = still_active
274
181
def serve_conn(self, conn, thread_name_suffix):
275
182
# For WIN32, where the timeout value from the listening socket
276
183
# propagates to the newly accepted socket.
277
184
conn.setblocking(True)
278
185
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
186
handler = medium.SmartServerSocketStreamMedium(
187
conn, self.backing_transport, self.root_client_path)
279
188
thread_name = 'smart-server-child' + thread_name_suffix
280
handler = self._make_handler(conn)
281
189
connection_thread = threading.Thread(
282
190
None, handler.serve, name=thread_name)
283
self._active_connections.append((handler, connection_thread))
191
# FIXME: This thread is never joined, it should at least be collected
192
# somewhere so that tests that want to check for leaked threads can get
193
# rid of them -- vila 20100531
284
194
connection_thread.setDaemon(True)
285
195
connection_thread.start()
286
196
return connection_thread
420
330
chroot_server = chroot.ChrootServer(transport)
421
331
chroot_server.start_server()
422
332
self.cleanups.append(chroot_server.stop_server)
423
transport = _mod_transport.get_transport_from_url(chroot_server.get_url())
333
transport = _mod_transport.get_transport(chroot_server.get_url())
424
334
if self.base_path is not None:
425
335
# Decorate the server's backing transport with a filter that can
426
336
# expand homedirs.
427
337
expand_userdirs = self._make_expand_userdirs_filter(transport)
428
338
expand_userdirs.start_server()
429
339
self.cleanups.append(expand_userdirs.stop_server)
430
transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
340
transport = _mod_transport.get_transport(expand_userdirs.get_url())
431
341
self.transport = transport
433
def _get_stdin_stdout(self):
434
return sys.stdin, sys.stdout
436
def _make_smart_server(self, host, port, inet, timeout):
438
c = config.GlobalStack()
439
timeout = c.get('serve.client_timeout')
343
def _make_smart_server(self, host, port, inet):
441
stdin, stdout = self._get_stdin_stdout()
442
345
smart_server = medium.SmartServerPipeStreamMedium(
443
stdin, stdout, self.transport, timeout=timeout)
346
sys.stdin, sys.stdout, self.transport)
446
349
host = medium.BZR_DEFAULT_INTERFACE
448
351
port = medium.BZR_DEFAULT_PORT
449
smart_server = SmartTCPServer(self.transport,
450
client_timeout=timeout)
352
smart_server = SmartTCPServer(self.transport)
451
353
smart_server.start_server(host, port)
452
trace.note(gettext('listening on port: %s') % smart_server.port)
354
trace.note('listening on port: %s' % smart_server.port)
453
355
self.smart_server = smart_server
455
357
def _change_globals(self):
466
368
self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
467
369
ui.ui_factory = ui.SilentUIFactory()
468
370
lockdir._DEFAULT_TIMEOUT_SECONDS = 0
469
orig = signals.install_sighup_handler()
470
def restore_signals():
471
signals.restore_sighup_handler(orig)
472
self.cleanups.append(restore_signals)
474
def set_up(self, transport, host, port, inet, timeout):
372
def set_up(self, transport, host, port, inet):
475
373
self._make_backing_transport(transport)
476
self._make_smart_server(host, port, inet, timeout)
374
self._make_smart_server(host, port, inet)
477
375
self._change_globals()
479
377
def tear_down(self):
480
378
for cleanup in reversed(self.cleanups):
484
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
381
def serve_bzr(transport, host=None, port=None, inet=False):
485
382
"""This is the default implementation of 'bzr serve'.
487
384
It creates a TCP or pipe smart server on 'transport, and runs it. The
488
385
transport will be decorated with a chroot and pathfilter (using
489
386
os.path.expanduser).
491
388
bzr_server = BzrServerFactory()
493
bzr_server.set_up(transport, host, port, inet, timeout)
390
bzr_server.set_up(transport, host, port, inet)
494
391
bzr_server.smart_server.serve()
496
393
hook_caught_exception = False