~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

(gz) Cleanup and test selftest thread leak detection (Martin [gz])

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
850
851
        """A KnownFailure being raised should trigger several result actions."""
851
852
        class InstrumentedTestResult(tests.ExtendedTestResult):
852
853
            def stopTestRun(self): pass
853
 
            def startTests(self): pass
854
 
            def report_test_start(self, test): pass
 
854
            def report_tests_starting(self): pass
855
855
            def report_known_failure(self, test, err=None, details=None):
856
856
                self._call = test, 'known failure'
857
857
        result = InstrumentedTestResult(None, None, None, None)
907
907
        """Test the behaviour of invoking addNotSupported."""
908
908
        class InstrumentedTestResult(tests.ExtendedTestResult):
909
909
            def stopTestRun(self): pass
910
 
            def startTests(self): pass
911
 
            def report_test_start(self, test): pass
 
910
            def report_tests_starting(self): pass
912
911
            def report_unsupported(self, test, feature):
913
912
                self._call = test, feature
914
913
        result = InstrumentedTestResult(None, None, None, None)
953
952
        """An UnavailableFeature being raised should invoke addNotSupported."""
954
953
        class InstrumentedTestResult(tests.ExtendedTestResult):
955
954
            def stopTestRun(self): pass
956
 
            def startTests(self): pass
957
 
            def report_test_start(self, test): pass
 
955
            def report_tests_starting(self): pass
958
956
            def addNotSupported(self, test, feature):
959
957
                self._call = test, feature
960
958
        result = InstrumentedTestResult(None, None, None, None)
1002
1000
        class InstrumentedTestResult(tests.ExtendedTestResult):
1003
1001
            calls = 0
1004
1002
            def startTests(self): self.calls += 1
1005
 
            def report_test_start(self, test): pass
1006
1003
        result = InstrumentedTestResult(None, None, None, None)
1007
1004
        def test_function():
1008
1005
            pass
1010
1007
        test.run(result)
1011
1008
        self.assertEquals(1, result.calls)
1012
1009
 
 
1010
    def test_startTests_only_once(self):
 
1011
        """With multiple tests startTests should still only be called once"""
 
1012
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1013
            calls = 0
 
1014
            def startTests(self): self.calls += 1
 
1015
        result = InstrumentedTestResult(None, None, None, None)
 
1016
        suite = unittest.TestSuite([
 
1017
            unittest.FunctionTestCase(lambda: None),
 
1018
            unittest.FunctionTestCase(lambda: None)])
 
1019
        suite.run(result)
 
1020
        self.assertEquals(1, result.calls)
 
1021
        self.assertEquals(2, result.count)
 
1022
 
1013
1023
 
1014
1024
class TestUnicodeFilenameFeature(tests.TestCase):
1015
1025
 
1036
1046
        because of our use of global state.
1037
1047
        """
1038
1048
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1039
 
        old_leak = tests.TestCase._first_thread_leaker_id
1040
1049
        try:
1041
1050
            tests.TestCaseInTempDir.TEST_ROOT = None
1042
 
            tests.TestCase._first_thread_leaker_id = None
1043
1051
            return testrunner.run(test)
1044
1052
        finally:
1045
1053
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1046
 
            tests.TestCase._first_thread_leaker_id = old_leak
1047
1054
 
1048
1055
    def test_known_failure_failed_run(self):
1049
1056
        # run a test that generates a known failure which should be printed in
2979
2986
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2980
2987
 
2981
2988
 
 
2989
class TestThreadLeakDetection(tests.TestCase):
 
2990
    """Ensure when tests leak threads we detect and report it"""
 
2991
 
 
2992
    class LeakRecordingResult(tests.ExtendedTestResult):
 
2993
        def __init__(self):
 
2994
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
2995
            self.leaks = []
 
2996
        def _report_thread_leak(self, test, leaks, alive):
 
2997
            self.leaks.append((test, leaks))
 
2998
 
 
2999
    def test_testcase_without_addCleanups(self):
 
3000
        """Check old TestCase instances don't break with leak detection"""
 
3001
        class Test(unittest.TestCase):
 
3002
            def runTest(self):
 
3003
                pass
 
3004
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3005
        result = self.LeakRecordingResult()
 
3006
        test = Test()
 
3007
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3008
        result.startTestRun()
 
3009
        test.run(result)
 
3010
        result.stopTestRun()
 
3011
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3012
        self.assertEqual(result.leaks, [])
 
3013
        
 
3014
    def test_thread_leak(self):
 
3015
        """Ensure a thread that outlives the running of a test is reported
 
3016
 
 
3017
        Uses a thread that blocks on an event, and is started by the inner
 
3018
        test case. As the thread outlives the inner case's run, it should be
 
3019
        detected as a leak, but the event is then set so that the thread can
 
3020
        be safely joined in cleanup so it's not leaked for real.
 
3021
        """
 
3022
        event = threading.Event()
 
3023
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3024
        class Test(tests.TestCase):
 
3025
            def test_leak(self):
 
3026
                thread.start()
 
3027
        result = self.LeakRecordingResult()
 
3028
        test = Test("test_leak")
 
3029
        self.addCleanup(thread.join)
 
3030
        self.addCleanup(event.set)
 
3031
        result.startTestRun()
 
3032
        test.run(result)
 
3033
        result.stopTestRun()
 
3034
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3035
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3036
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3037
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3038
 
 
3039
    def test_multiple_leaks(self):
 
3040
        """Check multiple leaks are blamed on the test cases at fault
 
3041
 
 
3042
        Same concept as the previous test, but has one inner test method that
 
3043
        leaks two threads, and one that doesn't leak at all.
 
3044
        """
 
3045
        event = threading.Event()
 
3046
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3047
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3048
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3049
        class Test(tests.TestCase):
 
3050
            def test_first_leak(self):
 
3051
                thread_b.start()
 
3052
            def test_second_no_leak(self):
 
3053
                pass
 
3054
            def test_third_leak(self):
 
3055
                thread_c.start()
 
3056
                thread_a.start()
 
3057
        result = self.LeakRecordingResult()
 
3058
        first_test = Test("test_first_leak")
 
3059
        third_test = Test("test_third_leak")
 
3060
        self.addCleanup(thread_a.join)
 
3061
        self.addCleanup(thread_b.join)
 
3062
        self.addCleanup(thread_c.join)
 
3063
        self.addCleanup(event.set)
 
3064
        result.startTestRun()
 
3065
        unittest.TestSuite(
 
3066
            [first_test, Test("test_second_no_leak"), third_test]
 
3067
            ).run(result)
 
3068
        result.stopTestRun()
 
3069
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3070
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3071
        self.assertEqual(result.leaks, [
 
3072
            (first_test, set([thread_b])),
 
3073
            (third_test, set([thread_a, thread_c]))])
 
3074
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3075
 
 
3076
 
2982
3077
class TestRunSuite(tests.TestCase):
2983
3078
 
2984
3079
    def test_runner_class(self):