25
from bzrlib.hooks import HookPoint, Hooks
26
from bzrlib.hooks import Hooks
26
27
from bzrlib import (
30
transport as _mod_transport,
32
from bzrlib.i18n import gettext
31
33
from bzrlib.lazy_import import lazy_import
32
34
lazy_import(globals(), """
33
from bzrlib.smart import medium
35
from bzrlib.smart import (
34
39
from bzrlib.transport import (
39
43
from bzrlib import (
51
56
hooks: An instance of SmartServerHooks.
54
def __init__(self, backing_transport, host='127.0.0.1', port=0,
55
root_client_path='/'):
59
# This is the timeout on the socket we use .accept() on. It is exposed here
60
# so the test suite can set it faster. (It thread.interrupt_main() will not
61
# fire a KeyboardInterrupt during socket.accept)
63
_SHUTDOWN_POLL_TIMEOUT = 1.0
64
_LOG_WAITING_TIMEOUT = 10.0
68
def __init__(self, backing_transport, root_client_path='/',
56
70
"""Construct a new server.
58
72
To actually start it running, call either start_background_thread or
61
75
:param backing_transport: The transport to serve.
76
:param root_client_path: The client path that will correspond to root
78
:param client_timeout: See SmartServerSocketStreamMedium's timeout
81
self.backing_transport = backing_transport
82
self.root_client_path = root_client_path
83
self._client_timeout = client_timeout
84
self._active_connections = []
85
# This is set to indicate we want to wait for clients to finish before
87
self._gracefully_stopping = False
89
def start_server(self, host, port):
90
"""Create the server listening socket.
62
92
:param host: Name of the interface to listen on.
63
93
:param port: TCP port to listen on, or 0 to allocate a transient port.
64
:param root_client_path: The client path that will correspond to root
67
95
# let connections timeout so that we get a chance to terminate
68
96
# Keep a reference to the exceptions we want to catch because the socket
88
116
self._sockname = self._server_socket.getsockname()
89
117
self.port = self._sockname[1]
90
118
self._server_socket.listen(1)
91
self._server_socket.settimeout(1)
92
self.backing_transport = backing_transport
119
self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
120
# Once we start accept()ing connections, we set started.
93
121
self._started = threading.Event()
122
# Once we stop accept()ing connections (and are closing the socket) we
94
124
self._stopped = threading.Event()
95
self.root_client_path = root_client_path
125
# Once we have finished waiting for all clients, etc. We set
127
self._fully_stopped = threading.Event()
97
def serve(self, thread_name_suffix=''):
98
self._should_terminate = False
99
# for hooks we are letting code know that a server has started (and
129
def _backing_urls(self):
101
130
# There are three interesting urls:
102
131
# The URL the server can be contacted on. (e.g. bzr://host/)
103
132
# The URL that a commit done on the same machine as the server will
104
133
# have within the servers space. (e.g. file:///home/user/source)
105
134
# The URL that will be given to other hooks in the same process -
106
# the URL of the backing transport itself. (e.g. chroot+:///)
135
# the URL of the backing transport itself. (e.g. filtered-36195:///)
107
136
# We need all three because:
108
137
# * other machines see the first
109
138
# * local commits on this machine should be able to be mapped to
113
142
# The latter two urls are different aliases to the servers url,
114
143
# so we group those in a list - as there might be more aliases
116
backing_urls = [self.backing_transport.base]
145
urls = [self.backing_transport.base]
118
backing_urls.append(self.backing_transport.external_url())
147
urls.append(self.backing_transport.external_url())
119
148
except errors.InProcessTransport:
152
def run_server_started_hooks(self, backing_urls=None):
153
if backing_urls is None:
154
backing_urls = self._backing_urls()
121
155
for hook in SmartTCPServer.hooks['server_started']:
122
156
hook(backing_urls, self.get_url())
123
157
for hook in SmartTCPServer.hooks['server_started_ex']:
124
158
hook(backing_urls, self)
160
def run_server_stopped_hooks(self, backing_urls=None):
161
if backing_urls is None:
162
backing_urls = self._backing_urls()
163
for hook in SmartTCPServer.hooks['server_stopped']:
164
hook(backing_urls, self.get_url())
166
def _stop_gracefully(self):
167
# XXX: ATM, once we see the self._should_terminate we immediately exit
168
# without waiting for client threads to shut down. (I don't know
169
# if we *can* because they are marked as Daemon.)
170
# Which means that while in-theory this is a graceful shutdown,
171
# because we don't actively close the connections, etc, we don't
172
# have a good way (yet) to poll the spawned clients and
173
trace.note('Requested to stop gracefully')
174
self._should_terminate = True
175
self._gracefully_stopping = True
176
for handler, _ in self._active_connections:
177
handler._stop_gracefully()
179
def _wait_for_clients_to_disconnect(self):
180
self._poll_active_connections()
181
if not self._active_connections:
183
trace.note('Waiting for %d client(s) to finish'
184
% (len(self._active_connections),))
185
t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
186
while self._active_connections:
188
if now >= t_next_log:
189
trace.note('Still waiting for %d client(s) to finish'
190
% (len(self._active_connections),))
191
t_next_log = now + self._LOG_WAITING_TIMEOUT
192
self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
194
def serve(self, thread_name_suffix=''):
195
# Note: There is a temptation to do
196
# signals.register_on_hangup(id(self), self._stop_gracefully)
197
# However, that creates a temporary object which is a bound
198
# method. signals._on_sighup is a WeakKeyDictionary so it
199
# immediately gets garbage collected, because nothing else
200
# references it. Instead, we need to keep a real reference to the
201
# bound method for the lifetime of the serve() function.
202
stop_gracefully = self._stop_gracefully
203
signals.register_on_hangup(id(self), stop_gracefully)
204
self._should_terminate = False
205
# for hooks we are letting code know that a server has started (and
207
self.run_server_started_hooks()
125
208
self._started.set()
135
218
# if the socket is closed by stop_background_thread
136
219
# we might get a EBADF here, any other socket errors
137
220
# should get logged.
138
if e.args[0] != errno.EBADF:
221
if e.args[0] not in (errno.EBADF, errno.EINTR):
139
222
trace.warning("listening socket error: %s", e)
141
224
if self._should_terminate:
143
227
self.serve_conn(conn, thread_name_suffix)
228
# Cleanout any threads that have finished processing.
229
self._poll_active_connections()
144
230
except KeyboardInterrupt:
145
231
# dont log when CTRL-C'd.
148
234
trace.report_exception(sys.exc_info(), sys.stderr)
153
238
# ensure the server socket is closed.
154
239
self._server_socket.close()
155
240
except self._socket_error:
156
241
# ignore errors on close
158
for hook in SmartTCPServer.hooks['server_stopped']:
159
hook(backing_urls, self.get_url())
244
signals.unregister_on_hangup(id(self))
245
self.run_server_stopped_hooks()
246
if self._gracefully_stopping:
247
self._wait_for_clients_to_disconnect()
248
self._fully_stopped.set()
161
250
def get_url(self):
162
251
"""Return the url of the server"""
163
return "bzr://%s:%d/" % self._sockname
252
return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
254
def _make_handler(self, conn):
255
return medium.SmartServerSocketStreamMedium(
256
conn, self.backing_transport, self.root_client_path,
257
timeout=self._client_timeout)
259
def _poll_active_connections(self, timeout=0.0):
260
"""Check to see if any active connections have finished.
262
This will iterate through self._active_connections, and update any
263
connections that are finished.
265
:param timeout: The timeout to pass to thread.join(). By default, we
266
set it to 0, so that we don't hang if threads are not done yet.
270
for handler, thread in self._active_connections:
273
still_active.append((handler, thread))
274
self._active_connections = still_active
165
276
def serve_conn(self, conn, thread_name_suffix):
166
277
# For WIN32, where the timeout value from the listening socket
167
278
# propagates to the newly accepted socket.
168
279
conn.setblocking(True)
169
280
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
170
handler = medium.SmartServerSocketStreamMedium(
171
conn, self.backing_transport, self.root_client_path)
172
281
thread_name = 'smart-server-child' + thread_name_suffix
282
handler = self._make_handler(conn)
173
283
connection_thread = threading.Thread(
174
284
None, handler.serve, name=thread_name)
285
self._active_connections.append((handler, connection_thread))
175
286
connection_thread.setDaemon(True)
176
287
connection_thread.start()
288
return connection_thread
178
290
def start_background_thread(self, thread_name_suffix=''):
179
291
self._started.clear()
219
331
These are all empty initially, because by default nothing should get
223
self.create_hook(HookPoint('server_started',
334
Hooks.__init__(self, "bzrlib.smart.server", "SmartTCPServer.hooks")
335
self.add_hook('server_started',
224
336
"Called by the bzr server when it starts serving a directory. "
225
337
"server_started is called with (backing urls, public url), "
226
338
"where backing_url is a list of URLs giving the "
227
339
"server-specific directory locations, and public_url is the "
228
"public URL for the directory being served.", (0, 16), None))
229
self.create_hook(HookPoint('server_started_ex',
340
"public URL for the directory being served.", (0, 16))
341
self.add_hook('server_started_ex',
230
342
"Called by the bzr server when it starts serving a directory. "
231
343
"server_started is called with (backing_urls, server_obj).",
233
self.create_hook(HookPoint('server_stopped',
345
self.add_hook('server_stopped',
234
346
"Called by the bzr server when it stops serving a directory. "
235
347
"server_stopped is called with the same parameters as the "
236
"server_started hook: (backing_urls, public_url).", (0, 16), None))
348
"server_started hook: (backing_urls, public_url).", (0, 16))
349
self.add_hook('server_exception',
350
"Called by the bzr server when an exception occurs. "
351
"server_exception is called with the sys.exc_info() tuple "
352
"return true for the hook if the exception has been handled, "
353
"in which case the server will exit normally.", (2, 4))
238
355
SmartTCPServer.hooks = SmartServerHooks()
305
422
chroot_server = chroot.ChrootServer(transport)
306
423
chroot_server.start_server()
307
424
self.cleanups.append(chroot_server.stop_server)
308
transport = get_transport(chroot_server.get_url())
425
transport = _mod_transport.get_transport_from_url(chroot_server.get_url())
309
426
if self.base_path is not None:
310
427
# Decorate the server's backing transport with a filter that can
311
428
# expand homedirs.
312
429
expand_userdirs = self._make_expand_userdirs_filter(transport)
313
430
expand_userdirs.start_server()
314
431
self.cleanups.append(expand_userdirs.stop_server)
315
transport = get_transport(expand_userdirs.get_url())
432
transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
316
433
self.transport = transport
318
def _make_smart_server(self, host, port, inet):
435
def _get_stdin_stdout(self):
436
return sys.stdin, sys.stdout
438
def _make_smart_server(self, host, port, inet, timeout):
440
c = config.GlobalStack()
441
timeout = c.get('serve.client_timeout')
443
stdin, stdout = self._get_stdin_stdout()
320
444
smart_server = medium.SmartServerPipeStreamMedium(
321
sys.stdin, sys.stdout, self.transport)
445
stdin, stdout, self.transport, timeout=timeout)
324
448
host = medium.BZR_DEFAULT_INTERFACE
326
450
port = medium.BZR_DEFAULT_PORT
327
smart_server = SmartTCPServer(self.transport, host=host, port=port)
328
trace.note('listening on port: %s' % smart_server.port)
451
smart_server = SmartTCPServer(self.transport,
452
client_timeout=timeout)
453
smart_server.start_server(host, port)
454
trace.note(gettext('listening on port: %s') % smart_server.port)
329
455
self.smart_server = smart_server
331
457
def _change_globals(self):
342
468
self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
343
469
ui.ui_factory = ui.SilentUIFactory()
344
470
lockdir._DEFAULT_TIMEOUT_SECONDS = 0
471
orig = signals.install_sighup_handler()
472
def restore_signals():
473
signals.restore_sighup_handler(orig)
474
self.cleanups.append(restore_signals)
346
def set_up(self, transport, host, port, inet):
476
def set_up(self, transport, host, port, inet, timeout):
347
477
self._make_backing_transport(transport)
348
self._make_smart_server(host, port, inet)
478
self._make_smart_server(host, port, inet, timeout)
349
479
self._change_globals()
351
481
def tear_down(self):
356
def serve_bzr(transport, host=None, port=None, inet=False):
486
def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
357
487
"""This is the default implementation of 'bzr serve'.
359
489
It creates a TCP or pipe smart server on 'transport, and runs it. The
360
490
transport will be decorated with a chroot and pathfilter (using
361
491
os.path.expanduser).
363
493
bzr_server = BzrServerFactory()
365
bzr_server.set_up(transport, host, port, inet)
495
bzr_server.set_up(transport, host, port, inet, timeout)
366
496
bzr_server.smart_server.serve()
498
hook_caught_exception = False
499
for hook in SmartTCPServer.hooks['server_exception']:
500
hook_caught_exception = hook(sys.exc_info())
501
if not hook_caught_exception:
368
504
bzr_server.tear_down()