213
235
def start_server(self, backing_server=None):
214
236
"""Setup the Chroot on backing_server."""
215
237
if backing_server is not None:
216
self.backing_transport = transport.get_transport(
238
self.backing_transport = transport.get_transport_from_url(
217
239
backing_server.get_url())
219
self.backing_transport = transport.get_transport('.')
241
self.backing_transport = transport.get_transport_from_path('.')
220
242
super(TestingChrootServer, self).start_server()
222
244
def get_bogus_url(self):
223
245
raise NotImplementedError
226
class SmartTCPServer_for_testing(server.SmartTCPServer):
248
class TestThread(cethread.CatchingExceptionThread):
250
def join(self, timeout=5):
251
"""Overrides to use a default timeout.
253
The default timeout is set to 5 and should expire only when a thread
254
serving a client connection is hung.
256
super(TestThread, self).join(timeout)
257
if timeout and self.isAlive():
258
# The timeout expired without joining the thread, the thread is
259
# therefore stucked and that's a failure as far as the test is
260
# concerned. We used to hang here.
262
# FIXME: we need to kill the thread, but as far as the test is
263
# concerned, raising an assertion is too strong. On most of the
264
# platforms, this doesn't occur, so just mentioning the problem is
265
# enough for now -- vila 2010824
266
sys.stderr.write('thread %s hung\n' % (self.name,))
267
#raise AssertionError('thread %s hung' % (self.name,))
270
class TestingTCPServerMixin(object):
271
"""Mixin to support running SocketServer.TCPServer in a thread.
273
Tests are connecting from the main thread, the server has to be run in a
278
self.started = threading.Event()
280
self.stopped = threading.Event()
281
# We collect the resources used by the clients so we can release them
284
self.ignored_exceptions = None
286
def server_bind(self):
287
self.socket.bind(self.server_address)
288
self.server_address = self.socket.getsockname()
293
# We are listening and ready to accept connections
297
# Really a connection but the python framework is generic and
299
self.handle_request()
300
# Let's close the listening socket
305
def handle_request(self):
306
"""Handle one request.
308
The python version swallows some socket exceptions and we don't use
309
timeout, so we override it to better control the server behavior.
311
request, client_address = self.get_request()
312
if self.verify_request(request, client_address):
314
self.process_request(request, client_address)
316
self.handle_error(request, client_address)
318
self.close_request(request)
320
def get_request(self):
321
return self.socket.accept()
323
def verify_request(self, request, client_address):
324
"""Verify the request.
326
Return True if we should proceed with this request, False if we should
327
not even touch a single byte in the socket ! This is useful when we
328
stop the server with a dummy last connection.
332
def handle_error(self, request, client_address):
333
# Stop serving and re-raise the last exception seen
335
# The following can be used for debugging purposes, it will display the
336
# exception and the traceback just when it occurs instead of waiting
337
# for the thread to be joined.
338
# SocketServer.BaseServer.handle_error(self, request, client_address)
340
# We call close_request manually, because we are going to raise an
341
# exception. The SocketServer implementation calls:
344
# But because we raise the exception, close_request will never be
345
# triggered. This helps client not block waiting for a response when
346
# the server gets an exception.
347
self.close_request(request)
350
def ignored_exceptions_during_shutdown(self, e):
351
if sys.platform == 'win32':
352
accepted_errnos = [errno.EBADF,
360
accepted_errnos = [errno.EBADF,
365
if isinstance(e, socket.error) and e[0] in accepted_errnos:
369
# The following methods are called by the main thread
371
def stop_client_connections(self):
373
c = self.clients.pop()
374
self.shutdown_client(c)
376
def shutdown_socket(self, sock):
377
"""Properly shutdown a socket.
379
This should be called only when no other thread is trying to use the
383
sock.shutdown(socket.SHUT_RDWR)
386
if self.ignored_exceptions(e):
391
# The following methods are called by the main thread
393
def set_ignored_exceptions(self, thread, ignored_exceptions):
394
self.ignored_exceptions = ignored_exceptions
395
thread.set_ignored_exceptions(self.ignored_exceptions)
397
def _pending_exception(self, thread):
398
"""Raise server uncaught exception.
400
Daughter classes can override this if they use daughter threads.
402
thread.pending_exception()
405
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
407
def __init__(self, server_address, request_handler_class):
408
TestingTCPServerMixin.__init__(self)
409
SocketServer.TCPServer.__init__(self, server_address,
410
request_handler_class)
412
def get_request(self):
413
"""Get the request and client address from the socket."""
414
sock, addr = TestingTCPServerMixin.get_request(self)
415
self.clients.append((sock, addr))
418
# The following methods are called by the main thread
420
def shutdown_client(self, client):
422
self.shutdown_socket(sock)
425
class TestingThreadingTCPServer(TestingTCPServerMixin,
426
SocketServer.ThreadingTCPServer):
428
def __init__(self, server_address, request_handler_class):
429
TestingTCPServerMixin.__init__(self)
430
SocketServer.ThreadingTCPServer.__init__(self, server_address,
431
request_handler_class)
433
def get_request(self):
434
"""Get the request and client address from the socket."""
435
sock, addr = TestingTCPServerMixin.get_request(self)
436
# The thread is not create yet, it will be updated in process_request
437
self.clients.append((sock, addr, None))
440
def process_request_thread(self, started, stopped, request, client_address):
442
SocketServer.ThreadingTCPServer.process_request_thread(
443
self, request, client_address)
444
self.close_request(request)
447
def process_request(self, request, client_address):
448
"""Start a new thread to process the request."""
449
started = threading.Event()
450
stopped = threading.Event()
453
name='%s -> %s' % (client_address, self.server_address),
454
target=self.process_request_thread,
455
args=(started, stopped, request, client_address))
456
# Update the client description
458
self.clients.append((request, client_address, t))
459
# Propagate the exception handler since we must use the same one as
460
# TestingTCPServer for connections running in their own threads.
461
t.set_ignored_exceptions(self.ignored_exceptions)
465
sys.stderr.write('Client thread %s started\n' % (t.name,))
466
# If an exception occured during the thread start, it will get raised.
467
t.pending_exception()
469
# The following methods are called by the main thread
471
def shutdown_client(self, client):
472
sock, addr, connection_thread = client
473
self.shutdown_socket(sock)
474
if connection_thread is not None:
475
# The thread has been created only if the request is processed but
476
# after the connection is inited. This could happen during server
477
# shutdown. If an exception occurred in the thread it will be
480
sys.stderr.write('Client thread %s will be joined\n'
481
% (connection_thread.name,))
482
connection_thread.join()
484
def set_ignored_exceptions(self, thread, ignored_exceptions):
485
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
487
for sock, addr, connection_thread in self.clients:
488
if connection_thread is not None:
489
connection_thread.set_ignored_exceptions(
490
self.ignored_exceptions)
492
def _pending_exception(self, thread):
493
for sock, addr, connection_thread in self.clients:
494
if connection_thread is not None:
495
connection_thread.pending_exception()
496
TestingTCPServerMixin._pending_exception(self, thread)
499
class TestingTCPServerInAThread(transport.Server):
500
"""A server in a thread that re-raise thread exceptions."""
502
def __init__(self, server_address, server_class, request_handler_class):
503
self.server_class = server_class
504
self.request_handler_class = request_handler_class
505
self.host, self.port = server_address
507
self._server_thread = None
510
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
512
def create_server(self):
513
return self.server_class((self.host, self.port),
514
self.request_handler_class)
516
def start_server(self):
517
self.server = self.create_server()
518
self._server_thread = TestThread(
519
sync_event=self.server.started,
520
target=self.run_server)
521
self._server_thread.start()
522
# Wait for the server thread to start (i.e release the lock)
523
self.server.started.wait()
524
# Get the real address, especially the port
525
self.host, self.port = self.server.server_address
526
self._server_thread.name = self.server.server_address
528
sys.stderr.write('Server thread %s started\n'
529
% (self._server_thread.name,))
530
# If an exception occured during the server start, it will get raised,
531
# otherwise, the server is blocked on its accept() call.
532
self._server_thread.pending_exception()
533
# From now on, we'll use a different event to ensure the server can set
535
self._server_thread.set_sync_event(self.server.stopped)
537
def run_server(self):
540
def stop_server(self):
541
if self.server is None:
544
# The server has been started successfully, shut it down now. As
545
# soon as we stop serving, no more connection are accepted except
546
# one to get out of the blocking listen.
547
self.set_ignored_exceptions(
548
self.server.ignored_exceptions_during_shutdown)
549
self.server.serving = False
551
sys.stderr.write('Server thread %s will be joined\n'
552
% (self._server_thread.name,))
553
# The server is listening for a last connection, let's give it:
556
last_conn = osutils.connect_socket((self.host, self.port))
557
except socket.error, e:
558
# But ignore connection errors as the point is to unblock the
559
# server thread, it may happen that it's not blocked or even
562
# We start shutting down the clients while the server itself is
564
self.server.stop_client_connections()
565
# Now we wait for the thread running self.server.serve() to finish
566
self.server.stopped.wait()
567
if last_conn is not None:
568
# Close the last connection without trying to use it. The
569
# server will not process a single byte on that socket to avoid
570
# complications (SSL starts with a handshake for example).
572
# Check for any exception that could have occurred in the server
575
self._server_thread.join()
577
if self.server.ignored_exceptions(e):
582
# Make sure we can be called twice safely, note that this means
583
# that we will raise a single exception even if several occurred in
584
# the various threads involved.
587
def set_ignored_exceptions(self, ignored_exceptions):
588
"""Install an exception handler for the server."""
589
self.server.set_ignored_exceptions(self._server_thread,
592
def pending_exception(self):
593
"""Raise uncaught exception in the server."""
594
self.server._pending_exception(self._server_thread)
597
class TestingSmartConnectionHandler(SocketServer.BaseRequestHandler,
598
medium.SmartServerSocketStreamMedium):
600
def __init__(self, request, client_address, server):
601
medium.SmartServerSocketStreamMedium.__init__(
602
self, request, server.backing_transport,
603
server.root_client_path,
604
timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
605
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
606
SocketServer.BaseRequestHandler.__init__(self, request, client_address,
611
while not self.finished:
612
server_protocol = self._build_protocol()
613
self._serve_one_request(server_protocol)
614
except errors.ConnectionTimeout:
615
# idle connections aren't considered a failure of the server
619
_DEFAULT_TESTING_CLIENT_TIMEOUT = 4.0
621
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
623
def __init__(self, server_address, request_handler_class,
624
backing_transport, root_client_path):
625
TestingThreadingTCPServer.__init__(self, server_address,
626
request_handler_class)
627
server.SmartTCPServer.__init__(self, backing_transport,
628
root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
631
self.run_server_started_hooks()
633
TestingThreadingTCPServer.serve(self)
635
self.run_server_stopped_hooks()
638
"""Return the url of the server"""
639
return "bzr://%s:%d/" % self.server_address
642
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
643
"""Server suitable for use by transport tests.
229
645
This server is backed by the process's cwd.
232
647
def __init__(self, thread_name_suffix=''):
233
super(SmartTCPServer_for_testing, self).__init__(None)
234
648
self.client_path_extra = None
235
649
self.thread_name_suffix = thread_name_suffix
237
def get_backing_transport(self, backing_transport_server):
238
"""Get a backing transport from a server we are decorating."""
239
return transport.get_transport(backing_transport_server.get_url())
650
self.host = '127.0.0.1'
652
super(SmartTCPServer_for_testing, self).__init__(
653
(self.host, self.port),
655
TestingSmartConnectionHandler)
657
def create_server(self):
658
return self.server_class((self.host, self.port),
659
self.request_handler_class,
660
self.backing_transport,
661
self.root_client_path)
241
664
def start_server(self, backing_transport_server=None,
242
client_path_extra='/extra/'):
665
client_path_extra='/extra/'):
243
666
"""Set up server for testing.
245
668
:param backing_transport_server: backing server to use. If not