1
# Copyright (C) 2011, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25
class TestCatchingExceptionThread(tests.TestCase):
27
def test_start_and_join_smoke_test(self):
31
tt = cethread.CatchingExceptionThread(target=do_nothing)
35
def test_exception_is_re_raised(self):
36
class MyException(Exception):
39
def raise_my_exception():
42
tt = cethread.CatchingExceptionThread(target=raise_my_exception)
44
self.assertRaises(MyException, tt.join)
46
def test_join_around_exception(self):
47
resume = threading.Event()
48
class MyException(Exception):
51
def raise_my_exception():
52
# Wait for the test to tell us to resume
57
tt = cethread.CatchingExceptionThread(target=raise_my_exception)
60
self.assertIs(None, tt.exception)
62
self.assertRaises(MyException, tt.join)
64
def test_sync_event(self):
65
control = threading.Event()
66
in_thread = threading.Event()
67
class MyException(Exception):
70
def raise_my_exception():
71
# Wait for the test to tell us to resume
76
tt = cethread.CatchingExceptionThread(target=raise_my_exception,
80
self.assertIs(None, tt.exception)
81
self.assertIs(in_thread, tt.sync_event)
83
self.assertRaises(MyException, tt.join)
84
self.assertEqual(True, tt.sync_event.isSet())
86
def test_switch_and_set(self):
87
"""Caller can precisely control a thread."""
88
control1 = threading.Event()
89
control2 = threading.Event()
90
control3 = threading.Event()
92
class TestThread(cethread.CatchingExceptionThread):
95
super(TestThread, self).__init__(target=self.step_by_step)
96
self.current_step = 'starting'
97
self.step1 = threading.Event()
98
self.set_sync_event(self.step1)
99
self.step2 = threading.Event()
100
self.final = threading.Event()
102
def step_by_step(self):
104
self.current_step = 'step1'
105
self.switch_and_set(self.step2)
107
self.current_step = 'step2'
108
self.switch_and_set(self.final)
110
self.current_step = 'done'
114
self.assertEqual('starting', tt.current_step)
117
self.assertEqual('step1', tt.current_step)
120
self.assertEqual('step2', tt.current_step)
122
# We don't wait on tt.final
124
self.assertEqual('done', tt.current_step)
126
def test_exception_while_switch_and_set(self):
127
control1 = threading.Event()
129
class MyException(Exception):
132
class TestThread(cethread.CatchingExceptionThread):
134
def __init__(self, *args, **kwargs):
135
self.step1 = threading.Event()
136
self.step2 = threading.Event()
137
super(TestThread, self).__init__(target=self.step_by_step,
138
sync_event=self.step1)
139
self.current_step = 'starting'
140
self.set_sync_event(self.step1)
142
def step_by_step(self):
144
self.current_step = 'step1'
145
self.switch_and_set(self.step2)
147
def set_sync_event(self, event):
148
# We force an exception while trying to set step2
149
if event is self.step2:
151
super(TestThread, self).set_sync_event(event)
155
self.assertEqual('starting', tt.current_step)
157
# We now wait on step1 which will be set when catching the exception
159
self.assertRaises(MyException, tt.pending_exception)
160
self.assertIs(tt.step1, tt.sync_event)
161
self.assertTrue(tt.step1.isSet())