~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_test_server.py

merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2010 Canonical Ltd
 
1
# Copyright (C) 2010, 2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
    tests,
25
25
    )
26
26
from bzrlib.tests import test_server
27
 
 
28
 
 
29
 
def load_tests(basic_tests, module, loader):
30
 
    suite = loader.suiteClass()
31
 
    server_tests, remaining_tests = tests.split_suite_by_condition(
32
 
        basic_tests, tests.condition_isinstance(TestTCPServerInAThread))
33
 
    server_scenarios = [ (name, {'server_class': getattr(test_server, name)})
34
 
                         for name in
35
 
                         ('TestingTCPServer', 'TestingThreadingTCPServer')]
36
 
    tests.multiply_tests(server_tests, server_scenarios, suite)
37
 
    suite.addTest(remaining_tests)
38
 
    return suite
39
 
 
40
 
 
41
 
class TestThreadWithException(tests.TestCase):
42
 
 
43
 
    def test_start_and_join_smoke_test(self):
44
 
        def do_nothing():
45
 
            pass
46
 
 
47
 
        tt = test_server.ThreadWithException(target=do_nothing)
48
 
        tt.start()
49
 
        tt.join()
50
 
 
51
 
    def test_exception_is_re_raised(self):
52
 
        class MyException(Exception):
53
 
            pass
54
 
 
55
 
        def raise_my_exception():
56
 
            raise MyException()
57
 
 
58
 
        tt = test_server.ThreadWithException(target=raise_my_exception)
59
 
        tt.start()
60
 
        self.assertRaises(MyException, tt.join)
61
 
 
62
 
    def test_join_when_no_exception(self):
63
 
        resume = threading.Event()
64
 
        class MyException(Exception):
65
 
            pass
66
 
 
67
 
        def raise_my_exception():
68
 
            # Wait for the test to tell us to resume
69
 
            resume.wait()
70
 
            # Now we can raise
71
 
            raise MyException()
72
 
 
73
 
        tt = test_server.ThreadWithException(target=raise_my_exception)
74
 
        tt.start()
75
 
        tt.join(timeout=0)
76
 
        self.assertIs(None, tt.exception)
77
 
        resume.set()
78
 
        self.assertRaises(MyException, tt.join)
 
27
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
28
 
 
29
 
 
30
load_tests = load_tests_apply_scenarios
79
31
 
80
32
 
81
33
class TCPClient(object):
129
81
 
130
82
class TestTCPServerInAThread(tests.TestCase):
131
83
 
 
84
    scenarios = [ 
 
85
        (name, {'server_class': getattr(test_server, name)})
 
86
        for name in
 
87
        ('TestingTCPServer', 'TestingThreadingTCPServer')]
 
88
 
132
89
    # Set by load_tests()
133
90
    server_class = None
134
91
 
224
181
 
225
182
            def handle_connection(self):
226
183
                req = self.rfile.readline()
227
 
                threading.currentThread().set_ready_event(sync)
 
184
                threading.currentThread().set_sync_event(sync)
228
185
                raise FailToRespond()
229
186
 
230
187
        server = self.get_server(
246
203
            def handle(self):
247
204
                # We want to sync with the thread that is serving the
248
205
                # connection.
249
 
                threading.currentThread().set_ready_event(sync)
 
206
                threading.currentThread().set_sync_event(sync)
250
207
                raise CantServe()
251
208
 
252
209
        server = self.get_server(