~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: John Arbash Meinel
  • Date: 2010-08-30 21:23:49 UTC
  • mto: This revision was merged to the branch mainline in revision 5398.
  • Revision ID: john@arbash-meinel.com-20100830212349-figt9yz2cic6hy68
Remove the 'false' invocation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
24
 
import threading
25
24
import time
26
25
import unittest
27
26
import warnings
28
27
 
29
28
from testtools import MultiTestResult
30
 
from testtools.content import Content
31
29
from testtools.content_type import ContentType
32
30
from testtools.matchers import (
33
31
    DocTestMatches,
44
42
    lockdir,
45
43
    memorytree,
46
44
    osutils,
 
45
    progress,
47
46
    remote,
48
47
    repository,
49
48
    symbol_versioning,
68
67
    test_sftp_transport,
69
68
    TestUtil,
70
69
    )
71
 
from bzrlib.trace import note, mutter
 
70
from bzrlib.trace import note
72
71
from bzrlib.transport import memory
73
72
from bzrlib.version import _get_bzr_source_tree
74
73
 
123
122
        self.failUnlessExists(filename)
124
123
 
125
124
 
126
 
class TestClassesAvailable(tests.TestCase):
127
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
128
 
 
129
 
    def test_test_case(self):
130
 
        from bzrlib.tests import TestCase
131
 
 
132
 
    def test_test_loader(self):
133
 
        from bzrlib.tests import TestLoader
134
 
 
135
 
    def test_test_suite(self):
136
 
        from bzrlib.tests import TestSuite
137
 
 
138
 
 
139
125
class TestTransportScenarios(tests.TestCase):
140
126
    """A group of tests that test the transport implementation adaption core.
141
127
 
326
312
        from bzrlib.tests.per_interrepository import make_scenarios
327
313
        server1 = "a"
328
314
        server2 = "b"
329
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
315
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
330
316
        scenarios = make_scenarios(server1, server2, formats)
331
317
        self.assertEqual([
332
318
            ('C0,str,str',
333
319
             {'repository_format': 'C1',
334
320
              'repository_format_to': 'C2',
335
321
              'transport_readonly_server': 'b',
336
 
              'transport_server': 'a',
337
 
              'extra_setup': 'C3'}),
 
322
              'transport_server': 'a'}),
338
323
            ('D0,str,str',
339
324
             {'repository_format': 'D1',
340
325
              'repository_format_to': 'D2',
341
326
              'transport_readonly_server': 'b',
342
 
              'transport_server': 'a',
343
 
              'extra_setup': 'D3'})],
 
327
              'transport_server': 'a'})],
344
328
            scenarios)
345
329
 
346
330
 
627
611
        result = test.run()
628
612
        total_failures = result.errors + result.failures
629
613
        if self._lock_check_thorough:
630
 
            self.assertEqual(1, len(total_failures))
 
614
            self.assertLength(1, total_failures)
631
615
        else:
632
616
            # When _lock_check_thorough is disabled, then we don't trigger a
633
617
            # failure
634
 
            self.assertEqual(0, len(total_failures))
 
618
            self.assertLength(0, total_failures)
635
619
 
636
620
 
637
621
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
853
837
        """A KnownFailure being raised should trigger several result actions."""
854
838
        class InstrumentedTestResult(tests.ExtendedTestResult):
855
839
            def stopTestRun(self): pass
856
 
            def report_tests_starting(self): pass
 
840
            def startTests(self): pass
 
841
            def report_test_start(self, test): pass
857
842
            def report_known_failure(self, test, err=None, details=None):
858
843
                self._call = test, 'known failure'
859
844
        result = InstrumentedTestResult(None, None, None, None)
909
894
        """Test the behaviour of invoking addNotSupported."""
910
895
        class InstrumentedTestResult(tests.ExtendedTestResult):
911
896
            def stopTestRun(self): pass
912
 
            def report_tests_starting(self): pass
 
897
            def startTests(self): pass
 
898
            def report_test_start(self, test): pass
913
899
            def report_unsupported(self, test, feature):
914
900
                self._call = test, feature
915
901
        result = InstrumentedTestResult(None, None, None, None)
954
940
        """An UnavailableFeature being raised should invoke addNotSupported."""
955
941
        class InstrumentedTestResult(tests.ExtendedTestResult):
956
942
            def stopTestRun(self): pass
957
 
            def report_tests_starting(self): pass
 
943
            def startTests(self): pass
 
944
            def report_test_start(self, test): pass
958
945
            def addNotSupported(self, test, feature):
959
946
                self._call = test, feature
960
947
        result = InstrumentedTestResult(None, None, None, None)
1002
989
        class InstrumentedTestResult(tests.ExtendedTestResult):
1003
990
            calls = 0
1004
991
            def startTests(self): self.calls += 1
 
992
            def report_test_start(self, test): pass
1005
993
        result = InstrumentedTestResult(None, None, None, None)
1006
994
        def test_function():
1007
995
            pass
1009
997
        test.run(result)
1010
998
        self.assertEquals(1, result.calls)
1011
999
 
1012
 
    def test_startTests_only_once(self):
1013
 
        """With multiple tests startTests should still only be called once"""
1014
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
1015
 
            calls = 0
1016
 
            def startTests(self): self.calls += 1
1017
 
        result = InstrumentedTestResult(None, None, None, None)
1018
 
        suite = unittest.TestSuite([
1019
 
            unittest.FunctionTestCase(lambda: None),
1020
 
            unittest.FunctionTestCase(lambda: None)])
1021
 
        suite.run(result)
1022
 
        self.assertEquals(1, result.calls)
1023
 
        self.assertEquals(2, result.count)
1024
 
 
1025
1000
 
1026
1001
class TestUnicodeFilenameFeature(tests.TestCase):
1027
1002
 
1048
1023
        because of our use of global state.
1049
1024
        """
1050
1025
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1026
        old_leak = tests.TestCase._first_thread_leaker_id
1051
1027
        try:
1052
1028
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1029
            tests.TestCase._first_thread_leaker_id = None
1053
1030
            return testrunner.run(test)
1054
1031
        finally:
1055
1032
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1033
            tests.TestCase._first_thread_leaker_id = old_leak
1056
1034
 
1057
1035
    def test_known_failure_failed_run(self):
1058
1036
        # run a test that generates a known failure which should be printed in
1236
1214
        self.assertContainsRe(output_string, "--date [0-9.]+")
1237
1215
        self.assertLength(1, self._get_source_tree_calls)
1238
1216
 
1239
 
    def test_verbose_test_count(self):
1240
 
        """A verbose test run reports the right test count at the start"""
1241
 
        suite = TestUtil.TestSuite([
1242
 
            unittest.FunctionTestCase(lambda:None),
1243
 
            unittest.FunctionTestCase(lambda:None)])
1244
 
        self.assertEqual(suite.countTestCases(), 2)
1245
 
        stream = StringIO()
1246
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
 
        # Need to use the CountingDecorator as that's what sets num_tests
1248
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1250
 
 
1251
1217
    def test_startTestRun(self):
1252
1218
        """run should call result.startTestRun()"""
1253
1219
        calls = []
1690
1656
        self.assertEqual('original', obj.test_attr)
1691
1657
 
1692
1658
 
1693
 
class _MissingFeature(tests.Feature):
1694
 
    def _probe(self):
1695
 
        return False
1696
 
missing_feature = _MissingFeature()
1697
 
 
1698
 
 
1699
 
def _get_test(name):
1700
 
    """Get an instance of a specific example test.
1701
 
 
1702
 
    We protect this in a function so that they don't auto-run in the test
1703
 
    suite.
1704
 
    """
1705
 
 
1706
 
    class ExampleTests(tests.TestCase):
1707
 
 
1708
 
        def test_fail(self):
1709
 
            mutter('this was a failing test')
1710
 
            self.fail('this test will fail')
1711
 
 
1712
 
        def test_error(self):
1713
 
            mutter('this test errored')
1714
 
            raise RuntimeError('gotcha')
1715
 
 
1716
 
        def test_missing_feature(self):
1717
 
            mutter('missing the feature')
1718
 
            self.requireFeature(missing_feature)
1719
 
 
1720
 
        def test_skip(self):
1721
 
            mutter('this test will be skipped')
1722
 
            raise tests.TestSkipped('reason')
1723
 
 
1724
 
        def test_success(self):
1725
 
            mutter('this test succeeds')
1726
 
 
1727
 
        def test_xfail(self):
1728
 
            mutter('test with expected failure')
1729
 
            self.knownFailure('this_fails')
1730
 
 
1731
 
        def test_unexpected_success(self):
1732
 
            mutter('test with unexpected success')
1733
 
            self.expectFailure('should_fail', lambda: None)
1734
 
 
1735
 
    return ExampleTests(name)
1736
 
 
1737
 
 
1738
 
class TestTestCaseLogDetails(tests.TestCase):
1739
 
 
1740
 
    def _run_test(self, test_name):
1741
 
        test = _get_test(test_name)
1742
 
        result = testtools.TestResult()
1743
 
        test.run(result)
1744
 
        return result
1745
 
 
1746
 
    def test_fail_has_log(self):
1747
 
        result = self._run_test('test_fail')
1748
 
        self.assertEqual(1, len(result.failures))
1749
 
        result_content = result.failures[0][1]
1750
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1751
 
        self.assertContainsRe(result_content, 'this was a failing test')
1752
 
 
1753
 
    def test_error_has_log(self):
1754
 
        result = self._run_test('test_error')
1755
 
        self.assertEqual(1, len(result.errors))
1756
 
        result_content = result.errors[0][1]
1757
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1758
 
        self.assertContainsRe(result_content, 'this test errored')
1759
 
 
1760
 
    def test_skip_has_no_log(self):
1761
 
        result = self._run_test('test_skip')
1762
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1763
 
        skips = result.skip_reasons['reason']
1764
 
        self.assertEqual(1, len(skips))
1765
 
        test = skips[0]
1766
 
        self.assertFalse('log' in test.getDetails())
1767
 
 
1768
 
    def test_missing_feature_has_no_log(self):
1769
 
        # testtools doesn't know about addNotSupported, so it just gets
1770
 
        # considered as a skip
1771
 
        result = self._run_test('test_missing_feature')
1772
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1773
 
        skips = result.skip_reasons[missing_feature]
1774
 
        self.assertEqual(1, len(skips))
1775
 
        test = skips[0]
1776
 
        self.assertFalse('log' in test.getDetails())
1777
 
 
1778
 
    def test_xfail_has_no_log(self):
1779
 
        result = self._run_test('test_xfail')
1780
 
        self.assertEqual(1, len(result.expectedFailures))
1781
 
        result_content = result.expectedFailures[0][1]
1782
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1783
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1784
 
 
1785
 
    def test_unexpected_success_has_log(self):
1786
 
        result = self._run_test('test_unexpected_success')
1787
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1788
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1789
 
        # expectedFailures is a list of reasons?
1790
 
        test = result.unexpectedSuccesses[0]
1791
 
        details = test.getDetails()
1792
 
        self.assertTrue('log' in details)
1793
 
 
1794
 
 
1795
 
class TestTestCloning(tests.TestCase):
1796
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1797
 
 
1798
 
    def test_cloned_testcase_does_not_share_details(self):
1799
 
        """A TestCase cloned with clone_test does not share mutable attributes
1800
 
        such as details or cleanups.
1801
 
        """
1802
 
        class Test(tests.TestCase):
1803
 
            def test_foo(self):
1804
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
 
        orig_test = Test('test_foo')
1806
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
 
        orig_test.run(unittest.TestResult())
1808
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1810
 
 
1811
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1812
 
        """Applying two levels of scenarios to a test preserves the attributes
1813
 
        added by both scenarios.
1814
 
        """
1815
 
        class Test(tests.TestCase):
1816
 
            def test_foo(self):
1817
 
                pass
1818
 
        test = Test('test_foo')
1819
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
 
        all_tests = list(tests.iter_suite_tests(suite))
1824
 
        self.assertLength(4, all_tests)
1825
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1826
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1827
 
 
1828
 
 
1829
1659
# NB: Don't delete this; it's not actually from 0.11!
1830
1660
@deprecated_function(deprecated_in((0, 11, 0)))
1831
1661
def sample_deprecated_function():
1985
1815
                tree.branch.repository.bzrdir.root_transport)
1986
1816
 
1987
1817
 
1988
 
class SelfTestHelper(object):
 
1818
class SelfTestHelper:
1989
1819
 
1990
1820
    def run_selftest(self, **kwargs):
1991
1821
        """Run selftest returning its output."""
2142
1972
            load_list='missing file name', list_only=True)
2143
1973
 
2144
1974
 
2145
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2146
 
 
2147
 
    _test_needs_features = [features.subunit]
2148
 
 
2149
 
    def run_subunit_stream(self, test_name):
2150
 
        from subunit import ProtocolTestCase
2151
 
        def factory():
2152
 
            return TestUtil.TestSuite([_get_test(test_name)])
2153
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2154
 
            test_suite_factory=factory)
2155
 
        test = ProtocolTestCase(stream)
2156
 
        result = testtools.TestResult()
2157
 
        test.run(result)
2158
 
        content = stream.getvalue()
2159
 
        return content, result
2160
 
 
2161
 
    def test_fail_has_log(self):
2162
 
        content, result = self.run_subunit_stream('test_fail')
2163
 
        self.assertEqual(1, len(result.failures))
2164
 
        self.assertContainsRe(content, '(?m)^log$')
2165
 
        self.assertContainsRe(content, 'this test will fail')
2166
 
 
2167
 
    def test_error_has_log(self):
2168
 
        content, result = self.run_subunit_stream('test_error')
2169
 
        self.assertContainsRe(content, '(?m)^log$')
2170
 
        self.assertContainsRe(content, 'this test errored')
2171
 
 
2172
 
    def test_skip_has_no_log(self):
2173
 
        content, result = self.run_subunit_stream('test_skip')
2174
 
        self.assertNotContainsRe(content, '(?m)^log$')
2175
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2176
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2177
 
        skips = result.skip_reasons['reason']
2178
 
        self.assertEqual(1, len(skips))
2179
 
        test = skips[0]
2180
 
        # RemotedTestCase doesn't preserve the "details"
2181
 
        ## self.assertFalse('log' in test.getDetails())
2182
 
 
2183
 
    def test_missing_feature_has_no_log(self):
2184
 
        content, result = self.run_subunit_stream('test_missing_feature')
2185
 
        self.assertNotContainsRe(content, '(?m)^log$')
2186
 
        self.assertNotContainsRe(content, 'missing the feature')
2187
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2188
 
        skips = result.skip_reasons['_MissingFeature\n']
2189
 
        self.assertEqual(1, len(skips))
2190
 
        test = skips[0]
2191
 
        # RemotedTestCase doesn't preserve the "details"
2192
 
        ## self.assertFalse('log' in test.getDetails())
2193
 
 
2194
 
    def test_xfail_has_no_log(self):
2195
 
        content, result = self.run_subunit_stream('test_xfail')
2196
 
        self.assertNotContainsRe(content, '(?m)^log$')
2197
 
        self.assertNotContainsRe(content, 'test with expected failure')
2198
 
        self.assertEqual(1, len(result.expectedFailures))
2199
 
        result_content = result.expectedFailures[0][1]
2200
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2201
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2202
 
 
2203
 
    def test_unexpected_success_has_log(self):
2204
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2205
 
        self.assertContainsRe(content, '(?m)^log$')
2206
 
        self.assertContainsRe(content, 'test with unexpected success')
2207
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2208
 
                           ' as a plain success',
2209
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2211
 
        test = result.unexpectedSuccesses[0]
2212
 
        # RemotedTestCase doesn't preserve the "details"
2213
 
        ## self.assertTrue('log' in test.getDetails())
2214
 
 
2215
 
    def test_success_has_no_log(self):
2216
 
        content, result = self.run_subunit_stream('test_success')
2217
 
        self.assertEqual(1, result.testsRun)
2218
 
        self.assertNotContainsRe(content, '(?m)^log$')
2219
 
        self.assertNotContainsRe(content, 'this test succeeds')
2220
 
 
2221
 
 
2222
1975
class TestRunBzr(tests.TestCase):
2223
1976
 
2224
1977
    out = ''
3213
2966
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3214
2967
 
3215
2968
 
3216
 
class TestThreadLeakDetection(tests.TestCase):
3217
 
    """Ensure when tests leak threads we detect and report it"""
3218
 
 
3219
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3220
 
        def __init__(self):
3221
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3222
 
            self.leaks = []
3223
 
        def _report_thread_leak(self, test, leaks, alive):
3224
 
            self.leaks.append((test, leaks))
3225
 
 
3226
 
    def test_testcase_without_addCleanups(self):
3227
 
        """Check old TestCase instances don't break with leak detection"""
3228
 
        class Test(unittest.TestCase):
3229
 
            def runTest(self):
3230
 
                pass
3231
 
            addCleanup = None # for when on Python 2.7 with native addCleanup
3232
 
        result = self.LeakRecordingResult()
3233
 
        test = Test()
3234
 
        self.assertIs(getattr(test, "addCleanup", None), None)
3235
 
        result.startTestRun()
3236
 
        test.run(result)
3237
 
        result.stopTestRun()
3238
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3239
 
        self.assertEqual(result.leaks, [])
3240
 
        
3241
 
    def test_thread_leak(self):
3242
 
        """Ensure a thread that outlives the running of a test is reported
3243
 
 
3244
 
        Uses a thread that blocks on an event, and is started by the inner
3245
 
        test case. As the thread outlives the inner case's run, it should be
3246
 
        detected as a leak, but the event is then set so that the thread can
3247
 
        be safely joined in cleanup so it's not leaked for real.
3248
 
        """
3249
 
        event = threading.Event()
3250
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3251
 
        class Test(tests.TestCase):
3252
 
            def test_leak(self):
3253
 
                thread.start()
3254
 
        result = self.LeakRecordingResult()
3255
 
        test = Test("test_leak")
3256
 
        self.addCleanup(thread.join)
3257
 
        self.addCleanup(event.set)
3258
 
        result.startTestRun()
3259
 
        test.run(result)
3260
 
        result.stopTestRun()
3261
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3262
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3263
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3264
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3265
 
 
3266
 
    def test_multiple_leaks(self):
3267
 
        """Check multiple leaks are blamed on the test cases at fault
3268
 
 
3269
 
        Same concept as the previous test, but has one inner test method that
3270
 
        leaks two threads, and one that doesn't leak at all.
3271
 
        """
3272
 
        event = threading.Event()
3273
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3274
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3275
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3276
 
        class Test(tests.TestCase):
3277
 
            def test_first_leak(self):
3278
 
                thread_b.start()
3279
 
            def test_second_no_leak(self):
3280
 
                pass
3281
 
            def test_third_leak(self):
3282
 
                thread_c.start()
3283
 
                thread_a.start()
3284
 
        result = self.LeakRecordingResult()
3285
 
        first_test = Test("test_first_leak")
3286
 
        third_test = Test("test_third_leak")
3287
 
        self.addCleanup(thread_a.join)
3288
 
        self.addCleanup(thread_b.join)
3289
 
        self.addCleanup(thread_c.join)
3290
 
        self.addCleanup(event.set)
3291
 
        result.startTestRun()
3292
 
        unittest.TestSuite(
3293
 
            [first_test, Test("test_second_no_leak"), third_test]
3294
 
            ).run(result)
3295
 
        result.stopTestRun()
3296
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3297
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3298
 
        self.assertEqual(result.leaks, [
3299
 
            (first_test, set([thread_b])),
3300
 
            (third_test, set([thread_a, thread_c]))])
3301
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3302
 
 
3303
 
 
3304
2969
class TestRunSuite(tests.TestCase):
3305
2970
 
3306
2971
    def test_runner_class(self):