~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2012-01-27 21:28:56 UTC
  • mto: This revision was merged to the branch mainline in revision 6460.
  • Revision ID: jelmer@samba.org-20120127212856-ewnjgn7fyblphcqw
Migrate mail_client to config stacks.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
 
20
import gc
20
21
import doctest
21
22
import os
22
23
import signal
93
94
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
94
95
 
95
96
 
96
 
class TestUnicodeFilename(tests.TestCase):
97
 
 
98
 
    def test_probe_passes(self):
99
 
        """UnicodeFilename._probe passes."""
100
 
        # We can't test much more than that because the behaviour depends
101
 
        # on the platform.
102
 
        tests.UnicodeFilename._probe()
103
 
 
104
 
 
105
97
class TestTreeShape(tests.TestCaseInTempDir):
106
98
 
107
99
    def test_unicode_paths(self):
108
 
        self.requireFeature(tests.UnicodeFilename)
 
100
        self.requireFeature(features.UnicodeFilenameFeature)
109
101
 
110
102
        filename = u'hell\u00d8'
111
103
        self.build_tree_contents([(filename, 'contents of hello')])
627
619
    def test_dangling_locks_cause_failures(self):
628
620
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
629
621
            def test_function(self):
630
 
                t = self.get_transport('.')
 
622
                t = self.get_transport_from_path('.')
631
623
                l = lockdir.LockDir(t, 'lock')
632
624
                l.create()
633
625
                l.attempt_lock()
653
645
        # for the server
654
646
        url = self.get_readonly_url()
655
647
        url2 = self.get_readonly_url('foo/bar')
656
 
        t = transport.get_transport(url)
657
 
        t2 = transport.get_transport(url2)
 
648
        t = transport.get_transport_from_url(url)
 
649
        t2 = transport.get_transport_from_url(url2)
658
650
        self.assertIsInstance(t, ReadonlyTransportDecorator)
659
651
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
660
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
668
660
        url = self.get_readonly_url()
669
661
        url2 = self.get_readonly_url('foo/bar')
670
662
        # the transport returned may be any HttpTransportBase subclass
671
 
        t = transport.get_transport(url)
672
 
        t2 = transport.get_transport(url2)
 
663
        t = transport.get_transport_from_url(url)
 
664
        t2 = transport.get_transport_from_url(url2)
673
665
        self.assertIsInstance(t, HttpTransportBase)
674
666
        self.assertIsInstance(t2, HttpTransportBase)
675
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
713
705
class TestChrootedTest(tests.ChrootedTestCase):
714
706
 
715
707
    def test_root_is_root(self):
716
 
        t = transport.get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport_from_url(self.get_readonly_url())
717
709
        url = t.base
718
710
        self.assertEqual(url, t.clone('..').base)
719
711
 
721
713
class TestProfileResult(tests.TestCase):
722
714
 
723
715
    def test_profiles_tests(self):
724
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
716
        self.requireFeature(features.lsprof_feature)
725
717
        terminal = testtools.testresult.doubles.ExtendedTestResult()
726
718
        result = tests.ProfileResult(terminal)
727
719
        class Sample(tests.TestCase):
782
774
 
783
775
    def test_lsprofiling(self):
784
776
        """Verbose test result prints lsprof statistics from test cases."""
785
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
777
        self.requireFeature(features.lsprof_feature)
786
778
        result_stream = StringIO()
787
779
        result = bzrlib.tests.VerboseTestResult(
788
780
            result_stream,
833
825
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
834
826
 
835
827
    def test_known_failure(self):
836
 
        """A KnownFailure being raised should trigger several result actions."""
 
828
        """Using knownFailure should trigger several result actions."""
837
829
        class InstrumentedTestResult(tests.ExtendedTestResult):
838
830
            def stopTestRun(self): pass
839
831
            def report_tests_starting(self): pass
842
834
        result = InstrumentedTestResult(None, None, None, None)
843
835
        class Test(tests.TestCase):
844
836
            def test_function(self):
845
 
                raise tests.KnownFailure('failed!')
 
837
                self.knownFailure('failed!')
846
838
        test = Test("test_function")
847
839
        test.run(result)
848
840
        # it should invoke 'report_known_failure'.
864
856
            descriptions=0,
865
857
            verbosity=2,
866
858
            )
867
 
        test = self.get_passing_test()
868
 
        result.startTest(test)
869
 
        prefix = len(result_stream.getvalue())
870
 
        # the err parameter has the shape:
871
 
        # (class, exception object, traceback)
872
 
        # KnownFailures dont get their tracebacks shown though, so we
873
 
        # can skip that.
874
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
 
        result.report_known_failure(test, err)
876
 
        output = result_stream.getvalue()[prefix:]
877
 
        lines = output.splitlines()
878
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
 
        if sys.version_info > (2, 7):
880
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
 
                self.assertNotEqual, lines[1], '    ')
882
 
        self.assertEqual(lines[1], '    foo')
883
 
        self.assertEqual(2, len(lines))
 
859
        _get_test("test_xfail").run(result)
 
860
        self.assertContainsRe(result_stream.getvalue(),
 
861
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
862
            "\\s*(?:Text attachment: )?reason"
 
863
            "(?:\n-+\n|: {{{)"
 
864
            "this_fails"
 
865
            "(?:\n-+\n|}}}\n)")
884
866
 
885
867
    def get_passing_test(self):
886
868
        """Return a test object that can't be run usefully."""
897
879
                self._call = test, feature
898
880
        result = InstrumentedTestResult(None, None, None, None)
899
881
        test = SampleTestCase('_test_pass')
900
 
        feature = tests.Feature()
 
882
        feature = features.Feature()
901
883
        result.startTest(test)
902
884
        result.addNotSupported(test, feature)
903
885
        # it should invoke 'report_unsupported'.
922
904
            verbosity=2,
923
905
            )
924
906
        test = self.get_passing_test()
925
 
        feature = tests.Feature()
 
907
        feature = features.Feature()
926
908
        result.startTest(test)
927
909
        prefix = len(result_stream.getvalue())
928
910
        result.report_unsupported(test, feature)
941
923
            def addNotSupported(self, test, feature):
942
924
                self._call = test, feature
943
925
        result = InstrumentedTestResult(None, None, None, None)
944
 
        feature = tests.Feature()
 
926
        feature = features.Feature()
945
927
        class Test(tests.TestCase):
946
928
            def test_function(self):
947
929
                raise tests.UnavailableFeature(feature)
966
948
    def test_strict_with_known_failure(self):
967
949
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
968
950
                                             verbosity=1)
969
 
        test = self.get_passing_test()
970
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
971
 
        result.addExpectedFailure(test, err)
 
951
        test = _get_test("test_xfail")
 
952
        test.run(result)
972
953
        self.assertFalse(result.wasStrictlySuccessful())
973
954
        self.assertEqual(None, result._extractBenchmarkTime(test))
974
955
 
1006
987
        self.assertEquals(2, result.count)
1007
988
 
1008
989
 
1009
 
class TestUnicodeFilenameFeature(tests.TestCase):
1010
 
 
1011
 
    def test_probe_passes(self):
1012
 
        """UnicodeFilenameFeature._probe passes."""
1013
 
        # We can't test much more than that because the behaviour depends
1014
 
        # on the platform.
1015
 
        tests.UnicodeFilenameFeature._probe()
1016
 
 
1017
 
 
1018
990
class TestRunner(tests.TestCase):
1019
991
 
1020
992
    def dummy_test(self):
1194
1166
 
1195
1167
    def test_unsupported_features_listed(self):
1196
1168
        """When unsupported features are encountered they are detailed."""
1197
 
        class Feature1(tests.Feature):
 
1169
        class Feature1(features.Feature):
1198
1170
            def _probe(self): return False
1199
 
        class Feature2(tests.Feature):
 
1171
        class Feature2(features.Feature):
1200
1172
            def _probe(self): return False
1201
1173
        # create sample tests
1202
1174
        test1 = SampleTestCase('_test_pass')
1479
1451
 
1480
1452
        Each self.time() call is individually and separately profiled.
1481
1453
        """
1482
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1454
        self.requireFeature(features.lsprof_feature)
1483
1455
        # overrides the class member with an instance member so no cleanup
1484
1456
        # needed.
1485
1457
        self._gather_lsprof_in_benchmarks = True
1504
1476
        transport_server = memory.MemoryServer()
1505
1477
        transport_server.start_server()
1506
1478
        self.addCleanup(transport_server.stop_server)
1507
 
        t = transport.get_transport(transport_server.get_url())
 
1479
        t = transport.get_transport_from_url(transport_server.get_url())
1508
1480
        bzrdir.BzrDir.create(t.base)
1509
1481
        self.assertRaises(errors.BzrError,
1510
1482
            bzrdir.BzrDir.open_from_transport, t)
1515
1487
 
1516
1488
    def test_requireFeature_available(self):
1517
1489
        """self.requireFeature(available) is a no-op."""
1518
 
        class Available(tests.Feature):
 
1490
        class Available(features.Feature):
1519
1491
            def _probe(self):return True
1520
1492
        feature = Available()
1521
1493
        self.requireFeature(feature)
1522
1494
 
1523
1495
    def test_requireFeature_unavailable(self):
1524
1496
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1525
 
        class Unavailable(tests.Feature):
 
1497
        class Unavailable(features.Feature):
1526
1498
            def _probe(self):return False
1527
1499
        feature = Unavailable()
1528
1500
        self.assertRaises(tests.UnavailableFeature,
1700
1672
    return a + b
1701
1673
 
1702
1674
 
1703
 
class _MissingFeature(tests.Feature):
 
1675
class _MissingFeature(features.Feature):
1704
1676
    def _probe(self):
1705
1677
        return False
1706
1678
missing_feature = _MissingFeature()
2055
2027
        self.assertLength(2, output.readlines())
2056
2028
 
2057
2029
    def test_lsprof_tests(self):
2058
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
2030
        self.requireFeature(features.lsprof_feature)
2059
2031
        results = []
2060
2032
        class Test(object):
2061
2033
            def __call__(test, result):
2626
2598
        self.assertEqual('bzr: interrupted\n', result[1])
2627
2599
 
2628
2600
 
2629
 
class TestFeature(tests.TestCase):
2630
 
 
2631
 
    def test_caching(self):
2632
 
        """Feature._probe is called by the feature at most once."""
2633
 
        class InstrumentedFeature(tests.Feature):
2634
 
            def __init__(self):
2635
 
                super(InstrumentedFeature, self).__init__()
2636
 
                self.calls = []
2637
 
            def _probe(self):
2638
 
                self.calls.append('_probe')
2639
 
                return False
2640
 
        feature = InstrumentedFeature()
2641
 
        feature.available()
2642
 
        self.assertEqual(['_probe'], feature.calls)
2643
 
        feature.available()
2644
 
        self.assertEqual(['_probe'], feature.calls)
2645
 
 
2646
 
    def test_named_str(self):
2647
 
        """Feature.__str__ should thunk to feature_name()."""
2648
 
        class NamedFeature(tests.Feature):
2649
 
            def feature_name(self):
2650
 
                return 'symlinks'
2651
 
        feature = NamedFeature()
2652
 
        self.assertEqual('symlinks', str(feature))
2653
 
 
2654
 
    def test_default_str(self):
2655
 
        """Feature.__str__ should default to __class__.__name__."""
2656
 
        class NamedFeature(tests.Feature):
2657
 
            pass
2658
 
        feature = NamedFeature()
2659
 
        self.assertEqual('NamedFeature', str(feature))
2660
 
 
2661
 
 
2662
 
class TestUnavailableFeature(tests.TestCase):
2663
 
 
2664
 
    def test_access_feature(self):
2665
 
        feature = tests.Feature()
2666
 
        exception = tests.UnavailableFeature(feature)
2667
 
        self.assertIs(feature, exception.args[0])
2668
 
 
2669
 
 
2670
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2671
 
    deprecated_in((2, 1, 0)),
2672
 
    'bzrlib.tests.test_selftest',
2673
 
    'simple_thunk_feature','UnicodeFilename',
2674
 
    replacement_module='bzrlib.tests'
2675
 
    )
2676
 
 
2677
 
class Test_CompatibilityFeature(tests.TestCase):
2678
 
 
2679
 
    def test_does_thunk(self):
2680
 
        res = self.callDeprecated(
2681
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2682
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2683
 
            simple_thunk_feature.available)
2684
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2685
 
 
2686
 
 
2687
 
class TestModuleAvailableFeature(tests.TestCase):
2688
 
 
2689
 
    def test_available_module(self):
2690
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2691
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2692
 
        self.assertEqual('bzrlib.tests', str(feature))
2693
 
        self.assertTrue(feature.available())
2694
 
        self.assertIs(tests, feature.module)
2695
 
 
2696
 
    def test_unavailable_module(self):
2697
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2698
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2699
 
        self.assertFalse(feature.available())
2700
 
        self.assertIs(None, feature.module)
2701
 
 
2702
 
 
2703
2601
class TestSelftestFiltering(tests.TestCase):
2704
2602
 
2705
2603
    def setUp(self):
3404
3302
        self.assertLength(1, calls)
3405
3303
 
3406
3304
 
 
3305
class _Selftest(object):
 
3306
    """Mixin for tests needing full selftest output"""
 
3307
 
 
3308
    def _inject_stream_into_subunit(self, stream):
 
3309
        """To be overridden by subclasses that run tests out of process"""
 
3310
 
 
3311
    def _run_selftest(self, **kwargs):
 
3312
        sio = StringIO()
 
3313
        self._inject_stream_into_subunit(sio)
 
3314
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3315
        return sio.getvalue()
 
3316
 
 
3317
 
 
3318
class _ForkedSelftest(_Selftest):
 
3319
    """Mixin for tests needing full selftest output with forked children"""
 
3320
 
 
3321
    _test_needs_features = [features.subunit]
 
3322
 
 
3323
    def _inject_stream_into_subunit(self, stream):
 
3324
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3325
 
 
3326
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3327
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3328
        """
 
3329
        from subunit import ProtocolTestCase
 
3330
        _original_init = ProtocolTestCase.__init__
 
3331
        def _init_with_passthrough(self, *args, **kwargs):
 
3332
            _original_init(self, *args, **kwargs)
 
3333
            self._passthrough = stream
 
3334
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3335
 
 
3336
    def _run_selftest(self, **kwargs):
 
3337
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3338
        if getattr(os, "fork", None) is None:
 
3339
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3340
        # Make sure the fork code is actually invoked by claiming two cores
 
3341
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3342
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3343
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3344
 
 
3345
 
 
3346
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3347
    """Check operation of --parallel=fork selftest option"""
 
3348
 
 
3349
    def test_error_in_child_during_fork(self):
 
3350
        """Error in a forked child during test setup should get reported"""
 
3351
        class Test(tests.TestCase):
 
3352
            def testMethod(self):
 
3353
                pass
 
3354
        # We don't care what, just break something that a child will run
 
3355
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3356
        out = self._run_selftest(test_suite_factory=Test)
 
3357
        # Lines from the tracebacks of the two child processes may be mixed
 
3358
        # together due to the way subunit parses and forwards the streams,
 
3359
        # so permit extra lines between each part of the error output.
 
3360
        self.assertContainsRe(out,
 
3361
            "Traceback.*:\n"
 
3362
            "(?:.*\n)*"
 
3363
            ".+ in fork_for_tests\n"
 
3364
            "(?:.*\n)*"
 
3365
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3366
            "(?:.*\n)*"
 
3367
            "TypeError:")
 
3368
 
 
3369
 
 
3370
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3371
    """Check a test case still alive after being run emits a warning"""
 
3372
 
 
3373
    class Test(tests.TestCase):
 
3374
        def test_pass(self):
 
3375
            pass
 
3376
        def test_self_ref(self):
 
3377
            self.also_self = self.test_self_ref
 
3378
        def test_skip(self):
 
3379
            self.skip("Don't need")
 
3380
 
 
3381
    def _get_suite(self):
 
3382
        return TestUtil.TestSuite([
 
3383
            self.Test("test_pass"),
 
3384
            self.Test("test_self_ref"),
 
3385
            self.Test("test_skip"),
 
3386
            ])
 
3387
 
 
3388
    def _run_selftest_with_suite(self, **kwargs):
 
3389
        old_flags = tests.selftest_debug_flags
 
3390
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3391
        gc_on = gc.isenabled()
 
3392
        if gc_on:
 
3393
            gc.disable()
 
3394
        try:
 
3395
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3396
                **kwargs)
 
3397
        finally:
 
3398
            if gc_on:
 
3399
                gc.enable()
 
3400
            tests.selftest_debug_flags = old_flags
 
3401
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3402
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3403
        return output
 
3404
 
 
3405
    def test_testsuite(self):
 
3406
        self._run_selftest_with_suite()
 
3407
 
 
3408
    def test_pattern(self):
 
3409
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3410
        self.assertNotContainsRe(out, "test_skip")
 
3411
 
 
3412
    def test_exclude_pattern(self):
 
3413
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3414
        self.assertNotContainsRe(out, "test_skip")
 
3415
 
 
3416
    def test_random_seed(self):
 
3417
        self._run_selftest_with_suite(random_seed="now")
 
3418
 
 
3419
    def test_matching_tests_first(self):
 
3420
        self._run_selftest_with_suite(matching_tests_first=True,
 
3421
            pattern="test_self_ref$")
 
3422
 
 
3423
    def test_starting_with_and_exclude(self):
 
3424
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3425
            exclude_pattern="test_skip$")
 
3426
        self.assertNotContainsRe(out, "test_skip")
 
3427
 
 
3428
    def test_additonal_decorator(self):
 
3429
        out = self._run_selftest_with_suite(
 
3430
            suite_decorators=[tests.TestDecorator])
 
3431
 
 
3432
 
 
3433
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3434
    """Check warnings from tests staying alive are emitted with subunit"""
 
3435
 
 
3436
    _test_needs_features = [features.subunit]
 
3437
 
 
3438
    def _run_selftest_with_suite(self, **kwargs):
 
3439
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3440
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3441
 
 
3442
 
 
3443
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3444
    """Check warnings from tests staying alive are emitted when forking"""
 
3445
 
 
3446
 
3407
3447
class TestEnvironHandling(tests.TestCase):
3408
3448
 
3409
3449
    def test_overrideEnv_None_called_twice_doesnt_leak(self):