325
325
# the tests cases.
326
326
self.test_case_server = test_case_server
327
327
self._home_dir = test_case_server._home_dir
329
self.is_shut_down = threading.Event()
330
# We collect the sockets/threads used by the clients so we can
331
# close/join them when shutting down
334
def get_request (self):
335
"""Get the request and client address from the socket.
337
sock, addr = self._get_request()
338
self.clients.append([sock, addr])
341
def verify_request(self, request, client_address):
342
"""Verify the request.
344
Return True if we should proceed with this request, False if we should
345
not even touch a single byte in the socket !
347
return self.serving is not None and self.serving.isSet()
349
def handle_request(self):
350
request, client_address = self.get_request()
352
if self.verify_request(request, client_address):
353
self.process_request(request, client_address)
355
if self.serving is not None and self.serving.isSet():
356
self.handle_error(request, client_address)
358
# Exceptions raised while we shut down are just noise, but feel
359
# free to put a breakpoint here if you suspect something
360
# else. Such an example is the SSL handshake: it's automatic
361
# once we start processing the request but the last connection
362
# will close immediately and will not be able to correctly
365
self.close_request(request)
367
def server_bind(self):
368
# The following has been fixed in 2.5 so we need to provide it for
369
# older python versions.
370
if sys.version < (2, 5):
371
self.server_address = self.socket.getsockname()
373
def serve(self, started):
374
self.serving = threading.Event()
376
self.is_shut_down.clear()
377
if 'threads' in tests.selftest_debug_flags:
378
print 'Starting %r' % (self.server_address,)
379
# We are listening and ready to accept connections
381
while self.serving.isSet():
382
if 'threads' in tests.selftest_debug_flags:
383
print 'Accepting on %r' % (self.server_address,)
384
# Really a connection but the python framework is generic and
386
self.handle_request()
387
if 'threads' in tests.selftest_debug_flags:
388
print 'Closing %r' % (self.server_address,)
389
# Let's close the listening socket
391
if 'threads' in tests.selftest_debug_flags:
392
print 'Closed %r' % (self.server_address,)
393
self.is_shut_down.set()
395
def join_thread(self, thread, timeout=2):
398
# The timeout expired without joining the thread, the thread is
399
# therefore stucked and that's a failure as far as the test is
400
# concerned. We used to hang here.
401
raise AssertionError('thread %s hung' % (thread.name,))
403
def shutdown_server(self):
404
"""Stops the serve() loop.
406
Blocks until the loop has finished. This must be called while serve()
407
is running in another thread, or it will deadlock.
409
if self.serving is None:
410
# If the server wasn't properly started, there is nothing to
413
# As soon as we stop serving, no more connection are accepted except
414
# one to get out of the blocking listen.
416
# The server is listening for a last connection, let's give it:
419
last_conn = osutils.connect_socket(self.server_address)
420
except socket.error, e:
421
# But ignore connection errors as the point is to unblock the
422
# server thread, it may happen that it's not blocked or even not
423
# started (when something went wrong during test case setup
424
# leading to self.setUp() *not* being called but self.tearDown()
425
# still being called)
427
# We don't have to wait for the server to shut down to start shutting
428
# down the clients, so let's start now.
429
for c in self.clients:
430
self.shutdown_client(c)
432
# Now we wait for the thread running serve() to finish
433
self.is_shut_down.wait()
434
if last_conn is not None:
435
# Close the last connection without trying to use it. The server
436
# will not process a single byte on that socket to avoid
437
# complications (SSL starts with a handshake for example).
440
def shutdown_client(self, client):
441
sock, addr = client[:2]
442
self.shutdown_client_socket(sock)
444
def shutdown_client_socket(self, sock):
445
"""Properly shutdown a client socket.
447
Under some circumstances (as in bug #383920), we need to force the
448
shutdown as python delays it until gc occur otherwise and the client
451
This should be called only when no other thread is trying to use the
455
# The request process has been completed, the thread is about to
456
# die, let's shutdown the socket if we can.
457
sock.shutdown(socket.SHUT_RDWR)
459
except (socket.error, select.error), e:
460
if e[0] in (errno.EBADF, errno.ENOTCONN):
461
# Right, the socket is already down
464
print 'exception in shutdown_client_socket: %r' % (e,)
468
class TestingHTTPServer(TestingHTTPServerMixin, SocketServer.TCPServer):
330
class TestingHTTPServer(test_server.TestingTCPServer, TestingHTTPServerMixin):
470
332
def __init__(self, server_address, request_handler_class,
471
333
test_case_server):
334
test_server.TestingTCPServer.__init__(self, server_address,
335
request_handler_class)
472
336
TestingHTTPServerMixin.__init__(self, test_case_server)
473
SocketServer.TCPServer.__init__(self, server_address,
474
request_handler_class)
476
def _get_request (self):
477
return SocketServer.TCPServer.get_request(self)
479
def server_bind(self):
480
SocketServer.TCPServer.server_bind(self)
481
TestingHTTPServerMixin.server_bind(self)
484
class TestingThreadingHTTPServer(TestingHTTPServerMixin,
485
SocketServer.ThreadingTCPServer,
339
class TestingThreadingHTTPServer(test_server.TestingThreadingTCPServer,
340
TestingHTTPServerMixin):
487
341
"""A threading HTTP test server for HTTP 1.1.
489
343
Since tests can initiate several concurrent connections to the same http
490
344
server, we need an independent connection for each of them. We achieve that
491
345
by spawning a new thread for each connection.
494
347
def __init__(self, server_address, request_handler_class,
495
348
test_case_server):
349
test_server.TestingThreadingTCPServer.__init__(self, server_address,
350
request_handler_class)
496
351
TestingHTTPServerMixin.__init__(self, test_case_server)
497
SocketServer.ThreadingTCPServer.__init__(self, server_address,
498
request_handler_class)
499
# Decides how threads will act upon termination of the main
500
# process. This is prophylactic as we should not leave the threads
502
self.daemon_threads = True
504
def _get_request (self):
505
return SocketServer.ThreadingTCPServer.get_request(self)
507
def process_request_thread(self, started, request, client_address):
508
if 'threads' in tests.selftest_debug_flags:
509
print 'Processing: %s' % (threading.currentThread().name,)
511
SocketServer.ThreadingTCPServer.process_request_thread(
512
self, request, client_address)
513
# Shutdown the socket as soon as possible, the thread will be joined
514
# later if needed during server shutdown thread.
515
self.shutdown_client_socket(request)
517
def process_request(self, request, client_address):
518
"""Start a new thread to process the request."""
519
client = self.clients.pop()
520
started = threading.Event()
521
t = test_server.ThreadWithException(
523
target = self.process_request_thread,
524
args = (started, request, client_address))
525
t.name = '%s -> %s' % (client_address, self.server_address)
526
if 'threads' in tests.selftest_debug_flags:
527
print 'Thread for: %s started' % (threading.currentThread().name,)
529
self.clients.append(client)
530
if self.daemon_threads:
535
def shutdown_client(self, client):
536
TestingHTTPServerMixin.shutdown_client(self, client)
538
# The thread has been created only if the request is processed but
539
# after the connection is inited. This could happen during server
541
sock, addr, thread = client
542
if 'threads' in tests.selftest_debug_flags:
543
print 'Try joining: %s' % (thread.name,)
544
self.join_thread(thread)
546
def server_bind(self):
547
SocketServer.ThreadingTCPServer.server_bind(self)
548
TestingHTTPServerMixin.server_bind(self)
551
class HttpServer(transport.Server):
354
class HttpServer(test_server.TestingTCPServerInAThread):
552
355
"""A test server for http transports.
554
357
Subclasses can provide a specific request handler.
576
379
:param protocol_version: if specified, will override the protocol
577
380
version of the request handler.
579
transport.Server.__init__(self)
580
self.request_handler = request_handler
382
# Depending on the protocol version, we will create the approriate
384
if protocol_version is None:
385
# Use the request handler one
386
proto_vers = request_handler.protocol_version
388
# Use our own, it will be used to override the request handler
390
proto_vers = protocol_version
391
# Get the appropriate server class for the required protocol
392
serv_cls = self.http_server_class.get(proto_vers, None)
394
raise httplib.UnknownProtocol(proto_vers)
581
395
self.host = 'localhost'
584
self.protocol_version = protocol_version
397
super(HttpServer, self).__init__((self.host, self.port),
400
self.protocol_version = proto_vers
585
401
# Allows tests to verify number of GET requests issued
586
402
self.GET_request_nb = 0
588
def create_httpd(self, serv_cls, rhandler_cls):
589
return serv_cls((self.host, self.port), self.request_handler, self)
592
return "%s(%s:%s)" % \
593
(self.__class__.__name__, self.host, self.port)
595
def _get_httpd(self):
596
if self._httpd is None:
597
rhandler = self.request_handler
598
# Depending on the protocol version, we will create the approriate
600
if self.protocol_version is None:
601
# Use the request handler one
602
proto_vers = rhandler.protocol_version
604
# Use our own, it will be used to override the request handler
606
proto_vers = self.protocol_version
607
# Create the appropriate server for the required protocol
608
serv_cls = self.http_server_class.get(proto_vers, None)
610
raise httplib.UnknownProtocol(proto_vers)
612
self._httpd = self.create_httpd(serv_cls, rhandler)
613
# Ensure we get the right port and an updated host if needed
614
self.host, self.port = self._httpd.server_address
617
def run_server(self, started):
618
"""Server thread main entry point. """
619
server = self._get_httpd()
620
self._http_base_url = '%s://%s:%s/' % (self._url_protocol,
621
self.host, self.port)
622
server.serve(started)
403
self._http_base_url = None
406
def create_server(self):
407
return self.server_class(
408
(self.host, self.port), self.request_handler_class, self)
624
410
def _get_remote_url(self, path):
625
411
path_parts = path.split(os.path.sep)
654
440
backing_transport_server)
655
441
self._home_dir = os.getcwdu()
656
442
self._local_path_parts = self._home_dir.split(os.path.sep)
657
self._http_base_url = None
660
# Create the server thread
661
started = threading.Event()
662
self._http_thread = test_server.ThreadWithException(
663
event=started, target=self.run_server, args=(started,))
664
self._http_thread.start()
665
# Wait for the server thread to start (i.e release the lock)
667
if self._httpd is None:
668
if 'threads' in tests.selftest_debug_flags:
669
print 'Server %s:% start failed ' % (self.host, self.port)
671
self._http_thread.name = self._http_base_url
672
if 'threads' in tests.selftest_debug_flags:
673
print 'Thread started: %s' % (self._http_thread.name,)
675
# If an exception occured during the server start, it will get raised
676
self._http_thread.join(timeout=0)
678
def stop_server(self):
679
"""See bzrlib.transport.Server.tearDown."""
680
if self._httpd is not None:
681
# The server has been started successfully, shut it down now
682
self._httpd.shutdown_server()
683
if 'threads' in tests.selftest_debug_flags:
684
print 'Try joining: %s' % (self._http_thread.name,)
685
self._httpd.join_thread(self._http_thread)
686
if 'threads' in tests.selftest_debug_flags:
687
print 'Thread joined: %s' % (self._http_thread.name,)
445
super(HttpServer, self).start_server()
446
self._http_base_url = '%s://%s:%s/' % (
447
self._url_protocol, self.host, self.port)
689
449
def get_url(self):
690
450
"""See bzrlib.transport.Server.get_url."""