232
227
raise NotImplementedError
235
class ThreadWithException(threading.Thread):
236
"""A catching exception thread.
238
If an exception occurs during the thread execution, it's caught and
239
re-raised when the thread is joined().
242
def __init__(self, *args, **kwargs):
243
# There are cases where the calling thread must wait, yet, if an
244
# exception occurs, the event should be set so the caller is not
245
# blocked. The main example is a calling thread that want to wait for
246
# the called thread to be in a given state before continuing.
248
event = kwargs.pop('event')
250
# If the caller didn't pass a specific event, create our own
251
event = threading.Event()
252
super(ThreadWithException, self).__init__(*args, **kwargs)
253
self.set_event(event)
254
self.exception = None
256
def set_event(self, event):
260
"""Overrides Thread.run to capture any exception."""
263
super(ThreadWithException, self).run()
265
self.exception = sys.exc_info()
267
# Make sure the calling thread is released
271
def join(self, timeout=5):
272
"""Overrides Thread.join to raise any exception caught.
275
Calling join(timeout=0) will raise the caught exception or return None
276
if the thread is still alive.
278
The default timeout is set to 5 and should expire only when a thread
279
serving a client connection is hung.
281
super(ThreadWithException, self).join(timeout)
282
if self.exception is not None:
283
exc_class, exc_value, exc_tb = self.exception
284
self.execption = None # The exception should be raised only once
285
raise exc_class, exc_value, exc_tb
286
if timeout and self.isAlive():
287
# The timeout expired without joining the thread, the thread is
288
# therefore stucked and that's a failure as far as the test is
289
# concerned. We used to hang here.
290
raise AssertionError('thread %s hung' % (self.name,))
292
def pending_exception(self):
293
"""Raise the caught exception.
295
This does nothing if no exception occurred.
300
class TestingTCPServerMixin:
301
"""Mixin to support running SocketServer.TCPServer in a thread.
303
Tests are connecting from the main thread, the server has to be run in a
307
def __init__(self, sibling_class):
308
self.sibling_class = sibling_class
309
self.started = threading.Event()
310
self.serving = threading.Event()
311
self.stopped = threading.Event()
312
# We collect the resources used by the clients so we can release them
316
def server_bind(self):
317
# We need to override the SocketServer bind, yet, we still want to use
318
# it so we need to use the sibling class to call it explicitly
319
self.sibling_class.server_bind(self)
320
# The following has been fixed in 2.5 so we need to provide it for
321
# older python versions.
322
if sys.version < (2, 5):
323
self.server_address = self.socket.getsockname()
328
# We are listening and ready to accept connections
330
while self.serving.isSet():
331
# Really a connection but the python framework is generic and
333
self.handle_request()
334
# Let's close the listening socket
338
def verify_request(self, request, client_address):
339
"""Verify the request.
341
Return True if we should proceed with this request, False if we should
342
not even touch a single byte in the socket ! This is useful when we
343
stop the server with a dummy last connection.
345
return self.serving.isSet()
347
def handle_error(self, request, client_address):
348
# Stop serving and re-raise the last exception seen
350
self.sibling_class.handle_error(self, request, client_address)
353
# The following methods are called by the main thread
355
def stop_client_connections(self):
357
c = self.clients.pop()
358
self.shutdown_client(c)
360
def shutdown_client_socket(self, sock):
361
"""Properly shutdown a client socket.
363
Under some circumstances (as in bug #383920), we need to force the
364
shutdown as python delays it until gc occur otherwise and the client
367
This should be called only when no other thread is trying to use the
371
# The request process has been completed, the thread is about to
372
# die, let's shutdown the socket if we can.
373
sock.shutdown(socket.SHUT_RDWR)
375
except (socket.error, select.error), e:
376
if e[0] in (errno.EBADF, errno.ENOTCONN):
377
# Right, the socket is already down
383
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
385
def __init__(self, server_address, request_handler_class):
386
TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
387
SocketServer.TCPServer.__init__(self, server_address,
388
request_handler_class)
390
def get_request(self):
391
"""Get the request and client address from the socket."""
392
sock, addr = self.sibling_class.get_request(self)
393
self.clients.append((sock, addr))
396
# The following methods are called by the main thread
398
def shutdown_client(self, client):
400
self.shutdown_client_socket(sock)
404
class TestingThreadingTCPServer(TestingTCPServerMixin,
405
SocketServer.ThreadingTCPServer):
407
def __init__(self, server_address, request_handler_class):
408
TestingTCPServerMixin.__init__(self, SocketServer.ThreadingTCPServer)
409
SocketServer.TCPServer.__init__(self, server_address,
410
request_handler_class)
412
def get_request (self):
413
"""Get the request and client address from the socket."""
414
sock, addr = self.sibling_class.get_request(self)
415
# The thread is not create yet, it will be updated in process_request
416
self.clients.append((sock, addr, None))
419
def process_request_thread(self, started, stopped, request, client_address):
421
SocketServer.ThreadingTCPServer.process_request_thread(
422
self, request, client_address)
423
self.close_request(request)
426
def process_request(self, request, client_address):
427
"""Start a new thread to process the request."""
428
started = threading.Event()
429
stopped = threading.Event()
430
t = ThreadWithException(
432
target = self.process_request_thread,
433
args = (started, stopped, request, client_address))
434
t.name = '%s -> %s' % (client_address, self.server_address)
435
# Update the client description
437
self.clients.append((request, client_address, t))
440
# If an exception occured during the thread start, it will get raised.
441
t.pending_exception()
443
# The following methods are called by the main thread
445
def shutdown_client(self, client):
446
sock, addr, t = client
447
self.shutdown_client_socket(sock)
449
# The thread has been created only if the request is processed but
450
# after the connection is inited. This could happen during server
451
# shutdown. If an exception occurred in the thread it will be
456
class TestingTCPServerInAThread(transport.Server):
457
"""A server in a thread that re-raise thread exceptions."""
459
def __init__(self, server_address, server_class, request_handler_class):
460
self.server_class = server_class
461
self.request_handler_class = request_handler_class
462
self.host, self.port = server_address
466
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
468
def create_server(self):
469
return self.server_class((self.host, self.port),
470
self.request_handler_class)
472
def start_server(self):
473
self.server = self.create_server()
474
self._server_thread = ThreadWithException(
475
event=self.server.started, target=self.run_server)
476
self._server_thread.start()
477
# Wait for the server thread to start (i.e release the lock)
478
self.server.started.wait()
479
# Get the real address, especially the port
480
self.host, self.port = self.server.server_address
481
self._server_thread.name = '(%s:%s)' % (self.host, self.port)
482
# If an exception occured during the server start, it will get raised,
483
# otherwise, the server is blocked on its accept() call.
484
self._server_thread.pending_exception()
485
# From now on, we'll use a different event to ensure the server can set
487
self._server_thread.set_event(self.server.stopped)
489
def run_server(self):
492
def stop_server(self):
493
if self.server is None:
496
# The server has been started successfully, shut it down now. As
497
# soon as we stop serving, no more connection are accepted except
498
# one to get out of the blocking listen.
499
self.server.serving.clear()
500
# The server is listening for a last connection, let's give it:
503
last_conn = osutils.connect_socket((self.host, self.port))
504
except socket.error, e:
505
# But ignore connection errors as the point is to unblock the
506
# server thread, it may happen that it's not blocked or even
509
# We start shutting down the client while the server itself is
511
self.server.stop_client_connections()
512
# Now we wait for the thread running self.server.serve() to finish
513
self.server.stopped.wait()
514
if last_conn is not None:
515
# Close the last connection without trying to use it. The
516
# server will not process a single byte on that socket to avoid
517
# complications (SSL starts with a handshake for example).
519
# Check for any exception that could have occurred in the server
521
self._server_thread.join()
523
# Make sure we can be called twice safely, note that this means
524
# that we will raise a single exception even if several occurred in
525
# the various threads involved.
529
230
class SmartTCPServer_for_testing(server.SmartTCPServer):
530
231
"""Server suitable for use by transport tests.