~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Parth Malwankar
  • Date: 2010-05-05 14:02:53 UTC
  • mto: This revision was merged to the branch mainline in revision 5213.
  • Revision ID: parth.malwankar@gmail.com-20100505140253-fqdiwllq4o4htbsg
added comment to init/init-repo pass tests for lacking whoami.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
24
 
import threading
25
24
import time
26
25
import unittest
27
26
import warnings
28
27
 
29
28
from testtools import MultiTestResult
30
 
from testtools.content import Content
31
29
from testtools.content_type import ContentType
32
30
from testtools.matchers import (
33
31
    DocTestMatches,
44
42
    lockdir,
45
43
    memorytree,
46
44
    osutils,
 
45
    progress,
47
46
    remote,
48
47
    repository,
49
48
    symbol_versioning,
68
67
    test_sftp_transport,
69
68
    TestUtil,
70
69
    )
71
 
from bzrlib.trace import note, mutter
 
70
from bzrlib.trace import note
72
71
from bzrlib.transport import memory
73
72
from bzrlib.version import _get_bzr_source_tree
74
73
 
123
122
        self.failUnlessExists(filename)
124
123
 
125
124
 
126
 
class TestClassesAvailable(tests.TestCase):
127
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
128
 
 
129
 
    def test_test_case(self):
130
 
        from bzrlib.tests import TestCase
131
 
 
132
 
    def test_test_loader(self):
133
 
        from bzrlib.tests import TestLoader
134
 
 
135
 
    def test_test_suite(self):
136
 
        from bzrlib.tests import TestSuite
137
 
 
138
 
 
139
125
class TestTransportScenarios(tests.TestCase):
140
126
    """A group of tests that test the transport implementation adaption core.
141
127
 
222
208
    def test_scenarios(self):
223
209
        # check that constructor parameters are passed through to the adapted
224
210
        # test.
225
 
        from bzrlib.tests.per_controldir import make_scenarios
 
211
        from bzrlib.tests.per_bzrdir import make_scenarios
226
212
        vfs_factory = "v"
227
213
        server1 = "a"
228
214
        server2 = "b"
326
312
        from bzrlib.tests.per_interrepository import make_scenarios
327
313
        server1 = "a"
328
314
        server2 = "b"
329
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
315
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
330
316
        scenarios = make_scenarios(server1, server2, formats)
331
317
        self.assertEqual([
332
318
            ('C0,str,str',
333
319
             {'repository_format': 'C1',
334
320
              'repository_format_to': 'C2',
335
321
              'transport_readonly_server': 'b',
336
 
              'transport_server': 'a',
337
 
              'extra_setup': 'C3'}),
 
322
              'transport_server': 'a'}),
338
323
            ('D0,str,str',
339
324
             {'repository_format': 'D1',
340
325
              'repository_format_to': 'D2',
341
326
              'transport_readonly_server': 'b',
342
 
              'transport_server': 'a',
343
 
              'extra_setup': 'D3'})],
 
327
              'transport_server': 'a'})],
344
328
            scenarios)
345
329
 
346
330
 
625
609
                l.attempt_lock()
626
610
        test = TestDanglingLock('test_function')
627
611
        result = test.run()
628
 
        total_failures = result.errors + result.failures
629
612
        if self._lock_check_thorough:
630
 
            self.assertEqual(1, len(total_failures))
 
613
            self.assertEqual(1, len(result.errors))
631
614
        else:
632
615
            # When _lock_check_thorough is disabled, then we don't trigger a
633
616
            # failure
634
 
            self.assertEqual(0, len(total_failures))
 
617
            self.assertEqual(0, len(result.errors))
635
618
 
636
619
 
637
620
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
638
621
    """Tests for the convenience functions TestCaseWithTransport introduces."""
639
622
 
640
623
    def test_get_readonly_url_none(self):
 
624
        from bzrlib.transport import get_transport
641
625
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
642
626
        self.vfs_transport_factory = memory.MemoryServer
643
627
        self.transport_readonly_server = None
645
629
        # for the server
646
630
        url = self.get_readonly_url()
647
631
        url2 = self.get_readonly_url('foo/bar')
648
 
        t = transport.get_transport(url)
649
 
        t2 = transport.get_transport(url2)
 
632
        t = get_transport(url)
 
633
        t2 = get_transport(url2)
650
634
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
651
635
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
652
636
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
653
637
 
654
638
    def test_get_readonly_url_http(self):
655
639
        from bzrlib.tests.http_server import HttpServer
 
640
        from bzrlib.transport import get_transport
656
641
        from bzrlib.transport.http import HttpTransportBase
657
642
        self.transport_server = test_server.LocalURLServer
658
643
        self.transport_readonly_server = HttpServer
660
645
        url = self.get_readonly_url()
661
646
        url2 = self.get_readonly_url('foo/bar')
662
647
        # the transport returned may be any HttpTransportBase subclass
663
 
        t = transport.get_transport(url)
664
 
        t2 = transport.get_transport(url2)
 
648
        t = get_transport(url)
 
649
        t2 = get_transport(url2)
665
650
        self.failUnless(isinstance(t, HttpTransportBase))
666
651
        self.failUnless(isinstance(t2, HttpTransportBase))
667
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
705
690
class TestChrootedTest(tests.ChrootedTestCase):
706
691
 
707
692
    def test_root_is_root(self):
708
 
        t = transport.get_transport(self.get_readonly_url())
 
693
        from bzrlib.transport import get_transport
 
694
        t = get_transport(self.get_readonly_url())
709
695
        url = t.base
710
696
        self.assertEqual(url, t.clone('..').base)
711
697
 
817
803
        self.requireFeature(test_lsprof.LSProfFeature)
818
804
        result_stream = StringIO()
819
805
        result = bzrlib.tests.VerboseTestResult(
820
 
            result_stream,
 
806
            unittest._WritelnDecorator(result_stream),
821
807
            descriptions=0,
822
808
            verbosity=2,
823
809
            )
853
839
        """A KnownFailure being raised should trigger several result actions."""
854
840
        class InstrumentedTestResult(tests.ExtendedTestResult):
855
841
            def stopTestRun(self): pass
856
 
            def report_tests_starting(self): pass
 
842
            def startTests(self): pass
 
843
            def report_test_start(self, test): pass
857
844
            def report_known_failure(self, test, err=None, details=None):
858
845
                self._call = test, 'known failure'
859
846
        result = InstrumentedTestResult(None, None, None, None)
877
864
        # verbose test output formatting
878
865
        result_stream = StringIO()
879
866
        result = bzrlib.tests.VerboseTestResult(
880
 
            result_stream,
 
867
            unittest._WritelnDecorator(result_stream),
881
868
            descriptions=0,
882
869
            verbosity=2,
883
870
            )
893
880
        output = result_stream.getvalue()[prefix:]
894
881
        lines = output.splitlines()
895
882
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
896
 
        if sys.version_info > (2, 7):
897
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
898
 
                self.assertNotEqual, lines[1], '    ')
899
883
        self.assertEqual(lines[1], '    foo')
900
884
        self.assertEqual(2, len(lines))
901
885
 
909
893
        """Test the behaviour of invoking addNotSupported."""
910
894
        class InstrumentedTestResult(tests.ExtendedTestResult):
911
895
            def stopTestRun(self): pass
912
 
            def report_tests_starting(self): pass
 
896
            def startTests(self): pass
 
897
            def report_test_start(self, test): pass
913
898
            def report_unsupported(self, test, feature):
914
899
                self._call = test, feature
915
900
        result = InstrumentedTestResult(None, None, None, None)
934
919
        # verbose test output formatting
935
920
        result_stream = StringIO()
936
921
        result = bzrlib.tests.VerboseTestResult(
937
 
            result_stream,
 
922
            unittest._WritelnDecorator(result_stream),
938
923
            descriptions=0,
939
924
            verbosity=2,
940
925
            )
954
939
        """An UnavailableFeature being raised should invoke addNotSupported."""
955
940
        class InstrumentedTestResult(tests.ExtendedTestResult):
956
941
            def stopTestRun(self): pass
957
 
            def report_tests_starting(self): pass
 
942
            def startTests(self): pass
 
943
            def report_test_start(self, test): pass
958
944
            def addNotSupported(self, test, feature):
959
945
                self._call = test, feature
960
946
        result = InstrumentedTestResult(None, None, None, None)
1002
988
        class InstrumentedTestResult(tests.ExtendedTestResult):
1003
989
            calls = 0
1004
990
            def startTests(self): self.calls += 1
 
991
            def report_test_start(self, test): pass
1005
992
        result = InstrumentedTestResult(None, None, None, None)
1006
993
        def test_function():
1007
994
            pass
1009
996
        test.run(result)
1010
997
        self.assertEquals(1, result.calls)
1011
998
 
1012
 
    def test_startTests_only_once(self):
1013
 
        """With multiple tests startTests should still only be called once"""
1014
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
1015
 
            calls = 0
1016
 
            def startTests(self): self.calls += 1
1017
 
        result = InstrumentedTestResult(None, None, None, None)
1018
 
        suite = unittest.TestSuite([
1019
 
            unittest.FunctionTestCase(lambda: None),
1020
 
            unittest.FunctionTestCase(lambda: None)])
1021
 
        suite.run(result)
1022
 
        self.assertEquals(1, result.calls)
1023
 
        self.assertEquals(2, result.count)
1024
 
 
1025
999
 
1026
1000
class TestUnicodeFilenameFeature(tests.TestCase):
1027
1001
 
1048
1022
        because of our use of global state.
1049
1023
        """
1050
1024
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1025
        old_leak = tests.TestCase._first_thread_leaker_id
1051
1026
        try:
1052
1027
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1028
            tests.TestCase._first_thread_leaker_id = None
1053
1029
            return testrunner.run(test)
1054
1030
        finally:
1055
1031
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1032
            tests.TestCase._first_thread_leaker_id = old_leak
1056
1033
 
1057
1034
    def test_known_failure_failed_run(self):
1058
1035
        # run a test that generates a known failure which should be printed in
1236
1213
        self.assertContainsRe(output_string, "--date [0-9.]+")
1237
1214
        self.assertLength(1, self._get_source_tree_calls)
1238
1215
 
1239
 
    def test_verbose_test_count(self):
1240
 
        """A verbose test run reports the right test count at the start"""
1241
 
        suite = TestUtil.TestSuite([
1242
 
            unittest.FunctionTestCase(lambda:None),
1243
 
            unittest.FunctionTestCase(lambda:None)])
1244
 
        self.assertEqual(suite.countTestCases(), 2)
1245
 
        stream = StringIO()
1246
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
 
        # Need to use the CountingDecorator as that's what sets num_tests
1248
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1250
 
 
1251
1216
    def test_startTestRun(self):
1252
1217
        """run should call result.startTestRun()"""
1253
1218
        calls = []
1456
1421
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1457
1422
        output_stream = StringIO()
1458
1423
        result = bzrlib.tests.VerboseTestResult(
1459
 
            output_stream,
 
1424
            unittest._WritelnDecorator(output_stream),
1460
1425
            descriptions=0,
1461
1426
            verbosity=2)
1462
1427
        sample_test.run(result)
1690
1655
        self.assertEqual('original', obj.test_attr)
1691
1656
 
1692
1657
 
1693
 
class _MissingFeature(tests.Feature):
1694
 
    def _probe(self):
1695
 
        return False
1696
 
missing_feature = _MissingFeature()
1697
 
 
1698
 
 
1699
 
def _get_test(name):
1700
 
    """Get an instance of a specific example test.
1701
 
 
1702
 
    We protect this in a function so that they don't auto-run in the test
1703
 
    suite.
1704
 
    """
1705
 
 
1706
 
    class ExampleTests(tests.TestCase):
1707
 
 
1708
 
        def test_fail(self):
1709
 
            mutter('this was a failing test')
1710
 
            self.fail('this test will fail')
1711
 
 
1712
 
        def test_error(self):
1713
 
            mutter('this test errored')
1714
 
            raise RuntimeError('gotcha')
1715
 
 
1716
 
        def test_missing_feature(self):
1717
 
            mutter('missing the feature')
1718
 
            self.requireFeature(missing_feature)
1719
 
 
1720
 
        def test_skip(self):
1721
 
            mutter('this test will be skipped')
1722
 
            raise tests.TestSkipped('reason')
1723
 
 
1724
 
        def test_success(self):
1725
 
            mutter('this test succeeds')
1726
 
 
1727
 
        def test_xfail(self):
1728
 
            mutter('test with expected failure')
1729
 
            self.knownFailure('this_fails')
1730
 
 
1731
 
        def test_unexpected_success(self):
1732
 
            mutter('test with unexpected success')
1733
 
            self.expectFailure('should_fail', lambda: None)
1734
 
 
1735
 
    return ExampleTests(name)
1736
 
 
1737
 
 
1738
 
class TestTestCaseLogDetails(tests.TestCase):
1739
 
 
1740
 
    def _run_test(self, test_name):
1741
 
        test = _get_test(test_name)
1742
 
        result = testtools.TestResult()
1743
 
        test.run(result)
1744
 
        return result
1745
 
 
1746
 
    def test_fail_has_log(self):
1747
 
        result = self._run_test('test_fail')
1748
 
        self.assertEqual(1, len(result.failures))
1749
 
        result_content = result.failures[0][1]
1750
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1751
 
        self.assertContainsRe(result_content, 'this was a failing test')
1752
 
 
1753
 
    def test_error_has_log(self):
1754
 
        result = self._run_test('test_error')
1755
 
        self.assertEqual(1, len(result.errors))
1756
 
        result_content = result.errors[0][1]
1757
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1758
 
        self.assertContainsRe(result_content, 'this test errored')
1759
 
 
1760
 
    def test_skip_has_no_log(self):
1761
 
        result = self._run_test('test_skip')
1762
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1763
 
        skips = result.skip_reasons['reason']
1764
 
        self.assertEqual(1, len(skips))
1765
 
        test = skips[0]
1766
 
        self.assertFalse('log' in test.getDetails())
1767
 
 
1768
 
    def test_missing_feature_has_no_log(self):
1769
 
        # testtools doesn't know about addNotSupported, so it just gets
1770
 
        # considered as a skip
1771
 
        result = self._run_test('test_missing_feature')
1772
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1773
 
        skips = result.skip_reasons[missing_feature]
1774
 
        self.assertEqual(1, len(skips))
1775
 
        test = skips[0]
1776
 
        self.assertFalse('log' in test.getDetails())
1777
 
 
1778
 
    def test_xfail_has_no_log(self):
1779
 
        result = self._run_test('test_xfail')
1780
 
        self.assertEqual(1, len(result.expectedFailures))
1781
 
        result_content = result.expectedFailures[0][1]
1782
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1783
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1784
 
 
1785
 
    def test_unexpected_success_has_log(self):
1786
 
        result = self._run_test('test_unexpected_success')
1787
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1788
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1789
 
        # expectedFailures is a list of reasons?
1790
 
        test = result.unexpectedSuccesses[0]
1791
 
        details = test.getDetails()
1792
 
        self.assertTrue('log' in details)
1793
 
 
1794
 
 
1795
 
class TestTestCloning(tests.TestCase):
1796
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1797
 
 
1798
 
    def test_cloned_testcase_does_not_share_details(self):
1799
 
        """A TestCase cloned with clone_test does not share mutable attributes
1800
 
        such as details or cleanups.
1801
 
        """
1802
 
        class Test(tests.TestCase):
1803
 
            def test_foo(self):
1804
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
 
        orig_test = Test('test_foo')
1806
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
 
        orig_test.run(unittest.TestResult())
1808
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1810
 
 
1811
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1812
 
        """Applying two levels of scenarios to a test preserves the attributes
1813
 
        added by both scenarios.
1814
 
        """
1815
 
        class Test(tests.TestCase):
1816
 
            def test_foo(self):
1817
 
                pass
1818
 
        test = Test('test_foo')
1819
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
 
        all_tests = list(tests.iter_suite_tests(suite))
1824
 
        self.assertLength(4, all_tests)
1825
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1826
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1827
 
 
1828
 
 
1829
1658
# NB: Don't delete this; it's not actually from 0.11!
1830
1659
@deprecated_function(deprecated_in((0, 11, 0)))
1831
1660
def sample_deprecated_function():
1985
1814
                tree.branch.repository.bzrdir.root_transport)
1986
1815
 
1987
1816
 
1988
 
class SelfTestHelper(object):
 
1817
class SelfTestHelper:
1989
1818
 
1990
1819
    def run_selftest(self, **kwargs):
1991
1820
        """Run selftest returning its output."""
2142
1971
            load_list='missing file name', list_only=True)
2143
1972
 
2144
1973
 
2145
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2146
 
 
2147
 
    _test_needs_features = [features.subunit]
2148
 
 
2149
 
    def run_subunit_stream(self, test_name):
2150
 
        from subunit import ProtocolTestCase
2151
 
        def factory():
2152
 
            return TestUtil.TestSuite([_get_test(test_name)])
2153
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2154
 
            test_suite_factory=factory)
2155
 
        test = ProtocolTestCase(stream)
2156
 
        result = testtools.TestResult()
2157
 
        test.run(result)
2158
 
        content = stream.getvalue()
2159
 
        return content, result
2160
 
 
2161
 
    def test_fail_has_log(self):
2162
 
        content, result = self.run_subunit_stream('test_fail')
2163
 
        self.assertEqual(1, len(result.failures))
2164
 
        self.assertContainsRe(content, '(?m)^log$')
2165
 
        self.assertContainsRe(content, 'this test will fail')
2166
 
 
2167
 
    def test_error_has_log(self):
2168
 
        content, result = self.run_subunit_stream('test_error')
2169
 
        self.assertContainsRe(content, '(?m)^log$')
2170
 
        self.assertContainsRe(content, 'this test errored')
2171
 
 
2172
 
    def test_skip_has_no_log(self):
2173
 
        content, result = self.run_subunit_stream('test_skip')
2174
 
        self.assertNotContainsRe(content, '(?m)^log$')
2175
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2176
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2177
 
        skips = result.skip_reasons['reason']
2178
 
        self.assertEqual(1, len(skips))
2179
 
        test = skips[0]
2180
 
        # RemotedTestCase doesn't preserve the "details"
2181
 
        ## self.assertFalse('log' in test.getDetails())
2182
 
 
2183
 
    def test_missing_feature_has_no_log(self):
2184
 
        content, result = self.run_subunit_stream('test_missing_feature')
2185
 
        self.assertNotContainsRe(content, '(?m)^log$')
2186
 
        self.assertNotContainsRe(content, 'missing the feature')
2187
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2188
 
        skips = result.skip_reasons['_MissingFeature\n']
2189
 
        self.assertEqual(1, len(skips))
2190
 
        test = skips[0]
2191
 
        # RemotedTestCase doesn't preserve the "details"
2192
 
        ## self.assertFalse('log' in test.getDetails())
2193
 
 
2194
 
    def test_xfail_has_no_log(self):
2195
 
        content, result = self.run_subunit_stream('test_xfail')
2196
 
        self.assertNotContainsRe(content, '(?m)^log$')
2197
 
        self.assertNotContainsRe(content, 'test with expected failure')
2198
 
        self.assertEqual(1, len(result.expectedFailures))
2199
 
        result_content = result.expectedFailures[0][1]
2200
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2201
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2202
 
 
2203
 
    def test_unexpected_success_has_log(self):
2204
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2205
 
        self.assertContainsRe(content, '(?m)^log$')
2206
 
        self.assertContainsRe(content, 'test with unexpected success')
2207
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2208
 
                           ' as a plain success',
2209
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2211
 
        test = result.unexpectedSuccesses[0]
2212
 
        # RemotedTestCase doesn't preserve the "details"
2213
 
        ## self.assertTrue('log' in test.getDetails())
2214
 
 
2215
 
    def test_success_has_no_log(self):
2216
 
        content, result = self.run_subunit_stream('test_success')
2217
 
        self.assertEqual(1, result.testsRun)
2218
 
        self.assertNotContainsRe(content, '(?m)^log$')
2219
 
        self.assertNotContainsRe(content, 'this test succeeds')
2220
 
 
2221
 
 
2222
1974
class TestRunBzr(tests.TestCase):
2223
1975
 
2224
1976
    out = ''
2587
2339
            os.chdir = orig_chdir
2588
2340
        self.assertEqual(['foo', 'current'], chdirs)
2589
2341
 
2590
 
    def test_get_bzr_path_with_cwd_bzrlib(self):
2591
 
        self.get_source_path = lambda: ""
2592
 
        self.overrideAttr(os.path, "isfile", lambda path: True)
2593
 
        self.assertEqual(self.get_bzr_path(), "bzr")
2594
 
 
2595
2342
 
2596
2343
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2597
2344
    """Tests that really need to do things with an external bzr."""
3213
2960
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3214
2961
 
3215
2962
 
3216
 
class TestThreadLeakDetection(tests.TestCase):
3217
 
    """Ensure when tests leak threads we detect and report it"""
3218
 
 
3219
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3220
 
        def __init__(self):
3221
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3222
 
            self.leaks = []
3223
 
        def _report_thread_leak(self, test, leaks, alive):
3224
 
            self.leaks.append((test, leaks))
3225
 
 
3226
 
    def test_testcase_without_addCleanups(self):
3227
 
        """Check old TestCase instances don't break with leak detection"""
3228
 
        class Test(unittest.TestCase):
3229
 
            def runTest(self):
3230
 
                pass
3231
 
            addCleanup = None # for when on Python 2.7 with native addCleanup
3232
 
        result = self.LeakRecordingResult()
3233
 
        test = Test()
3234
 
        self.assertIs(getattr(test, "addCleanup", None), None)
3235
 
        result.startTestRun()
3236
 
        test.run(result)
3237
 
        result.stopTestRun()
3238
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3239
 
        self.assertEqual(result.leaks, [])
3240
 
        
3241
 
    def test_thread_leak(self):
3242
 
        """Ensure a thread that outlives the running of a test is reported
3243
 
 
3244
 
        Uses a thread that blocks on an event, and is started by the inner
3245
 
        test case. As the thread outlives the inner case's run, it should be
3246
 
        detected as a leak, but the event is then set so that the thread can
3247
 
        be safely joined in cleanup so it's not leaked for real.
3248
 
        """
3249
 
        event = threading.Event()
3250
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3251
 
        class Test(tests.TestCase):
3252
 
            def test_leak(self):
3253
 
                thread.start()
3254
 
        result = self.LeakRecordingResult()
3255
 
        test = Test("test_leak")
3256
 
        self.addCleanup(thread.join)
3257
 
        self.addCleanup(event.set)
3258
 
        result.startTestRun()
3259
 
        test.run(result)
3260
 
        result.stopTestRun()
3261
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3262
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3263
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3264
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3265
 
 
3266
 
    def test_multiple_leaks(self):
3267
 
        """Check multiple leaks are blamed on the test cases at fault
3268
 
 
3269
 
        Same concept as the previous test, but has one inner test method that
3270
 
        leaks two threads, and one that doesn't leak at all.
3271
 
        """
3272
 
        event = threading.Event()
3273
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3274
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3275
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3276
 
        class Test(tests.TestCase):
3277
 
            def test_first_leak(self):
3278
 
                thread_b.start()
3279
 
            def test_second_no_leak(self):
3280
 
                pass
3281
 
            def test_third_leak(self):
3282
 
                thread_c.start()
3283
 
                thread_a.start()
3284
 
        result = self.LeakRecordingResult()
3285
 
        first_test = Test("test_first_leak")
3286
 
        third_test = Test("test_third_leak")
3287
 
        self.addCleanup(thread_a.join)
3288
 
        self.addCleanup(thread_b.join)
3289
 
        self.addCleanup(thread_c.join)
3290
 
        self.addCleanup(event.set)
3291
 
        result.startTestRun()
3292
 
        unittest.TestSuite(
3293
 
            [first_test, Test("test_second_no_leak"), third_test]
3294
 
            ).run(result)
3295
 
        result.stopTestRun()
3296
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3297
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3298
 
        self.assertEqual(result.leaks, [
3299
 
            (first_test, set([thread_b])),
3300
 
            (third_test, set([thread_a, thread_c]))])
3301
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3302
 
 
3303
 
 
3304
2963
class TestRunSuite(tests.TestCase):
3305
2964
 
3306
2965
    def test_runner_class(self):