247
250
# If the caller didn't pass a specific event, create our own
248
251
event = threading.Event()
249
252
super(ThreadWithException, self).__init__(*args, **kwargs)
253
self.set_event(event)
254
self.exception = None
256
def set_event(self, event):
250
257
self.ready = event
251
self.exception = None
254
260
"""Overrides Thread.run to capture any exception."""
255
261
self.ready.clear()
257
super(ThreadWithException, self).run()
259
self.exception = sys.exc_info()
264
super(ThreadWithException, self).run()
266
self.exception = sys.exc_info()
261
268
# Make sure the calling thread is released
265
def join(self, *args, **kwargs):
272
def join(self, timeout=5):
266
273
"""Overrides Thread.join to raise any exception caught.
269
276
Calling join(timeout=0) will raise the caught exception or return None
270
is the thread is still alive.
277
if the thread is still alive.
279
The default timeout is set to 5 and should expire only when a thread
280
serving a client connection is hung.
272
# Note that we don't care about the timeout parameter here: either the
273
# thread has raised an exception and it should be raised (and join()
274
# should succeed whatever the timeout is) or it's still alive which
275
# means it didn't encounter an exception.
276
super(ThreadWithException, self).join(*args, **kwargs)
282
super(ThreadWithException, self).join(timeout)
277
283
if self.exception is not None:
278
284
exc_class, exc_value, exc_tb = self.exception
285
self.execption = None # The exception should be raised only once
279
286
raise exc_class, exc_value, exc_tb
287
if timeout and self.isAlive():
288
# The timeout expired without joining the thread, the thread is
289
# therefore stucked and that's a failure as far as the test is
290
# concerned. We used to hang here.
291
raise AssertionError('thread %s hung' % (self.name,))
293
def pending_exception(self):
294
"""Raise the caught exception.
296
This does nothing if no exception occurred.
301
class TestingTCPServerMixin:
302
"""Mixin to support running SocketServer.TCPServer in a thread.
304
Tests are connecting from the main thread, the server has to be run in a
308
# FIXME: sibling_class is a hack -- vila 20100604
309
def __init__(self, sibling_class):
310
self.sibling_class = sibling_class
311
self.started = threading.Event()
312
self.serving = threading.Event()
313
self.stopped = threading.Event()
314
# We collect the resources used by the clients so we can release them
318
def server_bind(self):
319
# We need to override the SocketServer bind, yet, we still want to use
320
# it so we need to use the sibling class to call it explicitly
321
self.sibling_class.server_bind(self)
322
# The following has been fixed in 2.5 so we need to provide it for
323
# older python versions.
324
if sys.version < (2, 5):
325
self.server_address = self.socket.getsockname()
330
# We are listening and ready to accept connections
332
while self.serving.isSet():
333
# Really a connection but the python framework is generic and
335
self.handle_request()
336
# Let's close the listening socket
340
def handle_request(self):
341
"""Handle one request.
343
The python version swallows some socket exceptions and we don't use
344
timeout, so we override to better control the server behavior.
346
request, client_address = self.get_request()
347
if self.verify_request(request, client_address):
349
self.process_request(request, client_address)
351
self.handle_error(request, client_address)
352
self.close_request(request)
354
def verify_request(self, request, client_address):
355
"""Verify the request.
357
Return True if we should proceed with this request, False if we should
358
not even touch a single byte in the socket ! This is useful when we
359
stop the server with a dummy last connection.
361
return self.serving.isSet()
363
def handle_error(self, request, client_address):
364
# Stop serving and re-raise the last exception seen
366
# self.sibling_class.handle_error(self, request, client_address)
369
# The following methods are called by the main thread
371
def stop_client_connections(self):
373
c = self.clients.pop()
374
self.shutdown_client(c)
376
def shutdown_client_socket(self, sock):
377
"""Properly shutdown a client socket.
379
Under some circumstances (as in bug #383920), we need to force the
380
shutdown as python delays it until gc occur otherwise and the client
383
This should be called only when no other thread is trying to use the
387
# The request process has been completed, the thread is about to
388
# die, let's shutdown the socket if we can.
389
sock.shutdown(socket.SHUT_RDWR)
391
except (socket.error, select.error), e:
392
if e[0] in (errno.EBADF, errno.ENOTCONN):
393
# Right, the socket is already down
399
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
401
def __init__(self, server_address, request_handler_class):
402
TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
403
SocketServer.TCPServer.__init__(self, server_address,
404
request_handler_class)
406
def get_request(self):
407
"""Get the request and client address from the socket."""
408
sock, addr = self.sibling_class.get_request(self)
409
self.clients.append((sock, addr))
412
# The following methods are called by the main thread
414
def shutdown_client(self, client):
416
self.shutdown_client_socket(sock)
420
class TestingThreadingTCPServer(TestingTCPServerMixin,
421
SocketServer.ThreadingTCPServer):
423
def __init__(self, server_address, request_handler_class):
424
TestingTCPServerMixin.__init__(self, SocketServer.ThreadingTCPServer)
425
SocketServer.TCPServer.__init__(self, server_address,
426
request_handler_class)
428
def get_request (self):
429
"""Get the request and client address from the socket."""
430
sock, addr = self.sibling_class.get_request(self)
431
# The thread is not create yet, it will be updated in process_request
432
self.clients.append((sock, addr, None))
435
def process_request_thread(self, started, stopped, request, client_address):
437
SocketServer.ThreadingTCPServer.process_request_thread(
438
self, request, client_address)
439
self.close_request(request)
442
def process_request(self, request, client_address):
443
"""Start a new thread to process the request."""
444
started = threading.Event()
445
stopped = threading.Event()
446
t = ThreadWithException(
448
target = self.process_request_thread,
449
args = (started, stopped, request, client_address))
450
t.name = '%s -> %s' % (client_address, self.server_address)
451
# Update the client description
453
self.clients.append((request, client_address, t))
456
# If an exception occured during the thread start, it will get raised.
457
t.pending_exception()
459
# The following methods are called by the main thread
461
def shutdown_client(self, client):
462
sock, addr, t = client
463
self.shutdown_client_socket(sock)
465
# The thread has been created only if the request is processed but
466
# after the connection is inited. This could happen during server
467
# shutdown. If an exception occurred in the thread it will be
472
class TestingTCPServerInAThread(transport.Server):
473
"""A server in a thread that re-raise thread exceptions."""
475
def __init__(self, server_address, server_class, request_handler_class):
476
self.server_class = server_class
477
self.request_handler_class = request_handler_class
478
self.host, self.port = server_address
482
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
484
def create_server(self):
485
return self.server_class((self.host, self.port),
486
self.request_handler_class)
488
def start_server(self):
489
self.server = self.create_server()
490
self._server_thread = ThreadWithException(
491
event=self.server.started, target=self.run_server)
492
self._server_thread.start()
493
# Wait for the server thread to start (i.e release the lock)
494
self.server.started.wait()
495
# Get the real address, especially the port
496
self.host, self.port = self.server.server_address
497
self._server_thread.name = '(%s:%s)' % (self.host, self.port)
498
# If an exception occured during the server start, it will get raised,
499
# otherwise, the server is blocked on its accept() call.
500
self._server_thread.pending_exception()
501
# From now on, we'll use a different event to ensure the server can set
503
self._server_thread.set_event(self.server.stopped)
505
def run_server(self):
508
def stop_server(self):
509
if self.server is None:
512
# The server has been started successfully, shut it down now. As
513
# soon as we stop serving, no more connection are accepted except
514
# one to get out of the blocking listen.
515
self.server.serving.clear()
516
# The server is listening for a last connection, let's give it:
519
last_conn = osutils.connect_socket((self.host, self.port))
520
except socket.error, e:
521
# But ignore connection errors as the point is to unblock the
522
# server thread, it may happen that it's not blocked or even
525
# We start shutting down the client while the server itself is
527
self.server.stop_client_connections()
528
# Now we wait for the thread running self.server.serve() to finish
529
self.server.stopped.wait()
530
if last_conn is not None:
531
# Close the last connection without trying to use it. The
532
# server will not process a single byte on that socket to avoid
533
# complications (SSL starts with a handshake for example).
535
# Check for any exception that could have occurred in the server
537
self._server_thread.join()
539
# Make sure we can be called twice safely, note that this means
540
# that we will raise a single exception even if several occurred in
541
# the various threads involved.
282
545
class SmartTCPServer_for_testing(server.SmartTCPServer):