235
233
def start_server(self, backing_server=None):
236
234
"""Setup the Chroot on backing_server."""
237
235
if backing_server is not None:
238
self.backing_transport = transport.get_transport_from_url(
236
self.backing_transport = transport.get_transport(
239
237
backing_server.get_url())
241
self.backing_transport = transport.get_transport_from_path('.')
239
self.backing_transport = transport.get_transport('.')
242
240
super(TestingChrootServer, self).start_server()
244
242
def get_bogus_url(self):
245
243
raise NotImplementedError
248
class TestThread(cethread.CatchingExceptionThread):
246
class ThreadWithException(threading.Thread):
247
"""A catching exception thread.
249
If an exception occurs during the thread execution, it's caught and
250
re-raised when the thread is joined().
253
def __init__(self, *args, **kwargs):
254
# There are cases where the calling thread must wait, yet, if an
255
# exception occurs, the event should be set so the caller is not
256
# blocked. The main example is a calling thread that want to wait for
257
# the called thread to be in a given state before continuing.
259
event = kwargs.pop('event')
261
# If the caller didn't pass a specific event, create our own
262
event = threading.Event()
263
super(ThreadWithException, self).__init__(*args, **kwargs)
264
self.set_ready_event(event)
265
self.exception = None
266
self.ignored_exceptions = None # see set_ignored_exceptions
268
# compatibility thunk for python-2.4 and python-2.5...
269
if sys.version_info < (2, 6):
270
name = property(threading.Thread.getName, threading.Thread.setName)
272
def set_ready_event(self, event):
273
"""Set the ``ready`` event used to synchronize exception catching.
275
When the thread uses an event to synchronize itself with another thread
276
(setting it when the other thread can wake up from a ``wait`` call),
277
the event must be set after catching an exception or the other thread
280
Some threads require multiple events and should set the relevant one
285
def set_ignored_exceptions(self, ignored):
286
"""Declare which exceptions will be ignored.
288
:param ignored: Can be either:
289
- None: all exceptions will be raised,
290
- an exception class: the instances of this class will be ignored,
291
- a tuple of exception classes: the instances of any class of the
292
list will be ignored,
293
- a callable: that will be passed the exception object
294
and should return True if the exception should be ignored
297
self.ignored_exceptions = None
298
elif isinstance(ignored, (Exception, tuple)):
299
self.ignored_exceptions = lambda e: isinstance(e, ignored)
301
self.ignored_exceptions = ignored
304
"""Overrides Thread.run to capture any exception."""
308
super(ThreadWithException, self).run()
310
self.exception = sys.exc_info()
312
# Make sure the calling thread is released
250
316
def join(self, timeout=5):
251
"""Overrides to use a default timeout.
317
"""Overrides Thread.join to raise any exception caught.
320
Calling join(timeout=0) will raise the caught exception or return None
321
if the thread is still alive.
253
323
The default timeout is set to 5 and should expire only when a thread
254
324
serving a client connection is hung.
256
super(TestThread, self).join(timeout)
326
super(ThreadWithException, self).join(timeout)
327
if self.exception is not None:
328
exc_class, exc_value, exc_tb = self.exception
329
self.exception = None # The exception should be raised only once
330
if (self.ignored_exceptions is None
331
or not self.ignored_exceptions(exc_value)):
332
# Raise non ignored exceptions
333
raise exc_class, exc_value, exc_tb
257
334
if timeout and self.isAlive():
258
335
# The timeout expired without joining the thread, the thread is
259
336
# therefore stucked and that's a failure as far as the test is
334
418
# The following can be used for debugging purposes, it will display the
335
419
# exception and the traceback just when it occurs instead of waiting
336
420
# for the thread to be joined.
337
422
# SocketServer.BaseServer.handle_error(self, request, client_address)
339
# We call close_request manually, because we are going to raise an
340
# exception. The SocketServer implementation calls:
343
# But because we raise the exception, close_request will never be
344
# triggered. This helps client not block waiting for a response when
345
# the server gets an exception.
346
self.close_request(request)
349
425
def ignored_exceptions_during_shutdown(self, e):
429
505
SocketServer.ThreadingTCPServer.__init__(self, server_address,
430
506
request_handler_class)
432
def get_request(self):
508
def get_request (self):
433
509
"""Get the request and client address from the socket."""
434
510
sock, addr = TestingTCPServerMixin.get_request(self)
435
# The thread is not created yet, it will be updated in process_request
511
# The thread is not create yet, it will be updated in process_request
436
512
self.clients.append((sock, addr, None))
437
513
return sock, addr
439
def process_request_thread(self, started, detached, stopped,
440
request, client_address):
515
def process_request_thread(self, started, stopped, request, client_address):
442
# We will be on our own once the server tells us we're detached
444
517
SocketServer.ThreadingTCPServer.process_request_thread(
445
518
self, request, client_address)
446
519
self.close_request(request)
449
522
def process_request(self, request, client_address):
450
523
"""Start a new thread to process the request."""
451
524
started = threading.Event()
452
detached = threading.Event()
453
525
stopped = threading.Event()
526
t = ThreadWithException(
456
528
name='%s -> %s' % (client_address, self.server_address),
457
529
target = self.process_request_thread,
458
args = (started, detached, stopped, request, client_address))
530
args = (started, stopped, request, client_address))
459
531
# Update the client description
460
532
self.clients.pop()
461
533
self.clients.append((request, client_address, t))
462
# Propagate the exception handler since we must use the same one as
463
# TestingTCPServer for connections running in their own threads.
534
# Propagate the exception handler since we must use the same one for
535
# connections running in their own threads than TestingTCPServer.
464
536
t.set_ignored_exceptions(self.ignored_exceptions)
540
sys.stderr.write('Client thread %s started\n' % (t.name,))
467
541
# If an exception occured during the thread start, it will get raised.
468
542
t.pending_exception()
470
sys.stderr.write('Client thread %s started\n' % (t.name,))
471
# Tell the thread, it's now on its own for exception handling.
474
544
# The following methods are called by the main thread
605
675
def __init__(self, request, client_address, server):
606
676
medium.SmartServerSocketStreamMedium.__init__(
607
677
self, request, server.backing_transport,
608
server.root_client_path,
609
timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
678
server.root_client_path)
610
679
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
611
680
SocketServer.BaseRequestHandler.__init__(self, request, client_address,
614
683
def handle(self):
616
while not self.finished:
617
server_protocol = self._build_protocol()
618
self._serve_one_request(server_protocol)
619
except errors.ConnectionTimeout:
620
# idle connections aren't considered a failure of the server
624
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
684
while not self.finished:
685
server_protocol = self._build_protocol()
686
self._serve_one_request(server_protocol)
626
689
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):