~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_server.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-08-30 07:47:38 UTC
  • mfrom: (5394.2.1 integration2)
  • Revision ID: pqm@pqm.ubuntu.com-20100830074738-ymqwum541fi8b4sr
(vila) Fix most of the leaking tests (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2010 Canonical Ltd
 
1
# Copyright (C) 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
 
17
import errno
 
18
import socket
 
19
import SocketServer
 
20
import select
 
21
import sys
 
22
import threading
 
23
 
 
24
 
17
25
from bzrlib import (
 
26
    osutils,
18
27
    transport,
19
28
    urlutils,
20
29
    )
22
31
    chroot,
23
32
    pathfilter,
24
33
    )
25
 
from bzrlib.smart import server
 
34
from bzrlib.smart import (
 
35
    medium,
 
36
    server,
 
37
    )
 
38
 
 
39
 
 
40
def debug_threads():
 
41
    # FIXME: There is a dependency loop between bzrlib.tests and
 
42
    # bzrlib.tests.test_server that needs to be fixed. In the mean time
 
43
    # defining this function is enough for our needs. -- vila 20100611
 
44
    from bzrlib import tests
 
45
    return 'threads' in tests.selftest_debug_flags
26
46
 
27
47
 
28
48
class TestServer(transport.Server):
223
243
        raise NotImplementedError
224
244
 
225
245
 
226
 
class SmartTCPServer_for_testing(server.SmartTCPServer):
 
246
class ThreadWithException(threading.Thread):
 
247
    """A catching exception thread.
 
248
 
 
249
    If an exception occurs during the thread execution, it's caught and
 
250
    re-raised when the thread is joined().
 
251
    """
 
252
 
 
253
    def __init__(self, *args, **kwargs):
 
254
        # There are cases where the calling thread must wait, yet, if an
 
255
        # exception occurs, the event should be set so the caller is not
 
256
        # blocked. The main example is a calling thread that want to wait for
 
257
        # the called thread to be in a given state before continuing.
 
258
        try:
 
259
            event = kwargs.pop('event')
 
260
        except KeyError:
 
261
            # If the caller didn't pass a specific event, create our own
 
262
            event = threading.Event()
 
263
        super(ThreadWithException, self).__init__(*args, **kwargs)
 
264
        self.set_ready_event(event)
 
265
        self.exception = None
 
266
        self.ignored_exceptions = None # see set_ignored_exceptions
 
267
 
 
268
    # compatibility thunk for python-2.4 and python-2.5...
 
269
    if sys.version_info < (2, 6):
 
270
        name = property(threading.Thread.getName, threading.Thread.setName)
 
271
 
 
272
    def set_ready_event(self, event):
 
273
        """Set the ``ready`` event used to synchronize exception catching.
 
274
 
 
275
        When the thread uses an event to synchronize itself with another thread
 
276
        (setting it when the other thread can wake up from a ``wait`` call),
 
277
        the event must be set after catching an exception or the other thread
 
278
        will hang.
 
279
 
 
280
        Some threads require multiple events and should set the relevant one
 
281
        when appropriate.
 
282
        """
 
283
        self.ready = event
 
284
 
 
285
    def set_ignored_exceptions(self, ignored):
 
286
        """Declare which exceptions will be ignored.
 
287
 
 
288
        :param ignored: Can be either:
 
289
           - None: all exceptions will be raised,
 
290
           - an exception class: the instances of this class will be ignored,
 
291
           - a tuple of exception classes: the instances of any class of the
 
292
             list will be ignored,
 
293
           - a callable: that will be passed the exception object
 
294
             and should return True if the exception should be ignored
 
295
        """
 
296
        if ignored is None:
 
297
            self.ignored_exceptions = None
 
298
        elif isinstance(ignored, (Exception, tuple)):
 
299
            self.ignored_exceptions = lambda e: isinstance(e, ignored)
 
300
        else:
 
301
            self.ignored_exceptions = ignored
 
302
 
 
303
    def run(self):
 
304
        """Overrides Thread.run to capture any exception."""
 
305
        self.ready.clear()
 
306
        try:
 
307
            try:
 
308
                super(ThreadWithException, self).run()
 
309
            except:
 
310
                self.exception = sys.exc_info()
 
311
        finally:
 
312
            # Make sure the calling thread is released
 
313
            self.ready.set()
 
314
 
 
315
 
 
316
    def join(self, timeout=5):
 
317
        """Overrides Thread.join to raise any exception caught.
 
318
 
 
319
 
 
320
        Calling join(timeout=0) will raise the caught exception or return None
 
321
        if the thread is still alive.
 
322
 
 
323
        The default timeout is set to 5 and should expire only when a thread
 
324
        serving a client connection is hung.
 
325
        """
 
326
        super(ThreadWithException, self).join(timeout)
 
327
        if self.exception is not None:
 
328
            exc_class, exc_value, exc_tb = self.exception
 
329
            self.exception = None # The exception should be raised only once
 
330
            if (self.ignored_exceptions is None
 
331
                or not self.ignored_exceptions(exc_value)):
 
332
                # Raise non ignored exceptions
 
333
                raise exc_class, exc_value, exc_tb
 
334
        if timeout and self.isAlive():
 
335
            # The timeout expired without joining the thread, the thread is
 
336
            # therefore stucked and that's a failure as far as the test is
 
337
            # concerned. We used to hang here.
 
338
 
 
339
            # FIXME: we need to kill the thread, but as far as the test is
 
340
            # concerned, raising an assertion is too strong. On most of the
 
341
            # platforms, this doesn't occur, so just mentioning the problem is
 
342
            # enough for now -- vila 2010824
 
343
            sys.stderr.write('thread %s hung\n' % (self.name,))
 
344
            #raise AssertionError('thread %s hung' % (self.name,))
 
345
 
 
346
    def pending_exception(self):
 
347
        """Raise the caught exception.
 
348
 
 
349
        This does nothing if no exception occurred.
 
350
        """
 
351
        self.join(timeout=0)
 
352
 
 
353
 
 
354
class TestingTCPServerMixin:
 
355
    """Mixin to support running SocketServer.TCPServer in a thread.
 
356
 
 
357
    Tests are connecting from the main thread, the server has to be run in a
 
358
    separate thread.
 
359
    """
 
360
 
 
361
    def __init__(self):
 
362
        self.started = threading.Event()
 
363
        self.serving = None
 
364
        self.stopped = threading.Event()
 
365
        # We collect the resources used by the clients so we can release them
 
366
        # when shutting down
 
367
        self.clients = []
 
368
        self.ignored_exceptions = None
 
369
 
 
370
    def server_bind(self):
 
371
        self.socket.bind(self.server_address)
 
372
        self.server_address = self.socket.getsockname()
 
373
 
 
374
    def serve(self):
 
375
        self.serving = True
 
376
        self.stopped.clear()
 
377
        # We are listening and ready to accept connections
 
378
        self.started.set()
 
379
        try:
 
380
            while self.serving:
 
381
                # Really a connection but the python framework is generic and
 
382
                # call them requests
 
383
                self.handle_request()
 
384
            # Let's close the listening socket
 
385
            self.server_close()
 
386
        finally:
 
387
            self.stopped.set()
 
388
 
 
389
    def handle_request(self):
 
390
        """Handle one request.
 
391
 
 
392
        The python version swallows some socket exceptions and we don't use
 
393
        timeout, so we override it to better control the server behavior.
 
394
        """
 
395
        request, client_address = self.get_request()
 
396
        if self.verify_request(request, client_address):
 
397
            try:
 
398
                self.process_request(request, client_address)
 
399
            except:
 
400
                self.handle_error(request, client_address)
 
401
                self.close_request(request)
 
402
 
 
403
    def get_request(self):
 
404
        return self.socket.accept()
 
405
 
 
406
    def verify_request(self, request, client_address):
 
407
        """Verify the request.
 
408
 
 
409
        Return True if we should proceed with this request, False if we should
 
410
        not even touch a single byte in the socket ! This is useful when we
 
411
        stop the server with a dummy last connection.
 
412
        """
 
413
        return self.serving
 
414
 
 
415
    def handle_error(self, request, client_address):
 
416
        # Stop serving and re-raise the last exception seen
 
417
        self.serving = False
 
418
        # The following can be used for debugging purposes, it will display the
 
419
        # exception and the traceback just when it occurs instead of waiting
 
420
        # for the thread to be joined.
 
421
 
 
422
        # SocketServer.BaseServer.handle_error(self, request, client_address)
 
423
        raise
 
424
 
 
425
    def ignored_exceptions_during_shutdown(self, e):
 
426
        if sys.platform == 'win32':
 
427
            accepted_errnos = [errno.EBADF, errno.WSAEBADF, errno.WSAENOTCONN,
 
428
                               errno.WSAECONNRESET, errno.WSAESHUTDOWN]
 
429
        else:
 
430
            accepted_errnos = [errno.EBADF, errno.ENOTCONN, errno.ECONNRESET]
 
431
        if isinstance(e, socket.error) and e[0] in accepted_errnos:
 
432
            return True
 
433
        return False
 
434
 
 
435
    # The following methods are called by the main thread
 
436
 
 
437
    def stop_client_connections(self):
 
438
        while self.clients:
 
439
            c = self.clients.pop()
 
440
            self.shutdown_client(c)
 
441
 
 
442
    def shutdown_socket(self, sock):
 
443
        """Properly shutdown a socket.
 
444
 
 
445
        This should be called only when no other thread is trying to use the
 
446
        socket.
 
447
        """
 
448
        try:
 
449
            sock.shutdown(socket.SHUT_RDWR)
 
450
            sock.close()
 
451
        except Exception, e:
 
452
            if self.ignored_exceptions(e):
 
453
                pass
 
454
            else:
 
455
                raise
 
456
 
 
457
    # The following methods are called by the main thread
 
458
 
 
459
    def set_ignored_exceptions(self, thread, ignored_exceptions):
 
460
        self.ignored_exceptions = ignored_exceptions
 
461
        thread.set_ignored_exceptions(self.ignored_exceptions)
 
462
 
 
463
    def _pending_exception(self, thread):
 
464
        """Raise server uncaught exception.
 
465
 
 
466
        Daughter classes can override this if they use daughter threads.
 
467
        """
 
468
        thread.pending_exception()
 
469
 
 
470
 
 
471
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
 
472
 
 
473
    def __init__(self, server_address, request_handler_class):
 
474
        TestingTCPServerMixin.__init__(self)
 
475
        SocketServer.TCPServer.__init__(self, server_address,
 
476
                                        request_handler_class)
 
477
 
 
478
    def get_request(self):
 
479
        """Get the request and client address from the socket."""
 
480
        sock, addr = TestingTCPServerMixin.get_request(self)
 
481
        self.clients.append((sock, addr))
 
482
        return sock, addr
 
483
 
 
484
    # The following methods are called by the main thread
 
485
 
 
486
    def shutdown_client(self, client):
 
487
        sock, addr = client
 
488
        self.shutdown_socket(sock)
 
489
 
 
490
 
 
491
class TestingThreadingTCPServer(TestingTCPServerMixin,
 
492
                                SocketServer.ThreadingTCPServer):
 
493
 
 
494
    def __init__(self, server_address, request_handler_class):
 
495
        TestingTCPServerMixin.__init__(self)
 
496
        SocketServer.ThreadingTCPServer.__init__(self, server_address,
 
497
                                                 request_handler_class)
 
498
 
 
499
    def get_request (self):
 
500
        """Get the request and client address from the socket."""
 
501
        sock, addr = TestingTCPServerMixin.get_request(self)
 
502
        # The thread is not create yet, it will be updated in process_request
 
503
        self.clients.append((sock, addr, None))
 
504
        return sock, addr
 
505
 
 
506
    def process_request_thread(self, started, stopped, request, client_address):
 
507
        started.set()
 
508
        SocketServer.ThreadingTCPServer.process_request_thread(
 
509
            self, request, client_address)
 
510
        self.close_request(request)
 
511
        stopped.set()
 
512
 
 
513
    def process_request(self, request, client_address):
 
514
        """Start a new thread to process the request."""
 
515
        started = threading.Event()
 
516
        stopped = threading.Event()
 
517
        t = ThreadWithException(
 
518
            event=stopped,
 
519
            name='%s -> %s' % (client_address, self.server_address),
 
520
            target = self.process_request_thread,
 
521
            args = (started, stopped, request, client_address))
 
522
        # Update the client description
 
523
        self.clients.pop()
 
524
        self.clients.append((request, client_address, t))
 
525
        # Propagate the exception handler since we must use the same one for
 
526
        # connections running in their own threads than TestingTCPServer.
 
527
        t.set_ignored_exceptions(self.ignored_exceptions)
 
528
        t.start()
 
529
        started.wait()
 
530
        if debug_threads():
 
531
            sys.stderr.write('Client thread %s started\n' % (t.name,))
 
532
        # If an exception occured during the thread start, it will get raised.
 
533
        t.pending_exception()
 
534
 
 
535
    # The following methods are called by the main thread
 
536
 
 
537
    def shutdown_client(self, client):
 
538
        sock, addr, connection_thread = client
 
539
        self.shutdown_socket(sock)
 
540
        if connection_thread is not None:
 
541
            # The thread has been created only if the request is processed but
 
542
            # after the connection is inited. This could happen during server
 
543
            # shutdown. If an exception occurred in the thread it will be
 
544
            # re-raised
 
545
            if debug_threads():
 
546
                sys.stderr.write('Client thread %s will be joined\n'
 
547
                                 % (connection_thread.name,))
 
548
            connection_thread.join()
 
549
 
 
550
    def set_ignored_exceptions(self, thread, ignored_exceptions):
 
551
        TestingTCPServerMixin.set_ignored_exceptions(self, thread,
 
552
                                                     ignored_exceptions)
 
553
        for sock, addr, connection_thread in self.clients:
 
554
            if connection_thread is not None:
 
555
                connection_thread.set_ignored_exceptions(
 
556
                    self.ignored_exceptions)
 
557
 
 
558
    def _pending_exception(self, thread):
 
559
        for sock, addr, connection_thread in self.clients:
 
560
            if connection_thread is not None:
 
561
                connection_thread.pending_exception()
 
562
        TestingTCPServerMixin._pending_exception(self, thread)
 
563
 
 
564
 
 
565
class TestingTCPServerInAThread(transport.Server):
 
566
    """A server in a thread that re-raise thread exceptions."""
 
567
 
 
568
    def __init__(self, server_address, server_class, request_handler_class):
 
569
        self.server_class = server_class
 
570
        self.request_handler_class = request_handler_class
 
571
        self.host, self.port = server_address
 
572
        self.server = None
 
573
        self._server_thread = None
 
574
 
 
575
    def __repr__(self):
 
576
        return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
 
577
 
 
578
    def create_server(self):
 
579
        return self.server_class((self.host, self.port),
 
580
                                 self.request_handler_class)
 
581
 
 
582
    def start_server(self):
 
583
        self.server = self.create_server()
 
584
        self._server_thread = ThreadWithException(
 
585
            event=self.server.started,
 
586
            target=self.run_server)
 
587
        self._server_thread.start()
 
588
        # Wait for the server thread to start (i.e release the lock)
 
589
        self.server.started.wait()
 
590
        # Get the real address, especially the port
 
591
        self.host, self.port = self.server.server_address
 
592
        self._server_thread.name = self.server.server_address
 
593
        if debug_threads():
 
594
            sys.stderr.write('Server thread %s started\n'
 
595
                             % (self._server_thread.name,))
 
596
        # If an exception occured during the server start, it will get raised,
 
597
        # otherwise, the server is blocked on its accept() call.
 
598
        self._server_thread.pending_exception()
 
599
        # From now on, we'll use a different event to ensure the server can set
 
600
        # its exception
 
601
        self._server_thread.set_ready_event(self.server.stopped)
 
602
 
 
603
    def run_server(self):
 
604
        self.server.serve()
 
605
 
 
606
    def stop_server(self):
 
607
        if self.server is None:
 
608
            return
 
609
        try:
 
610
            # The server has been started successfully, shut it down now.  As
 
611
            # soon as we stop serving, no more connection are accepted except
 
612
            # one to get out of the blocking listen.
 
613
            self.set_ignored_exceptions(
 
614
                self.server.ignored_exceptions_during_shutdown)
 
615
            self.server.serving = False
 
616
            if debug_threads():
 
617
                sys.stderr.write('Server thread %s will be joined\n'
 
618
                                 % (self._server_thread.name,))
 
619
            # The server is listening for a last connection, let's give it:
 
620
            last_conn = None
 
621
            try:
 
622
                last_conn = osutils.connect_socket((self.host, self.port))
 
623
            except socket.error, e:
 
624
                # But ignore connection errors as the point is to unblock the
 
625
                # server thread, it may happen that it's not blocked or even
 
626
                # not started.
 
627
                pass
 
628
            # We start shutting down the client while the server itself is
 
629
            # shutting down.
 
630
            self.server.stop_client_connections()
 
631
            # Now we wait for the thread running self.server.serve() to finish
 
632
            self.server.stopped.wait()
 
633
            if last_conn is not None:
 
634
                # Close the last connection without trying to use it. The
 
635
                # server will not process a single byte on that socket to avoid
 
636
                # complications (SSL starts with a handshake for example).
 
637
                last_conn.close()
 
638
            # Check for any exception that could have occurred in the server
 
639
            # thread
 
640
            try:
 
641
                self._server_thread.join()
 
642
            except Exception, e:
 
643
                if self.server.ignored_exceptions(e):
 
644
                    pass
 
645
                else:
 
646
                    raise
 
647
        finally:
 
648
            # Make sure we can be called twice safely, note that this means
 
649
            # that we will raise a single exception even if several occurred in
 
650
            # the various threads involved.
 
651
            self.server = None
 
652
 
 
653
    def set_ignored_exceptions(self, ignored_exceptions):
 
654
        """Install an exception handler for the server."""
 
655
        self.server.set_ignored_exceptions(self._server_thread,
 
656
                                           ignored_exceptions)
 
657
 
 
658
    def pending_exception(self):
 
659
        """Raise uncaught exception in the server."""
 
660
        self.server._pending_exception(self._server_thread)
 
661
 
 
662
 
 
663
class TestingSmartConnectionHandler(SocketServer.BaseRequestHandler,
 
664
                                    medium.SmartServerSocketStreamMedium):
 
665
 
 
666
    def __init__(self, request, client_address, server):
 
667
        medium.SmartServerSocketStreamMedium.__init__(
 
668
            self, request, server.backing_transport,
 
669
            server.root_client_path)
 
670
        request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
671
        SocketServer.BaseRequestHandler.__init__(self, request, client_address,
 
672
                                                 server)
 
673
 
 
674
    def handle(self):
 
675
        while not self.finished:
 
676
            server_protocol = self._build_protocol()
 
677
            self._serve_one_request(server_protocol)
 
678
 
 
679
 
 
680
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
 
681
 
 
682
    def __init__(self, server_address, request_handler_class,
 
683
                 backing_transport, root_client_path):
 
684
        TestingThreadingTCPServer.__init__(self, server_address,
 
685
                                           request_handler_class)
 
686
        server.SmartTCPServer.__init__(self, backing_transport,
 
687
                                       root_client_path)
 
688
    def serve(self):
 
689
        # FIXME: No test are exercising the hooks for the test server
 
690
        # -- vila 20100618
 
691
        self.run_server_started_hooks()
 
692
        try:
 
693
            TestingThreadingTCPServer.serve(self)
 
694
        finally:
 
695
            self.run_server_stopped_hooks()
 
696
 
 
697
    def get_url(self):
 
698
        """Return the url of the server"""
 
699
        return "bzr://%s:%d/" % self.server_address
 
700
 
 
701
 
 
702
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
703
    """Server suitable for use by transport tests.
228
704
 
229
705
    This server is backed by the process's cwd.
230
706
    """
231
 
 
232
707
    def __init__(self, thread_name_suffix=''):
233
 
        super(SmartTCPServer_for_testing, self).__init__(None)
234
708
        self.client_path_extra = None
235
709
        self.thread_name_suffix = thread_name_suffix
236
 
 
237
 
    def get_backing_transport(self, backing_transport_server):
238
 
        """Get a backing transport from a server we are decorating."""
239
 
        return transport.get_transport(backing_transport_server.get_url())
 
710
        self.host = '127.0.0.1'
 
711
        self.port = 0
 
712
        super(SmartTCPServer_for_testing, self).__init__(
 
713
                (self.host, self.port),
 
714
                TestingSmartServer,
 
715
                TestingSmartConnectionHandler)
 
716
 
 
717
    def create_server(self):
 
718
        return self.server_class((self.host, self.port),
 
719
                                 self.request_handler_class,
 
720
                                 self.backing_transport,
 
721
                                 self.root_client_path)
 
722
 
240
723
 
241
724
    def start_server(self, backing_transport_server=None,
242
 
              client_path_extra='/extra/'):
 
725
                     client_path_extra='/extra/'):
243
726
        """Set up server for testing.
244
727
 
245
728
        :param backing_transport_server: backing server to use.  If not
254
737
        """
255
738
        if not client_path_extra.startswith('/'):
256
739
            raise ValueError(client_path_extra)
 
740
        self.root_client_path = self.client_path_extra = client_path_extra
257
741
        from bzrlib.transport.chroot import ChrootServer
258
742
        if backing_transport_server is None:
259
743
            backing_transport_server = LocalURLServer()
262
746
        self.chroot_server.start_server()
263
747
        self.backing_transport = transport.get_transport(
264
748
            self.chroot_server.get_url())
265
 
        self.root_client_path = self.client_path_extra = client_path_extra
266
 
        self.start_background_thread(self.thread_name_suffix)
 
749
        super(SmartTCPServer_for_testing, self).start_server()
267
750
 
268
751
    def stop_server(self):
269
 
        self.stop_background_thread()
270
 
        self.chroot_server.stop_server()
 
752
        try:
 
753
            super(SmartTCPServer_for_testing, self).stop_server()
 
754
        finally:
 
755
            self.chroot_server.stop_server()
 
756
 
 
757
    def get_backing_transport(self, backing_transport_server):
 
758
        """Get a backing transport from a server we are decorating."""
 
759
        return transport.get_transport(backing_transport_server.get_url())
271
760
 
272
761
    def get_url(self):
273
 
        url = super(SmartTCPServer_for_testing, self).get_url()
 
762
        url = self.server.get_url()
274
763
        return url[:-1] + self.client_path_extra
275
764
 
276
765
    def get_bogus_url(self):