~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: toto at example
  • Date: 2010-09-27 19:31:45 UTC
  • mfrom: (5050.17.27 2.2)
  • mto: This revision was merged to the branch mainline in revision 5448.
  • Revision ID: toto@example.com-20100927193145-bly1dw5wjd23hiwy
Merge lp:bzr/2.2 into trunk including fixes for #644855, #646133, #632387

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
29
31
from testtools.content_type import ContentType
30
32
from testtools.matchers import (
31
33
    DocTestMatches,
42
44
    lockdir,
43
45
    memorytree,
44
46
    osutils,
45
 
    progress,
46
47
    remote,
47
48
    repository,
48
49
    symbol_versioning,
67
68
    test_sftp_transport,
68
69
    TestUtil,
69
70
    )
70
 
from bzrlib.trace import note
 
71
from bzrlib.trace import note, mutter
71
72
from bzrlib.transport import memory
72
73
from bzrlib.version import _get_bzr_source_tree
73
74
 
122
123
        self.failUnlessExists(filename)
123
124
 
124
125
 
 
126
class TestClassesAvailable(tests.TestCase):
 
127
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
128
 
 
129
    def test_test_case(self):
 
130
        from bzrlib.tests import TestCase
 
131
 
 
132
    def test_test_loader(self):
 
133
        from bzrlib.tests import TestLoader
 
134
 
 
135
    def test_test_suite(self):
 
136
        from bzrlib.tests import TestSuite
 
137
 
 
138
 
125
139
class TestTransportScenarios(tests.TestCase):
126
140
    """A group of tests that test the transport implementation adaption core.
127
141
 
208
222
    def test_scenarios(self):
209
223
        # check that constructor parameters are passed through to the adapted
210
224
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
225
        from bzrlib.tests.per_controldir import make_scenarios
212
226
        vfs_factory = "v"
213
227
        server1 = "a"
214
228
        server2 = "b"
312
326
        from bzrlib.tests.per_interrepository import make_scenarios
313
327
        server1 = "a"
314
328
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
        scenarios = make_scenarios(server1, server2, formats)
317
331
        self.assertEqual([
318
332
            ('C0,str,str',
319
333
             {'repository_format': 'C1',
320
334
              'repository_format_to': 'C2',
321
335
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
323
338
            ('D0,str,str',
324
339
             {'repository_format': 'D1',
325
340
              'repository_format_to': 'D2',
326
341
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
328
344
            scenarios)
329
345
 
330
346
 
609
625
                l.attempt_lock()
610
626
        test = TestDanglingLock('test_function')
611
627
        result = test.run()
 
628
        total_failures = result.errors + result.failures
612
629
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
630
            self.assertEqual(1, len(total_failures))
614
631
        else:
615
632
            # When _lock_check_thorough is disabled, then we don't trigger a
616
633
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
634
            self.assertEqual(0, len(total_failures))
618
635
 
619
636
 
620
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
638
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
639
 
623
640
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
641
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
642
        self.vfs_transport_factory = memory.MemoryServer
627
643
        self.transport_readonly_server = None
629
645
        # for the server
630
646
        url = self.get_readonly_url()
631
647
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
634
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
653
 
638
654
    def test_get_readonly_url_http(self):
639
655
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
656
        from bzrlib.transport.http import HttpTransportBase
642
657
        self.transport_server = test_server.LocalURLServer
643
658
        self.transport_readonly_server = HttpServer
645
660
        url = self.get_readonly_url()
646
661
        url2 = self.get_readonly_url('foo/bar')
647
662
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
650
665
        self.failUnless(isinstance(t, HttpTransportBase))
651
666
        self.failUnless(isinstance(t2, HttpTransportBase))
652
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
705
class TestChrootedTest(tests.ChrootedTestCase):
691
706
 
692
707
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
695
709
        url = t.base
696
710
        self.assertEqual(url, t.clone('..').base)
697
711
 
803
817
        self.requireFeature(test_lsprof.LSProfFeature)
804
818
        result_stream = StringIO()
805
819
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
820
            result_stream,
807
821
            descriptions=0,
808
822
            verbosity=2,
809
823
            )
839
853
        """A KnownFailure being raised should trigger several result actions."""
840
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
855
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
856
            def report_tests_starting(self): pass
844
857
            def report_known_failure(self, test, err=None, details=None):
845
858
                self._call = test, 'known failure'
846
859
        result = InstrumentedTestResult(None, None, None, None)
864
877
        # verbose test output formatting
865
878
        result_stream = StringIO()
866
879
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
880
            result_stream,
868
881
            descriptions=0,
869
882
            verbosity=2,
870
883
            )
880
893
        output = result_stream.getvalue()[prefix:]
881
894
        lines = output.splitlines()
882
895
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
896
        if sys.version_info > (2, 7):
 
897
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
898
                self.assertNotEqual, lines[1], '    ')
883
899
        self.assertEqual(lines[1], '    foo')
884
900
        self.assertEqual(2, len(lines))
885
901
 
893
909
        """Test the behaviour of invoking addNotSupported."""
894
910
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
911
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
912
            def report_tests_starting(self): pass
898
913
            def report_unsupported(self, test, feature):
899
914
                self._call = test, feature
900
915
        result = InstrumentedTestResult(None, None, None, None)
919
934
        # verbose test output formatting
920
935
        result_stream = StringIO()
921
936
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
937
            result_stream,
923
938
            descriptions=0,
924
939
            verbosity=2,
925
940
            )
939
954
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
955
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
956
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
957
            def report_tests_starting(self): pass
944
958
            def addNotSupported(self, test, feature):
945
959
                self._call = test, feature
946
960
        result = InstrumentedTestResult(None, None, None, None)
988
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
1003
            calls = 0
990
1004
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
1005
        result = InstrumentedTestResult(None, None, None, None)
993
1006
        def test_function():
994
1007
            pass
996
1009
        test.run(result)
997
1010
        self.assertEquals(1, result.calls)
998
1011
 
 
1012
    def test_startTests_only_once(self):
 
1013
        """With multiple tests startTests should still only be called once"""
 
1014
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1015
            calls = 0
 
1016
            def startTests(self): self.calls += 1
 
1017
        result = InstrumentedTestResult(None, None, None, None)
 
1018
        suite = unittest.TestSuite([
 
1019
            unittest.FunctionTestCase(lambda: None),
 
1020
            unittest.FunctionTestCase(lambda: None)])
 
1021
        suite.run(result)
 
1022
        self.assertEquals(1, result.calls)
 
1023
        self.assertEquals(2, result.count)
 
1024
 
999
1025
 
1000
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1027
 
1022
1048
        because of our use of global state.
1023
1049
        """
1024
1050
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1051
        try:
1027
1052
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1053
            return testrunner.run(test)
1030
1054
        finally:
1031
1055
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1056
 
1034
1057
    def test_known_failure_failed_run(self):
1035
1058
        # run a test that generates a known failure which should be printed in
1213
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1237
        self.assertLength(1, self._get_source_tree_calls)
1215
1238
 
 
1239
    def test_verbose_test_count(self):
 
1240
        """A verbose test run reports the right test count at the start"""
 
1241
        suite = TestUtil.TestSuite([
 
1242
            unittest.FunctionTestCase(lambda:None),
 
1243
            unittest.FunctionTestCase(lambda:None)])
 
1244
        self.assertEqual(suite.countTestCases(), 2)
 
1245
        stream = StringIO()
 
1246
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1247
        # Need to use the CountingDecorator as that's what sets num_tests
 
1248
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1249
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1250
 
1216
1251
    def test_startTestRun(self):
1217
1252
        """run should call result.startTestRun()"""
1218
1253
        calls = []
1421
1456
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1457
        output_stream = StringIO()
1423
1458
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1459
            output_stream,
1425
1460
            descriptions=0,
1426
1461
            verbosity=2)
1427
1462
        sample_test.run(result)
1655
1690
        self.assertEqual('original', obj.test_attr)
1656
1691
 
1657
1692
 
 
1693
class _MissingFeature(tests.Feature):
 
1694
    def _probe(self):
 
1695
        return False
 
1696
missing_feature = _MissingFeature()
 
1697
 
 
1698
 
 
1699
def _get_test(name):
 
1700
    """Get an instance of a specific example test.
 
1701
 
 
1702
    We protect this in a function so that they don't auto-run in the test
 
1703
    suite.
 
1704
    """
 
1705
 
 
1706
    class ExampleTests(tests.TestCase):
 
1707
 
 
1708
        def test_fail(self):
 
1709
            mutter('this was a failing test')
 
1710
            self.fail('this test will fail')
 
1711
 
 
1712
        def test_error(self):
 
1713
            mutter('this test errored')
 
1714
            raise RuntimeError('gotcha')
 
1715
 
 
1716
        def test_missing_feature(self):
 
1717
            mutter('missing the feature')
 
1718
            self.requireFeature(missing_feature)
 
1719
 
 
1720
        def test_skip(self):
 
1721
            mutter('this test will be skipped')
 
1722
            raise tests.TestSkipped('reason')
 
1723
 
 
1724
        def test_success(self):
 
1725
            mutter('this test succeeds')
 
1726
 
 
1727
        def test_xfail(self):
 
1728
            mutter('test with expected failure')
 
1729
            self.knownFailure('this_fails')
 
1730
 
 
1731
        def test_unexpected_success(self):
 
1732
            mutter('test with unexpected success')
 
1733
            self.expectFailure('should_fail', lambda: None)
 
1734
 
 
1735
    return ExampleTests(name)
 
1736
 
 
1737
 
 
1738
class TestTestCaseLogDetails(tests.TestCase):
 
1739
 
 
1740
    def _run_test(self, test_name):
 
1741
        test = _get_test(test_name)
 
1742
        result = testtools.TestResult()
 
1743
        test.run(result)
 
1744
        return result
 
1745
 
 
1746
    def test_fail_has_log(self):
 
1747
        result = self._run_test('test_fail')
 
1748
        self.assertEqual(1, len(result.failures))
 
1749
        result_content = result.failures[0][1]
 
1750
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1751
        self.assertContainsRe(result_content, 'this was a failing test')
 
1752
 
 
1753
    def test_error_has_log(self):
 
1754
        result = self._run_test('test_error')
 
1755
        self.assertEqual(1, len(result.errors))
 
1756
        result_content = result.errors[0][1]
 
1757
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1758
        self.assertContainsRe(result_content, 'this test errored')
 
1759
 
 
1760
    def test_skip_has_no_log(self):
 
1761
        result = self._run_test('test_skip')
 
1762
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1763
        skips = result.skip_reasons['reason']
 
1764
        self.assertEqual(1, len(skips))
 
1765
        test = skips[0]
 
1766
        self.assertFalse('log' in test.getDetails())
 
1767
 
 
1768
    def test_missing_feature_has_no_log(self):
 
1769
        # testtools doesn't know about addNotSupported, so it just gets
 
1770
        # considered as a skip
 
1771
        result = self._run_test('test_missing_feature')
 
1772
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1773
        skips = result.skip_reasons[missing_feature]
 
1774
        self.assertEqual(1, len(skips))
 
1775
        test = skips[0]
 
1776
        self.assertFalse('log' in test.getDetails())
 
1777
 
 
1778
    def test_xfail_has_no_log(self):
 
1779
        result = self._run_test('test_xfail')
 
1780
        self.assertEqual(1, len(result.expectedFailures))
 
1781
        result_content = result.expectedFailures[0][1]
 
1782
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1783
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1784
 
 
1785
    def test_unexpected_success_has_log(self):
 
1786
        result = self._run_test('test_unexpected_success')
 
1787
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1788
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1789
        # expectedFailures is a list of reasons?
 
1790
        test = result.unexpectedSuccesses[0]
 
1791
        details = test.getDetails()
 
1792
        self.assertTrue('log' in details)
 
1793
 
 
1794
 
 
1795
class TestTestCloning(tests.TestCase):
 
1796
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1797
 
 
1798
    def test_cloned_testcase_does_not_share_details(self):
 
1799
        """A TestCase cloned with clone_test does not share mutable attributes
 
1800
        such as details or cleanups.
 
1801
        """
 
1802
        class Test(tests.TestCase):
 
1803
            def test_foo(self):
 
1804
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1805
        orig_test = Test('test_foo')
 
1806
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1807
        orig_test.run(unittest.TestResult())
 
1808
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1809
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1810
 
 
1811
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1812
        """Applying two levels of scenarios to a test preserves the attributes
 
1813
        added by both scenarios.
 
1814
        """
 
1815
        class Test(tests.TestCase):
 
1816
            def test_foo(self):
 
1817
                pass
 
1818
        test = Test('test_foo')
 
1819
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1820
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1821
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1822
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1823
        all_tests = list(tests.iter_suite_tests(suite))
 
1824
        self.assertLength(4, all_tests)
 
1825
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1826
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1827
 
 
1828
 
1658
1829
# NB: Don't delete this; it's not actually from 0.11!
1659
1830
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1831
def sample_deprecated_function():
1814
1985
                tree.branch.repository.bzrdir.root_transport)
1815
1986
 
1816
1987
 
1817
 
class SelfTestHelper:
 
1988
class SelfTestHelper(object):
1818
1989
 
1819
1990
    def run_selftest(self, **kwargs):
1820
1991
        """Run selftest returning its output."""
1971
2142
            load_list='missing file name', list_only=True)
1972
2143
 
1973
2144
 
 
2145
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2146
 
 
2147
    _test_needs_features = [features.subunit]
 
2148
 
 
2149
    def run_subunit_stream(self, test_name):
 
2150
        from subunit import ProtocolTestCase
 
2151
        def factory():
 
2152
            return TestUtil.TestSuite([_get_test(test_name)])
 
2153
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2154
            test_suite_factory=factory)
 
2155
        test = ProtocolTestCase(stream)
 
2156
        result = testtools.TestResult()
 
2157
        test.run(result)
 
2158
        content = stream.getvalue()
 
2159
        return content, result
 
2160
 
 
2161
    def test_fail_has_log(self):
 
2162
        content, result = self.run_subunit_stream('test_fail')
 
2163
        self.assertEqual(1, len(result.failures))
 
2164
        self.assertContainsRe(content, '(?m)^log$')
 
2165
        self.assertContainsRe(content, 'this test will fail')
 
2166
 
 
2167
    def test_error_has_log(self):
 
2168
        content, result = self.run_subunit_stream('test_error')
 
2169
        self.assertContainsRe(content, '(?m)^log$')
 
2170
        self.assertContainsRe(content, 'this test errored')
 
2171
 
 
2172
    def test_skip_has_no_log(self):
 
2173
        content, result = self.run_subunit_stream('test_skip')
 
2174
        self.assertNotContainsRe(content, '(?m)^log$')
 
2175
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2176
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2177
        skips = result.skip_reasons['reason']
 
2178
        self.assertEqual(1, len(skips))
 
2179
        test = skips[0]
 
2180
        # RemotedTestCase doesn't preserve the "details"
 
2181
        ## self.assertFalse('log' in test.getDetails())
 
2182
 
 
2183
    def test_missing_feature_has_no_log(self):
 
2184
        content, result = self.run_subunit_stream('test_missing_feature')
 
2185
        self.assertNotContainsRe(content, '(?m)^log$')
 
2186
        self.assertNotContainsRe(content, 'missing the feature')
 
2187
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2188
        skips = result.skip_reasons['_MissingFeature\n']
 
2189
        self.assertEqual(1, len(skips))
 
2190
        test = skips[0]
 
2191
        # RemotedTestCase doesn't preserve the "details"
 
2192
        ## self.assertFalse('log' in test.getDetails())
 
2193
 
 
2194
    def test_xfail_has_no_log(self):
 
2195
        content, result = self.run_subunit_stream('test_xfail')
 
2196
        self.assertNotContainsRe(content, '(?m)^log$')
 
2197
        self.assertNotContainsRe(content, 'test with expected failure')
 
2198
        self.assertEqual(1, len(result.expectedFailures))
 
2199
        result_content = result.expectedFailures[0][1]
 
2200
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2201
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2202
 
 
2203
    def test_unexpected_success_has_log(self):
 
2204
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2205
        self.assertContainsRe(content, '(?m)^log$')
 
2206
        self.assertContainsRe(content, 'test with unexpected success')
 
2207
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2208
                           ' as a plain success',
 
2209
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2210
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2211
        test = result.unexpectedSuccesses[0]
 
2212
        # RemotedTestCase doesn't preserve the "details"
 
2213
        ## self.assertTrue('log' in test.getDetails())
 
2214
 
 
2215
    def test_success_has_no_log(self):
 
2216
        content, result = self.run_subunit_stream('test_success')
 
2217
        self.assertEqual(1, result.testsRun)
 
2218
        self.assertNotContainsRe(content, '(?m)^log$')
 
2219
        self.assertNotContainsRe(content, 'this test succeeds')
 
2220
 
 
2221
 
1974
2222
class TestRunBzr(tests.TestCase):
1975
2223
 
1976
2224
    out = ''
2339
2587
            os.chdir = orig_chdir
2340
2588
        self.assertEqual(['foo', 'current'], chdirs)
2341
2589
 
 
2590
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2591
        self.get_source_path = lambda: ""
 
2592
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2593
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2594
 
2342
2595
 
2343
2596
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2597
    """Tests that really need to do things with an external bzr."""
2960
3213
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3214
 
2962
3215
 
 
3216
class TestThreadLeakDetection(tests.TestCase):
 
3217
    """Ensure when tests leak threads we detect and report it"""
 
3218
 
 
3219
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3220
        def __init__(self):
 
3221
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3222
            self.leaks = []
 
3223
        def _report_thread_leak(self, test, leaks, alive):
 
3224
            self.leaks.append((test, leaks))
 
3225
 
 
3226
    def test_testcase_without_addCleanups(self):
 
3227
        """Check old TestCase instances don't break with leak detection"""
 
3228
        class Test(unittest.TestCase):
 
3229
            def runTest(self):
 
3230
                pass
 
3231
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3232
        result = self.LeakRecordingResult()
 
3233
        test = Test()
 
3234
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3235
        result.startTestRun()
 
3236
        test.run(result)
 
3237
        result.stopTestRun()
 
3238
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3239
        self.assertEqual(result.leaks, [])
 
3240
        
 
3241
    def test_thread_leak(self):
 
3242
        """Ensure a thread that outlives the running of a test is reported
 
3243
 
 
3244
        Uses a thread that blocks on an event, and is started by the inner
 
3245
        test case. As the thread outlives the inner case's run, it should be
 
3246
        detected as a leak, but the event is then set so that the thread can
 
3247
        be safely joined in cleanup so it's not leaked for real.
 
3248
        """
 
3249
        event = threading.Event()
 
3250
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3251
        class Test(tests.TestCase):
 
3252
            def test_leak(self):
 
3253
                thread.start()
 
3254
        result = self.LeakRecordingResult()
 
3255
        test = Test("test_leak")
 
3256
        self.addCleanup(thread.join)
 
3257
        self.addCleanup(event.set)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3262
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3263
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3264
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3265
 
 
3266
    def test_multiple_leaks(self):
 
3267
        """Check multiple leaks are blamed on the test cases at fault
 
3268
 
 
3269
        Same concept as the previous test, but has one inner test method that
 
3270
        leaks two threads, and one that doesn't leak at all.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3274
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3275
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3276
        class Test(tests.TestCase):
 
3277
            def test_first_leak(self):
 
3278
                thread_b.start()
 
3279
            def test_second_no_leak(self):
 
3280
                pass
 
3281
            def test_third_leak(self):
 
3282
                thread_c.start()
 
3283
                thread_a.start()
 
3284
        result = self.LeakRecordingResult()
 
3285
        first_test = Test("test_first_leak")
 
3286
        third_test = Test("test_third_leak")
 
3287
        self.addCleanup(thread_a.join)
 
3288
        self.addCleanup(thread_b.join)
 
3289
        self.addCleanup(thread_c.join)
 
3290
        self.addCleanup(event.set)
 
3291
        result.startTestRun()
 
3292
        unittest.TestSuite(
 
3293
            [first_test, Test("test_second_no_leak"), third_test]
 
3294
            ).run(result)
 
3295
        result.stopTestRun()
 
3296
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3297
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3298
        self.assertEqual(result.leaks, [
 
3299
            (first_test, set([thread_b])),
 
3300
            (third_test, set([thread_a, thread_c]))])
 
3301
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3302
 
 
3303
 
2963
3304
class TestRunSuite(tests.TestCase):
2964
3305
 
2965
3306
    def test_runner_class(self):