~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2010-12-20 11:57:14 UTC
  • mto: This revision was merged to the branch mainline in revision 5577.
  • Revision ID: jelmer@samba.org-20101220115714-2ru3hfappjweeg7q
Don't use no-plugins.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
42
47
    lockdir,
43
48
    memorytree,
44
49
    osutils,
45
 
    progress,
46
50
    remote,
47
51
    repository,
48
52
    symbol_versioning,
67
71
    test_sftp_transport,
68
72
    TestUtil,
69
73
    )
70
 
from bzrlib.trace import note
 
74
from bzrlib.trace import note, mutter
71
75
from bzrlib.transport import memory
72
76
from bzrlib.version import _get_bzr_source_tree
73
77
 
77
81
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
82
 
79
83
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
84
class MetaTestLog(tests.TestCase):
93
85
 
94
86
    def test_logging(self):
122
114
        self.failUnlessExists(filename)
123
115
 
124
116
 
 
117
class TestClassesAvailable(tests.TestCase):
 
118
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
119
 
 
120
    def test_test_case(self):
 
121
        from bzrlib.tests import TestCase
 
122
 
 
123
    def test_test_loader(self):
 
124
        from bzrlib.tests import TestLoader
 
125
 
 
126
    def test_test_suite(self):
 
127
        from bzrlib.tests import TestSuite
 
128
 
 
129
 
125
130
class TestTransportScenarios(tests.TestCase):
126
131
    """A group of tests that test the transport implementation adaption core.
127
132
 
312
317
        from bzrlib.tests.per_interrepository import make_scenarios
313
318
        server1 = "a"
314
319
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
320
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
321
        scenarios = make_scenarios(server1, server2, formats)
317
322
        self.assertEqual([
318
323
            ('C0,str,str',
319
324
             {'repository_format': 'C1',
320
325
              'repository_format_to': 'C2',
321
326
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
327
              'transport_server': 'a',
 
328
              'extra_setup': 'C3'}),
323
329
            ('D0,str,str',
324
330
             {'repository_format': 'D1',
325
331
              'repository_format_to': 'D2',
326
332
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
333
              'transport_server': 'a',
 
334
              'extra_setup': 'D3'})],
328
335
            scenarios)
329
336
 
330
337
 
611
618
        result = test.run()
612
619
        total_failures = result.errors + result.failures
613
620
        if self._lock_check_thorough:
614
 
            self.assertLength(1, total_failures)
 
621
            self.assertEqual(1, len(total_failures))
615
622
        else:
616
623
            # When _lock_check_thorough is disabled, then we don't trigger a
617
624
            # failure
618
 
            self.assertLength(0, total_failures)
 
625
            self.assertEqual(0, len(total_failures))
619
626
 
620
627
 
621
628
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
833
840
        self.assertContainsRe(output,
834
841
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
835
842
 
 
843
    def test_uses_time_from_testtools(self):
 
844
        """Test case timings in verbose results should use testtools times"""
 
845
        import datetime
 
846
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
847
            def startTest(self, test):
 
848
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
849
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
850
            def addSuccess(self, test):
 
851
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
852
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
853
            def report_tests_starting(self): pass
 
854
        sio = StringIO()
 
855
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
856
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
857
 
836
858
    def test_known_failure(self):
837
859
        """A KnownFailure being raised should trigger several result actions."""
838
860
        class InstrumentedTestResult(tests.ExtendedTestResult):
839
861
            def stopTestRun(self): pass
840
 
            def startTests(self): pass
841
 
            def report_test_start(self, test): pass
 
862
            def report_tests_starting(self): pass
842
863
            def report_known_failure(self, test, err=None, details=None):
843
864
                self._call = test, 'known failure'
844
865
        result = InstrumentedTestResult(None, None, None, None)
894
915
        """Test the behaviour of invoking addNotSupported."""
895
916
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
917
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
918
            def report_tests_starting(self): pass
899
919
            def report_unsupported(self, test, feature):
900
920
                self._call = test, feature
901
921
        result = InstrumentedTestResult(None, None, None, None)
940
960
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
961
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
962
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
963
            def report_tests_starting(self): pass
945
964
            def addNotSupported(self, test, feature):
946
965
                self._call = test, feature
947
966
        result = InstrumentedTestResult(None, None, None, None)
989
1008
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
1009
            calls = 0
991
1010
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
1011
        result = InstrumentedTestResult(None, None, None, None)
994
1012
        def test_function():
995
1013
            pass
997
1015
        test.run(result)
998
1016
        self.assertEquals(1, result.calls)
999
1017
 
 
1018
    def test_startTests_only_once(self):
 
1019
        """With multiple tests startTests should still only be called once"""
 
1020
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1021
            calls = 0
 
1022
            def startTests(self): self.calls += 1
 
1023
        result = InstrumentedTestResult(None, None, None, None)
 
1024
        suite = unittest.TestSuite([
 
1025
            unittest.FunctionTestCase(lambda: None),
 
1026
            unittest.FunctionTestCase(lambda: None)])
 
1027
        suite.run(result)
 
1028
        self.assertEquals(1, result.calls)
 
1029
        self.assertEquals(2, result.count)
 
1030
 
1000
1031
 
1001
1032
class TestUnicodeFilenameFeature(tests.TestCase):
1002
1033
 
1023
1054
        because of our use of global state.
1024
1055
        """
1025
1056
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1057
        try:
1028
1058
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1059
            return testrunner.run(test)
1031
1060
        finally:
1032
1061
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1062
 
1035
1063
    def test_known_failure_failed_run(self):
1036
1064
        # run a test that generates a known failure which should be printed in
1082
1110
    def test_result_decorator(self):
1083
1111
        # decorate results
1084
1112
        calls = []
1085
 
        class LoggingDecorator(tests.ForwardingResult):
 
1113
        class LoggingDecorator(ExtendedToOriginalDecorator):
1086
1114
            def startTest(self, test):
1087
 
                tests.ForwardingResult.startTest(self, test)
 
1115
                ExtendedToOriginalDecorator.startTest(self, test)
1088
1116
                calls.append('start')
1089
1117
        test = unittest.FunctionTestCase(lambda:None)
1090
1118
        stream = StringIO()
1214
1242
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1243
        self.assertLength(1, self._get_source_tree_calls)
1216
1244
 
 
1245
    def test_verbose_test_count(self):
 
1246
        """A verbose test run reports the right test count at the start"""
 
1247
        suite = TestUtil.TestSuite([
 
1248
            unittest.FunctionTestCase(lambda:None),
 
1249
            unittest.FunctionTestCase(lambda:None)])
 
1250
        self.assertEqual(suite.countTestCases(), 2)
 
1251
        stream = StringIO()
 
1252
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1253
        # Need to use the CountingDecorator as that's what sets num_tests
 
1254
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1255
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1256
 
1217
1257
    def test_startTestRun(self):
1218
1258
        """run should call result.startTestRun()"""
1219
1259
        calls = []
1220
 
        class LoggingDecorator(tests.ForwardingResult):
 
1260
        class LoggingDecorator(ExtendedToOriginalDecorator):
1221
1261
            def startTestRun(self):
1222
 
                tests.ForwardingResult.startTestRun(self)
 
1262
                ExtendedToOriginalDecorator.startTestRun(self)
1223
1263
                calls.append('startTestRun')
1224
1264
        test = unittest.FunctionTestCase(lambda:None)
1225
1265
        stream = StringIO()
1231
1271
    def test_stopTestRun(self):
1232
1272
        """run should call result.stopTestRun()"""
1233
1273
        calls = []
1234
 
        class LoggingDecorator(tests.ForwardingResult):
 
1274
        class LoggingDecorator(ExtendedToOriginalDecorator):
1235
1275
            def stopTestRun(self):
1236
 
                tests.ForwardingResult.stopTestRun(self)
 
1276
                ExtendedToOriginalDecorator.stopTestRun(self)
1237
1277
                calls.append('stopTestRun')
1238
1278
        test = unittest.FunctionTestCase(lambda:None)
1239
1279
        stream = StringIO()
1242
1282
        result = self.run_test_runner(runner, test)
1243
1283
        self.assertLength(1, calls)
1244
1284
 
 
1285
    def test_unicode_test_output_on_ascii_stream(self):
 
1286
        """Showing results should always succeed even on an ascii console"""
 
1287
        class FailureWithUnicode(tests.TestCase):
 
1288
            def test_log_unicode(self):
 
1289
                self.log(u"\u2606")
 
1290
                self.fail("Now print that log!")
 
1291
        out = StringIO()
 
1292
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1293
            lambda trace=False: "ascii")
 
1294
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1295
            FailureWithUnicode("test_log_unicode"))
 
1296
        self.assertContainsRe(out.getvalue(),
 
1297
            "Text attachment: log\n"
 
1298
            "-+\n"
 
1299
            "\d+\.\d+  \\\\u2606\n"
 
1300
            "-+\n")
 
1301
 
1245
1302
 
1246
1303
class SampleTestCase(tests.TestCase):
1247
1304
 
1656
1713
        self.assertEqual('original', obj.test_attr)
1657
1714
 
1658
1715
 
 
1716
class _MissingFeature(tests.Feature):
 
1717
    def _probe(self):
 
1718
        return False
 
1719
missing_feature = _MissingFeature()
 
1720
 
 
1721
 
 
1722
def _get_test(name):
 
1723
    """Get an instance of a specific example test.
 
1724
 
 
1725
    We protect this in a function so that they don't auto-run in the test
 
1726
    suite.
 
1727
    """
 
1728
 
 
1729
    class ExampleTests(tests.TestCase):
 
1730
 
 
1731
        def test_fail(self):
 
1732
            mutter('this was a failing test')
 
1733
            self.fail('this test will fail')
 
1734
 
 
1735
        def test_error(self):
 
1736
            mutter('this test errored')
 
1737
            raise RuntimeError('gotcha')
 
1738
 
 
1739
        def test_missing_feature(self):
 
1740
            mutter('missing the feature')
 
1741
            self.requireFeature(missing_feature)
 
1742
 
 
1743
        def test_skip(self):
 
1744
            mutter('this test will be skipped')
 
1745
            raise tests.TestSkipped('reason')
 
1746
 
 
1747
        def test_success(self):
 
1748
            mutter('this test succeeds')
 
1749
 
 
1750
        def test_xfail(self):
 
1751
            mutter('test with expected failure')
 
1752
            self.knownFailure('this_fails')
 
1753
 
 
1754
        def test_unexpected_success(self):
 
1755
            mutter('test with unexpected success')
 
1756
            self.expectFailure('should_fail', lambda: None)
 
1757
 
 
1758
    return ExampleTests(name)
 
1759
 
 
1760
 
 
1761
class TestTestCaseLogDetails(tests.TestCase):
 
1762
 
 
1763
    def _run_test(self, test_name):
 
1764
        test = _get_test(test_name)
 
1765
        result = testtools.TestResult()
 
1766
        test.run(result)
 
1767
        return result
 
1768
 
 
1769
    def test_fail_has_log(self):
 
1770
        result = self._run_test('test_fail')
 
1771
        self.assertEqual(1, len(result.failures))
 
1772
        result_content = result.failures[0][1]
 
1773
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1774
        self.assertContainsRe(result_content, 'this was a failing test')
 
1775
 
 
1776
    def test_error_has_log(self):
 
1777
        result = self._run_test('test_error')
 
1778
        self.assertEqual(1, len(result.errors))
 
1779
        result_content = result.errors[0][1]
 
1780
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1781
        self.assertContainsRe(result_content, 'this test errored')
 
1782
 
 
1783
    def test_skip_has_no_log(self):
 
1784
        result = self._run_test('test_skip')
 
1785
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1786
        skips = result.skip_reasons['reason']
 
1787
        self.assertEqual(1, len(skips))
 
1788
        test = skips[0]
 
1789
        self.assertFalse('log' in test.getDetails())
 
1790
 
 
1791
    def test_missing_feature_has_no_log(self):
 
1792
        # testtools doesn't know about addNotSupported, so it just gets
 
1793
        # considered as a skip
 
1794
        result = self._run_test('test_missing_feature')
 
1795
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1796
        skips = result.skip_reasons[missing_feature]
 
1797
        self.assertEqual(1, len(skips))
 
1798
        test = skips[0]
 
1799
        self.assertFalse('log' in test.getDetails())
 
1800
 
 
1801
    def test_xfail_has_no_log(self):
 
1802
        result = self._run_test('test_xfail')
 
1803
        self.assertEqual(1, len(result.expectedFailures))
 
1804
        result_content = result.expectedFailures[0][1]
 
1805
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1806
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1807
 
 
1808
    def test_unexpected_success_has_log(self):
 
1809
        result = self._run_test('test_unexpected_success')
 
1810
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1811
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1812
        # expectedFailures is a list of reasons?
 
1813
        test = result.unexpectedSuccesses[0]
 
1814
        details = test.getDetails()
 
1815
        self.assertTrue('log' in details)
 
1816
 
 
1817
 
 
1818
class TestTestCloning(tests.TestCase):
 
1819
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1820
 
 
1821
    def test_cloned_testcase_does_not_share_details(self):
 
1822
        """A TestCase cloned with clone_test does not share mutable attributes
 
1823
        such as details or cleanups.
 
1824
        """
 
1825
        class Test(tests.TestCase):
 
1826
            def test_foo(self):
 
1827
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1828
        orig_test = Test('test_foo')
 
1829
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1830
        orig_test.run(unittest.TestResult())
 
1831
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1832
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1833
 
 
1834
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1835
        """Applying two levels of scenarios to a test preserves the attributes
 
1836
        added by both scenarios.
 
1837
        """
 
1838
        class Test(tests.TestCase):
 
1839
            def test_foo(self):
 
1840
                pass
 
1841
        test = Test('test_foo')
 
1842
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1843
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1844
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1845
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1846
        all_tests = list(tests.iter_suite_tests(suite))
 
1847
        self.assertLength(4, all_tests)
 
1848
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1849
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1850
 
 
1851
 
1659
1852
# NB: Don't delete this; it's not actually from 0.11!
1660
1853
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1854
def sample_deprecated_function():
1815
2008
                tree.branch.repository.bzrdir.root_transport)
1816
2009
 
1817
2010
 
1818
 
class SelfTestHelper:
 
2011
class SelfTestHelper(object):
1819
2012
 
1820
2013
    def run_selftest(self, **kwargs):
1821
2014
        """Run selftest returning its output."""
1881
2074
            def __call__(test, result):
1882
2075
                test.run(result)
1883
2076
            def run(test, result):
1884
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2077
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1885
2078
                calls.append("called")
1886
2079
            def countTestCases(self):
1887
2080
                return 1
1972
2165
            load_list='missing file name', list_only=True)
1973
2166
 
1974
2167
 
 
2168
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2169
 
 
2170
    _test_needs_features = [features.subunit]
 
2171
 
 
2172
    def run_subunit_stream(self, test_name):
 
2173
        from subunit import ProtocolTestCase
 
2174
        def factory():
 
2175
            return TestUtil.TestSuite([_get_test(test_name)])
 
2176
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2177
            test_suite_factory=factory)
 
2178
        test = ProtocolTestCase(stream)
 
2179
        result = testtools.TestResult()
 
2180
        test.run(result)
 
2181
        content = stream.getvalue()
 
2182
        return content, result
 
2183
 
 
2184
    def test_fail_has_log(self):
 
2185
        content, result = self.run_subunit_stream('test_fail')
 
2186
        self.assertEqual(1, len(result.failures))
 
2187
        self.assertContainsRe(content, '(?m)^log$')
 
2188
        self.assertContainsRe(content, 'this test will fail')
 
2189
 
 
2190
    def test_error_has_log(self):
 
2191
        content, result = self.run_subunit_stream('test_error')
 
2192
        self.assertContainsRe(content, '(?m)^log$')
 
2193
        self.assertContainsRe(content, 'this test errored')
 
2194
 
 
2195
    def test_skip_has_no_log(self):
 
2196
        content, result = self.run_subunit_stream('test_skip')
 
2197
        self.assertNotContainsRe(content, '(?m)^log$')
 
2198
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2199
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2200
        skips = result.skip_reasons['reason']
 
2201
        self.assertEqual(1, len(skips))
 
2202
        test = skips[0]
 
2203
        # RemotedTestCase doesn't preserve the "details"
 
2204
        ## self.assertFalse('log' in test.getDetails())
 
2205
 
 
2206
    def test_missing_feature_has_no_log(self):
 
2207
        content, result = self.run_subunit_stream('test_missing_feature')
 
2208
        self.assertNotContainsRe(content, '(?m)^log$')
 
2209
        self.assertNotContainsRe(content, 'missing the feature')
 
2210
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2211
        skips = result.skip_reasons['_MissingFeature\n']
 
2212
        self.assertEqual(1, len(skips))
 
2213
        test = skips[0]
 
2214
        # RemotedTestCase doesn't preserve the "details"
 
2215
        ## self.assertFalse('log' in test.getDetails())
 
2216
 
 
2217
    def test_xfail_has_no_log(self):
 
2218
        content, result = self.run_subunit_stream('test_xfail')
 
2219
        self.assertNotContainsRe(content, '(?m)^log$')
 
2220
        self.assertNotContainsRe(content, 'test with expected failure')
 
2221
        self.assertEqual(1, len(result.expectedFailures))
 
2222
        result_content = result.expectedFailures[0][1]
 
2223
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2224
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2225
 
 
2226
    def test_unexpected_success_has_log(self):
 
2227
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2228
        self.assertContainsRe(content, '(?m)^log$')
 
2229
        self.assertContainsRe(content, 'test with unexpected success')
 
2230
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2231
                           ' as a plain success',
 
2232
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2233
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2234
        test = result.unexpectedSuccesses[0]
 
2235
        # RemotedTestCase doesn't preserve the "details"
 
2236
        ## self.assertTrue('log' in test.getDetails())
 
2237
 
 
2238
    def test_success_has_no_log(self):
 
2239
        content, result = self.run_subunit_stream('test_success')
 
2240
        self.assertEqual(1, result.testsRun)
 
2241
        self.assertNotContainsRe(content, '(?m)^log$')
 
2242
        self.assertNotContainsRe(content, 'this test succeeds')
 
2243
 
 
2244
 
1975
2245
class TestRunBzr(tests.TestCase):
1976
2246
 
1977
2247
    out = ''
2966
3236
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2967
3237
 
2968
3238
 
 
3239
class TestThreadLeakDetection(tests.TestCase):
 
3240
    """Ensure when tests leak threads we detect and report it"""
 
3241
 
 
3242
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3243
        def __init__(self):
 
3244
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3245
            self.leaks = []
 
3246
        def _report_thread_leak(self, test, leaks, alive):
 
3247
            self.leaks.append((test, leaks))
 
3248
 
 
3249
    def test_testcase_without_addCleanups(self):
 
3250
        """Check old TestCase instances don't break with leak detection"""
 
3251
        class Test(unittest.TestCase):
 
3252
            def runTest(self):
 
3253
                pass
 
3254
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3255
        result = self.LeakRecordingResult()
 
3256
        test = Test()
 
3257
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3262
        self.assertEqual(result.leaks, [])
 
3263
        
 
3264
    def test_thread_leak(self):
 
3265
        """Ensure a thread that outlives the running of a test is reported
 
3266
 
 
3267
        Uses a thread that blocks on an event, and is started by the inner
 
3268
        test case. As the thread outlives the inner case's run, it should be
 
3269
        detected as a leak, but the event is then set so that the thread can
 
3270
        be safely joined in cleanup so it's not leaked for real.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3274
        class Test(tests.TestCase):
 
3275
            def test_leak(self):
 
3276
                thread.start()
 
3277
        result = self.LeakRecordingResult()
 
3278
        test = Test("test_leak")
 
3279
        self.addCleanup(thread.join)
 
3280
        self.addCleanup(event.set)
 
3281
        result.startTestRun()
 
3282
        test.run(result)
 
3283
        result.stopTestRun()
 
3284
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3285
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3286
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3287
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3288
 
 
3289
    def test_multiple_leaks(self):
 
3290
        """Check multiple leaks are blamed on the test cases at fault
 
3291
 
 
3292
        Same concept as the previous test, but has one inner test method that
 
3293
        leaks two threads, and one that doesn't leak at all.
 
3294
        """
 
3295
        event = threading.Event()
 
3296
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3297
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3298
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3299
        class Test(tests.TestCase):
 
3300
            def test_first_leak(self):
 
3301
                thread_b.start()
 
3302
            def test_second_no_leak(self):
 
3303
                pass
 
3304
            def test_third_leak(self):
 
3305
                thread_c.start()
 
3306
                thread_a.start()
 
3307
        result = self.LeakRecordingResult()
 
3308
        first_test = Test("test_first_leak")
 
3309
        third_test = Test("test_third_leak")
 
3310
        self.addCleanup(thread_a.join)
 
3311
        self.addCleanup(thread_b.join)
 
3312
        self.addCleanup(thread_c.join)
 
3313
        self.addCleanup(event.set)
 
3314
        result.startTestRun()
 
3315
        unittest.TestSuite(
 
3316
            [first_test, Test("test_second_no_leak"), third_test]
 
3317
            ).run(result)
 
3318
        result.stopTestRun()
 
3319
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3320
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3321
        self.assertEqual(result.leaks, [
 
3322
            (first_test, set([thread_b])),
 
3323
            (third_test, set([thread_a, thread_c]))])
 
3324
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3325
 
 
3326
 
 
3327
class TestPostMortemDebugging(tests.TestCase):
 
3328
    """Check post mortem debugging works when tests fail or error"""
 
3329
 
 
3330
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3331
        def __init__(self):
 
3332
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3333
            self.postcode = None
 
3334
        def _post_mortem(self, tb=None):
 
3335
            """Record the code object at the end of the current traceback"""
 
3336
            tb = tb or sys.exc_info()[2]
 
3337
            if tb is not None:
 
3338
                next = tb.tb_next
 
3339
                while next is not None:
 
3340
                    tb = next
 
3341
                    next = next.tb_next
 
3342
                self.postcode = tb.tb_frame.f_code
 
3343
        def report_error(self, test, err):
 
3344
            pass
 
3345
        def report_failure(self, test, err):
 
3346
            pass
 
3347
 
 
3348
    def test_location_unittest_error(self):
 
3349
        """Needs right post mortem traceback with erroring unittest case"""
 
3350
        class Test(unittest.TestCase):
 
3351
            def runTest(self):
 
3352
                raise RuntimeError
 
3353
        result = self.TracebackRecordingResult()
 
3354
        Test().run(result)
 
3355
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3356
 
 
3357
    def test_location_unittest_failure(self):
 
3358
        """Needs right post mortem traceback with failing unittest case"""
 
3359
        class Test(unittest.TestCase):
 
3360
            def runTest(self):
 
3361
                raise self.failureException
 
3362
        result = self.TracebackRecordingResult()
 
3363
        Test().run(result)
 
3364
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3365
 
 
3366
    def test_location_bt_error(self):
 
3367
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3368
        class Test(tests.TestCase):
 
3369
            def test_error(self):
 
3370
                raise RuntimeError
 
3371
        result = self.TracebackRecordingResult()
 
3372
        Test("test_error").run(result)
 
3373
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3374
 
 
3375
    def test_location_bt_failure(self):
 
3376
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3377
        class Test(tests.TestCase):
 
3378
            def test_failure(self):
 
3379
                raise self.failureException
 
3380
        result = self.TracebackRecordingResult()
 
3381
        Test("test_failure").run(result)
 
3382
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3383
 
 
3384
    def test_env_var_triggers_post_mortem(self):
 
3385
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3386
        import pdb
 
3387
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3388
        post_mortem_calls = []
 
3389
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3390
        self.addCleanup(osutils.set_or_unset_env, "BZR_TEST_PDB",
 
3391
            osutils.set_or_unset_env("BZR_TEST_PDB", None))
 
3392
        result._post_mortem(1)
 
3393
        os.environ["BZR_TEST_PDB"] = "on"
 
3394
        result._post_mortem(2)
 
3395
        self.assertEqual([2], post_mortem_calls)
 
3396
 
 
3397
 
2969
3398
class TestRunSuite(tests.TestCase):
2970
3399
 
2971
3400
    def test_runner_class(self):