~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_server.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-01 08:02:42 UTC
  • mfrom: (5390.3.3 faster-revert-593560)
  • Revision ID: pqm@pqm.ubuntu.com-20100901080242-esg62ody4frwmy66
(spiv) Avoid repeatedly calling self.target.all_file_ids() in
 InterTree.iter_changes. (Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2010, 2011 Canonical Ltd
 
1
# Copyright (C) 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
import errno
18
18
import socket
19
19
import SocketServer
 
20
import select
20
21
import sys
21
22
import threading
22
 
import traceback
23
23
 
24
24
 
25
25
from bzrlib import (
26
 
    cethread,
27
 
    errors,
28
26
    osutils,
29
27
    transport,
30
28
    urlutils,
214
212
    def start_server(self, backing_server=None):
215
213
        """Setup the Chroot on backing_server."""
216
214
        if backing_server is not None:
217
 
            self.backing_transport = transport.get_transport_from_url(
 
215
            self.backing_transport = transport.get_transport(
218
216
                backing_server.get_url())
219
217
        else:
220
 
            self.backing_transport = transport.get_transport_from_path('.')
 
218
            self.backing_transport = transport.get_transport('.')
221
219
        self.backing_transport.clone('added-by-filter').ensure_base()
222
220
        self.filter_func = lambda x: 'added-by-filter/' + x
223
221
        super(TestingPathFilteringServer, self).start_server()
235
233
    def start_server(self, backing_server=None):
236
234
        """Setup the Chroot on backing_server."""
237
235
        if backing_server is not None:
238
 
            self.backing_transport = transport.get_transport_from_url(
 
236
            self.backing_transport = transport.get_transport(
239
237
                backing_server.get_url())
240
238
        else:
241
 
            self.backing_transport = transport.get_transport_from_path('.')
 
239
            self.backing_transport = transport.get_transport('.')
242
240
        super(TestingChrootServer, self).start_server()
243
241
 
244
242
    def get_bogus_url(self):
245
243
        raise NotImplementedError
246
244
 
247
245
 
248
 
class TestThread(cethread.CatchingExceptionThread):
 
246
class ThreadWithException(threading.Thread):
 
247
    """A catching exception thread.
 
248
 
 
249
    If an exception occurs during the thread execution, it's caught and
 
250
    re-raised when the thread is joined().
 
251
    """
 
252
 
 
253
    def __init__(self, *args, **kwargs):
 
254
        # There are cases where the calling thread must wait, yet, if an
 
255
        # exception occurs, the event should be set so the caller is not
 
256
        # blocked. The main example is a calling thread that want to wait for
 
257
        # the called thread to be in a given state before continuing.
 
258
        try:
 
259
            event = kwargs.pop('event')
 
260
        except KeyError:
 
261
            # If the caller didn't pass a specific event, create our own
 
262
            event = threading.Event()
 
263
        super(ThreadWithException, self).__init__(*args, **kwargs)
 
264
        self.set_ready_event(event)
 
265
        self.exception = None
 
266
        self.ignored_exceptions = None # see set_ignored_exceptions
 
267
 
 
268
    # compatibility thunk for python-2.4 and python-2.5...
 
269
    if sys.version_info < (2, 6):
 
270
        name = property(threading.Thread.getName, threading.Thread.setName)
 
271
 
 
272
    def set_ready_event(self, event):
 
273
        """Set the ``ready`` event used to synchronize exception catching.
 
274
 
 
275
        When the thread uses an event to synchronize itself with another thread
 
276
        (setting it when the other thread can wake up from a ``wait`` call),
 
277
        the event must be set after catching an exception or the other thread
 
278
        will hang.
 
279
 
 
280
        Some threads require multiple events and should set the relevant one
 
281
        when appropriate.
 
282
        """
 
283
        self.ready = event
 
284
 
 
285
    def set_ignored_exceptions(self, ignored):
 
286
        """Declare which exceptions will be ignored.
 
287
 
 
288
        :param ignored: Can be either:
 
289
           - None: all exceptions will be raised,
 
290
           - an exception class: the instances of this class will be ignored,
 
291
           - a tuple of exception classes: the instances of any class of the
 
292
             list will be ignored,
 
293
           - a callable: that will be passed the exception object
 
294
             and should return True if the exception should be ignored
 
295
        """
 
296
        if ignored is None:
 
297
            self.ignored_exceptions = None
 
298
        elif isinstance(ignored, (Exception, tuple)):
 
299
            self.ignored_exceptions = lambda e: isinstance(e, ignored)
 
300
        else:
 
301
            self.ignored_exceptions = ignored
 
302
 
 
303
    def run(self):
 
304
        """Overrides Thread.run to capture any exception."""
 
305
        self.ready.clear()
 
306
        try:
 
307
            try:
 
308
                super(ThreadWithException, self).run()
 
309
            except:
 
310
                self.exception = sys.exc_info()
 
311
        finally:
 
312
            # Make sure the calling thread is released
 
313
            self.ready.set()
 
314
 
249
315
 
250
316
    def join(self, timeout=5):
251
 
        """Overrides to use a default timeout.
 
317
        """Overrides Thread.join to raise any exception caught.
 
318
 
 
319
 
 
320
        Calling join(timeout=0) will raise the caught exception or return None
 
321
        if the thread is still alive.
252
322
 
253
323
        The default timeout is set to 5 and should expire only when a thread
254
324
        serving a client connection is hung.
255
325
        """
256
 
        super(TestThread, self).join(timeout)
 
326
        super(ThreadWithException, self).join(timeout)
 
327
        if self.exception is not None:
 
328
            exc_class, exc_value, exc_tb = self.exception
 
329
            self.exception = None # The exception should be raised only once
 
330
            if (self.ignored_exceptions is None
 
331
                or not self.ignored_exceptions(exc_value)):
 
332
                # Raise non ignored exceptions
 
333
                raise exc_class, exc_value, exc_tb
257
334
        if timeout and self.isAlive():
258
335
            # The timeout expired without joining the thread, the thread is
259
336
            # therefore stucked and that's a failure as far as the test is
266
343
            sys.stderr.write('thread %s hung\n' % (self.name,))
267
344
            #raise AssertionError('thread %s hung' % (self.name,))
268
345
 
269
 
 
270
 
class TestingTCPServerMixin(object):
 
346
    def pending_exception(self):
 
347
        """Raise the caught exception.
 
348
 
 
349
        This does nothing if no exception occurred.
 
350
        """
 
351
        self.join(timeout=0)
 
352
 
 
353
 
 
354
class TestingTCPServerMixin:
271
355
    """Mixin to support running SocketServer.TCPServer in a thread.
272
356
 
273
357
    Tests are connecting from the main thread, the server has to be run in a
289
373
 
290
374
    def serve(self):
291
375
        self.serving = True
 
376
        self.stopped.clear()
292
377
        # We are listening and ready to accept connections
293
378
        self.started.set()
294
379
        try:
313
398
                self.process_request(request, client_address)
314
399
            except:
315
400
                self.handle_error(request, client_address)
316
 
        else:
317
 
            self.close_request(request)
 
401
                self.close_request(request)
318
402
 
319
403
    def get_request(self):
320
404
        return self.socket.accept()
334
418
        # The following can be used for debugging purposes, it will display the
335
419
        # exception and the traceback just when it occurs instead of waiting
336
420
        # for the thread to be joined.
 
421
 
337
422
        # SocketServer.BaseServer.handle_error(self, request, client_address)
338
 
 
339
 
        # We call close_request manually, because we are going to raise an
340
 
        # exception. The SocketServer implementation calls:
341
 
        #   handle_error(...)
342
 
        #   close_request(...)
343
 
        # But because we raise the exception, close_request will never be
344
 
        # triggered. This helps client not block waiting for a response when
345
 
        # the server gets an exception.
346
 
        self.close_request(request)
347
423
        raise
348
424
 
349
425
    def ignored_exceptions_during_shutdown(self, e):
429
505
        SocketServer.ThreadingTCPServer.__init__(self, server_address,
430
506
                                                 request_handler_class)
431
507
 
432
 
    def get_request(self):
 
508
    def get_request (self):
433
509
        """Get the request and client address from the socket."""
434
510
        sock, addr = TestingTCPServerMixin.get_request(self)
435
 
        # The thread is not created yet, it will be updated in process_request
 
511
        # The thread is not create yet, it will be updated in process_request
436
512
        self.clients.append((sock, addr, None))
437
513
        return sock, addr
438
514
 
439
 
    def process_request_thread(self, started, detached, stopped,
440
 
                               request, client_address):
 
515
    def process_request_thread(self, started, stopped, request, client_address):
441
516
        started.set()
442
 
        # We will be on our own once the server tells us we're detached
443
 
        detached.wait()
444
517
        SocketServer.ThreadingTCPServer.process_request_thread(
445
518
            self, request, client_address)
446
519
        self.close_request(request)
449
522
    def process_request(self, request, client_address):
450
523
        """Start a new thread to process the request."""
451
524
        started = threading.Event()
452
 
        detached = threading.Event()
453
525
        stopped = threading.Event()
454
 
        t = TestThread(
455
 
            sync_event=stopped,
 
526
        t = ThreadWithException(
 
527
            event=stopped,
456
528
            name='%s -> %s' % (client_address, self.server_address),
457
529
            target = self.process_request_thread,
458
 
            args = (started, detached, stopped, request, client_address))
 
530
            args = (started, stopped, request, client_address))
459
531
        # Update the client description
460
532
        self.clients.pop()
461
533
        self.clients.append((request, client_address, t))
462
 
        # Propagate the exception handler since we must use the same one as
463
 
        # TestingTCPServer for connections running in their own threads.
 
534
        # Propagate the exception handler since we must use the same one for
 
535
        # connections running in their own threads than TestingTCPServer.
464
536
        t.set_ignored_exceptions(self.ignored_exceptions)
465
537
        t.start()
466
538
        started.wait()
 
539
        if debug_threads():
 
540
            sys.stderr.write('Client thread %s started\n' % (t.name,))
467
541
        # If an exception occured during the thread start, it will get raised.
468
542
        t.pending_exception()
469
 
        if debug_threads():
470
 
            sys.stderr.write('Client thread %s started\n' % (t.name,))
471
 
        # Tell the thread, it's now on its own for exception handling.
472
 
        detached.set()
473
543
 
474
544
    # The following methods are called by the main thread
475
545
 
520
590
 
521
591
    def start_server(self):
522
592
        self.server = self.create_server()
523
 
        self._server_thread = TestThread(
524
 
            sync_event=self.server.started,
 
593
        self._server_thread = ThreadWithException(
 
594
            event=self.server.started,
525
595
            target=self.run_server)
526
596
        self._server_thread.start()
527
 
        # Wait for the server thread to start (i.e. release the lock)
 
597
        # Wait for the server thread to start (i.e release the lock)
528
598
        self.server.started.wait()
529
599
        # Get the real address, especially the port
530
600
        self.host, self.port = self.server.server_address
537
607
        self._server_thread.pending_exception()
538
608
        # From now on, we'll use a different event to ensure the server can set
539
609
        # its exception
540
 
        self._server_thread.set_sync_event(self.server.stopped)
 
610
        self._server_thread.set_ready_event(self.server.stopped)
541
611
 
542
612
    def run_server(self):
543
613
        self.server.serve()
564
634
                # server thread, it may happen that it's not blocked or even
565
635
                # not started.
566
636
                pass
567
 
            # We start shutting down the clients while the server itself is
 
637
            # We start shutting down the client while the server itself is
568
638
            # shutting down.
569
639
            self.server.stop_client_connections()
570
640
            # Now we wait for the thread running self.server.serve() to finish
605
675
    def __init__(self, request, client_address, server):
606
676
        medium.SmartServerSocketStreamMedium.__init__(
607
677
            self, request, server.backing_transport,
608
 
            server.root_client_path,
609
 
            timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
 
678
            server.root_client_path)
610
679
        request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
611
680
        SocketServer.BaseRequestHandler.__init__(self, request, client_address,
612
681
                                                 server)
613
682
 
614
683
    def handle(self):
615
 
        try:
616
 
            while not self.finished:
617
 
                server_protocol = self._build_protocol()
618
 
                self._serve_one_request(server_protocol)
619
 
        except errors.ConnectionTimeout:
620
 
            # idle connections aren't considered a failure of the server
621
 
            return
622
 
 
623
 
 
624
 
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
 
684
        while not self.finished:
 
685
            server_protocol = self._build_protocol()
 
686
            self._serve_one_request(server_protocol)
 
687
 
625
688
 
626
689
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
627
690
 
630
693
        TestingThreadingTCPServer.__init__(self, server_address,
631
694
                                           request_handler_class)
632
695
        server.SmartTCPServer.__init__(self, backing_transport,
633
 
            root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
634
 
 
 
696
                                       root_client_path)
635
697
    def serve(self):
 
698
        # FIXME: No test are exercising the hooks for the test server
 
699
        # -- vila 20100618
636
700
        self.run_server_started_hooks()
637
701
        try:
638
702
            TestingThreadingTCPServer.serve(self)
689
753
        self.chroot_server = ChrootServer(
690
754
            self.get_backing_transport(backing_transport_server))
691
755
        self.chroot_server.start_server()
692
 
        self.backing_transport = transport.get_transport_from_url(
 
756
        self.backing_transport = transport.get_transport(
693
757
            self.chroot_server.get_url())
694
758
        super(SmartTCPServer_for_testing, self).start_server()
695
759
 
701
765
 
702
766
    def get_backing_transport(self, backing_transport_server):
703
767
        """Get a backing transport from a server we are decorating."""
704
 
        return transport.get_transport_from_url(
705
 
            backing_transport_server.get_url())
 
768
        return transport.get_transport(backing_transport_server.get_url())
706
769
 
707
770
    def get_url(self):
708
771
        url = self.server.get_url()
719
782
    def get_backing_transport(self, backing_transport_server):
720
783
        """Get a backing transport from a server we are decorating."""
721
784
        url = 'readonly+' + backing_transport_server.get_url()
722
 
        return transport.get_transport_from_url(url)
 
785
        return transport.get_transport(url)
723
786
 
724
787
 
725
788
class SmartTCPServer_for_testing_v2_only(SmartTCPServer_for_testing):
740
803
    def get_backing_transport(self, backing_transport_server):
741
804
        """Get a backing transport from a server we are decorating."""
742
805
        url = 'readonly+' + backing_transport_server.get_url()
743
 
        return transport.get_transport_from_url(url)
 
806
        return transport.get_transport(url)
 
807
 
 
808
 
 
809
 
 
810