~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Parth Malwankar
  • Date: 2010-09-24 12:53:00 UTC
  • mfrom: (5443 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5444.
  • Revision ID: parth.malwankar@gmail.com-20100924125300-70sg2t3q03bcuqpm
merged trunk and moved NEWS entry to correct section

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
29
31
from testtools.content_type import ContentType
30
32
from testtools.matchers import (
31
33
    DocTestMatches,
42
44
    lockdir,
43
45
    memorytree,
44
46
    osutils,
45
 
    progress,
46
47
    remote,
47
48
    repository,
48
49
    symbol_versioning,
325
326
        from bzrlib.tests.per_interrepository import make_scenarios
326
327
        server1 = "a"
327
328
        server2 = "b"
328
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
329
330
        scenarios = make_scenarios(server1, server2, formats)
330
331
        self.assertEqual([
331
332
            ('C0,str,str',
332
333
             {'repository_format': 'C1',
333
334
              'repository_format_to': 'C2',
334
335
              'transport_readonly_server': 'b',
335
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
336
338
            ('D0,str,str',
337
339
             {'repository_format': 'D1',
338
340
              'repository_format_to': 'D2',
339
341
              'transport_readonly_server': 'b',
340
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
341
344
            scenarios)
342
345
 
343
346
 
850
853
        """A KnownFailure being raised should trigger several result actions."""
851
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
852
855
            def stopTestRun(self): pass
853
 
            def startTests(self): pass
854
 
            def report_test_start(self, test): pass
 
856
            def report_tests_starting(self): pass
855
857
            def report_known_failure(self, test, err=None, details=None):
856
858
                self._call = test, 'known failure'
857
859
        result = InstrumentedTestResult(None, None, None, None)
907
909
        """Test the behaviour of invoking addNotSupported."""
908
910
        class InstrumentedTestResult(tests.ExtendedTestResult):
909
911
            def stopTestRun(self): pass
910
 
            def startTests(self): pass
911
 
            def report_test_start(self, test): pass
 
912
            def report_tests_starting(self): pass
912
913
            def report_unsupported(self, test, feature):
913
914
                self._call = test, feature
914
915
        result = InstrumentedTestResult(None, None, None, None)
953
954
        """An UnavailableFeature being raised should invoke addNotSupported."""
954
955
        class InstrumentedTestResult(tests.ExtendedTestResult):
955
956
            def stopTestRun(self): pass
956
 
            def startTests(self): pass
957
 
            def report_test_start(self, test): pass
 
957
            def report_tests_starting(self): pass
958
958
            def addNotSupported(self, test, feature):
959
959
                self._call = test, feature
960
960
        result = InstrumentedTestResult(None, None, None, None)
1002
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
1003
1003
            calls = 0
1004
1004
            def startTests(self): self.calls += 1
1005
 
            def report_test_start(self, test): pass
1006
1005
        result = InstrumentedTestResult(None, None, None, None)
1007
1006
        def test_function():
1008
1007
            pass
1010
1009
        test.run(result)
1011
1010
        self.assertEquals(1, result.calls)
1012
1011
 
 
1012
    def test_startTests_only_once(self):
 
1013
        """With multiple tests startTests should still only be called once"""
 
1014
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1015
            calls = 0
 
1016
            def startTests(self): self.calls += 1
 
1017
        result = InstrumentedTestResult(None, None, None, None)
 
1018
        suite = unittest.TestSuite([
 
1019
            unittest.FunctionTestCase(lambda: None),
 
1020
            unittest.FunctionTestCase(lambda: None)])
 
1021
        suite.run(result)
 
1022
        self.assertEquals(1, result.calls)
 
1023
        self.assertEquals(2, result.count)
 
1024
 
1013
1025
 
1014
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1015
1027
 
1036
1048
        because of our use of global state.
1037
1049
        """
1038
1050
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1039
 
        old_leak = tests.TestCase._first_thread_leaker_id
1040
1051
        try:
1041
1052
            tests.TestCaseInTempDir.TEST_ROOT = None
1042
 
            tests.TestCase._first_thread_leaker_id = None
1043
1053
            return testrunner.run(test)
1044
1054
        finally:
1045
1055
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1046
 
            tests.TestCase._first_thread_leaker_id = old_leak
1047
1056
 
1048
1057
    def test_known_failure_failed_run(self):
1049
1058
        # run a test that generates a known failure which should be printed in
1227
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
1228
1237
        self.assertLength(1, self._get_source_tree_calls)
1229
1238
 
 
1239
    def test_verbose_test_count(self):
 
1240
        """A verbose test run reports the right test count at the start"""
 
1241
        suite = TestUtil.TestSuite([
 
1242
            unittest.FunctionTestCase(lambda:None),
 
1243
            unittest.FunctionTestCase(lambda:None)])
 
1244
        self.assertEqual(suite.countTestCases(), 2)
 
1245
        stream = StringIO()
 
1246
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1247
        # Need to use the CountingDecorator as that's what sets num_tests
 
1248
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1249
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1250
 
1230
1251
    def test_startTestRun(self):
1231
1252
        """run should call result.startTestRun()"""
1232
1253
        calls = []
1669
1690
        self.assertEqual('original', obj.test_attr)
1670
1691
 
1671
1692
 
 
1693
class TestTestCloning(tests.TestCase):
 
1694
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1695
 
 
1696
    def test_cloned_testcase_does_not_share_details(self):
 
1697
        """A TestCase cloned with clone_test does not share mutable attributes
 
1698
        such as details or cleanups.
 
1699
        """
 
1700
        class Test(tests.TestCase):
 
1701
            def test_foo(self):
 
1702
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1703
        orig_test = Test('test_foo')
 
1704
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1705
        orig_test.run(unittest.TestResult())
 
1706
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1707
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1708
 
 
1709
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1710
        """Applying two levels of scenarios to a test preserves the attributes
 
1711
        added by both scenarios.
 
1712
        """
 
1713
        class Test(tests.TestCase):
 
1714
            def test_foo(self):
 
1715
                pass
 
1716
        test = Test('test_foo')
 
1717
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1718
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1719
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1720
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1721
        all_tests = list(tests.iter_suite_tests(suite))
 
1722
        self.assertLength(4, all_tests)
 
1723
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1724
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1725
 
 
1726
 
1672
1727
# NB: Don't delete this; it's not actually from 0.11!
1673
1728
@deprecated_function(deprecated_in((0, 11, 0)))
1674
1729
def sample_deprecated_function():
2979
3034
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2980
3035
 
2981
3036
 
 
3037
class TestThreadLeakDetection(tests.TestCase):
 
3038
    """Ensure when tests leak threads we detect and report it"""
 
3039
 
 
3040
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3041
        def __init__(self):
 
3042
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3043
            self.leaks = []
 
3044
        def _report_thread_leak(self, test, leaks, alive):
 
3045
            self.leaks.append((test, leaks))
 
3046
 
 
3047
    def test_testcase_without_addCleanups(self):
 
3048
        """Check old TestCase instances don't break with leak detection"""
 
3049
        class Test(unittest.TestCase):
 
3050
            def runTest(self):
 
3051
                pass
 
3052
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3053
        result = self.LeakRecordingResult()
 
3054
        test = Test()
 
3055
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3056
        result.startTestRun()
 
3057
        test.run(result)
 
3058
        result.stopTestRun()
 
3059
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3060
        self.assertEqual(result.leaks, [])
 
3061
        
 
3062
    def test_thread_leak(self):
 
3063
        """Ensure a thread that outlives the running of a test is reported
 
3064
 
 
3065
        Uses a thread that blocks on an event, and is started by the inner
 
3066
        test case. As the thread outlives the inner case's run, it should be
 
3067
        detected as a leak, but the event is then set so that the thread can
 
3068
        be safely joined in cleanup so it's not leaked for real.
 
3069
        """
 
3070
        event = threading.Event()
 
3071
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3072
        class Test(tests.TestCase):
 
3073
            def test_leak(self):
 
3074
                thread.start()
 
3075
        result = self.LeakRecordingResult()
 
3076
        test = Test("test_leak")
 
3077
        self.addCleanup(thread.join)
 
3078
        self.addCleanup(event.set)
 
3079
        result.startTestRun()
 
3080
        test.run(result)
 
3081
        result.stopTestRun()
 
3082
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3083
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3084
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3085
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3086
 
 
3087
    def test_multiple_leaks(self):
 
3088
        """Check multiple leaks are blamed on the test cases at fault
 
3089
 
 
3090
        Same concept as the previous test, but has one inner test method that
 
3091
        leaks two threads, and one that doesn't leak at all.
 
3092
        """
 
3093
        event = threading.Event()
 
3094
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3095
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3096
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3097
        class Test(tests.TestCase):
 
3098
            def test_first_leak(self):
 
3099
                thread_b.start()
 
3100
            def test_second_no_leak(self):
 
3101
                pass
 
3102
            def test_third_leak(self):
 
3103
                thread_c.start()
 
3104
                thread_a.start()
 
3105
        result = self.LeakRecordingResult()
 
3106
        first_test = Test("test_first_leak")
 
3107
        third_test = Test("test_third_leak")
 
3108
        self.addCleanup(thread_a.join)
 
3109
        self.addCleanup(thread_b.join)
 
3110
        self.addCleanup(thread_c.join)
 
3111
        self.addCleanup(event.set)
 
3112
        result.startTestRun()
 
3113
        unittest.TestSuite(
 
3114
            [first_test, Test("test_second_no_leak"), third_test]
 
3115
            ).run(result)
 
3116
        result.stopTestRun()
 
3117
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3118
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3119
        self.assertEqual(result.leaks, [
 
3120
            (first_test, set([thread_b])),
 
3121
            (third_test, set([thread_a, thread_c]))])
 
3122
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3123
 
 
3124
 
2982
3125
class TestRunSuite(tests.TestCase):
2983
3126
 
2984
3127
    def test_runner_class(self):