~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_server.py

  • Committer: Martin Pool
  • Date: 2011-01-20 23:07:25 UTC
  • mfrom: (5626 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5630.
  • Revision ID: mbp@canonical.com-20110120230725-12l7ltnko5x3fgnz
merge news

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
 
24
24
from bzrlib import (
25
 
    cethread,
26
25
    osutils,
27
26
    transport,
28
27
    urlutils,
212
211
    def start_server(self, backing_server=None):
213
212
        """Setup the Chroot on backing_server."""
214
213
        if backing_server is not None:
215
 
            self.backing_transport = transport.get_transport_from_url(
 
214
            self.backing_transport = transport.get_transport(
216
215
                backing_server.get_url())
217
216
        else:
218
 
            self.backing_transport = transport.get_transport_from_path('.')
 
217
            self.backing_transport = transport.get_transport('.')
219
218
        self.backing_transport.clone('added-by-filter').ensure_base()
220
219
        self.filter_func = lambda x: 'added-by-filter/' + x
221
220
        super(TestingPathFilteringServer, self).start_server()
233
232
    def start_server(self, backing_server=None):
234
233
        """Setup the Chroot on backing_server."""
235
234
        if backing_server is not None:
236
 
            self.backing_transport = transport.get_transport_from_url(
 
235
            self.backing_transport = transport.get_transport(
237
236
                backing_server.get_url())
238
237
        else:
239
 
            self.backing_transport = transport.get_transport_from_path('.')
 
238
            self.backing_transport = transport.get_transport('.')
240
239
        super(TestingChrootServer, self).start_server()
241
240
 
242
241
    def get_bogus_url(self):
243
242
        raise NotImplementedError
244
243
 
245
244
 
246
 
class TestThread(cethread.CatchingExceptionThread):
 
245
class ThreadWithException(threading.Thread):
 
246
    """A catching exception thread.
 
247
 
 
248
    If an exception occurs during the thread execution, it's caught and
 
249
    re-raised when the thread is joined().
 
250
    """
 
251
 
 
252
    def __init__(self, *args, **kwargs):
 
253
        # There are cases where the calling thread must wait, yet, if an
 
254
        # exception occurs, the event should be set so the caller is not
 
255
        # blocked. The main example is a calling thread that want to wait for
 
256
        # the called thread to be in a given state before continuing.
 
257
        try:
 
258
            event = kwargs.pop('event')
 
259
        except KeyError:
 
260
            # If the caller didn't pass a specific event, create our own
 
261
            event = threading.Event()
 
262
        super(ThreadWithException, self).__init__(*args, **kwargs)
 
263
        self.set_ready_event(event)
 
264
        self.exception = None
 
265
        self.ignored_exceptions = None # see set_ignored_exceptions
 
266
 
 
267
    # compatibility thunk for python-2.4 and python-2.5...
 
268
    if sys.version_info < (2, 6):
 
269
        name = property(threading.Thread.getName, threading.Thread.setName)
 
270
 
 
271
    def set_ready_event(self, event):
 
272
        """Set the ``ready`` event used to synchronize exception catching.
 
273
 
 
274
        When the thread uses an event to synchronize itself with another thread
 
275
        (setting it when the other thread can wake up from a ``wait`` call),
 
276
        the event must be set after catching an exception or the other thread
 
277
        will hang.
 
278
 
 
279
        Some threads require multiple events and should set the relevant one
 
280
        when appropriate.
 
281
        """
 
282
        self.ready = event
 
283
 
 
284
    def set_ignored_exceptions(self, ignored):
 
285
        """Declare which exceptions will be ignored.
 
286
 
 
287
        :param ignored: Can be either:
 
288
           - None: all exceptions will be raised,
 
289
           - an exception class: the instances of this class will be ignored,
 
290
           - a tuple of exception classes: the instances of any class of the
 
291
             list will be ignored,
 
292
           - a callable: that will be passed the exception object
 
293
             and should return True if the exception should be ignored
 
294
        """
 
295
        if ignored is None:
 
296
            self.ignored_exceptions = None
 
297
        elif isinstance(ignored, (Exception, tuple)):
 
298
            self.ignored_exceptions = lambda e: isinstance(e, ignored)
 
299
        else:
 
300
            self.ignored_exceptions = ignored
 
301
 
 
302
    def run(self):
 
303
        """Overrides Thread.run to capture any exception."""
 
304
        self.ready.clear()
 
305
        try:
 
306
            try:
 
307
                super(ThreadWithException, self).run()
 
308
            except:
 
309
                self.exception = sys.exc_info()
 
310
        finally:
 
311
            # Make sure the calling thread is released
 
312
            self.ready.set()
 
313
 
247
314
 
248
315
    def join(self, timeout=5):
249
 
        """Overrides to use a default timeout.
 
316
        """Overrides Thread.join to raise any exception caught.
 
317
 
 
318
 
 
319
        Calling join(timeout=0) will raise the caught exception or return None
 
320
        if the thread is still alive.
250
321
 
251
322
        The default timeout is set to 5 and should expire only when a thread
252
323
        serving a client connection is hung.
253
324
        """
254
 
        super(TestThread, self).join(timeout)
 
325
        super(ThreadWithException, self).join(timeout)
 
326
        if self.exception is not None:
 
327
            exc_class, exc_value, exc_tb = self.exception
 
328
            self.exception = None # The exception should be raised only once
 
329
            if (self.ignored_exceptions is None
 
330
                or not self.ignored_exceptions(exc_value)):
 
331
                # Raise non ignored exceptions
 
332
                raise exc_class, exc_value, exc_tb
255
333
        if timeout and self.isAlive():
256
334
            # The timeout expired without joining the thread, the thread is
257
335
            # therefore stucked and that's a failure as far as the test is
264
342
            sys.stderr.write('thread %s hung\n' % (self.name,))
265
343
            #raise AssertionError('thread %s hung' % (self.name,))
266
344
 
267
 
 
268
 
class TestingTCPServerMixin(object):
 
345
    def pending_exception(self):
 
346
        """Raise the caught exception.
 
347
 
 
348
        This does nothing if no exception occurred.
 
349
        """
 
350
        self.join(timeout=0)
 
351
 
 
352
 
 
353
class TestingTCPServerMixin:
269
354
    """Mixin to support running SocketServer.TCPServer in a thread.
270
355
 
271
356
    Tests are connecting from the main thread, the server has to be run in a
437
522
        """Start a new thread to process the request."""
438
523
        started = threading.Event()
439
524
        stopped = threading.Event()
440
 
        t = TestThread(
441
 
            sync_event=stopped,
 
525
        t = ThreadWithException(
 
526
            event=stopped,
442
527
            name='%s -> %s' % (client_address, self.server_address),
443
528
            target = self.process_request_thread,
444
529
            args = (started, stopped, request, client_address))
504
589
 
505
590
    def start_server(self):
506
591
        self.server = self.create_server()
507
 
        self._server_thread = TestThread(
508
 
            sync_event=self.server.started,
 
592
        self._server_thread = ThreadWithException(
 
593
            event=self.server.started,
509
594
            target=self.run_server)
510
595
        self._server_thread.start()
511
596
        # Wait for the server thread to start (i.e release the lock)
521
606
        self._server_thread.pending_exception()
522
607
        # From now on, we'll use a different event to ensure the server can set
523
608
        # its exception
524
 
        self._server_thread.set_sync_event(self.server.stopped)
 
609
        self._server_thread.set_ready_event(self.server.stopped)
525
610
 
526
611
    def run_server(self):
527
612
        self.server.serve()
589
674
    def __init__(self, request, client_address, server):
590
675
        medium.SmartServerSocketStreamMedium.__init__(
591
676
            self, request, server.backing_transport,
592
 
            server.root_client_path,
593
 
            timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
 
677
            server.root_client_path)
594
678
        request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
595
679
        SocketServer.BaseRequestHandler.__init__(self, request, client_address,
596
680
                                                 server)
601
685
            self._serve_one_request(server_protocol)
602
686
 
603
687
 
604
 
_DEFAULT_TESTING_CLIENT_TIMEOUT = 4.0
605
 
 
606
688
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
607
689
 
608
690
    def __init__(self, server_address, request_handler_class,
610
692
        TestingThreadingTCPServer.__init__(self, server_address,
611
693
                                           request_handler_class)
612
694
        server.SmartTCPServer.__init__(self, backing_transport,
613
 
            root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
614
 
 
 
695
                                       root_client_path)
615
696
    def serve(self):
616
697
        self.run_server_started_hooks()
617
698
        try:
669
750
        self.chroot_server = ChrootServer(
670
751
            self.get_backing_transport(backing_transport_server))
671
752
        self.chroot_server.start_server()
672
 
        self.backing_transport = transport.get_transport_from_url(
 
753
        self.backing_transport = transport.get_transport(
673
754
            self.chroot_server.get_url())
674
755
        super(SmartTCPServer_for_testing, self).start_server()
675
756
 
681
762
 
682
763
    def get_backing_transport(self, backing_transport_server):
683
764
        """Get a backing transport from a server we are decorating."""
684
 
        return transport.get_transport_from_url(
685
 
            backing_transport_server.get_url())
 
765
        return transport.get_transport(backing_transport_server.get_url())
686
766
 
687
767
    def get_url(self):
688
768
        url = self.server.get_url()
699
779
    def get_backing_transport(self, backing_transport_server):
700
780
        """Get a backing transport from a server we are decorating."""
701
781
        url = 'readonly+' + backing_transport_server.get_url()
702
 
        return transport.get_transport_from_url(url)
 
782
        return transport.get_transport(url)
703
783
 
704
784
 
705
785
class SmartTCPServer_for_testing_v2_only(SmartTCPServer_for_testing):
720
800
    def get_backing_transport(self, backing_transport_server):
721
801
        """Get a backing transport from a server we are decorating."""
722
802
        url = 'readonly+' + backing_transport_server.get_url()
723
 
        return transport.get_transport_from_url(url)
 
803
        return transport.get_transport(url)