213
235
def start_server(self, backing_server=None):
214
236
"""Setup the Chroot on backing_server."""
215
237
if backing_server is not None:
216
self.backing_transport = transport.get_transport(
238
self.backing_transport = transport.get_transport_from_url(
217
239
backing_server.get_url())
219
self.backing_transport = transport.get_transport('.')
241
self.backing_transport = transport.get_transport_from_path('.')
220
242
super(TestingChrootServer, self).start_server()
222
244
def get_bogus_url(self):
223
245
raise NotImplementedError
226
class SmartTCPServer_for_testing(server.SmartTCPServer):
248
class TestThread(cethread.CatchingExceptionThread):
250
def join(self, timeout=5):
251
"""Overrides to use a default timeout.
253
The default timeout is set to 5 and should expire only when a thread
254
serving a client connection is hung.
256
super(TestThread, self).join(timeout)
257
if timeout and self.isAlive():
258
# The timeout expired without joining the thread, the thread is
259
# therefore stucked and that's a failure as far as the test is
260
# concerned. We used to hang here.
262
# FIXME: we need to kill the thread, but as far as the test is
263
# concerned, raising an assertion is too strong. On most of the
264
# platforms, this doesn't occur, so just mentioning the problem is
265
# enough for now -- vila 2010824
266
sys.stderr.write('thread %s hung\n' % (self.name,))
267
#raise AssertionError('thread %s hung' % (self.name,))
270
class TestingTCPServerMixin(object):
271
"""Mixin to support running SocketServer.TCPServer in a thread.
273
Tests are connecting from the main thread, the server has to be run in a
278
self.started = threading.Event()
280
self.stopped = threading.Event()
281
# We collect the resources used by the clients so we can release them
284
self.ignored_exceptions = None
286
def server_bind(self):
287
self.socket.bind(self.server_address)
288
self.server_address = self.socket.getsockname()
292
# We are listening and ready to accept connections
296
# Really a connection but the python framework is generic and
298
self.handle_request()
299
# Let's close the listening socket
304
def handle_request(self):
305
"""Handle one request.
307
The python version swallows some socket exceptions and we don't use
308
timeout, so we override it to better control the server behavior.
310
request, client_address = self.get_request()
311
if self.verify_request(request, client_address):
313
self.process_request(request, client_address)
315
self.handle_error(request, client_address)
317
self.close_request(request)
319
def get_request(self):
320
return self.socket.accept()
322
def verify_request(self, request, client_address):
323
"""Verify the request.
325
Return True if we should proceed with this request, False if we should
326
not even touch a single byte in the socket ! This is useful when we
327
stop the server with a dummy last connection.
331
def handle_error(self, request, client_address):
332
# Stop serving and re-raise the last exception seen
334
# The following can be used for debugging purposes, it will display the
335
# exception and the traceback just when it occurs instead of waiting
336
# for the thread to be joined.
337
# SocketServer.BaseServer.handle_error(self, request, client_address)
339
# We call close_request manually, because we are going to raise an
340
# exception. The SocketServer implementation calls:
343
# But because we raise the exception, close_request will never be
344
# triggered. This helps client not block waiting for a response when
345
# the server gets an exception.
346
self.close_request(request)
349
def ignored_exceptions_during_shutdown(self, e):
350
if sys.platform == 'win32':
351
accepted_errnos = [errno.EBADF,
359
accepted_errnos = [errno.EBADF,
364
if isinstance(e, socket.error) and e[0] in accepted_errnos:
368
# The following methods are called by the main thread
370
def stop_client_connections(self):
372
c = self.clients.pop()
373
self.shutdown_client(c)
375
def shutdown_socket(self, sock):
376
"""Properly shutdown a socket.
378
This should be called only when no other thread is trying to use the
382
sock.shutdown(socket.SHUT_RDWR)
385
if self.ignored_exceptions(e):
390
# The following methods are called by the main thread
392
def set_ignored_exceptions(self, thread, ignored_exceptions):
393
self.ignored_exceptions = ignored_exceptions
394
thread.set_ignored_exceptions(self.ignored_exceptions)
396
def _pending_exception(self, thread):
397
"""Raise server uncaught exception.
399
Daughter classes can override this if they use daughter threads.
401
thread.pending_exception()
404
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
406
def __init__(self, server_address, request_handler_class):
407
TestingTCPServerMixin.__init__(self)
408
SocketServer.TCPServer.__init__(self, server_address,
409
request_handler_class)
411
def get_request(self):
412
"""Get the request and client address from the socket."""
413
sock, addr = TestingTCPServerMixin.get_request(self)
414
self.clients.append((sock, addr))
417
# The following methods are called by the main thread
419
def shutdown_client(self, client):
421
self.shutdown_socket(sock)
424
class TestingThreadingTCPServer(TestingTCPServerMixin,
425
SocketServer.ThreadingTCPServer):
427
def __init__(self, server_address, request_handler_class):
428
TestingTCPServerMixin.__init__(self)
429
SocketServer.ThreadingTCPServer.__init__(self, server_address,
430
request_handler_class)
432
def get_request(self):
433
"""Get the request and client address from the socket."""
434
sock, addr = TestingTCPServerMixin.get_request(self)
435
# The thread is not create yet, it will be updated in process_request
436
self.clients.append((sock, addr, None))
439
def process_request_thread(self, started, stopped, request, client_address):
441
SocketServer.ThreadingTCPServer.process_request_thread(
442
self, request, client_address)
443
self.close_request(request)
446
def process_request(self, request, client_address):
447
"""Start a new thread to process the request."""
448
started = threading.Event()
449
stopped = threading.Event()
452
name='%s -> %s' % (client_address, self.server_address),
453
target=self.process_request_thread,
454
args=(started, stopped, request, client_address))
455
# Update the client description
457
self.clients.append((request, client_address, t))
458
# Propagate the exception handler since we must use the same one as
459
# TestingTCPServer for connections running in their own threads.
460
t.set_ignored_exceptions(self.ignored_exceptions)
464
sys.stderr.write('Client thread %s started\n' % (t.name,))
465
# If an exception occured during the thread start, it will get raised.
466
# In rare cases, an exception raised during the request processing may
467
# also get caught here (see http://pad.lv/869366)
468
t.pending_exception()
470
# The following methods are called by the main thread
472
def shutdown_client(self, client):
473
sock, addr, connection_thread = client
474
self.shutdown_socket(sock)
475
if connection_thread is not None:
476
# The thread has been created only if the request is processed but
477
# after the connection is inited. This could happen during server
478
# shutdown. If an exception occurred in the thread it will be
481
sys.stderr.write('Client thread %s will be joined\n'
482
% (connection_thread.name,))
483
connection_thread.join()
485
def set_ignored_exceptions(self, thread, ignored_exceptions):
486
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
488
for sock, addr, connection_thread in self.clients:
489
if connection_thread is not None:
490
connection_thread.set_ignored_exceptions(
491
self.ignored_exceptions)
493
def _pending_exception(self, thread):
494
for sock, addr, connection_thread in self.clients:
495
if connection_thread is not None:
496
connection_thread.pending_exception()
497
TestingTCPServerMixin._pending_exception(self, thread)
500
class TestingTCPServerInAThread(transport.Server):
501
"""A server in a thread that re-raise thread exceptions."""
503
def __init__(self, server_address, server_class, request_handler_class):
504
self.server_class = server_class
505
self.request_handler_class = request_handler_class
506
self.host, self.port = server_address
508
self._server_thread = None
511
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
513
def create_server(self):
514
return self.server_class((self.host, self.port),
515
self.request_handler_class)
517
def start_server(self):
518
self.server = self.create_server()
519
self._server_thread = TestThread(
520
sync_event=self.server.started,
521
target=self.run_server)
522
self._server_thread.start()
523
# Wait for the server thread to start (i.e. release the lock)
524
self.server.started.wait()
525
# Get the real address, especially the port
526
self.host, self.port = self.server.server_address
527
self._server_thread.name = self.server.server_address
529
sys.stderr.write('Server thread %s started\n'
530
% (self._server_thread.name,))
531
# If an exception occured during the server start, it will get raised,
532
# otherwise, the server is blocked on its accept() call.
533
self._server_thread.pending_exception()
534
# From now on, we'll use a different event to ensure the server can set
536
self._server_thread.set_sync_event(self.server.stopped)
538
def run_server(self):
541
def stop_server(self):
542
if self.server is None:
545
# The server has been started successfully, shut it down now. As
546
# soon as we stop serving, no more connection are accepted except
547
# one to get out of the blocking listen.
548
self.set_ignored_exceptions(
549
self.server.ignored_exceptions_during_shutdown)
550
self.server.serving = False
552
sys.stderr.write('Server thread %s will be joined\n'
553
% (self._server_thread.name,))
554
# The server is listening for a last connection, let's give it:
557
last_conn = osutils.connect_socket((self.host, self.port))
558
except socket.error, e:
559
# But ignore connection errors as the point is to unblock the
560
# server thread, it may happen that it's not blocked or even
563
# We start shutting down the clients while the server itself is
565
self.server.stop_client_connections()
566
# Now we wait for the thread running self.server.serve() to finish
567
self.server.stopped.wait()
568
if last_conn is not None:
569
# Close the last connection without trying to use it. The
570
# server will not process a single byte on that socket to avoid
571
# complications (SSL starts with a handshake for example).
573
# Check for any exception that could have occurred in the server
576
self._server_thread.join()
578
if self.server.ignored_exceptions(e):
583
# Make sure we can be called twice safely, note that this means
584
# that we will raise a single exception even if several occurred in
585
# the various threads involved.
588
def set_ignored_exceptions(self, ignored_exceptions):
589
"""Install an exception handler for the server."""
590
self.server.set_ignored_exceptions(self._server_thread,
593
def pending_exception(self):
594
"""Raise uncaught exception in the server."""
595
self.server._pending_exception(self._server_thread)
598
class TestingSmartConnectionHandler(SocketServer.BaseRequestHandler,
599
medium.SmartServerSocketStreamMedium):
601
def __init__(self, request, client_address, server):
602
medium.SmartServerSocketStreamMedium.__init__(
603
self, request, server.backing_transport,
604
server.root_client_path,
605
timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
606
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
607
SocketServer.BaseRequestHandler.__init__(self, request, client_address,
612
while not self.finished:
613
server_protocol = self._build_protocol()
614
self._serve_one_request(server_protocol)
615
except errors.ConnectionTimeout:
616
# idle connections aren't considered a failure of the server
620
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
622
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
624
def __init__(self, server_address, request_handler_class,
625
backing_transport, root_client_path):
626
TestingThreadingTCPServer.__init__(self, server_address,
627
request_handler_class)
628
server.SmartTCPServer.__init__(self, backing_transport,
629
root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
632
self.run_server_started_hooks()
634
TestingThreadingTCPServer.serve(self)
636
self.run_server_stopped_hooks()
639
"""Return the url of the server"""
640
return "bzr://%s:%d/" % self.server_address
643
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
644
"""Server suitable for use by transport tests.
229
646
This server is backed by the process's cwd.
232
648
def __init__(self, thread_name_suffix=''):
233
super(SmartTCPServer_for_testing, self).__init__(None)
234
649
self.client_path_extra = None
235
650
self.thread_name_suffix = thread_name_suffix
237
def get_backing_transport(self, backing_transport_server):
238
"""Get a backing transport from a server we are decorating."""
239
return transport.get_transport(backing_transport_server.get_url())
651
self.host = '127.0.0.1'
653
super(SmartTCPServer_for_testing, self).__init__(
654
(self.host, self.port),
656
TestingSmartConnectionHandler)
658
def create_server(self):
659
return self.server_class((self.host, self.port),
660
self.request_handler_class,
661
self.backing_transport,
662
self.root_client_path)
241
665
def start_server(self, backing_transport_server=None,
242
client_path_extra='/extra/'):
666
client_path_extra='/extra/'):
243
667
"""Set up server for testing.
245
669
:param backing_transport_server: backing server to use. If not