850
851
"""A KnownFailure being raised should trigger several result actions."""
851
852
class InstrumentedTestResult(tests.ExtendedTestResult):
852
853
def stopTestRun(self): pass
853
def startTests(self): pass
854
def report_test_start(self, test): pass
854
def report_tests_starting(self): pass
855
855
def report_known_failure(self, test, err=None, details=None):
856
856
self._call = test, 'known failure'
857
857
result = InstrumentedTestResult(None, None, None, None)
907
907
"""Test the behaviour of invoking addNotSupported."""
908
908
class InstrumentedTestResult(tests.ExtendedTestResult):
909
909
def stopTestRun(self): pass
910
def startTests(self): pass
911
def report_test_start(self, test): pass
910
def report_tests_starting(self): pass
912
911
def report_unsupported(self, test, feature):
913
912
self._call = test, feature
914
913
result = InstrumentedTestResult(None, None, None, None)
953
952
"""An UnavailableFeature being raised should invoke addNotSupported."""
954
953
class InstrumentedTestResult(tests.ExtendedTestResult):
955
954
def stopTestRun(self): pass
956
def startTests(self): pass
957
def report_test_start(self, test): pass
955
def report_tests_starting(self): pass
958
956
def addNotSupported(self, test, feature):
959
957
self._call = test, feature
960
958
result = InstrumentedTestResult(None, None, None, None)
1010
1007
test.run(result)
1011
1008
self.assertEquals(1, result.calls)
1010
def test_startTests_only_once(self):
1011
"""With multiple tests startTests should still only be called once"""
1012
class InstrumentedTestResult(tests.ExtendedTestResult):
1014
def startTests(self): self.calls += 1
1015
result = InstrumentedTestResult(None, None, None, None)
1016
suite = unittest.TestSuite([
1017
unittest.FunctionTestCase(lambda: None),
1018
unittest.FunctionTestCase(lambda: None)])
1020
self.assertEquals(1, result.calls)
1021
self.assertEquals(2, result.count)
1014
1024
class TestUnicodeFilenameFeature(tests.TestCase):
2979
2986
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2989
class TestThreadLeakDetection(tests.TestCase):
2990
"""Ensure when tests leak threads we detect and report it"""
2992
class LeakRecordingResult(tests.ExtendedTestResult):
2994
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
2996
def _report_thread_leak(self, test, leaks, alive):
2997
self.leaks.append((test, leaks))
2999
def test_testcase_without_addCleanups(self):
3000
"""Check old TestCase instances don't break with leak detection"""
3001
class Test(unittest.TestCase):
3004
addCleanup = None # for when on Python 2.7 with native addCleanup
3005
result = self.LeakRecordingResult()
3007
self.assertIs(getattr(test, "addCleanup", None), None)
3008
result.startTestRun()
3010
result.stopTestRun()
3011
self.assertEqual(result._tests_leaking_threads_count, 0)
3012
self.assertEqual(result.leaks, [])
3014
def test_thread_leak(self):
3015
"""Ensure a thread that outlives the running of a test is reported
3017
Uses a thread that blocks on an event, and is started by the inner
3018
test case. As the thread outlives the inner case's run, it should be
3019
detected as a leak, but the event is then set so that the thread can
3020
be safely joined in cleanup so it's not leaked for real.
3022
event = threading.Event()
3023
thread = threading.Thread(name="Leaker", target=event.wait)
3024
class Test(tests.TestCase):
3025
def test_leak(self):
3027
result = self.LeakRecordingResult()
3028
test = Test("test_leak")
3029
self.addCleanup(thread.join)
3030
self.addCleanup(event.set)
3031
result.startTestRun()
3033
result.stopTestRun()
3034
self.assertEqual(result._tests_leaking_threads_count, 1)
3035
self.assertEqual(result._first_thread_leaker_id, test.id())
3036
self.assertEqual(result.leaks, [(test, set([thread]))])
3037
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3039
def test_multiple_leaks(self):
3040
"""Check multiple leaks are blamed on the test cases at fault
3042
Same concept as the previous test, but has one inner test method that
3043
leaks two threads, and one that doesn't leak at all.
3045
event = threading.Event()
3046
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3047
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3048
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3049
class Test(tests.TestCase):
3050
def test_first_leak(self):
3052
def test_second_no_leak(self):
3054
def test_third_leak(self):
3057
result = self.LeakRecordingResult()
3058
first_test = Test("test_first_leak")
3059
third_test = Test("test_third_leak")
3060
self.addCleanup(thread_a.join)
3061
self.addCleanup(thread_b.join)
3062
self.addCleanup(thread_c.join)
3063
self.addCleanup(event.set)
3064
result.startTestRun()
3066
[first_test, Test("test_second_no_leak"), third_test]
3068
result.stopTestRun()
3069
self.assertEqual(result._tests_leaking_threads_count, 2)
3070
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3071
self.assertEqual(result.leaks, [
3072
(first_test, set([thread_b])),
3073
(third_test, set([thread_a, thread_c]))])
3074
self.assertContainsString(result.stream.getvalue(), "leaking threads")
2982
3077
class TestRunSuite(tests.TestCase):
2984
3079
def test_runner_class(self):