~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_server.py

Merge http-leaks into sftp-leaks

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
 
17
import errno
17
18
import socket
 
19
import SocketServer
18
20
import select
19
21
import sys
20
22
import threading
21
23
 
22
24
 
23
25
from bzrlib import (
 
26
    osutils,
24
27
    transport,
25
28
    urlutils,
26
29
    )
238
241
 
239
242
    def __init__(self, *args, **kwargs):
240
243
        # There are cases where the calling thread must wait, yet, if an
241
 
        # exception occurs the event should be set so the caller is not
 
244
        # exception occurs, the event should be set so the caller is not
242
245
        # blocked. The main example is a calling thread that want to wait for
243
246
        # the called thread to be in a given state before continuing.
244
247
        try:
247
250
            # If the caller didn't pass a specific event, create our own
248
251
            event = threading.Event()
249
252
        super(ThreadWithException, self).__init__(*args, **kwargs)
 
253
        self.set_event(event)
 
254
        self.exception = None
 
255
 
 
256
    def set_event(self, event):
250
257
        self.ready = event
251
 
        self.exception = None
252
258
 
253
259
    def run(self):
254
260
        """Overrides Thread.run to capture any exception."""
255
261
        self.ready.clear()
256
262
        try:
257
 
            super(ThreadWithException, self).run()
258
 
        except Exception, e:
259
 
            self.exception = sys.exc_info()
 
263
            try:
 
264
                super(ThreadWithException, self).run()
 
265
            except:
 
266
                self.exception = sys.exc_info()
260
267
        finally:
261
268
            # Make sure the calling thread is released
262
269
            self.ready.set()
263
270
 
264
271
 
265
 
    def join(self, *args, **kwargs):
 
272
    def join(self, timeout=5):
266
273
        """Overrides Thread.join to raise any exception caught.
267
274
 
268
275
 
269
276
        Calling join(timeout=0) will raise the caught exception or return None
270
 
        is the thread is still alive.
 
277
        if the thread is still alive.
 
278
 
 
279
        The default timeout is set to 5 and should expire only when a thread
 
280
        serving a client connection is hung.
271
281
        """
272
 
        # Note that we don't care about the timeout parameter here: either the
273
 
        # thread has raised an exception and it should be raised (and join()
274
 
        # should succeed whatever the timeout is) or it's still alive which
275
 
        # means it didn't encounter an exception.
276
 
        super(ThreadWithException, self).join(*args, **kwargs)
 
282
        super(ThreadWithException, self).join(timeout)
277
283
        if self.exception is not None:
278
284
            exc_class, exc_value, exc_tb = self.exception
 
285
            self.execption = None # The exception should be raised only once
279
286
            raise exc_class, exc_value, exc_tb
 
287
        if timeout and self.isAlive():
 
288
            # The timeout expired without joining the thread, the thread is
 
289
            # therefore stucked and that's a failure as far as the test is
 
290
            # concerned. We used to hang here.
 
291
            raise AssertionError('thread %s hung' % (self.name,))
 
292
 
 
293
    def pending_exception(self):
 
294
        """Raise the caught exception.
 
295
 
 
296
        This does nothing if no exception occurred.
 
297
        """
 
298
        self.join(timeout=0)
 
299
 
 
300
 
 
301
class TestingTCPServerMixin:
 
302
    """Mixin to support running SocketServer.TCPServer in a thread.
 
303
 
 
304
    Tests are connecting from the main thread, the server has to be run in a
 
305
    separate thread.
 
306
    """
 
307
 
 
308
    # FIXME: sibling_class is a hack -- vila 20100604
 
309
    def __init__(self, sibling_class):
 
310
        self.sibling_class = sibling_class
 
311
        self.started = threading.Event()
 
312
        self.serving = threading.Event()
 
313
        self.stopped = threading.Event()
 
314
        # We collect the resources used by the clients so we can release them
 
315
        # when shutting down
 
316
        self.clients = []
 
317
 
 
318
    def server_bind(self):
 
319
        # We need to override the SocketServer bind, yet, we still want to use
 
320
        # it so we need to use the sibling class to call it explicitly
 
321
        self.sibling_class.server_bind(self)
 
322
        # The following has been fixed in 2.5 so we need to provide it for
 
323
        # older python versions.
 
324
        if sys.version < (2, 5):
 
325
            self.server_address = self.socket.getsockname()
 
326
 
 
327
    def serve(self):
 
328
        self.serving.set()
 
329
        self.stopped.clear()
 
330
        # We are listening and ready to accept connections
 
331
        self.started.set()
 
332
        while self.serving.isSet():
 
333
            # Really a connection but the python framework is generic and
 
334
            # call them requests
 
335
            self.handle_request()
 
336
        # Let's close the listening socket
 
337
        self.server_close()
 
338
        self.stopped.set()
 
339
 
 
340
    def handle_request(self):
 
341
        """Handle one request.
 
342
 
 
343
        The python version swallows some socket exceptions and we don't use
 
344
        timeout, so we override to better control the server behavior.
 
345
        """
 
346
        request, client_address = self.get_request()
 
347
        if self.verify_request(request, client_address):
 
348
            try:
 
349
                self.process_request(request, client_address)
 
350
            except:
 
351
                self.handle_error(request, client_address)
 
352
                self.close_request(request)
 
353
 
 
354
    def verify_request(self, request, client_address):
 
355
        """Verify the request.
 
356
 
 
357
        Return True if we should proceed with this request, False if we should
 
358
        not even touch a single byte in the socket ! This is useful when we
 
359
        stop the server with a dummy last connection.
 
360
        """
 
361
        return self.serving.isSet()
 
362
 
 
363
    def handle_error(self, request, client_address):
 
364
        # Stop serving and re-raise the last exception seen
 
365
        self.serving.clear()
 
366
#        self.sibling_class.handle_error(self, request, client_address)
 
367
        raise
 
368
 
 
369
    # The following methods are called by the main thread
 
370
 
 
371
    def stop_client_connections(self):
 
372
        while self.clients:
 
373
            c = self.clients.pop()
 
374
            self.shutdown_client(c)
 
375
 
 
376
    def shutdown_client_socket(self, sock):
 
377
        """Properly shutdown a client socket.
 
378
 
 
379
        Under some circumstances (as in bug #383920), we need to force the
 
380
        shutdown as python delays it until gc occur otherwise and the client
 
381
        may hang.
 
382
 
 
383
        This should be called only when no other thread is trying to use the
 
384
        socket.
 
385
        """
 
386
        try:
 
387
            # The request process has been completed, the thread is about to
 
388
            # die, let's shutdown the socket if we can.
 
389
            sock.shutdown(socket.SHUT_RDWR)
 
390
            sock.close()
 
391
        except (socket.error, select.error), e:
 
392
            if e[0] in (errno.EBADF, errno.ENOTCONN):
 
393
                # Right, the socket is already down
 
394
                pass
 
395
            else:
 
396
                raise
 
397
 
 
398
 
 
399
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
 
400
 
 
401
    def __init__(self, server_address, request_handler_class):
 
402
        TestingTCPServerMixin.__init__(self, SocketServer.TCPServer)
 
403
        SocketServer.TCPServer.__init__(self, server_address,
 
404
                                        request_handler_class)
 
405
 
 
406
    def get_request(self):
 
407
        """Get the request and client address from the socket."""
 
408
        sock, addr = self.sibling_class.get_request(self)
 
409
        self.clients.append((sock, addr))
 
410
        return sock, addr
 
411
 
 
412
    # The following methods are called by the main thread
 
413
 
 
414
    def shutdown_client(self, client):
 
415
        sock, addr = client
 
416
        self.shutdown_client_socket(sock)
 
417
 
 
418
 
 
419
 
 
420
class TestingThreadingTCPServer(TestingTCPServerMixin,
 
421
                                SocketServer.ThreadingTCPServer):
 
422
 
 
423
    def __init__(self, server_address, request_handler_class):
 
424
        TestingTCPServerMixin.__init__(self, SocketServer.ThreadingTCPServer)
 
425
        SocketServer.TCPServer.__init__(self, server_address,
 
426
                                        request_handler_class)
 
427
 
 
428
    def get_request (self):
 
429
        """Get the request and client address from the socket."""
 
430
        sock, addr = self.sibling_class.get_request(self)
 
431
        # The thread is not create yet, it will be updated in process_request
 
432
        self.clients.append((sock, addr, None))
 
433
        return sock, addr
 
434
 
 
435
    def process_request_thread(self, started, stopped, request, client_address):
 
436
        started.set()
 
437
        SocketServer.ThreadingTCPServer.process_request_thread(
 
438
            self, request, client_address)
 
439
        self.close_request(request)
 
440
        stopped.set()
 
441
 
 
442
    def process_request(self, request, client_address):
 
443
        """Start a new thread to process the request."""
 
444
        started = threading.Event()
 
445
        stopped = threading.Event()
 
446
        t = ThreadWithException(
 
447
            event=stopped,
 
448
            target = self.process_request_thread,
 
449
            args = (started, stopped, request, client_address))
 
450
        t.name = '%s -> %s' % (client_address, self.server_address)
 
451
        # Update the client description
 
452
        self.clients.pop()
 
453
        self.clients.append((request, client_address, t))
 
454
        t.start()
 
455
        started.wait()
 
456
        # If an exception occured during the thread start, it will get raised.
 
457
        t.pending_exception()
 
458
 
 
459
    # The following methods are called by the main thread
 
460
 
 
461
    def shutdown_client(self, client):
 
462
        sock, addr, t = client
 
463
        self.shutdown_client_socket(sock)
 
464
        if t is not None:
 
465
            # The thread has been created only if the request is processed but
 
466
            # after the connection is inited. This could happen during server
 
467
            # shutdown. If an exception occurred in the thread it will be
 
468
            # re-raised
 
469
            t.join()
 
470
 
 
471
 
 
472
class TestingTCPServerInAThread(transport.Server):
 
473
    """A server in a thread that re-raise thread exceptions."""
 
474
 
 
475
    def __init__(self, server_address, server_class, request_handler_class):
 
476
        self.server_class = server_class
 
477
        self.request_handler_class = request_handler_class
 
478
        self.host, self.port = server_address
 
479
        self.server = None
 
480
 
 
481
    def __repr__(self):
 
482
        return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
 
483
 
 
484
    def create_server(self):
 
485
        return self.server_class((self.host, self.port),
 
486
                                 self.request_handler_class)
 
487
 
 
488
    def start_server(self):
 
489
        self.server = self.create_server()
 
490
        self._server_thread = ThreadWithException(
 
491
            event=self.server.started, target=self.run_server)
 
492
        self._server_thread.start()
 
493
        # Wait for the server thread to start (i.e release the lock)
 
494
        self.server.started.wait()
 
495
        # Get the real address, especially the port
 
496
        self.host, self.port = self.server.server_address
 
497
        self._server_thread.name = '(%s:%s)' % (self.host, self.port)
 
498
        # If an exception occured during the server start, it will get raised,
 
499
        # otherwise, the server is blocked on its accept() call.
 
500
        self._server_thread.pending_exception()
 
501
        # From now on, we'll use a different event to ensure the server can set
 
502
        # its exception
 
503
        self._server_thread.set_event(self.server.stopped)
 
504
 
 
505
    def run_server(self):
 
506
        self.server.serve()
 
507
 
 
508
    def stop_server(self):
 
509
        if self.server is None:
 
510
            return
 
511
        try:
 
512
            # The server has been started successfully, shut it down now.  As
 
513
            # soon as we stop serving, no more connection are accepted except
 
514
            # one to get out of the blocking listen.
 
515
            self.server.serving.clear()
 
516
            # The server is listening for a last connection, let's give it:
 
517
            last_conn = None
 
518
            try:
 
519
                last_conn = osutils.connect_socket((self.host, self.port))
 
520
            except socket.error, e:
 
521
                # But ignore connection errors as the point is to unblock the
 
522
                # server thread, it may happen that it's not blocked or even
 
523
                # not started.
 
524
                pass
 
525
            # We start shutting down the client while the server itself is
 
526
            # shutting down.
 
527
            self.server.stop_client_connections()
 
528
            # Now we wait for the thread running self.server.serve() to finish
 
529
            self.server.stopped.wait()
 
530
            if last_conn is not None:
 
531
                # Close the last connection without trying to use it. The
 
532
                # server will not process a single byte on that socket to avoid
 
533
                # complications (SSL starts with a handshake for example).
 
534
                last_conn.close()
 
535
            # Check for any exception that could have occurred in the server
 
536
            # thread
 
537
            self._server_thread.join()
 
538
        finally:
 
539
            # Make sure we can be called twice safely, note that this means
 
540
            # that we will raise a single exception even if several occurred in
 
541
            # the various threads involved.
 
542
            self.server = None
280
543
 
281
544
 
282
545
class SmartTCPServer_for_testing(server.SmartTCPServer):