~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Patch Queue Manager
  • Date: 2013-05-20 17:46:29 UTC
  • mfrom: (6573.1.1 bzr)
  • Revision ID: pqm@pqm.ubuntu.com-20130520174629-dp7zujtuclvomuzd
(jameinel) Fix CVE 2013-2009. Avoid allowing multiple wildcards in a single
 SSL cert hostname segment. (Andrew Starr-Bochicchio)

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
 
20
import gc
20
21
import doctest
21
22
import os
22
23
import signal
29
30
from testtools import (
30
31
    ExtendedToOriginalDecorator,
31
32
    MultiTestResult,
32
 
    __version__ as testtools_version,
33
33
    )
34
34
from testtools.content import Content
35
35
from testtools.content_type import ContentType
43
43
from bzrlib import (
44
44
    branchbuilder,
45
45
    bzrdir,
 
46
    controldir,
46
47
    errors,
47
48
    hooks,
48
49
    lockdir,
334
335
        server1 = "a"
335
336
        server2 = "b"
336
337
        formats = [workingtree_4.WorkingTreeFormat4(),
337
 
                   workingtree_3.WorkingTreeFormat3(),]
338
 
        scenarios = make_scenarios(server1, server2, formats)
 
338
                   workingtree_3.WorkingTreeFormat3(),
 
339
                   workingtree_4.WorkingTreeFormat6()]
 
340
        scenarios = make_scenarios(server1, server2, formats,
 
341
            remote_server='c', remote_readonly_server='d',
 
342
            remote_backing_server='e')
339
343
        self.assertEqual([
340
344
            ('WorkingTreeFormat4',
341
345
             {'bzrdir_format': formats[0]._matchingbzrdir,
346
350
             {'bzrdir_format': formats[1]._matchingbzrdir,
347
351
              'transport_readonly_server': 'b',
348
352
              'transport_server': 'a',
349
 
              'workingtree_format': formats[1]})],
350
 
            scenarios)
 
353
              'workingtree_format': formats[1]}),
 
354
            ('WorkingTreeFormat6',
 
355
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
356
              'transport_readonly_server': 'b',
 
357
              'transport_server': 'a',
 
358
              'workingtree_format': formats[2]}),
 
359
            ('WorkingTreeFormat6,remote',
 
360
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
361
              'repo_is_remote': True,
 
362
              'transport_readonly_server': 'd',
 
363
              'transport_server': 'c',
 
364
              'vfs_transport_factory': 'e',
 
365
              'workingtree_format': formats[2]}),
 
366
            ], scenarios)
351
367
 
352
368
 
353
369
class TestTreeScenarios(tests.TestCase):
354
370
 
355
371
    def test_scenarios(self):
356
372
        # the tree implementation scenario generator is meant to setup one
357
 
        # instance for each working tree format, and one additional instance
 
373
        # instance for each working tree format, one additional instance
358
374
        # that will use the default wt format, but create a revision tree for
359
 
        # the tests.  this means that the wt ones should have the
360
 
        # workingtree_to_test_tree attribute set to 'return_parameter' and the
361
 
        # revision one set to revision_tree_from_workingtree.
 
375
        # the tests, and one more that uses the default wt format as a
 
376
        # lightweight checkout of a remote repository.  This means that the wt
 
377
        # ones should have the workingtree_to_test_tree attribute set to
 
378
        # 'return_parameter' and the revision one set to
 
379
        # revision_tree_from_workingtree.
362
380
 
363
381
        from bzrlib.tests.per_tree import (
364
382
            _dirstate_tree_from_workingtree,
370
388
            )
371
389
        server1 = "a"
372
390
        server2 = "b"
 
391
        smart_server = test_server.SmartTCPServer_for_testing
 
392
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
 
393
        mem_server = memory.MemoryServer
373
394
        formats = [workingtree_4.WorkingTreeFormat4(),
374
395
                   workingtree_3.WorkingTreeFormat3(),]
375
396
        scenarios = make_scenarios(server1, server2, formats)
376
 
        self.assertEqual(7, len(scenarios))
 
397
        self.assertEqual(8, len(scenarios))
377
398
        default_wt_format = workingtree.format_registry.get_default()
378
399
        wt4_format = workingtree_4.WorkingTreeFormat4()
379
400
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
401
        wt6_format = workingtree_4.WorkingTreeFormat6()
380
402
        expected_scenarios = [
381
403
            ('WorkingTreeFormat4',
382
404
             {'bzrdir_format': formats[0]._matchingbzrdir,
392
414
              'workingtree_format': formats[1],
393
415
              '_workingtree_to_test_tree': return_parameter,
394
416
             }),
 
417
            ('WorkingTreeFormat6,remote',
 
418
             {'bzrdir_format': wt6_format._matchingbzrdir,
 
419
              'repo_is_remote': True,
 
420
              'transport_readonly_server': smart_readonly_server,
 
421
              'transport_server': smart_server,
 
422
              'vfs_transport_factory': mem_server,
 
423
              'workingtree_format': wt6_format,
 
424
              '_workingtree_to_test_tree': return_parameter,
 
425
             }),
395
426
            ('RevisionTree',
396
427
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
397
428
              'bzrdir_format': default_wt_format._matchingbzrdir,
609
640
        # Guard against regression into MemoryTransport leaking
610
641
        # files to disk instead of keeping them in memory.
611
642
        self.assertFalse(osutils.lexists('dir'))
612
 
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
643
        dir_format = controldir.format_registry.make_bzrdir('knit')
613
644
        self.assertEqual(dir_format.repository_format.__class__,
614
645
                         the_branch.repository._format.__class__)
615
646
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
678
709
        builder = self.make_branch_builder('dir')
679
710
        rev_id = builder.build_commit()
680
711
        self.assertPathExists('dir')
681
 
        a_dir = bzrdir.BzrDir.open('dir')
 
712
        a_dir = controldir.ControlDir.open('dir')
682
713
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
683
714
        a_branch = a_dir.open_branch()
684
715
        builder_branch = builder.get_branch()
1062
1093
                self.expectFailure("No absolute truth", self.assertTrue, True)
1063
1094
        runner = tests.TextTestRunner(stream=StringIO())
1064
1095
        result = self.run_test_runner(runner, Test("test_truth"))
1065
 
        if testtools_version[:3] <= (0, 9, 11):
1066
 
            self.assertContainsRe(runner.stream.getvalue(),
1067
 
                "=+\n"
1068
 
                "FAIL: \\S+\.test_truth\n"
1069
 
                "-+\n"
1070
 
                "(?:.*\n)*"
1071
 
                "No absolute truth\n"
1072
 
                "(?:.*\n)*"
1073
 
                "-+\n"
1074
 
                "Ran 1 test in .*\n"
1075
 
                "\n"
1076
 
                "FAILED \\(failures=1\\)\n\\Z")
1077
 
        else:
1078
 
            self.assertContainsRe(runner.stream.getvalue(),
1079
 
                "=+\n"
1080
 
                "FAIL: \\S+\.test_truth\n"
1081
 
                "-+\n"
1082
 
                "Empty attachments:\n"
1083
 
                "  log\n"
1084
 
                "\n"
1085
 
                "reason: {{{No absolute truth}}}\n"
1086
 
                "-+\n"
1087
 
                "Ran 1 test in .*\n"
1088
 
                "\n"
1089
 
                "FAILED \\(failures=1\\)\n\\Z")
 
1096
        self.assertContainsRe(runner.stream.getvalue(),
 
1097
            "=+\n"
 
1098
            "FAIL: \\S+\.test_truth\n"
 
1099
            "-+\n"
 
1100
            "(?:.*\n)*"
 
1101
            "\\s*(?:Text attachment: )?reason"
 
1102
            "(?:\n-+\n|: {{{)"
 
1103
            "No absolute truth"
 
1104
            "(?:\n-+\n|}}}\n)"
 
1105
            "(?:.*\n)*"
 
1106
            "-+\n"
 
1107
            "Ran 1 test in .*\n"
 
1108
            "\n"
 
1109
            "FAILED \\(failures=1\\)\n\\Z")
1090
1110
 
1091
1111
    def test_result_decorator(self):
1092
1112
        # decorate results
1141
1161
        class SkippedTest(tests.TestCase):
1142
1162
 
1143
1163
            def setUp(self):
1144
 
                tests.TestCase.setUp(self)
 
1164
                super(SkippedTest, self).setUp()
1145
1165
                calls.append('setUp')
1146
1166
                self.addCleanup(self.cleanup)
1147
1167
 
1251
1271
            lambda trace=False: "ascii")
1252
1272
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1253
1273
            FailureWithUnicode("test_log_unicode"))
1254
 
        if testtools_version[:3] > (0, 9, 11):
1255
 
            self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+  \\\\u2606}}}")
1256
 
        else:
1257
 
            self.assertContainsRe(out.getvalue(),
1258
 
                "Text attachment: log\n"
1259
 
                "-+\n"
1260
 
                "\d+\.\d+  \\\\u2606\n"
1261
 
                "-+\n")
 
1274
        self.assertContainsRe(out.getvalue(),
 
1275
            "(?:Text attachment: )?log"
 
1276
            "(?:\n-+\n|: {{{)"
 
1277
            "\d+\.\d+  \\\\u2606"
 
1278
            "(?:\n-+\n|}}}\n)")
1262
1279
 
1263
1280
 
1264
1281
class SampleTestCase(tests.TestCase):
1491
1508
        transport_server.start_server()
1492
1509
        self.addCleanup(transport_server.stop_server)
1493
1510
        t = transport.get_transport_from_url(transport_server.get_url())
1494
 
        bzrdir.BzrDir.create(t.base)
 
1511
        controldir.ControlDir.create(t.base)
1495
1512
        self.assertRaises(errors.BzrError,
1496
 
            bzrdir.BzrDir.open_from_transport, t)
 
1513
            controldir.ControlDir.open_from_transport, t)
1497
1514
        # But if we declare this as safe, we can open the bzrdir.
1498
1515
        self.permit_url(t.base)
1499
1516
        self._bzr_selftest_roots.append(t.base)
1500
 
        bzrdir.BzrDir.open_from_transport(t)
 
1517
        controldir.ControlDir.open_from_transport(t)
1501
1518
 
1502
1519
    def test_requireFeature_available(self):
1503
1520
        """self.requireFeature(available) is a no-op."""
1643
1660
        class Test(tests.TestCase):
1644
1661
 
1645
1662
            def setUp(self):
1646
 
                tests.TestCase.setUp(self)
 
1663
                super(Test, self).setUp()
1647
1664
                self.orig = self.overrideAttr(obj, 'test_attr')
1648
1665
 
1649
1666
            def test_value(self):
1662
1679
        class Test(tests.TestCase):
1663
1680
 
1664
1681
            def setUp(self):
1665
 
                tests.TestCase.setUp(self)
 
1682
                super(Test, self).setUp()
1666
1683
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1667
1684
 
1668
1685
            def test_value(self):
1743
1760
        result = self._run_test('test_fail')
1744
1761
        self.assertEqual(1, len(result.failures))
1745
1762
        result_content = result.failures[0][1]
1746
 
        if testtools_version < (0, 9, 12):
1747
 
            self.assertContainsRe(result_content, 'Text attachment: log')
 
1763
        self.assertContainsRe(result_content,
 
1764
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1748
1765
        self.assertContainsRe(result_content, 'this was a failing test')
1749
1766
 
1750
1767
    def test_error_has_log(self):
1751
1768
        result = self._run_test('test_error')
1752
1769
        self.assertEqual(1, len(result.errors))
1753
1770
        result_content = result.errors[0][1]
1754
 
        if testtools_version < (0, 9, 12):
1755
 
            self.assertContainsRe(result_content, 'Text attachment: log')
 
1771
        self.assertContainsRe(result_content,
 
1772
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1756
1773
        self.assertContainsRe(result_content, 'this test errored')
1757
1774
 
1758
1775
    def test_skip_has_no_log(self):
1777
1794
        result = self._run_test('test_xfail')
1778
1795
        self.assertEqual(1, len(result.expectedFailures))
1779
1796
        result_content = result.expectedFailures[0][1]
1780
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1797
        self.assertNotContainsRe(result_content,
 
1798
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1781
1799
        self.assertNotContainsRe(result_content, 'test with expected failure')
1782
1800
 
1783
1801
    def test_unexpected_success_has_log(self):
1956
1974
    def test_make_branch_and_tree_with_format(self):
1957
1975
        # we should be able to supply a format to make_branch_and_tree
1958
1976
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1959
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
1977
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1960
1978
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1961
1979
 
1962
1980
    def test_make_branch_and_memory_tree(self):
2192
2210
        self.assertNotContainsRe(content, 'test with expected failure')
2193
2211
        self.assertEqual(1, len(result.expectedFailures))
2194
2212
        result_content = result.expectedFailures[0][1]
2195
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2213
        self.assertNotContainsRe(result_content,
 
2214
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2196
2215
        self.assertNotContainsRe(result_content, 'test with expected failure')
2197
2216
 
2198
2217
    def test_unexpected_success_has_log(self):
2393
2412
    """Base class for tests testing how we might run bzr."""
2394
2413
 
2395
2414
    def setUp(self):
2396
 
        tests.TestCaseWithTransport.setUp(self)
 
2415
        super(TestWithFakedStartBzrSubprocess, self).setUp()
2397
2416
        self.subprocess_calls = []
2398
2417
 
2399
2418
    def start_bzr_subprocess(self, process_args, env_changes=None,
2613
2632
class TestSelftestFiltering(tests.TestCase):
2614
2633
 
2615
2634
    def setUp(self):
2616
 
        tests.TestCase.setUp(self)
 
2635
        super(TestSelftestFiltering, self).setUp()
2617
2636
        self.suite = TestUtil.TestSuite()
2618
2637
        self.loader = TestUtil.TestLoader()
2619
2638
        self.suite.addTest(self.loader.loadTestsFromModule(
3314
3333
        self.assertLength(1, calls)
3315
3334
 
3316
3335
 
 
3336
class _Selftest(object):
 
3337
    """Mixin for tests needing full selftest output"""
 
3338
 
 
3339
    def _inject_stream_into_subunit(self, stream):
 
3340
        """To be overridden by subclasses that run tests out of process"""
 
3341
 
 
3342
    def _run_selftest(self, **kwargs):
 
3343
        sio = StringIO()
 
3344
        self._inject_stream_into_subunit(sio)
 
3345
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3346
        return sio.getvalue()
 
3347
 
 
3348
 
 
3349
class _ForkedSelftest(_Selftest):
 
3350
    """Mixin for tests needing full selftest output with forked children"""
 
3351
 
 
3352
    _test_needs_features = [features.subunit]
 
3353
 
 
3354
    def _inject_stream_into_subunit(self, stream):
 
3355
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3356
 
 
3357
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3358
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3359
        """
 
3360
        from subunit import ProtocolTestCase
 
3361
        _original_init = ProtocolTestCase.__init__
 
3362
        def _init_with_passthrough(self, *args, **kwargs):
 
3363
            _original_init(self, *args, **kwargs)
 
3364
            self._passthrough = stream
 
3365
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3366
 
 
3367
    def _run_selftest(self, **kwargs):
 
3368
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3369
        if getattr(os, "fork", None) is None:
 
3370
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3371
        # Make sure the fork code is actually invoked by claiming two cores
 
3372
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3373
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3374
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3375
 
 
3376
 
 
3377
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3378
    """Check operation of --parallel=fork selftest option"""
 
3379
 
 
3380
    def test_error_in_child_during_fork(self):
 
3381
        """Error in a forked child during test setup should get reported"""
 
3382
        class Test(tests.TestCase):
 
3383
            def testMethod(self):
 
3384
                pass
 
3385
        # We don't care what, just break something that a child will run
 
3386
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3387
        out = self._run_selftest(test_suite_factory=Test)
 
3388
        # Lines from the tracebacks of the two child processes may be mixed
 
3389
        # together due to the way subunit parses and forwards the streams,
 
3390
        # so permit extra lines between each part of the error output.
 
3391
        self.assertContainsRe(out,
 
3392
            "Traceback.*:\n"
 
3393
            "(?:.*\n)*"
 
3394
            ".+ in fork_for_tests\n"
 
3395
            "(?:.*\n)*"
 
3396
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3397
            "(?:.*\n)*"
 
3398
            "TypeError:")
 
3399
 
 
3400
 
 
3401
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3402
    """Check a test case still alive after being run emits a warning"""
 
3403
 
 
3404
    class Test(tests.TestCase):
 
3405
        def test_pass(self):
 
3406
            pass
 
3407
        def test_self_ref(self):
 
3408
            self.also_self = self.test_self_ref
 
3409
        def test_skip(self):
 
3410
            self.skip("Don't need")
 
3411
 
 
3412
    def _get_suite(self):
 
3413
        return TestUtil.TestSuite([
 
3414
            self.Test("test_pass"),
 
3415
            self.Test("test_self_ref"),
 
3416
            self.Test("test_skip"),
 
3417
            ])
 
3418
 
 
3419
    def _run_selftest_with_suite(self, **kwargs):
 
3420
        old_flags = tests.selftest_debug_flags
 
3421
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3422
        gc_on = gc.isenabled()
 
3423
        if gc_on:
 
3424
            gc.disable()
 
3425
        try:
 
3426
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3427
                **kwargs)
 
3428
        finally:
 
3429
            if gc_on:
 
3430
                gc.enable()
 
3431
            tests.selftest_debug_flags = old_flags
 
3432
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3433
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3434
        return output
 
3435
 
 
3436
    def test_testsuite(self):
 
3437
        self._run_selftest_with_suite()
 
3438
 
 
3439
    def test_pattern(self):
 
3440
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3441
        self.assertNotContainsRe(out, "test_skip")
 
3442
 
 
3443
    def test_exclude_pattern(self):
 
3444
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3445
        self.assertNotContainsRe(out, "test_skip")
 
3446
 
 
3447
    def test_random_seed(self):
 
3448
        self._run_selftest_with_suite(random_seed="now")
 
3449
 
 
3450
    def test_matching_tests_first(self):
 
3451
        self._run_selftest_with_suite(matching_tests_first=True,
 
3452
            pattern="test_self_ref$")
 
3453
 
 
3454
    def test_starting_with_and_exclude(self):
 
3455
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3456
            exclude_pattern="test_skip$")
 
3457
        self.assertNotContainsRe(out, "test_skip")
 
3458
 
 
3459
    def test_additonal_decorator(self):
 
3460
        out = self._run_selftest_with_suite(
 
3461
            suite_decorators=[tests.TestDecorator])
 
3462
 
 
3463
 
 
3464
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3465
    """Check warnings from tests staying alive are emitted with subunit"""
 
3466
 
 
3467
    _test_needs_features = [features.subunit]
 
3468
 
 
3469
    def _run_selftest_with_suite(self, **kwargs):
 
3470
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3471
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3472
 
 
3473
 
 
3474
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3475
    """Check warnings from tests staying alive are emitted when forking"""
 
3476
 
 
3477
 
3317
3478
class TestEnvironHandling(tests.TestCase):
3318
3479
 
3319
3480
    def test_overrideEnv_None_called_twice_doesnt_leak(self):