~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin Pool
  • Date: 2010-09-10 06:35:30 UTC
  • mto: This revision was merged to the branch mainline in revision 5426.
  • Revision ID: mbp@sourcefrog.net-20100910063530-ntk3qfbtke6zmafl
Add list of confirmation ids

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
24
 
import threading
25
24
import time
26
25
import unittest
27
26
import warnings
28
27
 
29
28
from testtools import MultiTestResult
30
 
from testtools.content import Content
31
29
from testtools.content_type import ContentType
32
30
from testtools.matchers import (
33
31
    DocTestMatches,
44
42
    lockdir,
45
43
    memorytree,
46
44
    osutils,
 
45
    progress,
47
46
    remote,
48
47
    repository,
49
48
    symbol_versioning,
68
67
    test_sftp_transport,
69
68
    TestUtil,
70
69
    )
71
 
from bzrlib.trace import note, mutter
 
70
from bzrlib.trace import note
72
71
from bzrlib.transport import memory
73
72
from bzrlib.version import _get_bzr_source_tree
74
73
 
326
325
        from bzrlib.tests.per_interrepository import make_scenarios
327
326
        server1 = "a"
328
327
        server2 = "b"
329
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
328
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
330
329
        scenarios = make_scenarios(server1, server2, formats)
331
330
        self.assertEqual([
332
331
            ('C0,str,str',
333
332
             {'repository_format': 'C1',
334
333
              'repository_format_to': 'C2',
335
334
              'transport_readonly_server': 'b',
336
 
              'transport_server': 'a',
337
 
              'extra_setup': 'C3'}),
 
335
              'transport_server': 'a'}),
338
336
            ('D0,str,str',
339
337
             {'repository_format': 'D1',
340
338
              'repository_format_to': 'D2',
341
339
              'transport_readonly_server': 'b',
342
 
              'transport_server': 'a',
343
 
              'extra_setup': 'D3'})],
 
340
              'transport_server': 'a'})],
344
341
            scenarios)
345
342
 
346
343
 
627
624
        result = test.run()
628
625
        total_failures = result.errors + result.failures
629
626
        if self._lock_check_thorough:
630
 
            self.assertEqual(1, len(total_failures))
 
627
            self.assertLength(1, total_failures)
631
628
        else:
632
629
            # When _lock_check_thorough is disabled, then we don't trigger a
633
630
            # failure
634
 
            self.assertEqual(0, len(total_failures))
 
631
            self.assertLength(0, total_failures)
635
632
 
636
633
 
637
634
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
853
850
        """A KnownFailure being raised should trigger several result actions."""
854
851
        class InstrumentedTestResult(tests.ExtendedTestResult):
855
852
            def stopTestRun(self): pass
856
 
            def report_tests_starting(self): pass
 
853
            def startTests(self): pass
 
854
            def report_test_start(self, test): pass
857
855
            def report_known_failure(self, test, err=None, details=None):
858
856
                self._call = test, 'known failure'
859
857
        result = InstrumentedTestResult(None, None, None, None)
909
907
        """Test the behaviour of invoking addNotSupported."""
910
908
        class InstrumentedTestResult(tests.ExtendedTestResult):
911
909
            def stopTestRun(self): pass
912
 
            def report_tests_starting(self): pass
 
910
            def startTests(self): pass
 
911
            def report_test_start(self, test): pass
913
912
            def report_unsupported(self, test, feature):
914
913
                self._call = test, feature
915
914
        result = InstrumentedTestResult(None, None, None, None)
954
953
        """An UnavailableFeature being raised should invoke addNotSupported."""
955
954
        class InstrumentedTestResult(tests.ExtendedTestResult):
956
955
            def stopTestRun(self): pass
957
 
            def report_tests_starting(self): pass
 
956
            def startTests(self): pass
 
957
            def report_test_start(self, test): pass
958
958
            def addNotSupported(self, test, feature):
959
959
                self._call = test, feature
960
960
        result = InstrumentedTestResult(None, None, None, None)
1002
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
1003
1003
            calls = 0
1004
1004
            def startTests(self): self.calls += 1
 
1005
            def report_test_start(self, test): pass
1005
1006
        result = InstrumentedTestResult(None, None, None, None)
1006
1007
        def test_function():
1007
1008
            pass
1009
1010
        test.run(result)
1010
1011
        self.assertEquals(1, result.calls)
1011
1012
 
1012
 
    def test_startTests_only_once(self):
1013
 
        """With multiple tests startTests should still only be called once"""
1014
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
1015
 
            calls = 0
1016
 
            def startTests(self): self.calls += 1
1017
 
        result = InstrumentedTestResult(None, None, None, None)
1018
 
        suite = unittest.TestSuite([
1019
 
            unittest.FunctionTestCase(lambda: None),
1020
 
            unittest.FunctionTestCase(lambda: None)])
1021
 
        suite.run(result)
1022
 
        self.assertEquals(1, result.calls)
1023
 
        self.assertEquals(2, result.count)
1024
 
 
1025
1013
 
1026
1014
class TestUnicodeFilenameFeature(tests.TestCase):
1027
1015
 
1048
1036
        because of our use of global state.
1049
1037
        """
1050
1038
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1039
        old_leak = tests.TestCase._first_thread_leaker_id
1051
1040
        try:
1052
1041
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1042
            tests.TestCase._first_thread_leaker_id = None
1053
1043
            return testrunner.run(test)
1054
1044
        finally:
1055
1045
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1046
            tests.TestCase._first_thread_leaker_id = old_leak
1056
1047
 
1057
1048
    def test_known_failure_failed_run(self):
1058
1049
        # run a test that generates a known failure which should be printed in
1236
1227
        self.assertContainsRe(output_string, "--date [0-9.]+")
1237
1228
        self.assertLength(1, self._get_source_tree_calls)
1238
1229
 
1239
 
    def test_verbose_test_count(self):
1240
 
        """A verbose test run reports the right test count at the start"""
1241
 
        suite = TestUtil.TestSuite([
1242
 
            unittest.FunctionTestCase(lambda:None),
1243
 
            unittest.FunctionTestCase(lambda:None)])
1244
 
        self.assertEqual(suite.countTestCases(), 2)
1245
 
        stream = StringIO()
1246
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
 
        # Need to use the CountingDecorator as that's what sets num_tests
1248
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1250
 
 
1251
1230
    def test_startTestRun(self):
1252
1231
        """run should call result.startTestRun()"""
1253
1232
        calls = []
1690
1669
        self.assertEqual('original', obj.test_attr)
1691
1670
 
1692
1671
 
1693
 
class _MissingFeature(tests.Feature):
1694
 
    def _probe(self):
1695
 
        return False
1696
 
missing_feature = _MissingFeature()
1697
 
 
1698
 
 
1699
 
def _get_test(name):
1700
 
    """Get an instance of a specific example test.
1701
 
 
1702
 
    We protect this in a function so that they don't auto-run in the test
1703
 
    suite.
1704
 
    """
1705
 
 
1706
 
    class ExampleTests(tests.TestCase):
1707
 
 
1708
 
        def test_fail(self):
1709
 
            mutter('this was a failing test')
1710
 
            self.fail('this test will fail')
1711
 
 
1712
 
        def test_error(self):
1713
 
            mutter('this test errored')
1714
 
            raise RuntimeError('gotcha')
1715
 
 
1716
 
        def test_missing_feature(self):
1717
 
            mutter('missing the feature')
1718
 
            self.requireFeature(missing_feature)
1719
 
 
1720
 
        def test_skip(self):
1721
 
            mutter('this test will be skipped')
1722
 
            raise tests.TestSkipped('reason')
1723
 
 
1724
 
        def test_success(self):
1725
 
            mutter('this test succeeds')
1726
 
 
1727
 
        def test_xfail(self):
1728
 
            mutter('test with expected failure')
1729
 
            self.knownFailure('this_fails')
1730
 
 
1731
 
        def test_unexpected_success(self):
1732
 
            mutter('test with unexpected success')
1733
 
            self.expectFailure('should_fail', lambda: None)
1734
 
 
1735
 
    return ExampleTests(name)
1736
 
 
1737
 
 
1738
 
class TestTestCaseLogDetails(tests.TestCase):
1739
 
 
1740
 
    def _run_test(self, test_name):
1741
 
        test = _get_test(test_name)
1742
 
        result = testtools.TestResult()
1743
 
        test.run(result)
1744
 
        return result
1745
 
 
1746
 
    def test_fail_has_log(self):
1747
 
        result = self._run_test('test_fail')
1748
 
        self.assertEqual(1, len(result.failures))
1749
 
        result_content = result.failures[0][1]
1750
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1751
 
        self.assertContainsRe(result_content, 'this was a failing test')
1752
 
 
1753
 
    def test_error_has_log(self):
1754
 
        result = self._run_test('test_error')
1755
 
        self.assertEqual(1, len(result.errors))
1756
 
        result_content = result.errors[0][1]
1757
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1758
 
        self.assertContainsRe(result_content, 'this test errored')
1759
 
 
1760
 
    def test_skip_has_no_log(self):
1761
 
        result = self._run_test('test_skip')
1762
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1763
 
        skips = result.skip_reasons['reason']
1764
 
        self.assertEqual(1, len(skips))
1765
 
        test = skips[0]
1766
 
        self.assertFalse('log' in test.getDetails())
1767
 
 
1768
 
    def test_missing_feature_has_no_log(self):
1769
 
        # testtools doesn't know about addNotSupported, so it just gets
1770
 
        # considered as a skip
1771
 
        result = self._run_test('test_missing_feature')
1772
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1773
 
        skips = result.skip_reasons[missing_feature]
1774
 
        self.assertEqual(1, len(skips))
1775
 
        test = skips[0]
1776
 
        self.assertFalse('log' in test.getDetails())
1777
 
 
1778
 
    def test_xfail_has_no_log(self):
1779
 
        result = self._run_test('test_xfail')
1780
 
        self.assertEqual(1, len(result.expectedFailures))
1781
 
        result_content = result.expectedFailures[0][1]
1782
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1783
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1784
 
 
1785
 
    def test_unexpected_success_has_log(self):
1786
 
        result = self._run_test('test_unexpected_success')
1787
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1788
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1789
 
        # expectedFailures is a list of reasons?
1790
 
        test = result.unexpectedSuccesses[0]
1791
 
        details = test.getDetails()
1792
 
        self.assertTrue('log' in details)
1793
 
 
1794
 
 
1795
 
class TestTestCloning(tests.TestCase):
1796
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1797
 
 
1798
 
    def test_cloned_testcase_does_not_share_details(self):
1799
 
        """A TestCase cloned with clone_test does not share mutable attributes
1800
 
        such as details or cleanups.
1801
 
        """
1802
 
        class Test(tests.TestCase):
1803
 
            def test_foo(self):
1804
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
 
        orig_test = Test('test_foo')
1806
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
 
        orig_test.run(unittest.TestResult())
1808
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1810
 
 
1811
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1812
 
        """Applying two levels of scenarios to a test preserves the attributes
1813
 
        added by both scenarios.
1814
 
        """
1815
 
        class Test(tests.TestCase):
1816
 
            def test_foo(self):
1817
 
                pass
1818
 
        test = Test('test_foo')
1819
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
 
        all_tests = list(tests.iter_suite_tests(suite))
1824
 
        self.assertLength(4, all_tests)
1825
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1826
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1827
 
 
1828
 
 
1829
1672
# NB: Don't delete this; it's not actually from 0.11!
1830
1673
@deprecated_function(deprecated_in((0, 11, 0)))
1831
1674
def sample_deprecated_function():
1985
1828
                tree.branch.repository.bzrdir.root_transport)
1986
1829
 
1987
1830
 
1988
 
class SelfTestHelper(object):
 
1831
class SelfTestHelper:
1989
1832
 
1990
1833
    def run_selftest(self, **kwargs):
1991
1834
        """Run selftest returning its output."""
2142
1985
            load_list='missing file name', list_only=True)
2143
1986
 
2144
1987
 
2145
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2146
 
 
2147
 
    _test_needs_features = [features.subunit]
2148
 
 
2149
 
    def run_subunit_stream(self, test_name):
2150
 
        from subunit import ProtocolTestCase
2151
 
        def factory():
2152
 
            return TestUtil.TestSuite([_get_test(test_name)])
2153
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2154
 
            test_suite_factory=factory)
2155
 
        test = ProtocolTestCase(stream)
2156
 
        result = testtools.TestResult()
2157
 
        test.run(result)
2158
 
        content = stream.getvalue()
2159
 
        return content, result
2160
 
 
2161
 
    def test_fail_has_log(self):
2162
 
        content, result = self.run_subunit_stream('test_fail')
2163
 
        self.assertEqual(1, len(result.failures))
2164
 
        self.assertContainsRe(content, '(?m)^log$')
2165
 
        self.assertContainsRe(content, 'this test will fail')
2166
 
 
2167
 
    def test_error_has_log(self):
2168
 
        content, result = self.run_subunit_stream('test_error')
2169
 
        self.assertContainsRe(content, '(?m)^log$')
2170
 
        self.assertContainsRe(content, 'this test errored')
2171
 
 
2172
 
    def test_skip_has_no_log(self):
2173
 
        content, result = self.run_subunit_stream('test_skip')
2174
 
        self.assertNotContainsRe(content, '(?m)^log$')
2175
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2176
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2177
 
        skips = result.skip_reasons['reason']
2178
 
        self.assertEqual(1, len(skips))
2179
 
        test = skips[0]
2180
 
        # RemotedTestCase doesn't preserve the "details"
2181
 
        ## self.assertFalse('log' in test.getDetails())
2182
 
 
2183
 
    def test_missing_feature_has_no_log(self):
2184
 
        content, result = self.run_subunit_stream('test_missing_feature')
2185
 
        self.assertNotContainsRe(content, '(?m)^log$')
2186
 
        self.assertNotContainsRe(content, 'missing the feature')
2187
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2188
 
        skips = result.skip_reasons['_MissingFeature\n']
2189
 
        self.assertEqual(1, len(skips))
2190
 
        test = skips[0]
2191
 
        # RemotedTestCase doesn't preserve the "details"
2192
 
        ## self.assertFalse('log' in test.getDetails())
2193
 
 
2194
 
    def test_xfail_has_no_log(self):
2195
 
        content, result = self.run_subunit_stream('test_xfail')
2196
 
        self.assertNotContainsRe(content, '(?m)^log$')
2197
 
        self.assertNotContainsRe(content, 'test with expected failure')
2198
 
        self.assertEqual(1, len(result.expectedFailures))
2199
 
        result_content = result.expectedFailures[0][1]
2200
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2201
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2202
 
 
2203
 
    def test_unexpected_success_has_log(self):
2204
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2205
 
        self.assertContainsRe(content, '(?m)^log$')
2206
 
        self.assertContainsRe(content, 'test with unexpected success')
2207
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2208
 
                           ' as a plain success',
2209
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2211
 
        test = result.unexpectedSuccesses[0]
2212
 
        # RemotedTestCase doesn't preserve the "details"
2213
 
        ## self.assertTrue('log' in test.getDetails())
2214
 
 
2215
 
    def test_success_has_no_log(self):
2216
 
        content, result = self.run_subunit_stream('test_success')
2217
 
        self.assertEqual(1, result.testsRun)
2218
 
        self.assertNotContainsRe(content, '(?m)^log$')
2219
 
        self.assertNotContainsRe(content, 'this test succeeds')
2220
 
 
2221
 
 
2222
1988
class TestRunBzr(tests.TestCase):
2223
1989
 
2224
1990
    out = ''
3213
2979
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3214
2980
 
3215
2981
 
3216
 
class TestThreadLeakDetection(tests.TestCase):
3217
 
    """Ensure when tests leak threads we detect and report it"""
3218
 
 
3219
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3220
 
        def __init__(self):
3221
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3222
 
            self.leaks = []
3223
 
        def _report_thread_leak(self, test, leaks, alive):
3224
 
            self.leaks.append((test, leaks))
3225
 
 
3226
 
    def test_testcase_without_addCleanups(self):
3227
 
        """Check old TestCase instances don't break with leak detection"""
3228
 
        class Test(unittest.TestCase):
3229
 
            def runTest(self):
3230
 
                pass
3231
 
            addCleanup = None # for when on Python 2.7 with native addCleanup
3232
 
        result = self.LeakRecordingResult()
3233
 
        test = Test()
3234
 
        self.assertIs(getattr(test, "addCleanup", None), None)
3235
 
        result.startTestRun()
3236
 
        test.run(result)
3237
 
        result.stopTestRun()
3238
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3239
 
        self.assertEqual(result.leaks, [])
3240
 
        
3241
 
    def test_thread_leak(self):
3242
 
        """Ensure a thread that outlives the running of a test is reported
3243
 
 
3244
 
        Uses a thread that blocks on an event, and is started by the inner
3245
 
        test case. As the thread outlives the inner case's run, it should be
3246
 
        detected as a leak, but the event is then set so that the thread can
3247
 
        be safely joined in cleanup so it's not leaked for real.
3248
 
        """
3249
 
        event = threading.Event()
3250
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3251
 
        class Test(tests.TestCase):
3252
 
            def test_leak(self):
3253
 
                thread.start()
3254
 
        result = self.LeakRecordingResult()
3255
 
        test = Test("test_leak")
3256
 
        self.addCleanup(thread.join)
3257
 
        self.addCleanup(event.set)
3258
 
        result.startTestRun()
3259
 
        test.run(result)
3260
 
        result.stopTestRun()
3261
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3262
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3263
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3264
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3265
 
 
3266
 
    def test_multiple_leaks(self):
3267
 
        """Check multiple leaks are blamed on the test cases at fault
3268
 
 
3269
 
        Same concept as the previous test, but has one inner test method that
3270
 
        leaks two threads, and one that doesn't leak at all.
3271
 
        """
3272
 
        event = threading.Event()
3273
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3274
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3275
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3276
 
        class Test(tests.TestCase):
3277
 
            def test_first_leak(self):
3278
 
                thread_b.start()
3279
 
            def test_second_no_leak(self):
3280
 
                pass
3281
 
            def test_third_leak(self):
3282
 
                thread_c.start()
3283
 
                thread_a.start()
3284
 
        result = self.LeakRecordingResult()
3285
 
        first_test = Test("test_first_leak")
3286
 
        third_test = Test("test_third_leak")
3287
 
        self.addCleanup(thread_a.join)
3288
 
        self.addCleanup(thread_b.join)
3289
 
        self.addCleanup(thread_c.join)
3290
 
        self.addCleanup(event.set)
3291
 
        result.startTestRun()
3292
 
        unittest.TestSuite(
3293
 
            [first_test, Test("test_second_no_leak"), third_test]
3294
 
            ).run(result)
3295
 
        result.stopTestRun()
3296
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3297
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3298
 
        self.assertEqual(result.leaks, [
3299
 
            (first_test, set([thread_b])),
3300
 
            (third_test, set([thread_a, thread_c]))])
3301
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3302
 
 
3303
 
 
3304
2982
class TestRunSuite(tests.TestCase):
3305
2983
 
3306
2984
    def test_runner_class(self):