25
from bzrlib.hooks import HookPoint, Hooks
26
from bzrlib.hooks import Hooks
26
27
from bzrlib import (
30
transport as _mod_transport,
32
from bzrlib.i18n import gettext
31
33
from bzrlib.lazy_import import lazy_import
32
34
lazy_import(globals(), """
33
from bzrlib.smart import medium
35
from bzrlib.smart import (
34
39
from bzrlib.transport import (
39
43
from bzrlib import (
51
56
hooks: An instance of SmartServerHooks.
54
def __init__(self, backing_transport, host='127.0.0.1', port=0,
55
root_client_path='/'):
59
# This is the timeout on the socket we use .accept() on. It is exposed here
60
# so the test suite can set it faster. (It thread.interrupt_main() will not
61
# fire a KeyboardInterrupt during socket.accept)
63
_SHUTDOWN_POLL_TIMEOUT = 1.0
64
_LOG_WAITING_TIMEOUT = 10.0
68
def __init__(self, backing_transport, root_client_path='/',
56
70
"""Construct a new server.
58
72
To actually start it running, call either start_background_thread or
61
75
:param backing_transport: The transport to serve.
76
:param root_client_path: The client path that will correspond to root
78
:param client_timeout: See SmartServerSocketStreamMedium's timeout
81
self.backing_transport = backing_transport
82
self.root_client_path = root_client_path
83
self._client_timeout = client_timeout
84
self._active_connections = []
85
# This is set to indicate we want to wait for clients to finish before
87
self._gracefully_stopping = False
89
def start_server(self, host, port):
90
"""Create the server listening socket.
62
92
:param host: Name of the interface to listen on.
63
93
:param port: TCP port to listen on, or 0 to allocate a transient port.
64
:param root_client_path: The client path that will correspond to root
67
95
# let connections timeout so that we get a chance to terminate
68
96
# Keep a reference to the exceptions we want to catch because the socket
88
116
self._sockname = self._server_socket.getsockname()
89
117
self.port = self._sockname[1]
90
118
self._server_socket.listen(1)
91
self._server_socket.settimeout(1)
92
self.backing_transport = backing_transport
119
self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
120
# Once we start accept()ing connections, we set started.
93
121
self._started = threading.Event()
122
# Once we stop accept()ing connections (and are closing the socket) we
94
124
self._stopped = threading.Event()
95
self.root_client_path = root_client_path
125
# Once we have finished waiting for all clients, etc. We set
127
self._fully_stopped = threading.Event()
97
def serve(self, thread_name_suffix=''):
98
self._should_terminate = False
99
# for hooks we are letting code know that a server has started (and
129
def _backing_urls(self):
101
130
# There are three interesting urls:
102
131
# The URL the server can be contacted on. (e.g. bzr://host/)
103
132
# The URL that a commit done on the same machine as the server will
104
133
# have within the servers space. (e.g. file:///home/user/source)
105
134
# The URL that will be given to other hooks in the same process -
106
# the URL of the backing transport itself. (e.g. chroot+:///)
135
# the URL of the backing transport itself. (e.g. filtered-36195:///)
107
136
# We need all three because:
108
137
# * other machines see the first
109
138
# * local commits on this machine should be able to be mapped to
113
142
# The latter two urls are different aliases to the servers url,
114
143
# so we group those in a list - as there might be more aliases
116
backing_urls = [self.backing_transport.base]
145
urls = [self.backing_transport.base]
118
backing_urls.append(self.backing_transport.external_url())
147
urls.append(self.backing_transport.external_url())
119
148
except errors.InProcessTransport:
152
def run_server_started_hooks(self, backing_urls=None):
153
if backing_urls is None:
154
backing_urls = self._backing_urls()
121
155
for hook in SmartTCPServer.hooks['server_started']:
122
156
hook(backing_urls, self.get_url())
123
157
for hook in SmartTCPServer.hooks['server_started_ex']:
124
158
hook(backing_urls, self)
160
def run_server_stopped_hooks(self, backing_urls=None):
161
if backing_urls is None:
162
backing_urls = self._backing_urls()
163
for hook in SmartTCPServer.hooks['server_stopped']:
164
hook(backing_urls, self.get_url())
166
def _stop_gracefully(self):
167
trace.note(gettext('Requested to stop gracefully'))
168
self._should_terminate = True
169
self._gracefully_stopping = True
170
for handler, _ in self._active_connections:
171
handler._stop_gracefully()
173
def _wait_for_clients_to_disconnect(self):
174
self._poll_active_connections()
175
if not self._active_connections:
177
trace.note(gettext('Waiting for %d client(s) to finish')
178
% (len(self._active_connections),))
179
t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
180
while self._active_connections:
182
if now >= t_next_log:
183
trace.note(gettext('Still waiting for %d client(s) to finish')
184
% (len(self._active_connections),))
185
t_next_log = now + self._LOG_WAITING_TIMEOUT
186
self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
188
def serve(self, thread_name_suffix=''):
189
# Note: There is a temptation to do
190
# signals.register_on_hangup(id(self), self._stop_gracefully)
191
# However, that creates a temporary object which is a bound
192
# method. signals._on_sighup is a WeakKeyDictionary so it
193
# immediately gets garbage collected, because nothing else
194
# references it. Instead, we need to keep a real reference to the
195
# bound method for the lifetime of the serve() function.
196
stop_gracefully = self._stop_gracefully
197
signals.register_on_hangup(id(self), stop_gracefully)
198
self._should_terminate = False
199
# for hooks we are letting code know that a server has started (and
201
self.run_server_started_hooks()
125
202
self._started.set()
134
211
except self._socket_error, e:
135
212
# if the socket is closed by stop_background_thread
136
# we might get a EBADF here, any other socket errors
138
if e.args[0] != errno.EBADF:
139
trace.warning("listening socket error: %s", e)
213
# we might get a EBADF here, or if we get a signal we
214
# can get EINTR, any other socket errors should get
216
if e.args[0] not in (errno.EBADF, errno.EINTR):
217
trace.warning(gettext("listening socket error: %s")
141
220
if self._should_terminate:
143
223
self.serve_conn(conn, thread_name_suffix)
224
# Cleanout any threads that have finished processing.
225
self._poll_active_connections()
144
226
except KeyboardInterrupt:
145
227
# dont log when CTRL-C'd.
148
230
trace.report_exception(sys.exc_info(), sys.stderr)
153
234
# ensure the server socket is closed.
154
235
self._server_socket.close()
155
236
except self._socket_error:
156
237
# ignore errors on close
158
for hook in SmartTCPServer.hooks['server_stopped']:
159
hook(backing_urls, self.get_url())
240
signals.unregister_on_hangup(id(self))
241
self.run_server_stopped_hooks()
242
if self._gracefully_stopping:
243
self._wait_for_clients_to_disconnect()
244
self._fully_stopped.set()
161
246
def get_url(self):
162
247
"""Return the url of the server"""
163
return "bzr://%s:%d/" % self._sockname
248
return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
250
def _make_handler(self, conn):
251
return medium.SmartServerSocketStreamMedium(
252
conn, self.backing_transport, self.root_client_path,
253
timeout=self._client_timeout)
255
def _poll_active_connections(self, timeout=0.0):
256
"""Check to see if any active connections have finished.
258
This will iterate through self._active_connections, and update any
259
connections that are finished.
261
:param timeout: The timeout to pass to thread.join(). By default, we
262
set it to 0, so that we don't hang if threads are not done yet.
266
for handler, thread in self._active_connections:
269
still_active.append((handler, thread))
270
self._active_connections = still_active
165
272
def serve_conn(self, conn, thread_name_suffix):
166
273
# For WIN32, where the timeout value from the listening socket
167
274
# propagates to the newly accepted socket.
168
275
conn.setblocking(True)
169
276
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
170
handler = medium.SmartServerSocketStreamMedium(
171
conn, self.backing_transport, self.root_client_path)
172
277
thread_name = 'smart-server-child' + thread_name_suffix
278
handler = self._make_handler(conn)
173
279
connection_thread = threading.Thread(
174
280
None, handler.serve, name=thread_name)
281
self._active_connections.append((handler, connection_thread))
175
282
connection_thread.setDaemon(True)
176
283
connection_thread.start()
284
return connection_thread
178
286
def start_background_thread(self, thread_name_suffix=''):
179
287
self._started.clear()
219
327
These are all empty initially, because by default nothing should get
223
self.create_hook(HookPoint('server_started',
330
Hooks.__init__(self, "bzrlib.smart.server", "SmartTCPServer.hooks")
331
self.add_hook('server_started',
224
332
"Called by the bzr server when it starts serving a directory. "
225
333
"server_started is called with (backing urls, public url), "
226
334
"where backing_url is a list of URLs giving the "
227
335
"server-specific directory locations, and public_url is the "
228
"public URL for the directory being served.", (0, 16), None))
229
self.create_hook(HookPoint('server_started_ex',
336
"public URL for the directory being served.", (0, 16))
337
self.add_hook('server_started_ex',
230
338
"Called by the bzr server when it starts serving a directory. "
231
339
"server_started is called with (backing_urls, server_obj).",
233
self.create_hook(HookPoint('server_stopped',
341
self.add_hook('server_stopped',
234
342
"Called by the bzr server when it stops serving a directory. "
235
343
"server_stopped is called with the same parameters as the "
236
"server_started hook: (backing_urls, public_url).", (0, 16), None))
344
"server_started hook: (backing_urls, public_url).", (0, 16))
345
self.add_hook('server_exception',
346
"Called by the bzr server when an exception occurs. "
347
"server_exception is called with the sys.exc_info() tuple "
348
"return true for the hook if the exception has been handled, "
349
"in which case the server will exit normally.", (2, 4))
238
351
SmartTCPServer.hooks = SmartServerHooks()
305
418
chroot_server = chroot.ChrootServer(transport)
306
419
chroot_server.start_server()
307
420
self.cleanups.append(chroot_server.stop_server)
308
transport = get_transport(chroot_server.get_url())
421
transport = _mod_transport.get_transport_from_url(chroot_server.get_url())
309
422
if self.base_path is not None:
310
423
# Decorate the server's backing transport with a filter that can
311
424
# expand homedirs.
312
425
expand_userdirs = self._make_expand_userdirs_filter(transport)
313
426
expand_userdirs.start_server()
314
427
self.cleanups.append(expand_userdirs.stop_server)
315
transport = get_transport(expand_userdirs.get_url())
428
transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
316
429
self.transport = transport
318
def _make_smart_server(self, host, port, inet):
431
def _get_stdin_stdout(self):
432
return sys.stdin, sys.stdout
434
def _make_smart_server(self, host, port, inet, timeout):
436
c = config.GlobalStack()
437
timeout = c.get('serve.client_timeout')
439
stdin, stdout = self._get_stdin_stdout()
320
440
smart_server = medium.SmartServerPipeStreamMedium(
321
sys.stdin, sys.stdout, self.transport)
441
stdin, stdout, self.transport, timeout=timeout)
324
444
host = medium.BZR_DEFAULT_INTERFACE
326
446
port = medium.BZR_DEFAULT_PORT
327
smart_server = SmartTCPServer(self.transport, host=host, port=port)
328
trace.note('listening on port: %s' % smart_server.port)
447
smart_server = SmartTCPServer(self.transport,
448
client_timeout=timeout)
449
smart_server.start_server(host, port)
450
trace.note(gettext('listening on port: %s') % smart_server.port)
329
451
self.smart_server = smart_server
331
453
def _change_globals(self):
342
464
self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
343
465
ui.ui_factory = ui.SilentUIFactory()
344
466
lockdir._DEFAULT_TIMEOUT_SECONDS = 0
467
orig = signals.install_sighup_handler()
468
def restore_signals():
469
signals.restore_sighup_handler(orig)
470
self.cleanups.append(restore_signals)
346
def set_up(self, transport, host, port, inet):
472
def set_up(self, transport, host, port, inet, timeout):
347
473
self._make_backing_transport(transport)
348
self._make_smart_server(host, port, inet)
474
self._make_smart_server(host, port, inet, timeout)
349
475
self._change_globals()
351
477
def tear_down(self):
356
def serve_bzr(transport, host=None, port=None, inet=False):
482
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
357
483
"""This is the default implementation of 'bzr serve'.
359
485
It creates a TCP or pipe smart server on 'transport, and runs it. The
360
486
transport will be decorated with a chroot and pathfilter (using
361
487
os.path.expanduser).
363
489
bzr_server = BzrServerFactory()
365
bzr_server.set_up(transport, host, port, inet)
491
bzr_server.set_up(transport, host, port, inet, timeout)
366
492
bzr_server.smart_server.serve()
494
hook_caught_exception = False
495
for hook in SmartTCPServer.hooks['server_exception']:
496
hook_caught_exception = hook(sys.exc_info())
497
if not hook_caught_exception:
368
500
bzr_server.tear_down()