~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_cethread.py

Initial stab at repository format support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2010, 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
import threading
18
 
 
19
 
from bzrlib import (
20
 
    cethread,
21
 
    tests,
22
 
    )
23
 
 
24
 
 
25
 
class TestCatchingExceptionThread(tests.TestCase):
26
 
 
27
 
    def test_start_and_join_smoke_test(self):
28
 
        def do_nothing():
29
 
            pass
30
 
 
31
 
        tt = cethread.CatchingExceptionThread(target=do_nothing)
32
 
        tt.start()
33
 
        tt.join()
34
 
 
35
 
    def test_exception_is_re_raised(self):
36
 
        class MyException(Exception):
37
 
            pass
38
 
 
39
 
        def raise_my_exception():
40
 
            raise MyException()
41
 
 
42
 
        tt = cethread.CatchingExceptionThread(target=raise_my_exception)
43
 
        tt.start()
44
 
        self.assertRaises(MyException, tt.join)
45
 
 
46
 
    def test_join_around_exception(self):
47
 
        resume = threading.Event()
48
 
        class MyException(Exception):
49
 
            pass
50
 
 
51
 
        def raise_my_exception():
52
 
            # Wait for the test to tell us to resume
53
 
            resume.wait()
54
 
            # Now we can raise
55
 
            raise MyException()
56
 
 
57
 
        tt = cethread.CatchingExceptionThread(target=raise_my_exception)
58
 
        tt.start()
59
 
        tt.join(timeout=0)
60
 
        self.assertIs(None, tt.exception)
61
 
        resume.set()
62
 
        self.assertRaises(MyException, tt.join)
63
 
 
64
 
    def test_sync_event(self):
65
 
        control = threading.Event()
66
 
        in_thread = threading.Event()
67
 
        class MyException(Exception):
68
 
            pass
69
 
 
70
 
        def raise_my_exception():
71
 
            # Wait for the test to tell us to resume
72
 
            control.wait()
73
 
            # Now we can raise
74
 
            raise MyException()
75
 
 
76
 
        tt = cethread.CatchingExceptionThread(target=raise_my_exception,
77
 
                                            sync_event=in_thread)
78
 
        tt.start()
79
 
        tt.join(timeout=0)
80
 
        self.assertIs(None, tt.exception)
81
 
        self.assertIs(in_thread, tt.sync_event)
82
 
        control.set()
83
 
        self.assertRaises(MyException, tt.join)
84
 
        self.assertEquals(True, tt.sync_event.isSet())
85
 
 
86
 
    def test_switch_and_set(self):
87
 
        """Caller can precisely control a thread."""
88
 
        control1 = threading.Event()
89
 
        control2 = threading.Event()
90
 
        control3 = threading.Event()
91
 
 
92
 
        class TestThread(cethread.CatchingExceptionThread):
93
 
 
94
 
            def __init__(self):
95
 
                super(TestThread, self).__init__(target=self.step_by_step)
96
 
                self.current_step = 'starting'
97
 
                self.step1 = threading.Event()
98
 
                self.set_sync_event(self.step1)
99
 
                self.step2 = threading.Event()
100
 
                self.final = threading.Event()
101
 
 
102
 
            def step_by_step(self):
103
 
                control1.wait()
104
 
                self.current_step = 'step1'
105
 
                self.switch_and_set(self.step2)
106
 
                control2.wait()
107
 
                self.current_step = 'step2'
108
 
                self.switch_and_set(self.final)
109
 
                control3.wait()
110
 
                self.current_step = 'done'
111
 
 
112
 
        tt = TestThread()
113
 
        tt.start()
114
 
        self.assertEquals('starting', tt.current_step)
115
 
        control1.set()
116
 
        tt.step1.wait()
117
 
        self.assertEquals('step1', tt.current_step)
118
 
        control2.set()
119
 
        tt.step2.wait()
120
 
        self.assertEquals('step2', tt.current_step)
121
 
        control3.set()
122
 
        # We don't wait on tt.final
123
 
        tt.join()
124
 
        self.assertEquals('done', tt.current_step)
125
 
 
126
 
    def test_exception_while_switch_and_set(self):
127
 
        control1 = threading.Event()
128
 
 
129
 
        class MyException(Exception):
130
 
            pass
131
 
 
132
 
        class TestThread(cethread.CatchingExceptionThread):
133
 
 
134
 
            def __init__(self, *args, **kwargs):
135
 
                self.step1 = threading.Event()
136
 
                self.step2 = threading.Event()
137
 
                super(TestThread, self).__init__(target=self.step_by_step,
138
 
                                                 sync_event=self.step1)
139
 
                self.current_step = 'starting'
140
 
                self.set_sync_event(self.step1)
141
 
 
142
 
            def step_by_step(self):
143
 
                control1.wait()
144
 
                self.current_step = 'step1'
145
 
                self.switch_and_set(self.step2)
146
 
 
147
 
            def set_sync_event(self, event):
148
 
                # We force an exception while trying to set step2
149
 
                if event is self.step2:
150
 
                    raise MyException()
151
 
                super(TestThread, self).set_sync_event(event)
152
 
 
153
 
        tt = TestThread()
154
 
        tt.start()
155
 
        self.assertEquals('starting', tt.current_step)
156
 
        control1.set()
157
 
        # We now wait on step1 which will be set when catching the exception
158
 
        tt.step1.wait()
159
 
        self.assertRaises(MyException, tt.pending_exception)
160
 
        self.assertIs(tt.step1, tt.sync_event)
161
 
        self.assertTrue(tt.step1.isSet())