~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Vincent Ladeuil
  • Date: 2012-03-13 17:25:29 UTC
  • mfrom: (6499 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6501.
  • Revision ID: v.ladeuil+lp@free.fr-20120313172529-i0suyjnepsor25i7
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
 
20
import gc
20
21
import doctest
21
22
import os
22
23
import signal
42
43
from bzrlib import (
43
44
    branchbuilder,
44
45
    bzrdir,
 
46
    controldir,
45
47
    errors,
46
48
    hooks,
47
49
    lockdir,
93
95
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
94
96
 
95
97
 
96
 
class TestUnicodeFilename(tests.TestCase):
97
 
 
98
 
    def test_probe_passes(self):
99
 
        """UnicodeFilename._probe passes."""
100
 
        # We can't test much more than that because the behaviour depends
101
 
        # on the platform.
102
 
        tests.UnicodeFilename._probe()
103
 
 
104
 
 
105
98
class TestTreeShape(tests.TestCaseInTempDir):
106
99
 
107
100
    def test_unicode_paths(self):
108
 
        self.requireFeature(tests.UnicodeFilename)
 
101
        self.requireFeature(features.UnicodeFilenameFeature)
109
102
 
110
103
        filename = u'hell\u00d8'
111
104
        self.build_tree_contents([(filename, 'contents of hello')])
617
610
        # Guard against regression into MemoryTransport leaking
618
611
        # files to disk instead of keeping them in memory.
619
612
        self.assertFalse(osutils.lexists('dir'))
620
 
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
613
        dir_format = controldir.format_registry.make_bzrdir('knit')
621
614
        self.assertEqual(dir_format.repository_format.__class__,
622
615
                         the_branch.repository._format.__class__)
623
616
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
627
620
    def test_dangling_locks_cause_failures(self):
628
621
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
629
622
            def test_function(self):
630
 
                t = self.get_transport('.')
 
623
                t = self.get_transport_from_path('.')
631
624
                l = lockdir.LockDir(t, 'lock')
632
625
                l.create()
633
626
                l.attempt_lock()
653
646
        # for the server
654
647
        url = self.get_readonly_url()
655
648
        url2 = self.get_readonly_url('foo/bar')
656
 
        t = transport.get_transport(url)
657
 
        t2 = transport.get_transport(url2)
 
649
        t = transport.get_transport_from_url(url)
 
650
        t2 = transport.get_transport_from_url(url2)
658
651
        self.assertIsInstance(t, ReadonlyTransportDecorator)
659
652
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
660
653
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
668
661
        url = self.get_readonly_url()
669
662
        url2 = self.get_readonly_url('foo/bar')
670
663
        # the transport returned may be any HttpTransportBase subclass
671
 
        t = transport.get_transport(url)
672
 
        t2 = transport.get_transport(url2)
 
664
        t = transport.get_transport_from_url(url)
 
665
        t2 = transport.get_transport_from_url(url2)
673
666
        self.assertIsInstance(t, HttpTransportBase)
674
667
        self.assertIsInstance(t2, HttpTransportBase)
675
668
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
686
679
        builder = self.make_branch_builder('dir')
687
680
        rev_id = builder.build_commit()
688
681
        self.assertPathExists('dir')
689
 
        a_dir = bzrdir.BzrDir.open('dir')
 
682
        a_dir = controldir.ControlDir.open('dir')
690
683
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
691
684
        a_branch = a_dir.open_branch()
692
685
        builder_branch = builder.get_branch()
713
706
class TestChrootedTest(tests.ChrootedTestCase):
714
707
 
715
708
    def test_root_is_root(self):
716
 
        t = transport.get_transport(self.get_readonly_url())
 
709
        t = transport.get_transport_from_url(self.get_readonly_url())
717
710
        url = t.base
718
711
        self.assertEqual(url, t.clone('..').base)
719
712
 
721
714
class TestProfileResult(tests.TestCase):
722
715
 
723
716
    def test_profiles_tests(self):
724
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
717
        self.requireFeature(features.lsprof_feature)
725
718
        terminal = testtools.testresult.doubles.ExtendedTestResult()
726
719
        result = tests.ProfileResult(terminal)
727
720
        class Sample(tests.TestCase):
782
775
 
783
776
    def test_lsprofiling(self):
784
777
        """Verbose test result prints lsprof statistics from test cases."""
785
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
778
        self.requireFeature(features.lsprof_feature)
786
779
        result_stream = StringIO()
787
780
        result = bzrlib.tests.VerboseTestResult(
788
781
            result_stream,
833
826
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
834
827
 
835
828
    def test_known_failure(self):
836
 
        """A KnownFailure being raised should trigger several result actions."""
 
829
        """Using knownFailure should trigger several result actions."""
837
830
        class InstrumentedTestResult(tests.ExtendedTestResult):
838
831
            def stopTestRun(self): pass
839
832
            def report_tests_starting(self): pass
842
835
        result = InstrumentedTestResult(None, None, None, None)
843
836
        class Test(tests.TestCase):
844
837
            def test_function(self):
845
 
                raise tests.KnownFailure('failed!')
 
838
                self.knownFailure('failed!')
846
839
        test = Test("test_function")
847
840
        test.run(result)
848
841
        # it should invoke 'report_known_failure'.
864
857
            descriptions=0,
865
858
            verbosity=2,
866
859
            )
867
 
        test = self.get_passing_test()
868
 
        result.startTest(test)
869
 
        prefix = len(result_stream.getvalue())
870
 
        # the err parameter has the shape:
871
 
        # (class, exception object, traceback)
872
 
        # KnownFailures dont get their tracebacks shown though, so we
873
 
        # can skip that.
874
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
 
        result.report_known_failure(test, err)
876
 
        output = result_stream.getvalue()[prefix:]
877
 
        lines = output.splitlines()
878
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
 
        if sys.version_info > (2, 7):
880
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
 
                self.assertNotEqual, lines[1], '    ')
882
 
        self.assertEqual(lines[1], '    foo')
883
 
        self.assertEqual(2, len(lines))
 
860
        _get_test("test_xfail").run(result)
 
861
        self.assertContainsRe(result_stream.getvalue(),
 
862
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
863
            "\\s*(?:Text attachment: )?reason"
 
864
            "(?:\n-+\n|: {{{)"
 
865
            "this_fails"
 
866
            "(?:\n-+\n|}}}\n)")
884
867
 
885
868
    def get_passing_test(self):
886
869
        """Return a test object that can't be run usefully."""
897
880
                self._call = test, feature
898
881
        result = InstrumentedTestResult(None, None, None, None)
899
882
        test = SampleTestCase('_test_pass')
900
 
        feature = tests.Feature()
 
883
        feature = features.Feature()
901
884
        result.startTest(test)
902
885
        result.addNotSupported(test, feature)
903
886
        # it should invoke 'report_unsupported'.
922
905
            verbosity=2,
923
906
            )
924
907
        test = self.get_passing_test()
925
 
        feature = tests.Feature()
 
908
        feature = features.Feature()
926
909
        result.startTest(test)
927
910
        prefix = len(result_stream.getvalue())
928
911
        result.report_unsupported(test, feature)
941
924
            def addNotSupported(self, test, feature):
942
925
                self._call = test, feature
943
926
        result = InstrumentedTestResult(None, None, None, None)
944
 
        feature = tests.Feature()
 
927
        feature = features.Feature()
945
928
        class Test(tests.TestCase):
946
929
            def test_function(self):
947
930
                raise tests.UnavailableFeature(feature)
966
949
    def test_strict_with_known_failure(self):
967
950
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
968
951
                                             verbosity=1)
969
 
        test = self.get_passing_test()
970
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
971
 
        result.addExpectedFailure(test, err)
 
952
        test = _get_test("test_xfail")
 
953
        test.run(result)
972
954
        self.assertFalse(result.wasStrictlySuccessful())
973
955
        self.assertEqual(None, result._extractBenchmarkTime(test))
974
956
 
1006
988
        self.assertEquals(2, result.count)
1007
989
 
1008
990
 
1009
 
class TestUnicodeFilenameFeature(tests.TestCase):
1010
 
 
1011
 
    def test_probe_passes(self):
1012
 
        """UnicodeFilenameFeature._probe passes."""
1013
 
        # We can't test much more than that because the behaviour depends
1014
 
        # on the platform.
1015
 
        tests.UnicodeFilenameFeature._probe()
1016
 
 
1017
 
 
1018
991
class TestRunner(tests.TestCase):
1019
992
 
1020
993
    def dummy_test(self):
1095
1068
            "FAIL: \\S+\.test_truth\n"
1096
1069
            "-+\n"
1097
1070
            "(?:.*\n)*"
1098
 
            "No absolute truth\n"
 
1071
            "\\s*(?:Text attachment: )?reason"
 
1072
            "(?:\n-+\n|: {{{)"
 
1073
            "No absolute truth"
 
1074
            "(?:\n-+\n|}}}\n)"
1099
1075
            "(?:.*\n)*"
1100
1076
            "-+\n"
1101
1077
            "Ran 1 test in .*\n"
1191
1167
 
1192
1168
    def test_unsupported_features_listed(self):
1193
1169
        """When unsupported features are encountered they are detailed."""
1194
 
        class Feature1(tests.Feature):
 
1170
        class Feature1(features.Feature):
1195
1171
            def _probe(self): return False
1196
 
        class Feature2(tests.Feature):
 
1172
        class Feature2(features.Feature):
1197
1173
            def _probe(self): return False
1198
1174
        # create sample tests
1199
1175
        test1 = SampleTestCase('_test_pass')
1266
1242
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1267
1243
            FailureWithUnicode("test_log_unicode"))
1268
1244
        self.assertContainsRe(out.getvalue(),
1269
 
            "Text attachment: log\n"
1270
 
            "-+\n"
1271
 
            "\d+\.\d+  \\\\u2606\n"
1272
 
            "-+\n")
 
1245
            "(?:Text attachment: )?log"
 
1246
            "(?:\n-+\n|: {{{)"
 
1247
            "\d+\.\d+  \\\\u2606"
 
1248
            "(?:\n-+\n|}}}\n)")
1273
1249
 
1274
1250
 
1275
1251
class SampleTestCase(tests.TestCase):
1476
1452
 
1477
1453
        Each self.time() call is individually and separately profiled.
1478
1454
        """
1479
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1455
        self.requireFeature(features.lsprof_feature)
1480
1456
        # overrides the class member with an instance member so no cleanup
1481
1457
        # needed.
1482
1458
        self._gather_lsprof_in_benchmarks = True
1501
1477
        transport_server = memory.MemoryServer()
1502
1478
        transport_server.start_server()
1503
1479
        self.addCleanup(transport_server.stop_server)
1504
 
        t = transport.get_transport(transport_server.get_url())
1505
 
        bzrdir.BzrDir.create(t.base)
 
1480
        t = transport.get_transport_from_url(transport_server.get_url())
 
1481
        controldir.ControlDir.create(t.base)
1506
1482
        self.assertRaises(errors.BzrError,
1507
 
            bzrdir.BzrDir.open_from_transport, t)
 
1483
            controldir.ControlDir.open_from_transport, t)
1508
1484
        # But if we declare this as safe, we can open the bzrdir.
1509
1485
        self.permit_url(t.base)
1510
1486
        self._bzr_selftest_roots.append(t.base)
1511
 
        bzrdir.BzrDir.open_from_transport(t)
 
1487
        controldir.ControlDir.open_from_transport(t)
1512
1488
 
1513
1489
    def test_requireFeature_available(self):
1514
1490
        """self.requireFeature(available) is a no-op."""
1515
 
        class Available(tests.Feature):
 
1491
        class Available(features.Feature):
1516
1492
            def _probe(self):return True
1517
1493
        feature = Available()
1518
1494
        self.requireFeature(feature)
1519
1495
 
1520
1496
    def test_requireFeature_unavailable(self):
1521
1497
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1522
 
        class Unavailable(tests.Feature):
 
1498
        class Unavailable(features.Feature):
1523
1499
            def _probe(self):return False
1524
1500
        feature = Unavailable()
1525
1501
        self.assertRaises(tests.UnavailableFeature,
1684
1660
        test.run(unittest.TestResult())
1685
1661
        self.assertEqual('original', obj.test_attr)
1686
1662
 
1687
 
 
1688
 
class _MissingFeature(tests.Feature):
 
1663
    def test_recordCalls(self):
 
1664
        from bzrlib.tests import test_selftest
 
1665
        calls = self.recordCalls(
 
1666
            test_selftest, '_add_numbers')
 
1667
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1668
            12)
 
1669
        self.assertEquals(calls, [((2, 10), {})])
 
1670
 
 
1671
 
 
1672
def _add_numbers(a, b):
 
1673
    return a + b
 
1674
 
 
1675
 
 
1676
class _MissingFeature(features.Feature):
1689
1677
    def _probe(self):
1690
1678
        return False
1691
1679
missing_feature = _MissingFeature()
1742
1730
        result = self._run_test('test_fail')
1743
1731
        self.assertEqual(1, len(result.failures))
1744
1732
        result_content = result.failures[0][1]
1745
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1733
        self.assertContainsRe(result_content,
 
1734
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1746
1735
        self.assertContainsRe(result_content, 'this was a failing test')
1747
1736
 
1748
1737
    def test_error_has_log(self):
1749
1738
        result = self._run_test('test_error')
1750
1739
        self.assertEqual(1, len(result.errors))
1751
1740
        result_content = result.errors[0][1]
1752
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1741
        self.assertContainsRe(result_content,
 
1742
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1753
1743
        self.assertContainsRe(result_content, 'this test errored')
1754
1744
 
1755
1745
    def test_skip_has_no_log(self):
1774
1764
        result = self._run_test('test_xfail')
1775
1765
        self.assertEqual(1, len(result.expectedFailures))
1776
1766
        result_content = result.expectedFailures[0][1]
1777
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1767
        self.assertNotContainsRe(result_content,
 
1768
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1778
1769
        self.assertNotContainsRe(result_content, 'test with expected failure')
1779
1770
 
1780
1771
    def test_unexpected_success_has_log(self):
1953
1944
    def test_make_branch_and_tree_with_format(self):
1954
1945
        # we should be able to supply a format to make_branch_and_tree
1955
1946
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1956
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
1947
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1957
1948
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1958
1949
 
1959
1950
    def test_make_branch_and_memory_tree(self):
2037
2028
        self.assertLength(2, output.readlines())
2038
2029
 
2039
2030
    def test_lsprof_tests(self):
2040
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
2031
        self.requireFeature(features.lsprof_feature)
2041
2032
        results = []
2042
2033
        class Test(object):
2043
2034
            def __call__(test, result):
2189
2180
        self.assertNotContainsRe(content, 'test with expected failure')
2190
2181
        self.assertEqual(1, len(result.expectedFailures))
2191
2182
        result_content = result.expectedFailures[0][1]
2192
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2183
        self.assertNotContainsRe(result_content,
 
2184
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2193
2185
        self.assertNotContainsRe(result_content, 'test with expected failure')
2194
2186
 
2195
2187
    def test_unexpected_success_has_log(self):
2607
2599
        self.assertEqual('bzr: interrupted\n', result[1])
2608
2600
 
2609
2601
 
2610
 
class TestFeature(tests.TestCase):
2611
 
 
2612
 
    def test_caching(self):
2613
 
        """Feature._probe is called by the feature at most once."""
2614
 
        class InstrumentedFeature(tests.Feature):
2615
 
            def __init__(self):
2616
 
                super(InstrumentedFeature, self).__init__()
2617
 
                self.calls = []
2618
 
            def _probe(self):
2619
 
                self.calls.append('_probe')
2620
 
                return False
2621
 
        feature = InstrumentedFeature()
2622
 
        feature.available()
2623
 
        self.assertEqual(['_probe'], feature.calls)
2624
 
        feature.available()
2625
 
        self.assertEqual(['_probe'], feature.calls)
2626
 
 
2627
 
    def test_named_str(self):
2628
 
        """Feature.__str__ should thunk to feature_name()."""
2629
 
        class NamedFeature(tests.Feature):
2630
 
            def feature_name(self):
2631
 
                return 'symlinks'
2632
 
        feature = NamedFeature()
2633
 
        self.assertEqual('symlinks', str(feature))
2634
 
 
2635
 
    def test_default_str(self):
2636
 
        """Feature.__str__ should default to __class__.__name__."""
2637
 
        class NamedFeature(tests.Feature):
2638
 
            pass
2639
 
        feature = NamedFeature()
2640
 
        self.assertEqual('NamedFeature', str(feature))
2641
 
 
2642
 
 
2643
 
class TestUnavailableFeature(tests.TestCase):
2644
 
 
2645
 
    def test_access_feature(self):
2646
 
        feature = tests.Feature()
2647
 
        exception = tests.UnavailableFeature(feature)
2648
 
        self.assertIs(feature, exception.args[0])
2649
 
 
2650
 
 
2651
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2652
 
    deprecated_in((2, 1, 0)),
2653
 
    'bzrlib.tests.test_selftest',
2654
 
    'simple_thunk_feature','UnicodeFilename',
2655
 
    replacement_module='bzrlib.tests'
2656
 
    )
2657
 
 
2658
 
class Test_CompatibilityFeature(tests.TestCase):
2659
 
 
2660
 
    def test_does_thunk(self):
2661
 
        res = self.callDeprecated(
2662
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2663
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2664
 
            simple_thunk_feature.available)
2665
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2666
 
 
2667
 
 
2668
 
class TestModuleAvailableFeature(tests.TestCase):
2669
 
 
2670
 
    def test_available_module(self):
2671
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2672
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2673
 
        self.assertEqual('bzrlib.tests', str(feature))
2674
 
        self.assertTrue(feature.available())
2675
 
        self.assertIs(tests, feature.module)
2676
 
 
2677
 
    def test_unavailable_module(self):
2678
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2679
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2680
 
        self.assertFalse(feature.available())
2681
 
        self.assertIs(None, feature.module)
2682
 
 
2683
 
 
2684
2602
class TestSelftestFiltering(tests.TestCase):
2685
2603
 
2686
2604
    def setUp(self):
3385
3303
        self.assertLength(1, calls)
3386
3304
 
3387
3305
 
 
3306
class _Selftest(object):
 
3307
    """Mixin for tests needing full selftest output"""
 
3308
 
 
3309
    def _inject_stream_into_subunit(self, stream):
 
3310
        """To be overridden by subclasses that run tests out of process"""
 
3311
 
 
3312
    def _run_selftest(self, **kwargs):
 
3313
        sio = StringIO()
 
3314
        self._inject_stream_into_subunit(sio)
 
3315
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3316
        return sio.getvalue()
 
3317
 
 
3318
 
 
3319
class _ForkedSelftest(_Selftest):
 
3320
    """Mixin for tests needing full selftest output with forked children"""
 
3321
 
 
3322
    _test_needs_features = [features.subunit]
 
3323
 
 
3324
    def _inject_stream_into_subunit(self, stream):
 
3325
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3326
 
 
3327
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3328
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3329
        """
 
3330
        from subunit import ProtocolTestCase
 
3331
        _original_init = ProtocolTestCase.__init__
 
3332
        def _init_with_passthrough(self, *args, **kwargs):
 
3333
            _original_init(self, *args, **kwargs)
 
3334
            self._passthrough = stream
 
3335
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3336
 
 
3337
    def _run_selftest(self, **kwargs):
 
3338
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3339
        if getattr(os, "fork", None) is None:
 
3340
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3341
        # Make sure the fork code is actually invoked by claiming two cores
 
3342
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3343
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3344
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3345
 
 
3346
 
 
3347
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3348
    """Check operation of --parallel=fork selftest option"""
 
3349
 
 
3350
    def test_error_in_child_during_fork(self):
 
3351
        """Error in a forked child during test setup should get reported"""
 
3352
        class Test(tests.TestCase):
 
3353
            def testMethod(self):
 
3354
                pass
 
3355
        # We don't care what, just break something that a child will run
 
3356
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3357
        out = self._run_selftest(test_suite_factory=Test)
 
3358
        # Lines from the tracebacks of the two child processes may be mixed
 
3359
        # together due to the way subunit parses and forwards the streams,
 
3360
        # so permit extra lines between each part of the error output.
 
3361
        self.assertContainsRe(out,
 
3362
            "Traceback.*:\n"
 
3363
            "(?:.*\n)*"
 
3364
            ".+ in fork_for_tests\n"
 
3365
            "(?:.*\n)*"
 
3366
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3367
            "(?:.*\n)*"
 
3368
            "TypeError:")
 
3369
 
 
3370
 
 
3371
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3372
    """Check a test case still alive after being run emits a warning"""
 
3373
 
 
3374
    class Test(tests.TestCase):
 
3375
        def test_pass(self):
 
3376
            pass
 
3377
        def test_self_ref(self):
 
3378
            self.also_self = self.test_self_ref
 
3379
        def test_skip(self):
 
3380
            self.skip("Don't need")
 
3381
 
 
3382
    def _get_suite(self):
 
3383
        return TestUtil.TestSuite([
 
3384
            self.Test("test_pass"),
 
3385
            self.Test("test_self_ref"),
 
3386
            self.Test("test_skip"),
 
3387
            ])
 
3388
 
 
3389
    def _run_selftest_with_suite(self, **kwargs):
 
3390
        old_flags = tests.selftest_debug_flags
 
3391
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3392
        gc_on = gc.isenabled()
 
3393
        if gc_on:
 
3394
            gc.disable()
 
3395
        try:
 
3396
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3397
                **kwargs)
 
3398
        finally:
 
3399
            if gc_on:
 
3400
                gc.enable()
 
3401
            tests.selftest_debug_flags = old_flags
 
3402
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3403
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3404
        return output
 
3405
 
 
3406
    def test_testsuite(self):
 
3407
        self._run_selftest_with_suite()
 
3408
 
 
3409
    def test_pattern(self):
 
3410
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3411
        self.assertNotContainsRe(out, "test_skip")
 
3412
 
 
3413
    def test_exclude_pattern(self):
 
3414
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3415
        self.assertNotContainsRe(out, "test_skip")
 
3416
 
 
3417
    def test_random_seed(self):
 
3418
        self._run_selftest_with_suite(random_seed="now")
 
3419
 
 
3420
    def test_matching_tests_first(self):
 
3421
        self._run_selftest_with_suite(matching_tests_first=True,
 
3422
            pattern="test_self_ref$")
 
3423
 
 
3424
    def test_starting_with_and_exclude(self):
 
3425
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3426
            exclude_pattern="test_skip$")
 
3427
        self.assertNotContainsRe(out, "test_skip")
 
3428
 
 
3429
    def test_additonal_decorator(self):
 
3430
        out = self._run_selftest_with_suite(
 
3431
            suite_decorators=[tests.TestDecorator])
 
3432
 
 
3433
 
 
3434
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3435
    """Check warnings from tests staying alive are emitted with subunit"""
 
3436
 
 
3437
    _test_needs_features = [features.subunit]
 
3438
 
 
3439
    def _run_selftest_with_suite(self, **kwargs):
 
3440
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3441
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3442
 
 
3443
 
 
3444
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3445
    """Check warnings from tests staying alive are emitted when forking"""
 
3446
 
 
3447
 
3388
3448
class TestEnvironHandling(tests.TestCase):
3389
3449
 
3390
3450
    def test_overrideEnv_None_called_twice_doesnt_leak(self):