325
333
# the tests cases.
326
334
self.test_case_server = test_case_server
327
335
self._home_dir = test_case_server._home_dir
329
self.is_shut_down = threading.Event()
330
# We collect the sockets/threads used by the clients so we can
331
# close/join them when shutting down
334
def get_request (self):
335
"""Get the request and client address from the socket.
337
sock, addr = self._get_request()
338
self.clients.append([sock, addr])
341
def verify_request(self, request, client_address):
342
"""Verify the request.
344
Return True if we should proceed with this request, False if we should
345
not even touch a single byte in the socket !
347
return self.serving is not None and self.serving.isSet()
349
def handle_request(self):
350
request, client_address = self.get_request()
352
if self.verify_request(request, client_address):
353
self.process_request(request, client_address)
355
if self.serving is not None and self.serving.isSet():
356
self.handle_error(request, client_address)
358
# Exceptions raised while we shut down are just noise, but feel
359
# free to put a breakpoint here if you suspect something
360
# else. Such an example is the SSL handshake: it's automatic
361
# once we start processing the request but the last connection
362
# will close immediately and will not be able to correctly
365
self.close_request(request)
367
def server_bind(self):
368
# The following has been fixed in 2.5 so we need to provide it for
369
# older python versions.
370
if sys.version < (2, 5):
371
self.server_address = self.socket.getsockname()
373
def serve(self, started):
374
self.serving = threading.Event()
376
self.is_shut_down.clear()
377
if 'threads' in tests.selftest_debug_flags:
378
print 'Starting %r' % (self.server_address,)
379
# We are listening and ready to accept connections
381
while self.serving.isSet():
382
if 'threads' in tests.selftest_debug_flags:
383
print 'Accepting on %r' % (self.server_address,)
384
# Really a connection but the python framework is generic and
386
self.handle_request()
387
if 'threads' in tests.selftest_debug_flags:
388
print 'Closing %r' % (self.server_address,)
389
# Let's close the listening socket
391
if 'threads' in tests.selftest_debug_flags:
392
print 'Closed %r' % (self.server_address,)
393
self.is_shut_down.set()
395
def connect_socket(self):
396
err = socket.error('getaddrinfo returns an empty list')
397
for res in socket.getaddrinfo(*self.server_address):
398
af, socktype, proto, canonname, sa = res
401
sock = socket.socket(af, socktype, proto)
405
except socket.error, err:
406
# 'err' is now the most recent error
411
def join_thread(self, thread, timeout=2):
414
# The timeout expired without joining the thread, the thread is
415
# therefore stucked and that's a failure as far as the test is
416
# concerned. We used to hang here.
417
raise AssertionError('thread %s hung' % (thread.name,))
419
def shutdown_server(self):
420
"""Stops the serve() loop.
422
Blocks until the loop has finished. This must be called while serve()
423
is running in another thread, or it will deadlock.
425
if self.serving is None:
426
# If the server wasn't properly started, there is nothing to
429
# As soon as we stop serving, no more connection are accepted except
430
# one to get out of the blocking listen.
432
# The server is listening for a last connection, let's give it:
435
last_conn = self.connect_socket()
436
except socket.error, e:
437
# But ignore connection errors as the point is to unblock the
438
# server thread, it may happen that it's not blocked or even not
439
# started (when something went wrong during test case setup
440
# leading to self.setUp() *not* being called but self.tearDown()
441
# still being called)
443
# We don't have to wait for the server to shut down to start shutting
444
# down the clients, so let's start now.
445
for c in self.clients:
446
self.shutdown_client(c)
448
# Now we wait for the thread running serve() to finish
449
self.is_shut_down.wait()
450
if last_conn is not None:
451
# Close the last connection without trying to use it. The server
452
# will not process a single byte on that socket to avoid
453
# complications (SSL starts with a handshake for example).
456
def shutdown_client(self, client):
457
sock, addr = client[:2]
458
self.shutdown_client_socket(sock)
460
def shutdown_client_socket(self, sock):
461
"""Properly shutdown a client socket.
463
Under some circumstances (as in bug #383920), we need to force the
464
shutdown as python delays it until gc occur otherwise and the client
467
This should be called only when no other thread is trying to use the
471
# The request process has been completed, the thread is about to
472
# die, let's shutdown the socket if we can.
473
sock.shutdown(socket.SHUT_RDWR)
475
except (socket.error, select.error), e:
476
if e[0] in (errno.EBADF, errno.ENOTCONN):
477
# Right, the socket is already down
480
print 'exception in shutdown_client_socket: %r' % (e,)
484
# FIXME: TestingHTTPServerMixin shouldn't be first -- vila 20100531
485
class TestingHTTPServer(TestingHTTPServerMixin, SocketServer.TCPServer):
338
class TestingHTTPServer(test_server.TestingTCPServer, TestingHTTPServerMixin):
487
340
def __init__(self, server_address, request_handler_class,
488
341
test_case_server):
342
test_server.TestingTCPServer.__init__(self, server_address,
343
request_handler_class)
489
344
TestingHTTPServerMixin.__init__(self, test_case_server)
490
SocketServer.TCPServer.__init__(self, server_address,
491
request_handler_class)
493
def _get_request (self):
494
return SocketServer.TCPServer.get_request(self)
496
def server_bind(self):
497
SocketServer.TCPServer.server_bind(self)
498
TestingHTTPServerMixin.server_bind(self)
501
# FIXME: TestingHTTPServerMixin shouldn't be first -- vila 20100531
502
class TestingThreadingHTTPServer(TestingHTTPServerMixin,
503
SocketServer.ThreadingTCPServer,
347
class TestingThreadingHTTPServer(test_server.TestingThreadingTCPServer,
348
TestingHTTPServerMixin):
505
349
"""A threading HTTP test server for HTTP 1.1.
507
351
Since tests can initiate several concurrent connections to the same http
508
352
server, we need an independent connection for each of them. We achieve that
509
353
by spawning a new thread for each connection.
512
355
def __init__(self, server_address, request_handler_class,
513
356
test_case_server):
357
test_server.TestingThreadingTCPServer.__init__(self, server_address,
358
request_handler_class)
514
359
TestingHTTPServerMixin.__init__(self, test_case_server)
515
SocketServer.ThreadingTCPServer.__init__(self, server_address,
516
request_handler_class)
517
# Decides how threads will act upon termination of the main
518
# process. This is prophylactic as we should not leave the threads
520
self.daemon_threads = True
522
def _get_request (self):
523
return SocketServer.ThreadingTCPServer.get_request(self)
525
def process_request_thread(self, started, request, client_address):
526
if 'threads' in tests.selftest_debug_flags:
527
print 'Processing: %s' % (threading.currentThread().name,)
529
SocketServer.ThreadingTCPServer.process_request_thread(
530
self, request, client_address)
531
# Shutdown the socket as soon as possible, the thread will be joined
532
# later if needed during server shutdown thread.
533
self.shutdown_client_socket(request)
535
def process_request(self, request, client_address):
536
"""Start a new thread to process the request."""
537
client = self.clients.pop()
538
started = threading.Event()
539
t = test_server.ThreadWithException(
541
target = self.process_request_thread,
542
args = (started, request, client_address))
543
t.name = '%s -> %s' % (client_address, self.server_address)
544
if 'threads' in tests.selftest_debug_flags:
545
print 'Thread for: %s started' % (threading.currentThread().name,)
547
self.clients.append(client)
548
if self.daemon_threads:
553
def shutdown_client(self, client):
554
TestingHTTPServerMixin.shutdown_client(self, client)
556
# The thread has been created only if the request is processed but
557
# after the connection is inited. This could happen during server
559
sock, addr, thread = client
560
if 'threads' in tests.selftest_debug_flags:
561
print 'Try joining: %s' % (thread.name,)
562
self.join_thread(thread)
564
def server_bind(self):
565
SocketServer.ThreadingTCPServer.server_bind(self)
566
TestingHTTPServerMixin.server_bind(self)
569
class HttpServer(transport.Server):
362
class HttpServer(test_server.TestingTCPServerInAThread):
570
363
"""A test server for http transports.
572
365
Subclasses can provide a specific request handler.
594
387
:param protocol_version: if specified, will override the protocol
595
388
version of the request handler.
597
transport.Server.__init__(self)
598
self.request_handler = request_handler
390
# Depending on the protocol version, we will create the approriate
392
if protocol_version is None:
393
# Use the request handler one
394
proto_vers = request_handler.protocol_version
396
# Use our own, it will be used to override the request handler
398
proto_vers = protocol_version
399
# Get the appropriate server class for the required protocol
400
serv_cls = self.http_server_class.get(proto_vers, None)
402
raise httplib.UnknownProtocol(proto_vers)
599
403
self.host = 'localhost'
602
self.protocol_version = protocol_version
405
super(HttpServer, self).__init__((self.host, self.port),
408
self.protocol_version = proto_vers
603
409
# Allows tests to verify number of GET requests issued
604
410
self.GET_request_nb = 0
606
def create_httpd(self, serv_cls, rhandler_cls):
607
return serv_cls((self.host, self.port), self.request_handler, self)
610
return "%s(%s:%s)" % \
611
(self.__class__.__name__, self.host, self.port)
613
def _get_httpd(self):
614
if self._httpd is None:
615
rhandler = self.request_handler
616
# Depending on the protocol version, we will create the approriate
618
if self.protocol_version is None:
619
# Use the request handler one
620
proto_vers = rhandler.protocol_version
622
# Use our own, it will be used to override the request handler
624
proto_vers = self.protocol_version
625
# Create the appropriate server for the required protocol
626
serv_cls = self.http_server_class.get(proto_vers, None)
628
raise httplib.UnknownProtocol(proto_vers)
630
self._httpd = self.create_httpd(serv_cls, rhandler)
631
# Ensure we get the right port and an updated host if needed
632
self.host, self.port = self._httpd.server_address
635
def run_server(self, started):
636
"""Server thread main entry point. """
637
server = self._get_httpd()
638
self._http_base_url = '%s://%s:%s/' % (self._url_protocol,
639
self.host, self.port)
640
server.serve(started)
411
self._http_base_url = None
414
def create_server(self):
415
return self.server_class(
416
(self.host, self.port), self.request_handler_class, self)
642
418
def _get_remote_url(self, path):
643
419
path_parts = path.split(os.path.sep)
672
448
backing_transport_server)
673
449
self._home_dir = os.getcwdu()
674
450
self._local_path_parts = self._home_dir.split(os.path.sep)
675
self._http_base_url = None
678
# Create the server thread
679
started = threading.Event()
680
self._http_thread = test_server.ThreadWithException(
681
event=started, target=self.run_server, args=(started,))
682
self._http_thread.start()
683
# Wait for the server thread to start (i.e release the lock)
685
if self._httpd is None:
686
if 'threads' in tests.selftest_debug_flags:
687
print 'Server %s:% start failed ' % (self.host, self.port)
689
self._http_thread.name = self._http_base_url
690
if 'threads' in tests.selftest_debug_flags:
691
print 'Thread started: %s' % (self._http_thread.name,)
693
# If an exception occured during the server start, it will get raised
694
self._http_thread.join(timeout=0)
696
def stop_server(self):
697
"""See bzrlib.transport.Server.tearDown."""
698
if self._httpd is not None:
699
# The server has been started successfully, shut it down now
700
self._httpd.shutdown_server()
701
if 'threads' in tests.selftest_debug_flags:
702
print 'Try joining: %s' % (self._http_thread.name,)
703
self._httpd.join_thread(self._http_thread)
704
if 'threads' in tests.selftest_debug_flags:
705
print 'Thread joined: %s' % (self._http_thread.name,)
453
super(HttpServer, self).start_server()
454
self._http_base_url = '%s://%s:%s/' % (
455
self._url_protocol, self.host, self.port)
707
457
def get_url(self):
708
458
"""See bzrlib.transport.Server.get_url."""