223
243
raise NotImplementedError
226
class SmartTCPServer_for_testing(server.SmartTCPServer):
246
class ThreadWithException(threading.Thread):
247
"""A catching exception thread.
249
If an exception occurs during the thread execution, it's caught and
250
re-raised when the thread is joined().
253
def __init__(self, *args, **kwargs):
254
# There are cases where the calling thread must wait, yet, if an
255
# exception occurs, the event should be set so the caller is not
256
# blocked. The main example is a calling thread that want to wait for
257
# the called thread to be in a given state before continuing.
259
event = kwargs.pop('event')
261
# If the caller didn't pass a specific event, create our own
262
event = threading.Event()
263
super(ThreadWithException, self).__init__(*args, **kwargs)
264
self.set_ready_event(event)
265
self.exception = None
266
self.ignored_exceptions = None # see set_ignored_exceptions
268
# compatibility thunk for python-2.4 and python-2.5...
269
if sys.version_info < (2, 6):
270
name = property(threading.Thread.getName, threading.Thread.setName)
272
def set_ready_event(self, event):
273
"""Set the ``ready`` event used to synchronize exception catching.
275
When the thread uses an event to synchronize itself with another thread
276
(setting it when the other thread can wake up from a ``wait`` call),
277
the event must be set after catching an exception or the other thread
280
Some threads require multiple events and should set the relevant one
285
def set_ignored_exceptions(self, ignored):
286
"""Declare which exceptions will be ignored.
288
:param ignored: Can be either:
289
- None: all exceptions will be raised,
290
- an exception class: the instances of this class will be ignored,
291
- a tuple of exception classes: the instances of any class of the
292
list will be ignored,
293
- a callable: that will be passed the exception object
294
and should return True if the exception should be ignored
297
self.ignored_exceptions = None
298
elif isinstance(ignored, (Exception, tuple)):
299
self.ignored_exceptions = lambda e: isinstance(e, ignored)
301
self.ignored_exceptions = ignored
304
"""Overrides Thread.run to capture any exception."""
308
super(ThreadWithException, self).run()
310
self.exception = sys.exc_info()
312
# Make sure the calling thread is released
316
def join(self, timeout=5):
317
"""Overrides Thread.join to raise any exception caught.
320
Calling join(timeout=0) will raise the caught exception or return None
321
if the thread is still alive.
323
The default timeout is set to 5 and should expire only when a thread
324
serving a client connection is hung.
326
super(ThreadWithException, self).join(timeout)
327
if self.exception is not None:
328
exc_class, exc_value, exc_tb = self.exception
329
self.exception = None # The exception should be raised only once
330
if (self.ignored_exceptions is None
331
or not self.ignored_exceptions(exc_value)):
332
# Raise non ignored exceptions
333
raise exc_class, exc_value, exc_tb
334
if timeout and self.isAlive():
335
# The timeout expired without joining the thread, the thread is
336
# therefore stucked and that's a failure as far as the test is
337
# concerned. We used to hang here.
339
# FIXME: we need to kill the thread, but as far as the test is
340
# concerned, raising an assertion is too strong. On most of the
341
# platforms, this doesn't occur, so just mentioning the problem is
342
# enough for now -- vila 2010824
343
sys.stderr.write('thread %s hung\n' % (self.name,))
344
#raise AssertionError('thread %s hung' % (self.name,))
346
def pending_exception(self):
347
"""Raise the caught exception.
349
This does nothing if no exception occurred.
354
class TestingTCPServerMixin:
355
"""Mixin to support running SocketServer.TCPServer in a thread.
357
Tests are connecting from the main thread, the server has to be run in a
362
self.started = threading.Event()
364
self.stopped = threading.Event()
365
# We collect the resources used by the clients so we can release them
368
self.ignored_exceptions = None
370
def server_bind(self):
371
self.socket.bind(self.server_address)
372
self.server_address = self.socket.getsockname()
377
# We are listening and ready to accept connections
381
# Really a connection but the python framework is generic and
383
self.handle_request()
384
# Let's close the listening socket
389
def handle_request(self):
390
"""Handle one request.
392
The python version swallows some socket exceptions and we don't use
393
timeout, so we override it to better control the server behavior.
395
request, client_address = self.get_request()
396
if self.verify_request(request, client_address):
398
self.process_request(request, client_address)
400
self.handle_error(request, client_address)
401
self.close_request(request)
403
def get_request(self):
404
return self.socket.accept()
406
def verify_request(self, request, client_address):
407
"""Verify the request.
409
Return True if we should proceed with this request, False if we should
410
not even touch a single byte in the socket ! This is useful when we
411
stop the server with a dummy last connection.
415
def handle_error(self, request, client_address):
416
# Stop serving and re-raise the last exception seen
418
# The following can be used for debugging purposes, it will display the
419
# exception and the traceback just when it occurs instead of waiting
420
# for the thread to be joined.
422
# SocketServer.BaseServer.handle_error(self, request, client_address)
425
def ignored_exceptions_during_shutdown(self, e):
426
if sys.platform == 'win32':
427
accepted_errnos = [errno.EBADF, errno.WSAEBADF, errno.WSAENOTCONN,
428
errno.WSAECONNRESET, errno.WSAESHUTDOWN]
430
accepted_errnos = [errno.EBADF, errno.ENOTCONN, errno.ECONNRESET]
431
if isinstance(e, socket.error) and e[0] in accepted_errnos:
435
# The following methods are called by the main thread
437
def stop_client_connections(self):
439
c = self.clients.pop()
440
self.shutdown_client(c)
442
def shutdown_socket(self, sock):
443
"""Properly shutdown a socket.
445
This should be called only when no other thread is trying to use the
449
sock.shutdown(socket.SHUT_RDWR)
452
if self.ignored_exceptions(e):
457
# The following methods are called by the main thread
459
def set_ignored_exceptions(self, thread, ignored_exceptions):
460
self.ignored_exceptions = ignored_exceptions
461
thread.set_ignored_exceptions(self.ignored_exceptions)
463
def _pending_exception(self, thread):
464
"""Raise server uncaught exception.
466
Daughter classes can override this if they use daughter threads.
468
thread.pending_exception()
471
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
473
def __init__(self, server_address, request_handler_class):
474
TestingTCPServerMixin.__init__(self)
475
SocketServer.TCPServer.__init__(self, server_address,
476
request_handler_class)
478
def get_request(self):
479
"""Get the request and client address from the socket."""
480
sock, addr = TestingTCPServerMixin.get_request(self)
481
self.clients.append((sock, addr))
484
# The following methods are called by the main thread
486
def shutdown_client(self, client):
488
self.shutdown_socket(sock)
491
class TestingThreadingTCPServer(TestingTCPServerMixin,
492
SocketServer.ThreadingTCPServer):
494
def __init__(self, server_address, request_handler_class):
495
TestingTCPServerMixin.__init__(self)
496
SocketServer.ThreadingTCPServer.__init__(self, server_address,
497
request_handler_class)
499
def get_request (self):
500
"""Get the request and client address from the socket."""
501
sock, addr = TestingTCPServerMixin.get_request(self)
502
# The thread is not create yet, it will be updated in process_request
503
self.clients.append((sock, addr, None))
506
def process_request_thread(self, started, stopped, request, client_address):
508
SocketServer.ThreadingTCPServer.process_request_thread(
509
self, request, client_address)
510
self.close_request(request)
513
def process_request(self, request, client_address):
514
"""Start a new thread to process the request."""
515
started = threading.Event()
516
stopped = threading.Event()
517
t = ThreadWithException(
519
name='%s -> %s' % (client_address, self.server_address),
520
target = self.process_request_thread,
521
args = (started, stopped, request, client_address))
522
# Update the client description
524
self.clients.append((request, client_address, t))
525
# Propagate the exception handler since we must use the same one for
526
# connections running in their own threads than TestingTCPServer.
527
t.set_ignored_exceptions(self.ignored_exceptions)
531
sys.stderr.write('Client thread %s started\n' % (t.name,))
532
# If an exception occured during the thread start, it will get raised.
533
t.pending_exception()
535
# The following methods are called by the main thread
537
def shutdown_client(self, client):
538
sock, addr, connection_thread = client
539
self.shutdown_socket(sock)
540
if connection_thread is not None:
541
# The thread has been created only if the request is processed but
542
# after the connection is inited. This could happen during server
543
# shutdown. If an exception occurred in the thread it will be
546
sys.stderr.write('Client thread %s will be joined\n'
547
% (connection_thread.name,))
548
connection_thread.join()
550
def set_ignored_exceptions(self, thread, ignored_exceptions):
551
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
553
for sock, addr, connection_thread in self.clients:
554
if connection_thread is not None:
555
connection_thread.set_ignored_exceptions(
556
self.ignored_exceptions)
558
def _pending_exception(self, thread):
559
for sock, addr, connection_thread in self.clients:
560
if connection_thread is not None:
561
connection_thread.pending_exception()
562
TestingTCPServerMixin._pending_exception(self, thread)
565
class TestingTCPServerInAThread(transport.Server):
566
"""A server in a thread that re-raise thread exceptions."""
568
def __init__(self, server_address, server_class, request_handler_class):
569
self.server_class = server_class
570
self.request_handler_class = request_handler_class
571
self.host, self.port = server_address
573
self._server_thread = None
576
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
578
def create_server(self):
579
return self.server_class((self.host, self.port),
580
self.request_handler_class)
582
def start_server(self):
583
self.server = self.create_server()
584
self._server_thread = ThreadWithException(
585
event=self.server.started,
586
target=self.run_server)
587
self._server_thread.start()
588
# Wait for the server thread to start (i.e release the lock)
589
self.server.started.wait()
590
# Get the real address, especially the port
591
self.host, self.port = self.server.server_address
592
self._server_thread.name = self.server.server_address
594
sys.stderr.write('Server thread %s started\n'
595
% (self._server_thread.name,))
596
# If an exception occured during the server start, it will get raised,
597
# otherwise, the server is blocked on its accept() call.
598
self._server_thread.pending_exception()
599
# From now on, we'll use a different event to ensure the server can set
601
self._server_thread.set_ready_event(self.server.stopped)
603
def run_server(self):
606
def stop_server(self):
607
if self.server is None:
610
# The server has been started successfully, shut it down now. As
611
# soon as we stop serving, no more connection are accepted except
612
# one to get out of the blocking listen.
613
self.set_ignored_exceptions(
614
self.server.ignored_exceptions_during_shutdown)
615
self.server.serving = False
617
sys.stderr.write('Server thread %s will be joined\n'
618
% (self._server_thread.name,))
619
# The server is listening for a last connection, let's give it:
622
last_conn = osutils.connect_socket((self.host, self.port))
623
except socket.error, e:
624
# But ignore connection errors as the point is to unblock the
625
# server thread, it may happen that it's not blocked or even
628
# We start shutting down the client while the server itself is
630
self.server.stop_client_connections()
631
# Now we wait for the thread running self.server.serve() to finish
632
self.server.stopped.wait()
633
if last_conn is not None:
634
# Close the last connection without trying to use it. The
635
# server will not process a single byte on that socket to avoid
636
# complications (SSL starts with a handshake for example).
638
# Check for any exception that could have occurred in the server
641
self._server_thread.join()
643
if self.server.ignored_exceptions(e):
648
# Make sure we can be called twice safely, note that this means
649
# that we will raise a single exception even if several occurred in
650
# the various threads involved.
653
def set_ignored_exceptions(self, ignored_exceptions):
654
"""Install an exception handler for the server."""
655
self.server.set_ignored_exceptions(self._server_thread,
658
def pending_exception(self):
659
"""Raise uncaught exception in the server."""
660
self.server._pending_exception(self._server_thread)
663
class TestingSmartConnectionHandler(SocketServer.BaseRequestHandler,
664
medium.SmartServerSocketStreamMedium):
666
def __init__(self, request, client_address, server):
667
medium.SmartServerSocketStreamMedium.__init__(
668
self, request, server.backing_transport,
669
server.root_client_path)
670
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
671
SocketServer.BaseRequestHandler.__init__(self, request, client_address,
675
while not self.finished:
676
server_protocol = self._build_protocol()
677
self._serve_one_request(server_protocol)
680
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
682
def __init__(self, server_address, request_handler_class,
683
backing_transport, root_client_path):
684
TestingThreadingTCPServer.__init__(self, server_address,
685
request_handler_class)
686
server.SmartTCPServer.__init__(self, backing_transport,
689
# FIXME: No test are exercising the hooks for the test server
691
self.run_server_started_hooks()
693
TestingThreadingTCPServer.serve(self)
695
self.run_server_stopped_hooks()
698
"""Return the url of the server"""
699
return "bzr://%s:%d/" % self.server_address
702
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
703
"""Server suitable for use by transport tests.
229
705
This server is backed by the process's cwd.
232
707
def __init__(self, thread_name_suffix=''):
233
super(SmartTCPServer_for_testing, self).__init__(None)
234
708
self.client_path_extra = None
235
709
self.thread_name_suffix = thread_name_suffix
237
def get_backing_transport(self, backing_transport_server):
238
"""Get a backing transport from a server we are decorating."""
239
return transport.get_transport(backing_transport_server.get_url())
710
self.host = '127.0.0.1'
712
super(SmartTCPServer_for_testing, self).__init__(
713
(self.host, self.port),
715
TestingSmartConnectionHandler)
717
def create_server(self):
718
return self.server_class((self.host, self.port),
719
self.request_handler_class,
720
self.backing_transport,
721
self.root_client_path)
241
724
def start_server(self, backing_transport_server=None,
242
client_path_extra='/extra/'):
725
client_path_extra='/extra/'):
243
726
"""Set up server for testing.
245
728
:param backing_transport_server: backing server to use. If not