~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jonathan Lange
  • Date: 2012-10-23 10:22:25 UTC
  • mto: This revision was merged to the branch mainline in revision 6571.
  • Revision ID: jml@canonical.com-20121023102225-2rg3vuygc9npii7w
NEWS update

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
 
20
import gc
20
21
import doctest
21
22
import os
22
23
import signal
42
43
from bzrlib import (
43
44
    branchbuilder,
44
45
    bzrdir,
 
46
    controldir,
45
47
    errors,
46
48
    hooks,
47
49
    lockdir,
93
95
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
94
96
 
95
97
 
96
 
class TestUnicodeFilename(tests.TestCase):
97
 
 
98
 
    def test_probe_passes(self):
99
 
        """UnicodeFilename._probe passes."""
100
 
        # We can't test much more than that because the behaviour depends
101
 
        # on the platform.
102
 
        tests.UnicodeFilename._probe()
103
 
 
104
 
 
105
98
class TestTreeShape(tests.TestCaseInTempDir):
106
99
 
107
100
    def test_unicode_paths(self):
108
 
        self.requireFeature(tests.UnicodeFilename)
 
101
        self.requireFeature(features.UnicodeFilenameFeature)
109
102
 
110
103
        filename = u'hell\u00d8'
111
104
        self.build_tree_contents([(filename, 'contents of hello')])
342
335
        server1 = "a"
343
336
        server2 = "b"
344
337
        formats = [workingtree_4.WorkingTreeFormat4(),
345
 
                   workingtree_3.WorkingTreeFormat3(),]
346
 
        scenarios = make_scenarios(server1, server2, formats)
 
338
                   workingtree_3.WorkingTreeFormat3(),
 
339
                   workingtree_4.WorkingTreeFormat6()]
 
340
        scenarios = make_scenarios(server1, server2, formats,
 
341
            remote_server='c', remote_readonly_server='d',
 
342
            remote_backing_server='e')
347
343
        self.assertEqual([
348
344
            ('WorkingTreeFormat4',
349
345
             {'bzrdir_format': formats[0]._matchingbzrdir,
354
350
             {'bzrdir_format': formats[1]._matchingbzrdir,
355
351
              'transport_readonly_server': 'b',
356
352
              'transport_server': 'a',
357
 
              'workingtree_format': formats[1]})],
358
 
            scenarios)
 
353
              'workingtree_format': formats[1]}),
 
354
            ('WorkingTreeFormat6',
 
355
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
356
              'transport_readonly_server': 'b',
 
357
              'transport_server': 'a',
 
358
              'workingtree_format': formats[2]}),
 
359
            ('WorkingTreeFormat6,remote',
 
360
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
361
              'repo_is_remote': True,
 
362
              'transport_readonly_server': 'd',
 
363
              'transport_server': 'c',
 
364
              'vfs_transport_factory': 'e',
 
365
              'workingtree_format': formats[2]}),
 
366
            ], scenarios)
359
367
 
360
368
 
361
369
class TestTreeScenarios(tests.TestCase):
362
370
 
363
371
    def test_scenarios(self):
364
372
        # the tree implementation scenario generator is meant to setup one
365
 
        # instance for each working tree format, and one additional instance
 
373
        # instance for each working tree format, one additional instance
366
374
        # that will use the default wt format, but create a revision tree for
367
 
        # the tests.  this means that the wt ones should have the
368
 
        # workingtree_to_test_tree attribute set to 'return_parameter' and the
369
 
        # revision one set to revision_tree_from_workingtree.
 
375
        # the tests, and one more that uses the default wt format as a
 
376
        # lightweight checkout of a remote repository.  This means that the wt
 
377
        # ones should have the workingtree_to_test_tree attribute set to
 
378
        # 'return_parameter' and the revision one set to
 
379
        # revision_tree_from_workingtree.
370
380
 
371
381
        from bzrlib.tests.per_tree import (
372
382
            _dirstate_tree_from_workingtree,
378
388
            )
379
389
        server1 = "a"
380
390
        server2 = "b"
 
391
        smart_server = test_server.SmartTCPServer_for_testing
 
392
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
 
393
        mem_server = memory.MemoryServer
381
394
        formats = [workingtree_4.WorkingTreeFormat4(),
382
395
                   workingtree_3.WorkingTreeFormat3(),]
383
396
        scenarios = make_scenarios(server1, server2, formats)
384
 
        self.assertEqual(7, len(scenarios))
 
397
        self.assertEqual(8, len(scenarios))
385
398
        default_wt_format = workingtree.format_registry.get_default()
386
399
        wt4_format = workingtree_4.WorkingTreeFormat4()
387
400
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
401
        wt6_format = workingtree_4.WorkingTreeFormat6()
388
402
        expected_scenarios = [
389
403
            ('WorkingTreeFormat4',
390
404
             {'bzrdir_format': formats[0]._matchingbzrdir,
400
414
              'workingtree_format': formats[1],
401
415
              '_workingtree_to_test_tree': return_parameter,
402
416
             }),
 
417
            ('WorkingTreeFormat6,remote',
 
418
             {'bzrdir_format': wt6_format._matchingbzrdir,
 
419
              'repo_is_remote': True,
 
420
              'transport_readonly_server': smart_readonly_server,
 
421
              'transport_server': smart_server,
 
422
              'vfs_transport_factory': mem_server,
 
423
              'workingtree_format': wt6_format,
 
424
              '_workingtree_to_test_tree': return_parameter,
 
425
             }),
403
426
            ('RevisionTree',
404
427
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
405
428
              'bzrdir_format': default_wt_format._matchingbzrdir,
617
640
        # Guard against regression into MemoryTransport leaking
618
641
        # files to disk instead of keeping them in memory.
619
642
        self.assertFalse(osutils.lexists('dir'))
620
 
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
643
        dir_format = controldir.format_registry.make_bzrdir('knit')
621
644
        self.assertEqual(dir_format.repository_format.__class__,
622
645
                         the_branch.repository._format.__class__)
623
646
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
627
650
    def test_dangling_locks_cause_failures(self):
628
651
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
629
652
            def test_function(self):
630
 
                t = self.get_transport('.')
 
653
                t = self.get_transport_from_path('.')
631
654
                l = lockdir.LockDir(t, 'lock')
632
655
                l.create()
633
656
                l.attempt_lock()
653
676
        # for the server
654
677
        url = self.get_readonly_url()
655
678
        url2 = self.get_readonly_url('foo/bar')
656
 
        t = transport.get_transport(url)
657
 
        t2 = transport.get_transport(url2)
 
679
        t = transport.get_transport_from_url(url)
 
680
        t2 = transport.get_transport_from_url(url2)
658
681
        self.assertIsInstance(t, ReadonlyTransportDecorator)
659
682
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
660
683
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
668
691
        url = self.get_readonly_url()
669
692
        url2 = self.get_readonly_url('foo/bar')
670
693
        # the transport returned may be any HttpTransportBase subclass
671
 
        t = transport.get_transport(url)
672
 
        t2 = transport.get_transport(url2)
 
694
        t = transport.get_transport_from_url(url)
 
695
        t2 = transport.get_transport_from_url(url2)
673
696
        self.assertIsInstance(t, HttpTransportBase)
674
697
        self.assertIsInstance(t2, HttpTransportBase)
675
698
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
686
709
        builder = self.make_branch_builder('dir')
687
710
        rev_id = builder.build_commit()
688
711
        self.assertPathExists('dir')
689
 
        a_dir = bzrdir.BzrDir.open('dir')
 
712
        a_dir = controldir.ControlDir.open('dir')
690
713
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
691
714
        a_branch = a_dir.open_branch()
692
715
        builder_branch = builder.get_branch()
713
736
class TestChrootedTest(tests.ChrootedTestCase):
714
737
 
715
738
    def test_root_is_root(self):
716
 
        t = transport.get_transport(self.get_readonly_url())
 
739
        t = transport.get_transport_from_url(self.get_readonly_url())
717
740
        url = t.base
718
741
        self.assertEqual(url, t.clone('..').base)
719
742
 
721
744
class TestProfileResult(tests.TestCase):
722
745
 
723
746
    def test_profiles_tests(self):
724
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
747
        self.requireFeature(features.lsprof_feature)
725
748
        terminal = testtools.testresult.doubles.ExtendedTestResult()
726
749
        result = tests.ProfileResult(terminal)
727
750
        class Sample(tests.TestCase):
782
805
 
783
806
    def test_lsprofiling(self):
784
807
        """Verbose test result prints lsprof statistics from test cases."""
785
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
808
        self.requireFeature(features.lsprof_feature)
786
809
        result_stream = StringIO()
787
810
        result = bzrlib.tests.VerboseTestResult(
788
811
            result_stream,
833
856
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
834
857
 
835
858
    def test_known_failure(self):
836
 
        """A KnownFailure being raised should trigger several result actions."""
 
859
        """Using knownFailure should trigger several result actions."""
837
860
        class InstrumentedTestResult(tests.ExtendedTestResult):
838
861
            def stopTestRun(self): pass
839
862
            def report_tests_starting(self): pass
842
865
        result = InstrumentedTestResult(None, None, None, None)
843
866
        class Test(tests.TestCase):
844
867
            def test_function(self):
845
 
                raise tests.KnownFailure('failed!')
 
868
                self.knownFailure('failed!')
846
869
        test = Test("test_function")
847
870
        test.run(result)
848
871
        # it should invoke 'report_known_failure'.
864
887
            descriptions=0,
865
888
            verbosity=2,
866
889
            )
867
 
        test = self.get_passing_test()
868
 
        result.startTest(test)
869
 
        prefix = len(result_stream.getvalue())
870
 
        # the err parameter has the shape:
871
 
        # (class, exception object, traceback)
872
 
        # KnownFailures dont get their tracebacks shown though, so we
873
 
        # can skip that.
874
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
 
        result.report_known_failure(test, err)
876
 
        output = result_stream.getvalue()[prefix:]
877
 
        lines = output.splitlines()
878
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
 
        if sys.version_info > (2, 7):
880
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
 
                self.assertNotEqual, lines[1], '    ')
882
 
        self.assertEqual(lines[1], '    foo')
883
 
        self.assertEqual(2, len(lines))
 
890
        _get_test("test_xfail").run(result)
 
891
        self.assertContainsRe(result_stream.getvalue(),
 
892
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
893
            "\\s*(?:Text attachment: )?reason"
 
894
            "(?:\n-+\n|: {{{)"
 
895
            "this_fails"
 
896
            "(?:\n-+\n|}}}\n)")
884
897
 
885
898
    def get_passing_test(self):
886
899
        """Return a test object that can't be run usefully."""
897
910
                self._call = test, feature
898
911
        result = InstrumentedTestResult(None, None, None, None)
899
912
        test = SampleTestCase('_test_pass')
900
 
        feature = tests.Feature()
 
913
        feature = features.Feature()
901
914
        result.startTest(test)
902
915
        result.addNotSupported(test, feature)
903
916
        # it should invoke 'report_unsupported'.
922
935
            verbosity=2,
923
936
            )
924
937
        test = self.get_passing_test()
925
 
        feature = tests.Feature()
 
938
        feature = features.Feature()
926
939
        result.startTest(test)
927
940
        prefix = len(result_stream.getvalue())
928
941
        result.report_unsupported(test, feature)
941
954
            def addNotSupported(self, test, feature):
942
955
                self._call = test, feature
943
956
        result = InstrumentedTestResult(None, None, None, None)
944
 
        feature = tests.Feature()
 
957
        feature = features.Feature()
945
958
        class Test(tests.TestCase):
946
959
            def test_function(self):
947
960
                raise tests.UnavailableFeature(feature)
966
979
    def test_strict_with_known_failure(self):
967
980
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
968
981
                                             verbosity=1)
969
 
        test = self.get_passing_test()
970
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
971
 
        result.addExpectedFailure(test, err)
 
982
        test = _get_test("test_xfail")
 
983
        test.run(result)
972
984
        self.assertFalse(result.wasStrictlySuccessful())
973
985
        self.assertEqual(None, result._extractBenchmarkTime(test))
974
986
 
1006
1018
        self.assertEquals(2, result.count)
1007
1019
 
1008
1020
 
1009
 
class TestUnicodeFilenameFeature(tests.TestCase):
1010
 
 
1011
 
    def test_probe_passes(self):
1012
 
        """UnicodeFilenameFeature._probe passes."""
1013
 
        # We can't test much more than that because the behaviour depends
1014
 
        # on the platform.
1015
 
        tests.UnicodeFilenameFeature._probe()
1016
 
 
1017
 
 
1018
1021
class TestRunner(tests.TestCase):
1019
1022
 
1020
1023
    def dummy_test(self):
1095
1098
            "FAIL: \\S+\.test_truth\n"
1096
1099
            "-+\n"
1097
1100
            "(?:.*\n)*"
1098
 
            "No absolute truth\n"
 
1101
            "\\s*(?:Text attachment: )?reason"
 
1102
            "(?:\n-+\n|: {{{)"
 
1103
            "No absolute truth"
 
1104
            "(?:\n-+\n|}}}\n)"
1099
1105
            "(?:.*\n)*"
1100
1106
            "-+\n"
1101
1107
            "Ran 1 test in .*\n"
1155
1161
        class SkippedTest(tests.TestCase):
1156
1162
 
1157
1163
            def setUp(self):
1158
 
                tests.TestCase.setUp(self)
 
1164
                super(SkippedTest, self).setUp()
1159
1165
                calls.append('setUp')
1160
1166
                self.addCleanup(self.cleanup)
1161
1167
 
1191
1197
 
1192
1198
    def test_unsupported_features_listed(self):
1193
1199
        """When unsupported features are encountered they are detailed."""
1194
 
        class Feature1(tests.Feature):
 
1200
        class Feature1(features.Feature):
1195
1201
            def _probe(self): return False
1196
 
        class Feature2(tests.Feature):
 
1202
        class Feature2(features.Feature):
1197
1203
            def _probe(self): return False
1198
1204
        # create sample tests
1199
1205
        test1 = SampleTestCase('_test_pass')
1266
1272
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1267
1273
            FailureWithUnicode("test_log_unicode"))
1268
1274
        self.assertContainsRe(out.getvalue(),
1269
 
            "Text attachment: log\n"
1270
 
            "-+\n"
1271
 
            "\d+\.\d+  \\\\u2606\n"
1272
 
            "-+\n")
 
1275
            "(?:Text attachment: )?log"
 
1276
            "(?:\n-+\n|: {{{)"
 
1277
            "\d+\.\d+  \\\\u2606"
 
1278
            "(?:\n-+\n|}}}\n)")
1273
1279
 
1274
1280
 
1275
1281
class SampleTestCase(tests.TestCase):
1476
1482
 
1477
1483
        Each self.time() call is individually and separately profiled.
1478
1484
        """
1479
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1485
        self.requireFeature(features.lsprof_feature)
1480
1486
        # overrides the class member with an instance member so no cleanup
1481
1487
        # needed.
1482
1488
        self._gather_lsprof_in_benchmarks = True
1501
1507
        transport_server = memory.MemoryServer()
1502
1508
        transport_server.start_server()
1503
1509
        self.addCleanup(transport_server.stop_server)
1504
 
        t = transport.get_transport(transport_server.get_url())
1505
 
        bzrdir.BzrDir.create(t.base)
 
1510
        t = transport.get_transport_from_url(transport_server.get_url())
 
1511
        controldir.ControlDir.create(t.base)
1506
1512
        self.assertRaises(errors.BzrError,
1507
 
            bzrdir.BzrDir.open_from_transport, t)
 
1513
            controldir.ControlDir.open_from_transport, t)
1508
1514
        # But if we declare this as safe, we can open the bzrdir.
1509
1515
        self.permit_url(t.base)
1510
1516
        self._bzr_selftest_roots.append(t.base)
1511
 
        bzrdir.BzrDir.open_from_transport(t)
 
1517
        controldir.ControlDir.open_from_transport(t)
1512
1518
 
1513
1519
    def test_requireFeature_available(self):
1514
1520
        """self.requireFeature(available) is a no-op."""
1515
 
        class Available(tests.Feature):
 
1521
        class Available(features.Feature):
1516
1522
            def _probe(self):return True
1517
1523
        feature = Available()
1518
1524
        self.requireFeature(feature)
1519
1525
 
1520
1526
    def test_requireFeature_unavailable(self):
1521
1527
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1522
 
        class Unavailable(tests.Feature):
 
1528
        class Unavailable(features.Feature):
1523
1529
            def _probe(self):return False
1524
1530
        feature = Unavailable()
1525
1531
        self.assertRaises(tests.UnavailableFeature,
1654
1660
        class Test(tests.TestCase):
1655
1661
 
1656
1662
            def setUp(self):
1657
 
                tests.TestCase.setUp(self)
 
1663
                super(Test, self).setUp()
1658
1664
                self.orig = self.overrideAttr(obj, 'test_attr')
1659
1665
 
1660
1666
            def test_value(self):
1673
1679
        class Test(tests.TestCase):
1674
1680
 
1675
1681
            def setUp(self):
1676
 
                tests.TestCase.setUp(self)
 
1682
                super(Test, self).setUp()
1677
1683
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1678
1684
 
1679
1685
            def test_value(self):
1684
1690
        test.run(unittest.TestResult())
1685
1691
        self.assertEqual('original', obj.test_attr)
1686
1692
 
1687
 
 
1688
 
class _MissingFeature(tests.Feature):
 
1693
    def test_recordCalls(self):
 
1694
        from bzrlib.tests import test_selftest
 
1695
        calls = self.recordCalls(
 
1696
            test_selftest, '_add_numbers')
 
1697
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1698
            12)
 
1699
        self.assertEquals(calls, [((2, 10), {})])
 
1700
 
 
1701
 
 
1702
def _add_numbers(a, b):
 
1703
    return a + b
 
1704
 
 
1705
 
 
1706
class _MissingFeature(features.Feature):
1689
1707
    def _probe(self):
1690
1708
        return False
1691
1709
missing_feature = _MissingFeature()
1742
1760
        result = self._run_test('test_fail')
1743
1761
        self.assertEqual(1, len(result.failures))
1744
1762
        result_content = result.failures[0][1]
1745
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1763
        self.assertContainsRe(result_content,
 
1764
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1746
1765
        self.assertContainsRe(result_content, 'this was a failing test')
1747
1766
 
1748
1767
    def test_error_has_log(self):
1749
1768
        result = self._run_test('test_error')
1750
1769
        self.assertEqual(1, len(result.errors))
1751
1770
        result_content = result.errors[0][1]
1752
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1771
        self.assertContainsRe(result_content,
 
1772
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1753
1773
        self.assertContainsRe(result_content, 'this test errored')
1754
1774
 
1755
1775
    def test_skip_has_no_log(self):
1774
1794
        result = self._run_test('test_xfail')
1775
1795
        self.assertEqual(1, len(result.expectedFailures))
1776
1796
        result_content = result.expectedFailures[0][1]
1777
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1797
        self.assertNotContainsRe(result_content,
 
1798
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1778
1799
        self.assertNotContainsRe(result_content, 'test with expected failure')
1779
1800
 
1780
1801
    def test_unexpected_success_has_log(self):
1953
1974
    def test_make_branch_and_tree_with_format(self):
1954
1975
        # we should be able to supply a format to make_branch_and_tree
1955
1976
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1956
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
1977
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1957
1978
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1958
1979
 
1959
1980
    def test_make_branch_and_memory_tree(self):
2037
2058
        self.assertLength(2, output.readlines())
2038
2059
 
2039
2060
    def test_lsprof_tests(self):
2040
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
2061
        self.requireFeature(features.lsprof_feature)
2041
2062
        results = []
2042
2063
        class Test(object):
2043
2064
            def __call__(test, result):
2189
2210
        self.assertNotContainsRe(content, 'test with expected failure')
2190
2211
        self.assertEqual(1, len(result.expectedFailures))
2191
2212
        result_content = result.expectedFailures[0][1]
2192
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2213
        self.assertNotContainsRe(result_content,
 
2214
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2193
2215
        self.assertNotContainsRe(result_content, 'test with expected failure')
2194
2216
 
2195
2217
    def test_unexpected_success_has_log(self):
2390
2412
    """Base class for tests testing how we might run bzr."""
2391
2413
 
2392
2414
    def setUp(self):
2393
 
        tests.TestCaseWithTransport.setUp(self)
 
2415
        super(TestWithFakedStartBzrSubprocess, self).setUp()
2394
2416
        self.subprocess_calls = []
2395
2417
 
2396
2418
    def start_bzr_subprocess(self, process_args, env_changes=None,
2607
2629
        self.assertEqual('bzr: interrupted\n', result[1])
2608
2630
 
2609
2631
 
2610
 
class TestFeature(tests.TestCase):
2611
 
 
2612
 
    def test_caching(self):
2613
 
        """Feature._probe is called by the feature at most once."""
2614
 
        class InstrumentedFeature(tests.Feature):
2615
 
            def __init__(self):
2616
 
                super(InstrumentedFeature, self).__init__()
2617
 
                self.calls = []
2618
 
            def _probe(self):
2619
 
                self.calls.append('_probe')
2620
 
                return False
2621
 
        feature = InstrumentedFeature()
2622
 
        feature.available()
2623
 
        self.assertEqual(['_probe'], feature.calls)
2624
 
        feature.available()
2625
 
        self.assertEqual(['_probe'], feature.calls)
2626
 
 
2627
 
    def test_named_str(self):
2628
 
        """Feature.__str__ should thunk to feature_name()."""
2629
 
        class NamedFeature(tests.Feature):
2630
 
            def feature_name(self):
2631
 
                return 'symlinks'
2632
 
        feature = NamedFeature()
2633
 
        self.assertEqual('symlinks', str(feature))
2634
 
 
2635
 
    def test_default_str(self):
2636
 
        """Feature.__str__ should default to __class__.__name__."""
2637
 
        class NamedFeature(tests.Feature):
2638
 
            pass
2639
 
        feature = NamedFeature()
2640
 
        self.assertEqual('NamedFeature', str(feature))
2641
 
 
2642
 
 
2643
 
class TestUnavailableFeature(tests.TestCase):
2644
 
 
2645
 
    def test_access_feature(self):
2646
 
        feature = tests.Feature()
2647
 
        exception = tests.UnavailableFeature(feature)
2648
 
        self.assertIs(feature, exception.args[0])
2649
 
 
2650
 
 
2651
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2652
 
    deprecated_in((2, 1, 0)),
2653
 
    'bzrlib.tests.test_selftest',
2654
 
    'simple_thunk_feature','UnicodeFilename',
2655
 
    replacement_module='bzrlib.tests'
2656
 
    )
2657
 
 
2658
 
class Test_CompatibilityFeature(tests.TestCase):
2659
 
 
2660
 
    def test_does_thunk(self):
2661
 
        res = self.callDeprecated(
2662
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2663
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2664
 
            simple_thunk_feature.available)
2665
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2666
 
 
2667
 
 
2668
 
class TestModuleAvailableFeature(tests.TestCase):
2669
 
 
2670
 
    def test_available_module(self):
2671
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2672
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2673
 
        self.assertEqual('bzrlib.tests', str(feature))
2674
 
        self.assertTrue(feature.available())
2675
 
        self.assertIs(tests, feature.module)
2676
 
 
2677
 
    def test_unavailable_module(self):
2678
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2679
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2680
 
        self.assertFalse(feature.available())
2681
 
        self.assertIs(None, feature.module)
2682
 
 
2683
 
 
2684
2632
class TestSelftestFiltering(tests.TestCase):
2685
2633
 
2686
2634
    def setUp(self):
2687
 
        tests.TestCase.setUp(self)
 
2635
        super(TestSelftestFiltering, self).setUp()
2688
2636
        self.suite = TestUtil.TestSuite()
2689
2637
        self.loader = TestUtil.TestLoader()
2690
2638
        self.suite.addTest(self.loader.loadTestsFromModule(
3385
3333
        self.assertLength(1, calls)
3386
3334
 
3387
3335
 
 
3336
class _Selftest(object):
 
3337
    """Mixin for tests needing full selftest output"""
 
3338
 
 
3339
    def _inject_stream_into_subunit(self, stream):
 
3340
        """To be overridden by subclasses that run tests out of process"""
 
3341
 
 
3342
    def _run_selftest(self, **kwargs):
 
3343
        sio = StringIO()
 
3344
        self._inject_stream_into_subunit(sio)
 
3345
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3346
        return sio.getvalue()
 
3347
 
 
3348
 
 
3349
class _ForkedSelftest(_Selftest):
 
3350
    """Mixin for tests needing full selftest output with forked children"""
 
3351
 
 
3352
    _test_needs_features = [features.subunit]
 
3353
 
 
3354
    def _inject_stream_into_subunit(self, stream):
 
3355
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3356
 
 
3357
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3358
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3359
        """
 
3360
        from subunit import ProtocolTestCase
 
3361
        _original_init = ProtocolTestCase.__init__
 
3362
        def _init_with_passthrough(self, *args, **kwargs):
 
3363
            _original_init(self, *args, **kwargs)
 
3364
            self._passthrough = stream
 
3365
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3366
 
 
3367
    def _run_selftest(self, **kwargs):
 
3368
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3369
        if getattr(os, "fork", None) is None:
 
3370
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3371
        # Make sure the fork code is actually invoked by claiming two cores
 
3372
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3373
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3374
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3375
 
 
3376
 
 
3377
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3378
    """Check operation of --parallel=fork selftest option"""
 
3379
 
 
3380
    def test_error_in_child_during_fork(self):
 
3381
        """Error in a forked child during test setup should get reported"""
 
3382
        class Test(tests.TestCase):
 
3383
            def testMethod(self):
 
3384
                pass
 
3385
        # We don't care what, just break something that a child will run
 
3386
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3387
        out = self._run_selftest(test_suite_factory=Test)
 
3388
        # Lines from the tracebacks of the two child processes may be mixed
 
3389
        # together due to the way subunit parses and forwards the streams,
 
3390
        # so permit extra lines between each part of the error output.
 
3391
        self.assertContainsRe(out,
 
3392
            "Traceback.*:\n"
 
3393
            "(?:.*\n)*"
 
3394
            ".+ in fork_for_tests\n"
 
3395
            "(?:.*\n)*"
 
3396
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3397
            "(?:.*\n)*"
 
3398
            "TypeError:")
 
3399
 
 
3400
 
 
3401
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3402
    """Check a test case still alive after being run emits a warning"""
 
3403
 
 
3404
    class Test(tests.TestCase):
 
3405
        def test_pass(self):
 
3406
            pass
 
3407
        def test_self_ref(self):
 
3408
            self.also_self = self.test_self_ref
 
3409
        def test_skip(self):
 
3410
            self.skip("Don't need")
 
3411
 
 
3412
    def _get_suite(self):
 
3413
        return TestUtil.TestSuite([
 
3414
            self.Test("test_pass"),
 
3415
            self.Test("test_self_ref"),
 
3416
            self.Test("test_skip"),
 
3417
            ])
 
3418
 
 
3419
    def _run_selftest_with_suite(self, **kwargs):
 
3420
        old_flags = tests.selftest_debug_flags
 
3421
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3422
        gc_on = gc.isenabled()
 
3423
        if gc_on:
 
3424
            gc.disable()
 
3425
        try:
 
3426
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3427
                **kwargs)
 
3428
        finally:
 
3429
            if gc_on:
 
3430
                gc.enable()
 
3431
            tests.selftest_debug_flags = old_flags
 
3432
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3433
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3434
        return output
 
3435
 
 
3436
    def test_testsuite(self):
 
3437
        self._run_selftest_with_suite()
 
3438
 
 
3439
    def test_pattern(self):
 
3440
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3441
        self.assertNotContainsRe(out, "test_skip")
 
3442
 
 
3443
    def test_exclude_pattern(self):
 
3444
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3445
        self.assertNotContainsRe(out, "test_skip")
 
3446
 
 
3447
    def test_random_seed(self):
 
3448
        self._run_selftest_with_suite(random_seed="now")
 
3449
 
 
3450
    def test_matching_tests_first(self):
 
3451
        self._run_selftest_with_suite(matching_tests_first=True,
 
3452
            pattern="test_self_ref$")
 
3453
 
 
3454
    def test_starting_with_and_exclude(self):
 
3455
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3456
            exclude_pattern="test_skip$")
 
3457
        self.assertNotContainsRe(out, "test_skip")
 
3458
 
 
3459
    def test_additonal_decorator(self):
 
3460
        out = self._run_selftest_with_suite(
 
3461
            suite_decorators=[tests.TestDecorator])
 
3462
 
 
3463
 
 
3464
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3465
    """Check warnings from tests staying alive are emitted with subunit"""
 
3466
 
 
3467
    _test_needs_features = [features.subunit]
 
3468
 
 
3469
    def _run_selftest_with_suite(self, **kwargs):
 
3470
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3471
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3472
 
 
3473
 
 
3474
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3475
    """Check warnings from tests staying alive are emitted when forking"""
 
3476
 
 
3477
 
3388
3478
class TestEnvironHandling(tests.TestCase):
3389
3479
 
3390
3480
    def test_overrideEnv_None_called_twice_doesnt_leak(self):