~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2011-01-14 00:58:16 UTC
  • mto: (5582.12.2 weave-plugin)
  • mto: This revision was merged to the branch mainline in revision 5718.
  • Revision ID: jelmer@samba.org-20110114005816-b3g5xigfiy20s29y
Fix imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
52
55
    )
53
56
from bzrlib.repofmt import (
54
57
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
58
    )
58
59
from bzrlib.symbol_versioning import (
59
60
    deprecated_function,
64
65
    features,
65
66
    test_lsprof,
66
67
    test_server,
67
 
    test_sftp_transport,
68
68
    TestUtil,
69
69
    )
70
 
from bzrlib.trace import note
 
70
from bzrlib.trace import note, mutter
71
71
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
72
 
74
73
 
75
74
def _test_ids(test_suite):
77
76
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
77
 
79
78
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
79
class MetaTestLog(tests.TestCase):
93
80
 
94
81
    def test_logging(self):
100
87
            "text", "plain", {"charset": "utf8"})))
101
88
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
89
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
90
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
91
 
105
92
 
106
93
class TestUnicodeFilename(tests.TestCase):
122
109
        self.failUnlessExists(filename)
123
110
 
124
111
 
 
112
class TestClassesAvailable(tests.TestCase):
 
113
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
114
 
 
115
    def test_test_case(self):
 
116
        from bzrlib.tests import TestCase
 
117
 
 
118
    def test_test_loader(self):
 
119
        from bzrlib.tests import TestLoader
 
120
 
 
121
    def test_test_suite(self):
 
122
        from bzrlib.tests import TestSuite
 
123
 
 
124
 
125
125
class TestTransportScenarios(tests.TestCase):
126
126
    """A group of tests that test the transport implementation adaption core.
127
127
 
312
312
        from bzrlib.tests.per_interrepository import make_scenarios
313
313
        server1 = "a"
314
314
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
315
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
316
        scenarios = make_scenarios(server1, server2, formats)
317
317
        self.assertEqual([
318
318
            ('C0,str,str',
319
319
             {'repository_format': 'C1',
320
320
              'repository_format_to': 'C2',
321
321
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
322
              'transport_server': 'a',
 
323
              'extra_setup': 'C3'}),
323
324
            ('D0,str,str',
324
325
             {'repository_format': 'D1',
325
326
              'repository_format_to': 'D2',
326
327
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
328
              'transport_server': 'a',
 
329
              'extra_setup': 'D3'})],
328
330
            scenarios)
329
331
 
330
332
 
336
338
        from bzrlib.tests.per_workingtree import make_scenarios
337
339
        server1 = "a"
338
340
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
 
341
        from bzrlib.plugins.weave_fmt.workingtree import WorkingTreeFormat2
 
342
        formats = [WorkingTreeFormat2(),
340
343
                   workingtree.WorkingTreeFormat3(),]
341
344
        scenarios = make_scenarios(server1, server2, formats)
342
345
        self.assertEqual([
371
374
            return_parameter,
372
375
            revision_tree_from_workingtree
373
376
            )
 
377
        from bzrlib.plugins.weave_fmt.workingtree import WorkingTreeFormat2
374
378
        server1 = "a"
375
379
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
 
380
        formats = [WorkingTreeFormat2(),
377
381
                   workingtree.WorkingTreeFormat3(),]
378
382
        scenarios = make_scenarios(server1, server2, formats)
379
383
        self.assertEqual(7, len(scenarios))
447
451
        # ones to add.
448
452
        from bzrlib.tests.per_tree import (
449
453
            return_parameter,
450
 
            revision_tree_from_workingtree
451
454
            )
452
455
        from bzrlib.tests.per_intertree import (
453
456
            make_scenarios,
454
457
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
458
        from bzrlib.plugins.weave_fmt.workingtree import WorkingTreeFormat2
 
459
        from bzrlib.workingtree import WorkingTreeFormat3
456
460
        input_test = TestInterTreeScenarios(
457
461
            "test_scenarios")
458
462
        server1 = "a"
554
558
    def test_make_branch_and_memory_tree_with_format(self):
555
559
        """make_branch_and_memory_tree should accept a format option."""
556
560
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
561
        format.repository_format = repository.RepositoryFormat.get_default_format()
558
562
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
563
        # Guard against regression into MemoryTransport leaking
560
564
        # files to disk instead of keeping them in memory.
574
578
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
579
        # that the format objects are used.
576
580
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
581
        repo_format = repository.RepositoryFormat.get_default_format()
578
582
        format.repository_format = repo_format
579
583
        builder = self.make_branch_builder('dir', format=format)
580
584
        the_branch = builder.get_branch()
611
615
        result = test.run()
612
616
        total_failures = result.errors + result.failures
613
617
        if self._lock_check_thorough:
614
 
            self.assertLength(1, total_failures)
 
618
            self.assertEqual(1, len(total_failures))
615
619
        else:
616
620
            # When _lock_check_thorough is disabled, then we don't trigger a
617
621
            # failure
618
 
            self.assertLength(0, total_failures)
 
622
            self.assertEqual(0, len(total_failures))
619
623
 
620
624
 
621
625
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
833
837
        self.assertContainsRe(output,
834
838
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
835
839
 
 
840
    def test_uses_time_from_testtools(self):
 
841
        """Test case timings in verbose results should use testtools times"""
 
842
        import datetime
 
843
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
844
            def startTest(self, test):
 
845
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
846
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
847
            def addSuccess(self, test):
 
848
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
849
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
850
            def report_tests_starting(self): pass
 
851
        sio = StringIO()
 
852
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
853
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
854
 
836
855
    def test_known_failure(self):
837
856
        """A KnownFailure being raised should trigger several result actions."""
838
857
        class InstrumentedTestResult(tests.ExtendedTestResult):
839
858
            def stopTestRun(self): pass
840
 
            def startTests(self): pass
841
 
            def report_test_start(self, test): pass
 
859
            def report_tests_starting(self): pass
842
860
            def report_known_failure(self, test, err=None, details=None):
843
861
                self._call = test, 'known failure'
844
862
        result = InstrumentedTestResult(None, None, None, None)
894
912
        """Test the behaviour of invoking addNotSupported."""
895
913
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
914
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
915
            def report_tests_starting(self): pass
899
916
            def report_unsupported(self, test, feature):
900
917
                self._call = test, feature
901
918
        result = InstrumentedTestResult(None, None, None, None)
940
957
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
958
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
959
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
960
            def report_tests_starting(self): pass
945
961
            def addNotSupported(self, test, feature):
946
962
                self._call = test, feature
947
963
        result = InstrumentedTestResult(None, None, None, None)
989
1005
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
1006
            calls = 0
991
1007
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
1008
        result = InstrumentedTestResult(None, None, None, None)
994
1009
        def test_function():
995
1010
            pass
997
1012
        test.run(result)
998
1013
        self.assertEquals(1, result.calls)
999
1014
 
 
1015
    def test_startTests_only_once(self):
 
1016
        """With multiple tests startTests should still only be called once"""
 
1017
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1018
            calls = 0
 
1019
            def startTests(self): self.calls += 1
 
1020
        result = InstrumentedTestResult(None, None, None, None)
 
1021
        suite = unittest.TestSuite([
 
1022
            unittest.FunctionTestCase(lambda: None),
 
1023
            unittest.FunctionTestCase(lambda: None)])
 
1024
        suite.run(result)
 
1025
        self.assertEquals(1, result.calls)
 
1026
        self.assertEquals(2, result.count)
 
1027
 
1000
1028
 
1001
1029
class TestUnicodeFilenameFeature(tests.TestCase):
1002
1030
 
1023
1051
        because of our use of global state.
1024
1052
        """
1025
1053
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1054
        try:
1028
1055
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1056
            return testrunner.run(test)
1031
1057
        finally:
1032
1058
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1059
 
1035
1060
    def test_known_failure_failed_run(self):
1036
1061
        # run a test that generates a known failure which should be printed in
1082
1107
    def test_result_decorator(self):
1083
1108
        # decorate results
1084
1109
        calls = []
1085
 
        class LoggingDecorator(tests.ForwardingResult):
 
1110
        class LoggingDecorator(ExtendedToOriginalDecorator):
1086
1111
            def startTest(self, test):
1087
 
                tests.ForwardingResult.startTest(self, test)
 
1112
                ExtendedToOriginalDecorator.startTest(self, test)
1088
1113
                calls.append('start')
1089
1114
        test = unittest.FunctionTestCase(lambda:None)
1090
1115
        stream = StringIO()
1214
1239
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1240
        self.assertLength(1, self._get_source_tree_calls)
1216
1241
 
 
1242
    def test_verbose_test_count(self):
 
1243
        """A verbose test run reports the right test count at the start"""
 
1244
        suite = TestUtil.TestSuite([
 
1245
            unittest.FunctionTestCase(lambda:None),
 
1246
            unittest.FunctionTestCase(lambda:None)])
 
1247
        self.assertEqual(suite.countTestCases(), 2)
 
1248
        stream = StringIO()
 
1249
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1250
        # Need to use the CountingDecorator as that's what sets num_tests
 
1251
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1252
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1253
 
1217
1254
    def test_startTestRun(self):
1218
1255
        """run should call result.startTestRun()"""
1219
1256
        calls = []
1220
 
        class LoggingDecorator(tests.ForwardingResult):
 
1257
        class LoggingDecorator(ExtendedToOriginalDecorator):
1221
1258
            def startTestRun(self):
1222
 
                tests.ForwardingResult.startTestRun(self)
 
1259
                ExtendedToOriginalDecorator.startTestRun(self)
1223
1260
                calls.append('startTestRun')
1224
1261
        test = unittest.FunctionTestCase(lambda:None)
1225
1262
        stream = StringIO()
1231
1268
    def test_stopTestRun(self):
1232
1269
        """run should call result.stopTestRun()"""
1233
1270
        calls = []
1234
 
        class LoggingDecorator(tests.ForwardingResult):
 
1271
        class LoggingDecorator(ExtendedToOriginalDecorator):
1235
1272
            def stopTestRun(self):
1236
 
                tests.ForwardingResult.stopTestRun(self)
 
1273
                ExtendedToOriginalDecorator.stopTestRun(self)
1237
1274
                calls.append('stopTestRun')
1238
1275
        test = unittest.FunctionTestCase(lambda:None)
1239
1276
        stream = StringIO()
1242
1279
        result = self.run_test_runner(runner, test)
1243
1280
        self.assertLength(1, calls)
1244
1281
 
 
1282
    def test_unicode_test_output_on_ascii_stream(self):
 
1283
        """Showing results should always succeed even on an ascii console"""
 
1284
        class FailureWithUnicode(tests.TestCase):
 
1285
            def test_log_unicode(self):
 
1286
                self.log(u"\u2606")
 
1287
                self.fail("Now print that log!")
 
1288
        out = StringIO()
 
1289
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1290
            lambda trace=False: "ascii")
 
1291
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1292
            FailureWithUnicode("test_log_unicode"))
 
1293
        self.assertContainsRe(out.getvalue(),
 
1294
            "Text attachment: log\n"
 
1295
            "-+\n"
 
1296
            "\d+\.\d+  \\\\u2606\n"
 
1297
            "-+\n")
 
1298
 
1245
1299
 
1246
1300
class SampleTestCase(tests.TestCase):
1247
1301
 
1656
1710
        self.assertEqual('original', obj.test_attr)
1657
1711
 
1658
1712
 
 
1713
class _MissingFeature(tests.Feature):
 
1714
    def _probe(self):
 
1715
        return False
 
1716
missing_feature = _MissingFeature()
 
1717
 
 
1718
 
 
1719
def _get_test(name):
 
1720
    """Get an instance of a specific example test.
 
1721
 
 
1722
    We protect this in a function so that they don't auto-run in the test
 
1723
    suite.
 
1724
    """
 
1725
 
 
1726
    class ExampleTests(tests.TestCase):
 
1727
 
 
1728
        def test_fail(self):
 
1729
            mutter('this was a failing test')
 
1730
            self.fail('this test will fail')
 
1731
 
 
1732
        def test_error(self):
 
1733
            mutter('this test errored')
 
1734
            raise RuntimeError('gotcha')
 
1735
 
 
1736
        def test_missing_feature(self):
 
1737
            mutter('missing the feature')
 
1738
            self.requireFeature(missing_feature)
 
1739
 
 
1740
        def test_skip(self):
 
1741
            mutter('this test will be skipped')
 
1742
            raise tests.TestSkipped('reason')
 
1743
 
 
1744
        def test_success(self):
 
1745
            mutter('this test succeeds')
 
1746
 
 
1747
        def test_xfail(self):
 
1748
            mutter('test with expected failure')
 
1749
            self.knownFailure('this_fails')
 
1750
 
 
1751
        def test_unexpected_success(self):
 
1752
            mutter('test with unexpected success')
 
1753
            self.expectFailure('should_fail', lambda: None)
 
1754
 
 
1755
    return ExampleTests(name)
 
1756
 
 
1757
 
 
1758
class TestTestCaseLogDetails(tests.TestCase):
 
1759
 
 
1760
    def _run_test(self, test_name):
 
1761
        test = _get_test(test_name)
 
1762
        result = testtools.TestResult()
 
1763
        test.run(result)
 
1764
        return result
 
1765
 
 
1766
    def test_fail_has_log(self):
 
1767
        result = self._run_test('test_fail')
 
1768
        self.assertEqual(1, len(result.failures))
 
1769
        result_content = result.failures[0][1]
 
1770
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1771
        self.assertContainsRe(result_content, 'this was a failing test')
 
1772
 
 
1773
    def test_error_has_log(self):
 
1774
        result = self._run_test('test_error')
 
1775
        self.assertEqual(1, len(result.errors))
 
1776
        result_content = result.errors[0][1]
 
1777
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1778
        self.assertContainsRe(result_content, 'this test errored')
 
1779
 
 
1780
    def test_skip_has_no_log(self):
 
1781
        result = self._run_test('test_skip')
 
1782
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1783
        skips = result.skip_reasons['reason']
 
1784
        self.assertEqual(1, len(skips))
 
1785
        test = skips[0]
 
1786
        self.assertFalse('log' in test.getDetails())
 
1787
 
 
1788
    def test_missing_feature_has_no_log(self):
 
1789
        # testtools doesn't know about addNotSupported, so it just gets
 
1790
        # considered as a skip
 
1791
        result = self._run_test('test_missing_feature')
 
1792
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1793
        skips = result.skip_reasons[missing_feature]
 
1794
        self.assertEqual(1, len(skips))
 
1795
        test = skips[0]
 
1796
        self.assertFalse('log' in test.getDetails())
 
1797
 
 
1798
    def test_xfail_has_no_log(self):
 
1799
        result = self._run_test('test_xfail')
 
1800
        self.assertEqual(1, len(result.expectedFailures))
 
1801
        result_content = result.expectedFailures[0][1]
 
1802
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1803
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1804
 
 
1805
    def test_unexpected_success_has_log(self):
 
1806
        result = self._run_test('test_unexpected_success')
 
1807
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1808
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1809
        # expectedFailures is a list of reasons?
 
1810
        test = result.unexpectedSuccesses[0]
 
1811
        details = test.getDetails()
 
1812
        self.assertTrue('log' in details)
 
1813
 
 
1814
 
 
1815
class TestTestCloning(tests.TestCase):
 
1816
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1817
 
 
1818
    def test_cloned_testcase_does_not_share_details(self):
 
1819
        """A TestCase cloned with clone_test does not share mutable attributes
 
1820
        such as details or cleanups.
 
1821
        """
 
1822
        class Test(tests.TestCase):
 
1823
            def test_foo(self):
 
1824
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1825
        orig_test = Test('test_foo')
 
1826
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1827
        orig_test.run(unittest.TestResult())
 
1828
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1829
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1830
 
 
1831
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1832
        """Applying two levels of scenarios to a test preserves the attributes
 
1833
        added by both scenarios.
 
1834
        """
 
1835
        class Test(tests.TestCase):
 
1836
            def test_foo(self):
 
1837
                pass
 
1838
        test = Test('test_foo')
 
1839
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1840
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1841
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1842
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1843
        all_tests = list(tests.iter_suite_tests(suite))
 
1844
        self.assertLength(4, all_tests)
 
1845
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1846
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1847
 
 
1848
 
1659
1849
# NB: Don't delete this; it's not actually from 0.11!
1660
1850
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1851
def sample_deprecated_function():
1788
1978
    def test_make_branch_and_tree_with_format(self):
1789
1979
        # we should be able to supply a format to make_branch_and_tree
1790
1980
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1791
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
 
1981
        from bzrlib.plugins.weave_fmt.bzrdir import BzrDirFormat6
 
1982
        self.make_branch_and_tree('b', format=BzrDirFormat6())
1792
1983
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1793
1984
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1794
1985
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1795
 
                              bzrlib.bzrdir.BzrDirFormat6)
 
1986
                              BzrDirFormat6)
1796
1987
 
1797
1988
    def test_make_branch_and_memory_tree(self):
1798
1989
        # we should be able to get a new branch and a mutable tree from
1815
2006
                tree.branch.repository.bzrdir.root_transport)
1816
2007
 
1817
2008
 
1818
 
class SelfTestHelper:
 
2009
class SelfTestHelper(object):
1819
2010
 
1820
2011
    def run_selftest(self, **kwargs):
1821
2012
        """Run selftest returning its output."""
1881
2072
            def __call__(test, result):
1882
2073
                test.run(result)
1883
2074
            def run(test, result):
1884
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2075
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1885
2076
                calls.append("called")
1886
2077
            def countTestCases(self):
1887
2078
                return 1
1972
2163
            load_list='missing file name', list_only=True)
1973
2164
 
1974
2165
 
 
2166
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2167
 
 
2168
    _test_needs_features = [features.subunit]
 
2169
 
 
2170
    def run_subunit_stream(self, test_name):
 
2171
        from subunit import ProtocolTestCase
 
2172
        def factory():
 
2173
            return TestUtil.TestSuite([_get_test(test_name)])
 
2174
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2175
            test_suite_factory=factory)
 
2176
        test = ProtocolTestCase(stream)
 
2177
        result = testtools.TestResult()
 
2178
        test.run(result)
 
2179
        content = stream.getvalue()
 
2180
        return content, result
 
2181
 
 
2182
    def test_fail_has_log(self):
 
2183
        content, result = self.run_subunit_stream('test_fail')
 
2184
        self.assertEqual(1, len(result.failures))
 
2185
        self.assertContainsRe(content, '(?m)^log$')
 
2186
        self.assertContainsRe(content, 'this test will fail')
 
2187
 
 
2188
    def test_error_has_log(self):
 
2189
        content, result = self.run_subunit_stream('test_error')
 
2190
        self.assertContainsRe(content, '(?m)^log$')
 
2191
        self.assertContainsRe(content, 'this test errored')
 
2192
 
 
2193
    def test_skip_has_no_log(self):
 
2194
        content, result = self.run_subunit_stream('test_skip')
 
2195
        self.assertNotContainsRe(content, '(?m)^log$')
 
2196
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2197
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2198
        skips = result.skip_reasons['reason']
 
2199
        self.assertEqual(1, len(skips))
 
2200
        test = skips[0]
 
2201
        # RemotedTestCase doesn't preserve the "details"
 
2202
        ## self.assertFalse('log' in test.getDetails())
 
2203
 
 
2204
    def test_missing_feature_has_no_log(self):
 
2205
        content, result = self.run_subunit_stream('test_missing_feature')
 
2206
        self.assertNotContainsRe(content, '(?m)^log$')
 
2207
        self.assertNotContainsRe(content, 'missing the feature')
 
2208
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2209
        skips = result.skip_reasons['_MissingFeature\n']
 
2210
        self.assertEqual(1, len(skips))
 
2211
        test = skips[0]
 
2212
        # RemotedTestCase doesn't preserve the "details"
 
2213
        ## self.assertFalse('log' in test.getDetails())
 
2214
 
 
2215
    def test_xfail_has_no_log(self):
 
2216
        content, result = self.run_subunit_stream('test_xfail')
 
2217
        self.assertNotContainsRe(content, '(?m)^log$')
 
2218
        self.assertNotContainsRe(content, 'test with expected failure')
 
2219
        self.assertEqual(1, len(result.expectedFailures))
 
2220
        result_content = result.expectedFailures[0][1]
 
2221
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2222
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2223
 
 
2224
    def test_unexpected_success_has_log(self):
 
2225
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2226
        self.assertContainsRe(content, '(?m)^log$')
 
2227
        self.assertContainsRe(content, 'test with unexpected success')
 
2228
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2229
                           ' as a plain success',
 
2230
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2231
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2232
        test = result.unexpectedSuccesses[0]
 
2233
        # RemotedTestCase doesn't preserve the "details"
 
2234
        ## self.assertTrue('log' in test.getDetails())
 
2235
 
 
2236
    def test_success_has_no_log(self):
 
2237
        content, result = self.run_subunit_stream('test_success')
 
2238
        self.assertEqual(1, result.testsRun)
 
2239
        self.assertNotContainsRe(content, '(?m)^log$')
 
2240
        self.assertNotContainsRe(content, 'this test succeeds')
 
2241
 
 
2242
 
1975
2243
class TestRunBzr(tests.TestCase):
1976
2244
 
1977
2245
    out = ''
2940
3208
        tpr.register('bar', 'bBB.aAA.rRR')
2941
3209
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2942
3210
        self.assertThat(self.get_log(),
2943
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3211
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3212
                           doctest.ELLIPSIS))
2944
3213
 
2945
3214
    def test_get_unknown_prefix(self):
2946
3215
        tpr = self._get_registry()
2966
3235
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2967
3236
 
2968
3237
 
 
3238
class TestThreadLeakDetection(tests.TestCase):
 
3239
    """Ensure when tests leak threads we detect and report it"""
 
3240
 
 
3241
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3242
        def __init__(self):
 
3243
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3244
            self.leaks = []
 
3245
        def _report_thread_leak(self, test, leaks, alive):
 
3246
            self.leaks.append((test, leaks))
 
3247
 
 
3248
    def test_testcase_without_addCleanups(self):
 
3249
        """Check old TestCase instances don't break with leak detection"""
 
3250
        class Test(unittest.TestCase):
 
3251
            def runTest(self):
 
3252
                pass
 
3253
        result = self.LeakRecordingResult()
 
3254
        test = Test()
 
3255
        result.startTestRun()
 
3256
        test.run(result)
 
3257
        result.stopTestRun()
 
3258
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3259
        self.assertEqual(result.leaks, [])
 
3260
        
 
3261
    def test_thread_leak(self):
 
3262
        """Ensure a thread that outlives the running of a test is reported
 
3263
 
 
3264
        Uses a thread that blocks on an event, and is started by the inner
 
3265
        test case. As the thread outlives the inner case's run, it should be
 
3266
        detected as a leak, but the event is then set so that the thread can
 
3267
        be safely joined in cleanup so it's not leaked for real.
 
3268
        """
 
3269
        event = threading.Event()
 
3270
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3271
        class Test(tests.TestCase):
 
3272
            def test_leak(self):
 
3273
                thread.start()
 
3274
        result = self.LeakRecordingResult()
 
3275
        test = Test("test_leak")
 
3276
        self.addCleanup(thread.join)
 
3277
        self.addCleanup(event.set)
 
3278
        result.startTestRun()
 
3279
        test.run(result)
 
3280
        result.stopTestRun()
 
3281
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3282
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3283
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3284
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3285
 
 
3286
    def test_multiple_leaks(self):
 
3287
        """Check multiple leaks are blamed on the test cases at fault
 
3288
 
 
3289
        Same concept as the previous test, but has one inner test method that
 
3290
        leaks two threads, and one that doesn't leak at all.
 
3291
        """
 
3292
        event = threading.Event()
 
3293
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3294
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3295
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3296
        class Test(tests.TestCase):
 
3297
            def test_first_leak(self):
 
3298
                thread_b.start()
 
3299
            def test_second_no_leak(self):
 
3300
                pass
 
3301
            def test_third_leak(self):
 
3302
                thread_c.start()
 
3303
                thread_a.start()
 
3304
        result = self.LeakRecordingResult()
 
3305
        first_test = Test("test_first_leak")
 
3306
        third_test = Test("test_third_leak")
 
3307
        self.addCleanup(thread_a.join)
 
3308
        self.addCleanup(thread_b.join)
 
3309
        self.addCleanup(thread_c.join)
 
3310
        self.addCleanup(event.set)
 
3311
        result.startTestRun()
 
3312
        unittest.TestSuite(
 
3313
            [first_test, Test("test_second_no_leak"), third_test]
 
3314
            ).run(result)
 
3315
        result.stopTestRun()
 
3316
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3317
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3318
        self.assertEqual(result.leaks, [
 
3319
            (first_test, set([thread_b])),
 
3320
            (third_test, set([thread_a, thread_c]))])
 
3321
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3322
 
 
3323
 
 
3324
class TestPostMortemDebugging(tests.TestCase):
 
3325
    """Check post mortem debugging works when tests fail or error"""
 
3326
 
 
3327
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3328
        def __init__(self):
 
3329
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3330
            self.postcode = None
 
3331
        def _post_mortem(self, tb=None):
 
3332
            """Record the code object at the end of the current traceback"""
 
3333
            tb = tb or sys.exc_info()[2]
 
3334
            if tb is not None:
 
3335
                next = tb.tb_next
 
3336
                while next is not None:
 
3337
                    tb = next
 
3338
                    next = next.tb_next
 
3339
                self.postcode = tb.tb_frame.f_code
 
3340
        def report_error(self, test, err):
 
3341
            pass
 
3342
        def report_failure(self, test, err):
 
3343
            pass
 
3344
 
 
3345
    def test_location_unittest_error(self):
 
3346
        """Needs right post mortem traceback with erroring unittest case"""
 
3347
        class Test(unittest.TestCase):
 
3348
            def runTest(self):
 
3349
                raise RuntimeError
 
3350
        result = self.TracebackRecordingResult()
 
3351
        Test().run(result)
 
3352
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3353
 
 
3354
    def test_location_unittest_failure(self):
 
3355
        """Needs right post mortem traceback with failing unittest case"""
 
3356
        class Test(unittest.TestCase):
 
3357
            def runTest(self):
 
3358
                raise self.failureException
 
3359
        result = self.TracebackRecordingResult()
 
3360
        Test().run(result)
 
3361
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3362
 
 
3363
    def test_location_bt_error(self):
 
3364
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3365
        class Test(tests.TestCase):
 
3366
            def test_error(self):
 
3367
                raise RuntimeError
 
3368
        result = self.TracebackRecordingResult()
 
3369
        Test("test_error").run(result)
 
3370
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3371
 
 
3372
    def test_location_bt_failure(self):
 
3373
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3374
        class Test(tests.TestCase):
 
3375
            def test_failure(self):
 
3376
                raise self.failureException
 
3377
        result = self.TracebackRecordingResult()
 
3378
        Test("test_failure").run(result)
 
3379
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3380
 
 
3381
    def test_env_var_triggers_post_mortem(self):
 
3382
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3383
        import pdb
 
3384
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3385
        post_mortem_calls = []
 
3386
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3387
        self.overrideEnv('BZR_TEST_PDB', None)
 
3388
        result._post_mortem(1)
 
3389
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3390
        result._post_mortem(2)
 
3391
        self.assertEqual([2], post_mortem_calls)
 
3392
 
 
3393
 
2969
3394
class TestRunSuite(tests.TestCase):
2970
3395
 
2971
3396
    def test_runner_class(self):
2982
3407
                                                self.verbosity)
2983
3408
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2984
3409
        self.assertLength(1, calls)
 
3410
 
 
3411
 
 
3412
class TestEnvironHandling(tests.TestCase):
 
3413
 
 
3414
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3415
        self.failIf('MYVAR' in os.environ)
 
3416
        self.overrideEnv('MYVAR', '42')
 
3417
        # We use an embedded test to make sure we fix the _captureVar bug
 
3418
        class Test(tests.TestCase):
 
3419
            def test_me(self):
 
3420
                # The first call save the 42 value
 
3421
                self.overrideEnv('MYVAR', None)
 
3422
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3423
                # Make sure we can call it twice
 
3424
                self.overrideEnv('MYVAR', None)
 
3425
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3426
        output = StringIO()
 
3427
        result = tests.TextTestResult(output, 0, 1)
 
3428
        Test('test_me').run(result)
 
3429
        if not result.wasStrictlySuccessful():
 
3430
            self.fail(output.getvalue())
 
3431
        # We get our value back
 
3432
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3433
 
 
3434
 
 
3435
class TestIsolatedEnv(tests.TestCase):
 
3436
    """Test isolating tests from os.environ.
 
3437
 
 
3438
    Since we use tests that are already isolated from os.environ a bit of care
 
3439
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3440
    The tests start an already clean os.environ which allow doing valid
 
3441
    assertions about which variables are present or not and design tests around
 
3442
    these assertions.
 
3443
    """
 
3444
 
 
3445
    class ScratchMonkey(tests.TestCase):
 
3446
 
 
3447
        def test_me(self):
 
3448
            pass
 
3449
 
 
3450
    def test_basics(self):
 
3451
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3452
        # for tests.TestCase.
 
3453
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3454
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3455
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3456
        self.assertFalse('BZR_HOME' in os.environ)
 
3457
        # Make sure we know the definition of LINES: part of os.environ for
 
3458
        # tests.TestCase
 
3459
        self.assertTrue('LINES' in tests.isolated_environ)
 
3460
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3461
        self.assertEquals('25', os.environ['LINES'])
 
3462
 
 
3463
    def test_injecting_unknown_variable(self):
 
3464
        # BZR_HOME is known to be absent from os.environ
 
3465
        test = self.ScratchMonkey('test_me')
 
3466
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3467
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3468
        tests.restore_os_environ(test)
 
3469
        self.assertFalse('BZR_HOME' in os.environ)
 
3470
 
 
3471
    def test_injecting_known_variable(self):
 
3472
        test = self.ScratchMonkey('test_me')
 
3473
        # LINES is known to be present in os.environ
 
3474
        tests.override_os_environ(test, {'LINES': '42'})
 
3475
        self.assertEquals('42', os.environ['LINES'])
 
3476
        tests.restore_os_environ(test)
 
3477
        self.assertEquals('25', os.environ['LINES'])
 
3478
 
 
3479
    def test_deleting_variable(self):
 
3480
        test = self.ScratchMonkey('test_me')
 
3481
        # LINES is known to be present in os.environ
 
3482
        tests.override_os_environ(test, {'LINES': None})
 
3483
        self.assertTrue('LINES' not in os.environ)
 
3484
        tests.restore_os_environ(test)
 
3485
        self.assertEquals('25', os.environ['LINES'])
 
3486
 
 
3487
 
 
3488
class TestDocTestSuiteIsolation(tests.TestCase):
 
3489
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3490
 
 
3491
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3492
    the clean environment as a base for testing. To precisely capture the
 
3493
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3494
    compare against.
 
3495
 
 
3496
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3497
    not `os.environ` so each test overrides it to suit its needs.
 
3498
 
 
3499
    """
 
3500
 
 
3501
    def get_doctest_suite_for_string(self, klass, string):
 
3502
        class Finder(doctest.DocTestFinder):
 
3503
 
 
3504
            def find(*args, **kwargs):
 
3505
                test = doctest.DocTestParser().get_doctest(
 
3506
                    string, {}, 'foo', 'foo.py', 0)
 
3507
                return [test]
 
3508
 
 
3509
        suite = klass(test_finder=Finder())
 
3510
        return suite
 
3511
 
 
3512
    def run_doctest_suite_for_string(self, klass, string):
 
3513
        suite = self.get_doctest_suite_for_string(klass, string)
 
3514
        output = StringIO()
 
3515
        result = tests.TextTestResult(output, 0, 1)
 
3516
        suite.run(result)
 
3517
        return result, output
 
3518
 
 
3519
    def assertDocTestStringSucceds(self, klass, string):
 
3520
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3521
        if not result.wasStrictlySuccessful():
 
3522
            self.fail(output.getvalue())
 
3523
 
 
3524
    def assertDocTestStringFails(self, klass, string):
 
3525
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3526
        if result.wasStrictlySuccessful():
 
3527
            self.fail(output.getvalue())
 
3528
 
 
3529
    def test_injected_variable(self):
 
3530
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3531
        test = """
 
3532
            >>> import os
 
3533
            >>> os.environ['LINES']
 
3534
            '42'
 
3535
            """
 
3536
        # doctest.DocTestSuite fails as it sees '25'
 
3537
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3538
        # tests.DocTestSuite sees '42'
 
3539
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3540
 
 
3541
    def test_deleted_variable(self):
 
3542
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3543
        test = """
 
3544
            >>> import os
 
3545
            >>> os.environ.get('LINES')
 
3546
            """
 
3547
        # doctest.DocTestSuite fails as it sees '25'
 
3548
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3549
        # tests.DocTestSuite sees None
 
3550
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)