~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2011-06-16 11:58:04 UTC
  • mto: This revision was merged to the branch mainline in revision 5987.
  • Revision ID: jelmer@samba.org-20110616115804-7tnqon61emrbdoxm
RemoveĀ unusedĀ Tree._get_ancestors.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import gc
21
20
import doctest
22
21
import os
23
22
import signal
30
29
from testtools import (
31
30
    ExtendedToOriginalDecorator,
32
31
    MultiTestResult,
33
 
    __version__ as testtools_version,
34
32
    )
35
33
from testtools.content import Content
36
34
from testtools.content_type import ContentType
45
43
    branchbuilder,
46
44
    bzrdir,
47
45
    errors,
48
 
    hooks,
49
46
    lockdir,
50
47
    memorytree,
51
48
    osutils,
95
92
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
96
93
 
97
94
 
 
95
class TestUnicodeFilename(tests.TestCase):
 
96
 
 
97
    def test_probe_passes(self):
 
98
        """UnicodeFilename._probe passes."""
 
99
        # We can't test much more than that because the behaviour depends
 
100
        # on the platform.
 
101
        tests.UnicodeFilename._probe()
 
102
 
 
103
 
98
104
class TestTreeShape(tests.TestCaseInTempDir):
99
105
 
100
106
    def test_unicode_paths(self):
101
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
107
        self.requireFeature(tests.UnicodeFilename)
102
108
 
103
109
        filename = u'hell\u00d8'
104
110
        self.build_tree_contents([(filename, 'contents of hello')])
620
626
    def test_dangling_locks_cause_failures(self):
621
627
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
622
628
            def test_function(self):
623
 
                t = self.get_transport_from_path('.')
 
629
                t = self.get_transport('.')
624
630
                l = lockdir.LockDir(t, 'lock')
625
631
                l.create()
626
632
                l.attempt_lock()
646
652
        # for the server
647
653
        url = self.get_readonly_url()
648
654
        url2 = self.get_readonly_url('foo/bar')
649
 
        t = transport.get_transport_from_url(url)
650
 
        t2 = transport.get_transport_from_url(url2)
 
655
        t = transport.get_transport(url)
 
656
        t2 = transport.get_transport(url2)
651
657
        self.assertIsInstance(t, ReadonlyTransportDecorator)
652
658
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
653
659
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
661
667
        url = self.get_readonly_url()
662
668
        url2 = self.get_readonly_url('foo/bar')
663
669
        # the transport returned may be any HttpTransportBase subclass
664
 
        t = transport.get_transport_from_url(url)
665
 
        t2 = transport.get_transport_from_url(url2)
 
670
        t = transport.get_transport(url)
 
671
        t2 = transport.get_transport(url2)
666
672
        self.assertIsInstance(t, HttpTransportBase)
667
673
        self.assertIsInstance(t2, HttpTransportBase)
668
674
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
706
712
class TestChrootedTest(tests.ChrootedTestCase):
707
713
 
708
714
    def test_root_is_root(self):
709
 
        t = transport.get_transport_from_url(self.get_readonly_url())
 
715
        t = transport.get_transport(self.get_readonly_url())
710
716
        url = t.base
711
717
        self.assertEqual(url, t.clone('..').base)
712
718
 
714
720
class TestProfileResult(tests.TestCase):
715
721
 
716
722
    def test_profiles_tests(self):
717
 
        self.requireFeature(features.lsprof_feature)
 
723
        self.requireFeature(test_lsprof.LSProfFeature)
718
724
        terminal = testtools.testresult.doubles.ExtendedTestResult()
719
725
        result = tests.ProfileResult(terminal)
720
726
        class Sample(tests.TestCase):
775
781
 
776
782
    def test_lsprofiling(self):
777
783
        """Verbose test result prints lsprof statistics from test cases."""
778
 
        self.requireFeature(features.lsprof_feature)
 
784
        self.requireFeature(test_lsprof.LSProfFeature)
779
785
        result_stream = StringIO()
780
786
        result = bzrlib.tests.VerboseTestResult(
781
787
            result_stream,
826
832
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
827
833
 
828
834
    def test_known_failure(self):
829
 
        """Using knownFailure should trigger several result actions."""
 
835
        """A KnownFailure being raised should trigger several result actions."""
830
836
        class InstrumentedTestResult(tests.ExtendedTestResult):
831
837
            def stopTestRun(self): pass
832
838
            def report_tests_starting(self): pass
835
841
        result = InstrumentedTestResult(None, None, None, None)
836
842
        class Test(tests.TestCase):
837
843
            def test_function(self):
838
 
                self.knownFailure('failed!')
 
844
                raise tests.KnownFailure('failed!')
839
845
        test = Test("test_function")
840
846
        test.run(result)
841
847
        # it should invoke 'report_known_failure'.
857
863
            descriptions=0,
858
864
            verbosity=2,
859
865
            )
860
 
        _get_test("test_xfail").run(result)
861
 
        self.assertContainsRe(result_stream.getvalue(),
862
 
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
863
 
            "\\s*(?:Text attachment: )?reason"
864
 
            "(?:\n-+\n|: {{{)"
865
 
            "this_fails"
866
 
            "(?:\n-+\n|}}}\n)")
 
866
        test = self.get_passing_test()
 
867
        result.startTest(test)
 
868
        prefix = len(result_stream.getvalue())
 
869
        # the err parameter has the shape:
 
870
        # (class, exception object, traceback)
 
871
        # KnownFailures dont get their tracebacks shown though, so we
 
872
        # can skip that.
 
873
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
874
        result.report_known_failure(test, err)
 
875
        output = result_stream.getvalue()[prefix:]
 
876
        lines = output.splitlines()
 
877
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
878
        if sys.version_info > (2, 7):
 
879
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
880
                self.assertNotEqual, lines[1], '    ')
 
881
        self.assertEqual(lines[1], '    foo')
 
882
        self.assertEqual(2, len(lines))
867
883
 
868
884
    def get_passing_test(self):
869
885
        """Return a test object that can't be run usefully."""
880
896
                self._call = test, feature
881
897
        result = InstrumentedTestResult(None, None, None, None)
882
898
        test = SampleTestCase('_test_pass')
883
 
        feature = features.Feature()
 
899
        feature = tests.Feature()
884
900
        result.startTest(test)
885
901
        result.addNotSupported(test, feature)
886
902
        # it should invoke 'report_unsupported'.
905
921
            verbosity=2,
906
922
            )
907
923
        test = self.get_passing_test()
908
 
        feature = features.Feature()
 
924
        feature = tests.Feature()
909
925
        result.startTest(test)
910
926
        prefix = len(result_stream.getvalue())
911
927
        result.report_unsupported(test, feature)
924
940
            def addNotSupported(self, test, feature):
925
941
                self._call = test, feature
926
942
        result = InstrumentedTestResult(None, None, None, None)
927
 
        feature = features.Feature()
 
943
        feature = tests.Feature()
928
944
        class Test(tests.TestCase):
929
945
            def test_function(self):
930
946
                raise tests.UnavailableFeature(feature)
949
965
    def test_strict_with_known_failure(self):
950
966
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
951
967
                                             verbosity=1)
952
 
        test = _get_test("test_xfail")
953
 
        test.run(result)
 
968
        test = self.get_passing_test()
 
969
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
970
        result.addExpectedFailure(test, err)
954
971
        self.assertFalse(result.wasStrictlySuccessful())
955
972
        self.assertEqual(None, result._extractBenchmarkTime(test))
956
973
 
988
1005
        self.assertEquals(2, result.count)
989
1006
 
990
1007
 
 
1008
class TestUnicodeFilenameFeature(tests.TestCase):
 
1009
 
 
1010
    def test_probe_passes(self):
 
1011
        """UnicodeFilenameFeature._probe passes."""
 
1012
        # We can't test much more than that because the behaviour depends
 
1013
        # on the platform.
 
1014
        tests.UnicodeFilenameFeature._probe()
 
1015
 
 
1016
 
991
1017
class TestRunner(tests.TestCase):
992
1018
 
993
1019
    def dummy_test(self):
1063
1089
                self.expectFailure("No absolute truth", self.assertTrue, True)
1064
1090
        runner = tests.TextTestRunner(stream=StringIO())
1065
1091
        result = self.run_test_runner(runner, Test("test_truth"))
1066
 
        if testtools_version[:3] <= (0, 9, 11):
1067
 
            self.assertContainsRe(runner.stream.getvalue(),
1068
 
                "=+\n"
1069
 
                "FAIL: \\S+\.test_truth\n"
1070
 
                "-+\n"
1071
 
                "(?:.*\n)*"
1072
 
                "No absolute truth\n"
1073
 
                "(?:.*\n)*"
1074
 
                "-+\n"
1075
 
                "Ran 1 test in .*\n"
1076
 
                "\n"
1077
 
                "FAILED \\(failures=1\\)\n\\Z")
1078
 
        else:
1079
 
            self.assertContainsRe(runner.stream.getvalue(),
1080
 
                "=+\n"
1081
 
                "FAIL: \\S+\.test_truth\n"
1082
 
                "-+\n"
1083
 
                "Empty attachments:\n"
1084
 
                "  log\n"
1085
 
                "\n"
1086
 
                "reason: {{{No absolute truth}}}\n"
1087
 
                "-+\n"
1088
 
                "Ran 1 test in .*\n"
1089
 
                "\n"
1090
 
                "FAILED \\(failures=1\\)\n\\Z")
 
1092
        self.assertContainsRe(runner.stream.getvalue(),
 
1093
            "=+\n"
 
1094
            "FAIL: \\S+\.test_truth\n"
 
1095
            "-+\n"
 
1096
            "(?:.*\n)*"
 
1097
            "No absolute truth\n"
 
1098
            "(?:.*\n)*"
 
1099
            "-+\n"
 
1100
            "Ran 1 test in .*\n"
 
1101
            "\n"
 
1102
            "FAILED \\(failures=1\\)\n\\Z")
1091
1103
 
1092
1104
    def test_result_decorator(self):
1093
1105
        # decorate results
1178
1190
 
1179
1191
    def test_unsupported_features_listed(self):
1180
1192
        """When unsupported features are encountered they are detailed."""
1181
 
        class Feature1(features.Feature):
 
1193
        class Feature1(tests.Feature):
1182
1194
            def _probe(self): return False
1183
 
        class Feature2(features.Feature):
 
1195
        class Feature2(tests.Feature):
1184
1196
            def _probe(self): return False
1185
1197
        # create sample tests
1186
1198
        test1 = SampleTestCase('_test_pass')
1252
1264
            lambda trace=False: "ascii")
1253
1265
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1254
1266
            FailureWithUnicode("test_log_unicode"))
1255
 
        if testtools_version[:3] > (0, 9, 11):
1256
 
            self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+  \\\\u2606}}}")
1257
 
        else:
1258
 
            self.assertContainsRe(out.getvalue(),
1259
 
                "Text attachment: log\n"
1260
 
                "-+\n"
1261
 
                "\d+\.\d+  \\\\u2606\n"
1262
 
                "-+\n")
 
1267
        self.assertContainsRe(out.getvalue(),
 
1268
            "Text attachment: log\n"
 
1269
            "-+\n"
 
1270
            "\d+\.\d+  \\\\u2606\n"
 
1271
            "-+\n")
1263
1272
 
1264
1273
 
1265
1274
class SampleTestCase(tests.TestCase):
1466
1475
 
1467
1476
        Each self.time() call is individually and separately profiled.
1468
1477
        """
1469
 
        self.requireFeature(features.lsprof_feature)
 
1478
        self.requireFeature(test_lsprof.LSProfFeature)
1470
1479
        # overrides the class member with an instance member so no cleanup
1471
1480
        # needed.
1472
1481
        self._gather_lsprof_in_benchmarks = True
1491
1500
        transport_server = memory.MemoryServer()
1492
1501
        transport_server.start_server()
1493
1502
        self.addCleanup(transport_server.stop_server)
1494
 
        t = transport.get_transport_from_url(transport_server.get_url())
 
1503
        t = transport.get_transport(transport_server.get_url())
1495
1504
        bzrdir.BzrDir.create(t.base)
1496
1505
        self.assertRaises(errors.BzrError,
1497
1506
            bzrdir.BzrDir.open_from_transport, t)
1502
1511
 
1503
1512
    def test_requireFeature_available(self):
1504
1513
        """self.requireFeature(available) is a no-op."""
1505
 
        class Available(features.Feature):
 
1514
        class Available(tests.Feature):
1506
1515
            def _probe(self):return True
1507
1516
        feature = Available()
1508
1517
        self.requireFeature(feature)
1509
1518
 
1510
1519
    def test_requireFeature_unavailable(self):
1511
1520
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1512
 
        class Unavailable(features.Feature):
 
1521
        class Unavailable(tests.Feature):
1513
1522
            def _probe(self):return False
1514
1523
        feature = Unavailable()
1515
1524
        self.assertRaises(tests.UnavailableFeature,
1674
1683
        test.run(unittest.TestResult())
1675
1684
        self.assertEqual('original', obj.test_attr)
1676
1685
 
1677
 
    def test_recordCalls(self):
1678
 
        from bzrlib.tests import test_selftest
1679
 
        calls = self.recordCalls(
1680
 
            test_selftest, '_add_numbers')
1681
 
        self.assertEqual(test_selftest._add_numbers(2, 10),
1682
 
            12)
1683
 
        self.assertEquals(calls, [((2, 10), {})])
1684
 
 
1685
 
 
1686
 
def _add_numbers(a, b):
1687
 
    return a + b
1688
 
 
1689
 
 
1690
 
class _MissingFeature(features.Feature):
 
1686
 
 
1687
class _MissingFeature(tests.Feature):
1691
1688
    def _probe(self):
1692
1689
        return False
1693
1690
missing_feature = _MissingFeature()
1744
1741
        result = self._run_test('test_fail')
1745
1742
        self.assertEqual(1, len(result.failures))
1746
1743
        result_content = result.failures[0][1]
1747
 
        if testtools_version < (0, 9, 12):
1748
 
            self.assertContainsRe(result_content, 'Text attachment: log')
 
1744
        self.assertContainsRe(result_content, 'Text attachment: log')
1749
1745
        self.assertContainsRe(result_content, 'this was a failing test')
1750
1746
 
1751
1747
    def test_error_has_log(self):
1752
1748
        result = self._run_test('test_error')
1753
1749
        self.assertEqual(1, len(result.errors))
1754
1750
        result_content = result.errors[0][1]
1755
 
        if testtools_version < (0, 9, 12):
1756
 
            self.assertContainsRe(result_content, 'Text attachment: log')
 
1751
        self.assertContainsRe(result_content, 'Text attachment: log')
1757
1752
        self.assertContainsRe(result_content, 'this test errored')
1758
1753
 
1759
1754
    def test_skip_has_no_log(self):
2041
2036
        self.assertLength(2, output.readlines())
2042
2037
 
2043
2038
    def test_lsprof_tests(self):
2044
 
        self.requireFeature(features.lsprof_feature)
 
2039
        self.requireFeature(test_lsprof.LSProfFeature)
2045
2040
        results = []
2046
2041
        class Test(object):
2047
2042
            def __call__(test, result):
2510
2505
 
2511
2506
 
2512
2507
class TestStartBzrSubProcess(tests.TestCase):
2513
 
    """Stub test start_bzr_subprocess."""
2514
2508
 
2515
 
    def _subprocess_log_cleanup(self):
2516
 
        """Inhibits the base version as we don't produce a log file."""
 
2509
    def check_popen_state(self):
 
2510
        """Replace to make assertions when popen is called."""
2517
2511
 
2518
2512
    def _popen(self, *args, **kwargs):
2519
 
        """Override the base version to record the command that is run.
2520
 
 
2521
 
        From there we can ensure it is correct without spawning a real process.
2522
 
        """
 
2513
        """Record the command that is run, so that we can ensure it is correct"""
2523
2514
        self.check_popen_state()
2524
2515
        self._popen_args = args
2525
2516
        self._popen_kwargs = kwargs
2526
2517
        raise _DontSpawnProcess()
2527
2518
 
2528
 
    def check_popen_state(self):
2529
 
        """Replace to make assertions when popen is called."""
2530
 
 
2531
2519
    def test_run_bzr_subprocess_no_plugins(self):
2532
2520
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2533
2521
        command = self._popen_args[0]
2537
2525
 
2538
2526
    def test_allow_plugins(self):
2539
2527
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2540
 
                          allow_plugins=True)
 
2528
            allow_plugins=True)
2541
2529
        command = self._popen_args[0]
2542
2530
        self.assertEqual([], command[2:])
2543
2531
 
2548
2536
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2549
2537
        self.check_popen_state = check_environment
2550
2538
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2551
 
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2539
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
2552
2540
        # not set in theparent
2553
2541
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2554
2542
 
2560
2548
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2561
2549
        self.check_popen_state = check_environment
2562
2550
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2563
 
                          env_changes={'EXISTANT_ENV_VAR':None})
 
2551
            env_changes={'EXISTANT_ENV_VAR':None})
2564
2552
        # Still set in parent
2565
2553
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2566
2554
        del os.environ['EXISTANT_ENV_VAR']
2571
2559
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2572
2560
        self.check_popen_state = check_environment
2573
2561
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2574
 
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2562
            env_changes={'NON_EXISTANT_ENV_VAR':None})
2575
2563
 
2576
2564
    def test_working_dir(self):
2577
2565
        """Test that we can specify the working dir for the child"""
2580
2568
        chdirs = []
2581
2569
        def chdir(path):
2582
2570
            chdirs.append(path)
2583
 
        self.overrideAttr(os, 'chdir', chdir)
2584
 
        def getcwd():
2585
 
            return 'current'
2586
 
        self.overrideAttr(osutils, 'getcwd', getcwd)
2587
 
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2588
 
                          working_dir='foo')
 
2571
        os.chdir = chdir
 
2572
        try:
 
2573
            def getcwd():
 
2574
                return 'current'
 
2575
            osutils.getcwd = getcwd
 
2576
            try:
 
2577
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2578
                    working_dir='foo')
 
2579
            finally:
 
2580
                osutils.getcwd = orig_getcwd
 
2581
        finally:
 
2582
            os.chdir = orig_chdir
2589
2583
        self.assertEqual(['foo', 'current'], chdirs)
2590
2584
 
2591
2585
    def test_get_bzr_path_with_cwd_bzrlib(self):
2611
2605
        self.assertEqual('bzr: interrupted\n', result[1])
2612
2606
 
2613
2607
 
 
2608
class TestFeature(tests.TestCase):
 
2609
 
 
2610
    def test_caching(self):
 
2611
        """Feature._probe is called by the feature at most once."""
 
2612
        class InstrumentedFeature(tests.Feature):
 
2613
            def __init__(self):
 
2614
                super(InstrumentedFeature, self).__init__()
 
2615
                self.calls = []
 
2616
            def _probe(self):
 
2617
                self.calls.append('_probe')
 
2618
                return False
 
2619
        feature = InstrumentedFeature()
 
2620
        feature.available()
 
2621
        self.assertEqual(['_probe'], feature.calls)
 
2622
        feature.available()
 
2623
        self.assertEqual(['_probe'], feature.calls)
 
2624
 
 
2625
    def test_named_str(self):
 
2626
        """Feature.__str__ should thunk to feature_name()."""
 
2627
        class NamedFeature(tests.Feature):
 
2628
            def feature_name(self):
 
2629
                return 'symlinks'
 
2630
        feature = NamedFeature()
 
2631
        self.assertEqual('symlinks', str(feature))
 
2632
 
 
2633
    def test_default_str(self):
 
2634
        """Feature.__str__ should default to __class__.__name__."""
 
2635
        class NamedFeature(tests.Feature):
 
2636
            pass
 
2637
        feature = NamedFeature()
 
2638
        self.assertEqual('NamedFeature', str(feature))
 
2639
 
 
2640
 
 
2641
class TestUnavailableFeature(tests.TestCase):
 
2642
 
 
2643
    def test_access_feature(self):
 
2644
        feature = tests.Feature()
 
2645
        exception = tests.UnavailableFeature(feature)
 
2646
        self.assertIs(feature, exception.args[0])
 
2647
 
 
2648
 
 
2649
simple_thunk_feature = tests._CompatabilityThunkFeature(
 
2650
    deprecated_in((2, 1, 0)),
 
2651
    'bzrlib.tests.test_selftest',
 
2652
    'simple_thunk_feature','UnicodeFilename',
 
2653
    replacement_module='bzrlib.tests'
 
2654
    )
 
2655
 
 
2656
class Test_CompatibilityFeature(tests.TestCase):
 
2657
 
 
2658
    def test_does_thunk(self):
 
2659
        res = self.callDeprecated(
 
2660
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
 
2661
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
 
2662
            simple_thunk_feature.available)
 
2663
        self.assertEqual(tests.UnicodeFilename.available(), res)
 
2664
 
 
2665
 
 
2666
class TestModuleAvailableFeature(tests.TestCase):
 
2667
 
 
2668
    def test_available_module(self):
 
2669
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
 
2670
        self.assertEqual('bzrlib.tests', feature.module_name)
 
2671
        self.assertEqual('bzrlib.tests', str(feature))
 
2672
        self.assertTrue(feature.available())
 
2673
        self.assertIs(tests, feature.module)
 
2674
 
 
2675
    def test_unavailable_module(self):
 
2676
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
 
2677
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
 
2678
        self.assertFalse(feature.available())
 
2679
        self.assertIs(None, feature.module)
 
2680
 
 
2681
 
2614
2682
class TestSelftestFiltering(tests.TestCase):
2615
2683
 
2616
2684
    def setUp(self):
3315
3383
        self.assertLength(1, calls)
3316
3384
 
3317
3385
 
3318
 
class TestUncollectedWarnings(tests.TestCase):
3319
 
    """Check a test case still alive after being run emits a warning"""
3320
 
 
3321
 
    class Test(tests.TestCase):
3322
 
        def test_pass(self):
3323
 
            pass
3324
 
        def test_self_ref(self):
3325
 
            self.also_self = self.test_self_ref
3326
 
        def test_skip(self):
3327
 
            self.skip("Don't need")
3328
 
 
3329
 
    def _get_suite(self):
3330
 
        return TestUtil.TestSuite([
3331
 
            self.Test("test_pass"),
3332
 
            self.Test("test_self_ref"),
3333
 
            self.Test("test_skip"),
3334
 
            ])
3335
 
 
3336
 
    def _inject_stream_into_subunit(self, stream):
3337
 
        """To be overridden by subclasses that run tests out of process"""
3338
 
 
3339
 
    def _run_selftest_with_suite(self, **kwargs):
3340
 
        sio = StringIO()
3341
 
        self._inject_stream_into_subunit(sio)
3342
 
        old_flags = tests.selftest_debug_flags
3343
 
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3344
 
        gc_on = gc.isenabled()
3345
 
        if gc_on:
3346
 
            gc.disable()
3347
 
        try:
3348
 
            tests.selftest(test_suite_factory=self._get_suite, stream=sio,
3349
 
                stop_on_failure=False, **kwargs)
3350
 
        finally:
3351
 
            if gc_on:
3352
 
                gc.enable()
3353
 
            tests.selftest_debug_flags = old_flags
3354
 
        output = sio.getvalue()
3355
 
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3356
 
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3357
 
        return output
3358
 
 
3359
 
    def test_testsuite(self):
3360
 
        self._run_selftest_with_suite()
3361
 
 
3362
 
    def test_pattern(self):
3363
 
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3364
 
        self.assertNotContainsRe(out, "test_skip")
3365
 
 
3366
 
    def test_exclude_pattern(self):
3367
 
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3368
 
        self.assertNotContainsRe(out, "test_skip")
3369
 
 
3370
 
    def test_random_seed(self):
3371
 
        self._run_selftest_with_suite(random_seed="now")
3372
 
 
3373
 
    def test_matching_tests_first(self):
3374
 
        self._run_selftest_with_suite(matching_tests_first=True,
3375
 
            pattern="test_self_ref$")
3376
 
 
3377
 
    def test_starting_with_and_exclude(self):
3378
 
        out = self._run_selftest_with_suite(starting_with=["bt."],
3379
 
            exclude_pattern="test_skip$")
3380
 
        self.assertNotContainsRe(out, "test_skip")
3381
 
 
3382
 
    def test_additonal_decorator(self):
3383
 
        out = self._run_selftest_with_suite(
3384
 
            suite_decorators=[tests.TestDecorator])
3385
 
 
3386
 
 
3387
 
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3388
 
    """Check warnings from tests staying alive are emitted with subunit"""
3389
 
 
3390
 
    _test_needs_features = [features.subunit]
3391
 
 
3392
 
    def _run_selftest_with_suite(self, **kwargs):
3393
 
        return TestUncollectedWarnings._run_selftest_with_suite(self,
3394
 
            runner_class=tests.SubUnitBzrRunner, **kwargs)
3395
 
 
3396
 
 
3397
 
class TestUncollectedWarningsForking(TestUncollectedWarnings):
3398
 
    """Check warnings from tests staying alive are emitted when forking"""
3399
 
 
3400
 
    _test_needs_features = [features.subunit]
3401
 
 
3402
 
    def _inject_stream_into_subunit(self, stream):
3403
 
        """Monkey-patch subunit so the extra output goes to stream not stdout
3404
 
 
3405
 
        Some APIs need rewriting so this kind of bogus hackery can be replaced
3406
 
        by passing the stream param from run_tests down into ProtocolTestCase.
3407
 
        """
3408
 
        from subunit import ProtocolTestCase
3409
 
        _original_init = ProtocolTestCase.__init__
3410
 
        def _init_with_passthrough(self, *args, **kwargs):
3411
 
            _original_init(self, *args, **kwargs)
3412
 
            self._passthrough = stream
3413
 
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3414
 
 
3415
 
    def _run_selftest_with_suite(self, **kwargs):
3416
 
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3417
 
        if getattr(os, "fork", None) is None:
3418
 
            raise tests.TestNotApplicable("Platform doesn't support forking")
3419
 
        # Make sure the fork code is actually invoked by claiming two cores
3420
 
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3421
 
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3422
 
        return TestUncollectedWarnings._run_selftest_with_suite(self, **kwargs)
3423
 
 
3424
 
 
3425
3386
class TestEnvironHandling(tests.TestCase):
3426
3387
 
3427
3388
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3598
3559
 
3599
3560
    def test_mutiple_excludes(self):
3600
3561
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3601
 
 
3602
 
 
3603
 
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3604
 
 
3605
 
    _test_needs_features = [features.subunit]
3606
 
 
3607
 
    def setUp(self):
3608
 
        super(TestCounterHooks, self).setUp()
3609
 
        class Test(tests.TestCase):
3610
 
 
3611
 
            def setUp(self):
3612
 
                super(Test, self).setUp()
3613
 
                self.hooks = hooks.Hooks()
3614
 
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3615
 
                self.install_counter_hook(self.hooks, 'myhook')
3616
 
 
3617
 
            def no_hook(self):
3618
 
                pass
3619
 
 
3620
 
            def run_hook_once(self):
3621
 
                for hook in self.hooks['myhook']:
3622
 
                    hook(self)
3623
 
 
3624
 
        self.test_class = Test
3625
 
 
3626
 
    def assertHookCalls(self, expected_calls, test_name):
3627
 
        test = self.test_class(test_name)
3628
 
        result = unittest.TestResult()
3629
 
        test.run(result)
3630
 
        self.assertTrue(hasattr(test, '_counters'))
3631
 
        self.assertTrue(test._counters.has_key('myhook'))
3632
 
        self.assertEquals(expected_calls, test._counters['myhook'])
3633
 
 
3634
 
    def test_no_hook(self):
3635
 
        self.assertHookCalls(0, 'no_hook')
3636
 
 
3637
 
    def test_run_hook_once(self):
3638
 
        tt = features.testtools
3639
 
        if tt.module.__version__ < (0, 9, 8):
3640
 
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3641
 
        self.assertHookCalls(1, 'run_hook_once')