~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_server.py

(vila) Better deprecation warnings for _CompatabilityThunkFeature based test
 features (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2010 Canonical Ltd
 
1
# Copyright (C) 2010, 2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
import errno
18
18
import socket
19
19
import SocketServer
20
 
import select
21
20
import sys
22
21
import threading
 
22
import traceback
23
23
 
24
24
 
25
25
from bzrlib import (
 
26
    cethread,
 
27
    errors,
26
28
    osutils,
27
29
    transport,
28
30
    urlutils,
212
214
    def start_server(self, backing_server=None):
213
215
        """Setup the Chroot on backing_server."""
214
216
        if backing_server is not None:
215
 
            self.backing_transport = transport.get_transport(
 
217
            self.backing_transport = transport.get_transport_from_url(
216
218
                backing_server.get_url())
217
219
        else:
218
 
            self.backing_transport = transport.get_transport('.')
 
220
            self.backing_transport = transport.get_transport_from_path('.')
219
221
        self.backing_transport.clone('added-by-filter').ensure_base()
220
222
        self.filter_func = lambda x: 'added-by-filter/' + x
221
223
        super(TestingPathFilteringServer, self).start_server()
233
235
    def start_server(self, backing_server=None):
234
236
        """Setup the Chroot on backing_server."""
235
237
        if backing_server is not None:
236
 
            self.backing_transport = transport.get_transport(
 
238
            self.backing_transport = transport.get_transport_from_url(
237
239
                backing_server.get_url())
238
240
        else:
239
 
            self.backing_transport = transport.get_transport('.')
 
241
            self.backing_transport = transport.get_transport_from_path('.')
240
242
        super(TestingChrootServer, self).start_server()
241
243
 
242
244
    def get_bogus_url(self):
243
245
        raise NotImplementedError
244
246
 
245
247
 
246
 
class ThreadWithException(threading.Thread):
247
 
    """A catching exception thread.
248
 
 
249
 
    If an exception occurs during the thread execution, it's caught and
250
 
    re-raised when the thread is joined().
251
 
    """
252
 
 
253
 
    def __init__(self, *args, **kwargs):
254
 
        # There are cases where the calling thread must wait, yet, if an
255
 
        # exception occurs, the event should be set so the caller is not
256
 
        # blocked. The main example is a calling thread that want to wait for
257
 
        # the called thread to be in a given state before continuing.
258
 
        try:
259
 
            event = kwargs.pop('event')
260
 
        except KeyError:
261
 
            # If the caller didn't pass a specific event, create our own
262
 
            event = threading.Event()
263
 
        super(ThreadWithException, self).__init__(*args, **kwargs)
264
 
        self.set_ready_event(event)
265
 
        self.exception = None
266
 
        self.ignored_exceptions = None # see set_ignored_exceptions
267
 
 
268
 
    # compatibility thunk for python-2.4 and python-2.5...
269
 
    if sys.version_info < (2, 6):
270
 
        name = property(threading.Thread.getName, threading.Thread.setName)
271
 
 
272
 
    def set_ready_event(self, event):
273
 
        """Set the ``ready`` event used to synchronize exception catching.
274
 
 
275
 
        When the thread uses an event to synchronize itself with another thread
276
 
        (setting it when the other thread can wake up from a ``wait`` call),
277
 
        the event must be set after catching an exception or the other thread
278
 
        will hang.
279
 
 
280
 
        Some threads require multiple events and should set the relevant one
281
 
        when appropriate.
282
 
        """
283
 
        self.ready = event
284
 
 
285
 
    def set_ignored_exceptions(self, ignored):
286
 
        """Declare which exceptions will be ignored.
287
 
 
288
 
        :param ignored: Can be either:
289
 
           - None: all exceptions will be raised,
290
 
           - an exception class: the instances of this class will be ignored,
291
 
           - a tuple of exception classes: the instances of any class of the
292
 
             list will be ignored,
293
 
           - a callable: that will be passed the exception object
294
 
             and should return True if the exception should be ignored
295
 
        """
296
 
        if ignored is None:
297
 
            self.ignored_exceptions = None
298
 
        elif isinstance(ignored, (Exception, tuple)):
299
 
            self.ignored_exceptions = lambda e: isinstance(e, ignored)
300
 
        else:
301
 
            self.ignored_exceptions = ignored
302
 
 
303
 
    def run(self):
304
 
        """Overrides Thread.run to capture any exception."""
305
 
        self.ready.clear()
306
 
        try:
307
 
            try:
308
 
                super(ThreadWithException, self).run()
309
 
            except:
310
 
                self.exception = sys.exc_info()
311
 
        finally:
312
 
            # Make sure the calling thread is released
313
 
            self.ready.set()
314
 
 
 
248
class TestThread(cethread.CatchingExceptionThread):
315
249
 
316
250
    def join(self, timeout=5):
317
 
        """Overrides Thread.join to raise any exception caught.
318
 
 
319
 
 
320
 
        Calling join(timeout=0) will raise the caught exception or return None
321
 
        if the thread is still alive.
 
251
        """Overrides to use a default timeout.
322
252
 
323
253
        The default timeout is set to 5 and should expire only when a thread
324
254
        serving a client connection is hung.
325
255
        """
326
 
        super(ThreadWithException, self).join(timeout)
327
 
        if self.exception is not None:
328
 
            exc_class, exc_value, exc_tb = self.exception
329
 
            self.exception = None # The exception should be raised only once
330
 
            if (self.ignored_exceptions is None
331
 
                or not self.ignored_exceptions(exc_value)):
332
 
                # Raise non ignored exceptions
333
 
                raise exc_class, exc_value, exc_tb
 
256
        super(TestThread, self).join(timeout)
334
257
        if timeout and self.isAlive():
335
258
            # The timeout expired without joining the thread, the thread is
336
259
            # therefore stucked and that's a failure as far as the test is
343
266
            sys.stderr.write('thread %s hung\n' % (self.name,))
344
267
            #raise AssertionError('thread %s hung' % (self.name,))
345
268
 
346
 
    def pending_exception(self):
347
 
        """Raise the caught exception.
348
 
 
349
 
        This does nothing if no exception occurred.
350
 
        """
351
 
        self.join(timeout=0)
352
 
 
353
 
 
354
 
class TestingTCPServerMixin:
 
269
 
 
270
class TestingTCPServerMixin(object):
355
271
    """Mixin to support running SocketServer.TCPServer in a thread.
356
272
 
357
273
    Tests are connecting from the main thread, the server has to be run in a
373
289
 
374
290
    def serve(self):
375
291
        self.serving = True
376
 
        self.stopped.clear()
377
292
        # We are listening and ready to accept connections
378
293
        self.started.set()
379
294
        try:
398
313
                self.process_request(request, client_address)
399
314
            except:
400
315
                self.handle_error(request, client_address)
401
 
                self.close_request(request)
 
316
        else:
 
317
            self.close_request(request)
402
318
 
403
319
    def get_request(self):
404
320
        return self.socket.accept()
418
334
        # The following can be used for debugging purposes, it will display the
419
335
        # exception and the traceback just when it occurs instead of waiting
420
336
        # for the thread to be joined.
421
 
 
422
337
        # SocketServer.BaseServer.handle_error(self, request, client_address)
 
338
 
 
339
        # We call close_request manually, because we are going to raise an
 
340
        # exception. The SocketServer implementation calls:
 
341
        #   handle_error(...)
 
342
        #   close_request(...)
 
343
        # But because we raise the exception, close_request will never be
 
344
        # triggered. This helps client not block waiting for a response when
 
345
        # the server gets an exception.
 
346
        self.close_request(request)
423
347
        raise
424
348
 
425
349
    def ignored_exceptions_during_shutdown(self, e):
505
429
        SocketServer.ThreadingTCPServer.__init__(self, server_address,
506
430
                                                 request_handler_class)
507
431
 
508
 
    def get_request (self):
 
432
    def get_request(self):
509
433
        """Get the request and client address from the socket."""
510
434
        sock, addr = TestingTCPServerMixin.get_request(self)
511
435
        # The thread is not create yet, it will be updated in process_request
523
447
        """Start a new thread to process the request."""
524
448
        started = threading.Event()
525
449
        stopped = threading.Event()
526
 
        t = ThreadWithException(
527
 
            event=stopped,
 
450
        t = TestThread(
 
451
            sync_event=stopped,
528
452
            name='%s -> %s' % (client_address, self.server_address),
529
 
            target = self.process_request_thread,
530
 
            args = (started, stopped, request, client_address))
 
453
            target=self.process_request_thread,
 
454
            args=(started, stopped, request, client_address))
531
455
        # Update the client description
532
456
        self.clients.pop()
533
457
        self.clients.append((request, client_address, t))
534
 
        # Propagate the exception handler since we must use the same one for
535
 
        # connections running in their own threads than TestingTCPServer.
 
458
        # Propagate the exception handler since we must use the same one as
 
459
        # TestingTCPServer for connections running in their own threads.
536
460
        t.set_ignored_exceptions(self.ignored_exceptions)
537
461
        t.start()
538
462
        started.wait()
539
463
        if debug_threads():
540
464
            sys.stderr.write('Client thread %s started\n' % (t.name,))
541
465
        # If an exception occured during the thread start, it will get raised.
 
466
        # In rare cases, an exception raised during the request processing may
 
467
        # also get caught here (see http://pad.lv/869366)
542
468
        t.pending_exception()
543
469
 
544
470
    # The following methods are called by the main thread
590
516
 
591
517
    def start_server(self):
592
518
        self.server = self.create_server()
593
 
        self._server_thread = ThreadWithException(
594
 
            event=self.server.started,
 
519
        self._server_thread = TestThread(
 
520
            sync_event=self.server.started,
595
521
            target=self.run_server)
596
522
        self._server_thread.start()
597
 
        # Wait for the server thread to start (i.e release the lock)
 
523
        # Wait for the server thread to start (i.e. release the lock)
598
524
        self.server.started.wait()
599
525
        # Get the real address, especially the port
600
526
        self.host, self.port = self.server.server_address
607
533
        self._server_thread.pending_exception()
608
534
        # From now on, we'll use a different event to ensure the server can set
609
535
        # its exception
610
 
        self._server_thread.set_ready_event(self.server.stopped)
 
536
        self._server_thread.set_sync_event(self.server.stopped)
611
537
 
612
538
    def run_server(self):
613
539
        self.server.serve()
634
560
                # server thread, it may happen that it's not blocked or even
635
561
                # not started.
636
562
                pass
637
 
            # We start shutting down the client while the server itself is
 
563
            # We start shutting down the clients while the server itself is
638
564
            # shutting down.
639
565
            self.server.stop_client_connections()
640
566
            # Now we wait for the thread running self.server.serve() to finish
675
601
    def __init__(self, request, client_address, server):
676
602
        medium.SmartServerSocketStreamMedium.__init__(
677
603
            self, request, server.backing_transport,
678
 
            server.root_client_path)
 
604
            server.root_client_path,
 
605
            timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
679
606
        request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
680
607
        SocketServer.BaseRequestHandler.__init__(self, request, client_address,
681
608
                                                 server)
682
609
 
683
610
    def handle(self):
684
 
        while not self.finished:
685
 
            server_protocol = self._build_protocol()
686
 
            self._serve_one_request(server_protocol)
687
 
 
 
611
        try:
 
612
            while not self.finished:
 
613
                server_protocol = self._build_protocol()
 
614
                self._serve_one_request(server_protocol)
 
615
        except errors.ConnectionTimeout:
 
616
            # idle connections aren't considered a failure of the server
 
617
            return
 
618
 
 
619
 
 
620
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
688
621
 
689
622
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
690
623
 
693
626
        TestingThreadingTCPServer.__init__(self, server_address,
694
627
                                           request_handler_class)
695
628
        server.SmartTCPServer.__init__(self, backing_transport,
696
 
                                       root_client_path)
 
629
            root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
 
630
 
697
631
    def serve(self):
698
 
        # FIXME: No test are exercising the hooks for the test server
699
 
        # -- vila 20100618
700
632
        self.run_server_started_hooks()
701
633
        try:
702
634
            TestingThreadingTCPServer.serve(self)
753
685
        self.chroot_server = ChrootServer(
754
686
            self.get_backing_transport(backing_transport_server))
755
687
        self.chroot_server.start_server()
756
 
        self.backing_transport = transport.get_transport(
 
688
        self.backing_transport = transport.get_transport_from_url(
757
689
            self.chroot_server.get_url())
758
690
        super(SmartTCPServer_for_testing, self).start_server()
759
691
 
765
697
 
766
698
    def get_backing_transport(self, backing_transport_server):
767
699
        """Get a backing transport from a server we are decorating."""
768
 
        return transport.get_transport(backing_transport_server.get_url())
 
700
        return transport.get_transport_from_url(
 
701
            backing_transport_server.get_url())
769
702
 
770
703
    def get_url(self):
771
704
        url = self.server.get_url()
782
715
    def get_backing_transport(self, backing_transport_server):
783
716
        """Get a backing transport from a server we are decorating."""
784
717
        url = 'readonly+' + backing_transport_server.get_url()
785
 
        return transport.get_transport(url)
 
718
        return transport.get_transport_from_url(url)
786
719
 
787
720
 
788
721
class SmartTCPServer_for_testing_v2_only(SmartTCPServer_for_testing):
803
736
    def get_backing_transport(self, backing_transport_server):
804
737
        """Get a backing transport from a server we are decorating."""
805
738
        url = 'readonly+' + backing_transport_server.get_url()
806
 
        return transport.get_transport(url)
807
 
 
808
 
 
809
 
 
810
 
 
 
739
        return transport.get_transport_from_url(url)