~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_server.py

Merge cleanup into first-try

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
import errno
18
17
import socket
19
 
import SocketServer
20
18
import select
21
 
import sys
22
 
import threading
23
19
 
24
20
 
25
21
from bzrlib import (
26
 
    osutils,
27
22
    transport,
28
23
    urlutils,
29
24
    )
232
227
        raise NotImplementedError
233
228
 
234
229
 
235
 
class ThreadWithException(threading.Thread):
236
 
    """A catching exception thread.
237
 
 
238
 
    If an exception occurs during the thread execution, it's caught and
239
 
    re-raised when the thread is joined().
240
 
    """
241
 
 
242
 
    def __init__(self, *args, **kwargs):
243
 
        # There are cases where the calling thread must wait, yet, if an
244
 
        # exception occurs, the event should be set so the caller is not
245
 
        # blocked. The main example is a calling thread that want to wait for
246
 
        # the called thread to be in a given state before continuing.
247
 
        try:
248
 
            event = kwargs.pop('event')
249
 
        except KeyError:
250
 
            # If the caller didn't pass a specific event, create our own
251
 
            event = threading.Event()
252
 
        super(ThreadWithException, self).__init__(*args, **kwargs)
253
 
        self.set_event(event)
254
 
        self.exception = None
255
 
 
256
 
    def set_event(self, event):
257
 
        self.ready = event
258
 
 
259
 
    def run(self):
260
 
        """Overrides Thread.run to capture any exception."""
261
 
        self.ready.clear()
262
 
        try:
263
 
            super(ThreadWithException, self).run()
264
 
        except:
265
 
            self.exception = sys.exc_info()
266
 
        finally:
267
 
            # Make sure the calling thread is released
268
 
            self.ready.set()
269
 
 
270
 
 
271
 
    def join(self, timeout=5):
272
 
        """Overrides Thread.join to raise any exception caught.
273
 
 
274
 
 
275
 
        Calling join(timeout=0) will raise the caught exception or return None
276
 
        if the thread is still alive.
277
 
 
278
 
        The default timeout is set to 5 and should expire only when a thread
279
 
        serving a client connection is hung.
280
 
        """
281
 
        super(ThreadWithException, self).join(timeout)
282
 
        if self.exception is not None:
283
 
            exc_class, exc_value, exc_tb = self.exception
284
 
            self.execption = None # The exception should be raised only once
285
 
            raise exc_class, exc_value, exc_tb
286
 
        if timeout and self.isAlive():
287
 
            # The timeout expired without joining the thread, the thread is
288
 
            # therefore stucked and that's a failure as far as the test is
289
 
            # concerned. We used to hang here.
290
 
            raise AssertionError('thread %s hung' % (self.name,))
291
 
 
292
 
    def pending_exception(self):
293
 
        """Raise the caught exception.
294
 
 
295
 
        This does nothing if no exception occurred.
296
 
        """
297
 
        self.join(timeout=0)
298
 
 
299
 
 
300
 
class TestingTCPServerMixin:
301
 
    """Mixin to support running SocketServer.TCPServer in a thread.
302
 
 
303
 
    Tests are connecting from the main thread, the server has to be run in a
304
 
    separate thread.
305
 
    """
306
 
 
307
 
    def __init__(self, sibling_class):
308
 
        self.sibling_class = sibling_class
309
 
        self.started = threading.Event()
310
 
        self.serving = threading.Event()
311
 
        self.stopped = threading.Event()
312
 
        # We collect the resources used by the clients so we can release them
313
 
        # when shutting down
314
 
        self.clients = []
315
 
 
316
 
    def server_bind(self):
317
 
        # We need to override the SocketServer bind, yet, we still want to use
318
 
        # it so we need to use the sibling class to call it explicitly
319
 
        self.sibling_class.server_bind(self)
320
 
        # The following has been fixed in 2.5 so we need to provide it for
321
 
        # older python versions.
322
 
        if sys.version < (2, 5):
323
 
            self.server_address = self.socket.getsockname()
324
 
 
325
 
    def serve(self):
326
 
        self.serving.set()
327
 
        self.stopped.clear()
328
 
        # We are listening and ready to accept connections
329
 
        self.started.set()
330
 
        while self.serving.isSet():
331
 
            # Really a connection but the python framework is generic and
332
 
            # call them requests
333
 
            self.handle_request()
334
 
        # Let's close the listening socket
335
 
        self.server_close()
336
 
        self.stopped.set()
337
 
 
338
 
    def verify_request(self, request, client_address):
339
 
        """Verify the request.
340
 
 
341
 
        Return True if we should proceed with this request, False if we should
342
 
        not even touch a single byte in the socket ! This is useful when we
343
 
        stop the server with a dummy last connection.
344
 
        """
345
 
        return self.serving.isSet()
346
 
 
347
 
    def handle_error(self, request, client_address):
348
 
        # Stop serving and re-raise the last exception seen
349
 
        self.serving.clear()
350
 
        self.sibling_class.handle_error(self, request, client_address)
351
 
        raise
352
 
 
353
 
    # The following methods are called by the main thread
354
 
 
355
 
    def stop_client_connections(self):
356
 
        while self.clients:
357
 
            c = self.clients.pop()
358
 
            self.shutdown_client(c)
359
 
 
360
 
    def shutdown_client_socket(self, sock):
361
 
        """Properly shutdown a client socket.
362
 
 
363
 
        Under some circumstances (as in bug #383920), we need to force the
364
 
        shutdown as python delays it until gc occur otherwise and the client
365
 
        may hang.
366
 
 
367
 
        This should be called only when no other thread is trying to use the
368
 
        socket.
369
 
        """
370
 
        try:
371
 
            # The request process has been completed, the thread is about to
372
 
            # die, let's shutdown the socket if we can.
373
 
            sock.shutdown(socket.SHUT_RDWR)
374
 
            sock.close()
375
 
        except (socket.error, select.error), e:
376
 
            if e[0] in (errno.EBADF, errno.ENOTCONN):
377
 
                # Right, the socket is already down
378
 
                pass
379
 
            else:
380
 
                raise
381
 
 
382
 
 
383
 
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
384
 
 
385
 
    def __init__(self, server_address, request_handler_class):
386
 
        TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
387
 
        SocketServer.TCPServer.__init__(self, server_address,
388
 
                                        request_handler_class)
389
 
 
390
 
    def get_request(self):
391
 
        """Get the request and client address from the socket."""
392
 
        sock, addr = self.sibling_class.get_request(self)
393
 
        self.clients.append((sock, addr))
394
 
        return sock, addr
395
 
 
396
 
    # The following methods are called by the main thread
397
 
 
398
 
    def shutdown_client(self, client):
399
 
        sock, addr = client
400
 
        self.shutdown_client_socket(sock)
401
 
 
402
 
 
403
 
 
404
 
class TestingThreadingTCPServer(TestingTCPServerMixin,
405
 
                                SocketServer.ThreadingTCPServer):
406
 
 
407
 
    def __init__(self, server_address, request_handler_class):
408
 
        TestingTCPServerMixin.__init__(self, SocketServer.ThreadingTCPServer)
409
 
        SocketServer.TCPServer.__init__(self, server_address,
410
 
                                        request_handler_class)
411
 
 
412
 
    def get_request (self):
413
 
        """Get the request and client address from the socket."""
414
 
        sock, addr = self.sibling_class.get_request(self)
415
 
        # The thread is not create yet, it will be updated in process_request
416
 
        self.clients.append((sock, addr, None))
417
 
        return sock, addr
418
 
 
419
 
    def process_request_thread(self, started, stopped, request, client_address):
420
 
        started.set()
421
 
        SocketServer.ThreadingTCPServer.process_request_thread(
422
 
            self, request, client_address)
423
 
        self.close_request(request)
424
 
        stopped.set()
425
 
 
426
 
    def process_request(self, request, client_address):
427
 
        """Start a new thread to process the request."""
428
 
        started = threading.Event()
429
 
        stopped = threading.Event()
430
 
        t = ThreadWithException(
431
 
            event=stopped,
432
 
            target = self.process_request_thread,
433
 
            args = (started, stopped, request, client_address))
434
 
        t.name = '%s -> %s' % (client_address, self.server_address)
435
 
        # Update the client description
436
 
        self.clients.pop()
437
 
        self.clients.append((request, client_address, t))
438
 
        t.start()
439
 
        started.wait()
440
 
        # If an exception occured during the thread start, it will get raised.
441
 
        t.pending_exception()
442
 
 
443
 
    # The following methods are called by the main thread
444
 
 
445
 
    def shutdown_client(self, client):
446
 
        sock, addr, t = client
447
 
        self.shutdown_client_socket(sock)
448
 
        if t is not None:
449
 
            # The thread has been created only if the request is processed but
450
 
            # after the connection is inited. This could happen during server
451
 
            # shutdown. If an exception occurred in the thread it will be
452
 
            # re-raised
453
 
            t.join()
454
 
 
455
 
 
456
 
class TestingTCPServerInAThread(transport.Server):
457
 
    """A server in a thread that re-raise thread exceptions."""
458
 
 
459
 
    def __init__(self, server_address, server_class, request_handler_class):
460
 
        self.server_class = server_class
461
 
        self.request_handler_class = request_handler_class
462
 
        self.host, self.port = server_address
463
 
        self.server = None
464
 
 
465
 
    def __repr__(self):
466
 
        return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
467
 
 
468
 
    def create_server(self):
469
 
        return self.server_class((self.host, self.port),
470
 
                                 self.request_handler_class)
471
 
 
472
 
    def start_server(self):
473
 
        self.server = self.create_server()
474
 
        self._server_thread = ThreadWithException(
475
 
            event=self.server.started, target=self.run_server)
476
 
        self._server_thread.start()
477
 
        # Wait for the server thread to start (i.e release the lock)
478
 
        self.server.started.wait()
479
 
        # Get the real address, especially the port
480
 
        self.host, self.port = self.server.server_address
481
 
        self._server_thread.name = '(%s:%s)' % (self.host, self.port)
482
 
        # If an exception occured during the server start, it will get raised,
483
 
        # otherwise, the server is blocked on its accept() call.
484
 
        self._server_thread.pending_exception()
485
 
        # From now on, we'll use a different event to ensure the server can set
486
 
        # its exception
487
 
        self._server_thread.set_event(self.server.stopped)
488
 
 
489
 
    def run_server(self):
490
 
        self.server.serve()
491
 
 
492
 
    def stop_server(self):
493
 
        if self.server is None:
494
 
            return
495
 
        try:
496
 
            # The server has been started successfully, shut it down now.  As
497
 
            # soon as we stop serving, no more connection are accepted except
498
 
            # one to get out of the blocking listen.
499
 
            self.server.serving.clear()
500
 
            # The server is listening for a last connection, let's give it:
501
 
            last_conn = None
502
 
            try:
503
 
                last_conn = osutils.connect_socket((self.host, self.port))
504
 
            except socket.error, e:
505
 
                # But ignore connection errors as the point is to unblock the
506
 
                # server thread, it may happen that it's not blocked or even
507
 
                # not started.
508
 
                pass
509
 
            # We start shutting down the client while the server itself is
510
 
            # shutting down.
511
 
            self.server.stop_client_connections()
512
 
            # Now we wait for the thread running self.server.serve() to finish
513
 
            self.server.stopped.wait()
514
 
            if last_conn is not None:
515
 
                # Close the last connection without trying to use it. The
516
 
                # server will not process a single byte on that socket to avoid
517
 
                # complications (SSL starts with a handshake for example).
518
 
                last_conn.close()
519
 
            # Check for any exception that could have occurred in the server
520
 
            # thread
521
 
            self._server_thread.join()
522
 
        finally:
523
 
            # Make sure we can be called twice safely, note that this means
524
 
            # that we will raise a single exception even if several occurred in
525
 
            # the various threads involved.
526
 
            self.server = None
527
 
 
528
 
 
529
230
class SmartTCPServer_for_testing(server.SmartTCPServer):
530
231
    """Server suitable for use by transport tests.
531
232