~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Zearin
  • Date: 2010-11-12 22:08:18 UTC
  • mto: (5570.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5572.
  • Revision ID: zearin@users.sourceforge.net-20101112220818-mb62len4zyxr8qvd
Fixed capitalization of XML and HTTP.  Fixed by hand and only where appropriate (e.g., left http://some/url lowercase, but capitalized "When making an HTTP request…").

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
42
47
    lockdir,
43
48
    memorytree,
44
49
    osutils,
45
 
    progress,
46
50
    remote,
47
51
    repository,
48
52
    symbol_versioning,
67
71
    test_sftp_transport,
68
72
    TestUtil,
69
73
    )
70
 
from bzrlib.trace import note
 
74
from bzrlib.trace import note, mutter
71
75
from bzrlib.transport import memory
72
76
from bzrlib.version import _get_bzr_source_tree
73
77
 
77
81
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
82
 
79
83
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
84
class MetaTestLog(tests.TestCase):
93
85
 
94
86
    def test_logging(self):
122
114
        self.failUnlessExists(filename)
123
115
 
124
116
 
 
117
class TestClassesAvailable(tests.TestCase):
 
118
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
119
 
 
120
    def test_test_case(self):
 
121
        from bzrlib.tests import TestCase
 
122
 
 
123
    def test_test_loader(self):
 
124
        from bzrlib.tests import TestLoader
 
125
 
 
126
    def test_test_suite(self):
 
127
        from bzrlib.tests import TestSuite
 
128
 
 
129
 
125
130
class TestTransportScenarios(tests.TestCase):
126
131
    """A group of tests that test the transport implementation adaption core.
127
132
 
208
213
    def test_scenarios(self):
209
214
        # check that constructor parameters are passed through to the adapted
210
215
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
216
        from bzrlib.tests.per_controldir import make_scenarios
212
217
        vfs_factory = "v"
213
218
        server1 = "a"
214
219
        server2 = "b"
312
317
        from bzrlib.tests.per_interrepository import make_scenarios
313
318
        server1 = "a"
314
319
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
320
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
321
        scenarios = make_scenarios(server1, server2, formats)
317
322
        self.assertEqual([
318
323
            ('C0,str,str',
319
324
             {'repository_format': 'C1',
320
325
              'repository_format_to': 'C2',
321
326
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
327
              'transport_server': 'a',
 
328
              'extra_setup': 'C3'}),
323
329
            ('D0,str,str',
324
330
             {'repository_format': 'D1',
325
331
              'repository_format_to': 'D2',
326
332
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
333
              'transport_server': 'a',
 
334
              'extra_setup': 'D3'})],
328
335
            scenarios)
329
336
 
330
337
 
609
616
                l.attempt_lock()
610
617
        test = TestDanglingLock('test_function')
611
618
        result = test.run()
 
619
        total_failures = result.errors + result.failures
612
620
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
621
            self.assertEqual(1, len(total_failures))
614
622
        else:
615
623
            # When _lock_check_thorough is disabled, then we don't trigger a
616
624
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
625
            self.assertEqual(0, len(total_failures))
618
626
 
619
627
 
620
628
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
629
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
630
 
623
631
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
632
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
633
        self.vfs_transport_factory = memory.MemoryServer
627
634
        self.transport_readonly_server = None
629
636
        # for the server
630
637
        url = self.get_readonly_url()
631
638
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
639
        t = transport.get_transport(url)
 
640
        t2 = transport.get_transport(url2)
634
641
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
642
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
643
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
644
 
638
645
    def test_get_readonly_url_http(self):
639
646
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
647
        from bzrlib.transport.http import HttpTransportBase
642
648
        self.transport_server = test_server.LocalURLServer
643
649
        self.transport_readonly_server = HttpServer
645
651
        url = self.get_readonly_url()
646
652
        url2 = self.get_readonly_url('foo/bar')
647
653
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
654
        t = transport.get_transport(url)
 
655
        t2 = transport.get_transport(url2)
650
656
        self.failUnless(isinstance(t, HttpTransportBase))
651
657
        self.failUnless(isinstance(t2, HttpTransportBase))
652
658
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
696
class TestChrootedTest(tests.ChrootedTestCase):
691
697
 
692
698
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
699
        t = transport.get_transport(self.get_readonly_url())
695
700
        url = t.base
696
701
        self.assertEqual(url, t.clone('..').base)
697
702
 
803
808
        self.requireFeature(test_lsprof.LSProfFeature)
804
809
        result_stream = StringIO()
805
810
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
811
            result_stream,
807
812
            descriptions=0,
808
813
            verbosity=2,
809
814
            )
835
840
        self.assertContainsRe(output,
836
841
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
842
 
 
843
    def test_uses_time_from_testtools(self):
 
844
        """Test case timings in verbose results should use testtools times"""
 
845
        import datetime
 
846
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
847
            def startTest(self, test):
 
848
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
849
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
850
            def addSuccess(self, test):
 
851
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
852
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
853
            def report_tests_starting(self): pass
 
854
        sio = StringIO()
 
855
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
856
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
857
 
838
858
    def test_known_failure(self):
839
859
        """A KnownFailure being raised should trigger several result actions."""
840
860
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
861
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
862
            def report_tests_starting(self): pass
844
863
            def report_known_failure(self, test, err=None, details=None):
845
864
                self._call = test, 'known failure'
846
865
        result = InstrumentedTestResult(None, None, None, None)
864
883
        # verbose test output formatting
865
884
        result_stream = StringIO()
866
885
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
886
            result_stream,
868
887
            descriptions=0,
869
888
            verbosity=2,
870
889
            )
880
899
        output = result_stream.getvalue()[prefix:]
881
900
        lines = output.splitlines()
882
901
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
902
        if sys.version_info > (2, 7):
 
903
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
904
                self.assertNotEqual, lines[1], '    ')
883
905
        self.assertEqual(lines[1], '    foo')
884
906
        self.assertEqual(2, len(lines))
885
907
 
893
915
        """Test the behaviour of invoking addNotSupported."""
894
916
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
917
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
918
            def report_tests_starting(self): pass
898
919
            def report_unsupported(self, test, feature):
899
920
                self._call = test, feature
900
921
        result = InstrumentedTestResult(None, None, None, None)
919
940
        # verbose test output formatting
920
941
        result_stream = StringIO()
921
942
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
943
            result_stream,
923
944
            descriptions=0,
924
945
            verbosity=2,
925
946
            )
939
960
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
961
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
962
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
963
            def report_tests_starting(self): pass
944
964
            def addNotSupported(self, test, feature):
945
965
                self._call = test, feature
946
966
        result = InstrumentedTestResult(None, None, None, None)
988
1008
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
1009
            calls = 0
990
1010
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
1011
        result = InstrumentedTestResult(None, None, None, None)
993
1012
        def test_function():
994
1013
            pass
996
1015
        test.run(result)
997
1016
        self.assertEquals(1, result.calls)
998
1017
 
 
1018
    def test_startTests_only_once(self):
 
1019
        """With multiple tests startTests should still only be called once"""
 
1020
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1021
            calls = 0
 
1022
            def startTests(self): self.calls += 1
 
1023
        result = InstrumentedTestResult(None, None, None, None)
 
1024
        suite = unittest.TestSuite([
 
1025
            unittest.FunctionTestCase(lambda: None),
 
1026
            unittest.FunctionTestCase(lambda: None)])
 
1027
        suite.run(result)
 
1028
        self.assertEquals(1, result.calls)
 
1029
        self.assertEquals(2, result.count)
 
1030
 
999
1031
 
1000
1032
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1033
 
1022
1054
        because of our use of global state.
1023
1055
        """
1024
1056
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1057
        try:
1027
1058
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1059
            return testrunner.run(test)
1030
1060
        finally:
1031
1061
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1062
 
1034
1063
    def test_known_failure_failed_run(self):
1035
1064
        # run a test that generates a known failure which should be printed in
1081
1110
    def test_result_decorator(self):
1082
1111
        # decorate results
1083
1112
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1113
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1114
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1115
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1116
                calls.append('start')
1088
1117
        test = unittest.FunctionTestCase(lambda:None)
1089
1118
        stream = StringIO()
1213
1242
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1243
        self.assertLength(1, self._get_source_tree_calls)
1215
1244
 
 
1245
    def test_verbose_test_count(self):
 
1246
        """A verbose test run reports the right test count at the start"""
 
1247
        suite = TestUtil.TestSuite([
 
1248
            unittest.FunctionTestCase(lambda:None),
 
1249
            unittest.FunctionTestCase(lambda:None)])
 
1250
        self.assertEqual(suite.countTestCases(), 2)
 
1251
        stream = StringIO()
 
1252
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1253
        # Need to use the CountingDecorator as that's what sets num_tests
 
1254
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1255
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1256
 
1216
1257
    def test_startTestRun(self):
1217
1258
        """run should call result.startTestRun()"""
1218
1259
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1260
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1261
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1262
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1263
                calls.append('startTestRun')
1223
1264
        test = unittest.FunctionTestCase(lambda:None)
1224
1265
        stream = StringIO()
1230
1271
    def test_stopTestRun(self):
1231
1272
        """run should call result.stopTestRun()"""
1232
1273
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1274
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1275
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1276
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1277
                calls.append('stopTestRun')
1237
1278
        test = unittest.FunctionTestCase(lambda:None)
1238
1279
        stream = StringIO()
1421
1462
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1463
        output_stream = StringIO()
1423
1464
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1465
            output_stream,
1425
1466
            descriptions=0,
1426
1467
            verbosity=2)
1427
1468
        sample_test.run(result)
1655
1696
        self.assertEqual('original', obj.test_attr)
1656
1697
 
1657
1698
 
 
1699
class _MissingFeature(tests.Feature):
 
1700
    def _probe(self):
 
1701
        return False
 
1702
missing_feature = _MissingFeature()
 
1703
 
 
1704
 
 
1705
def _get_test(name):
 
1706
    """Get an instance of a specific example test.
 
1707
 
 
1708
    We protect this in a function so that they don't auto-run in the test
 
1709
    suite.
 
1710
    """
 
1711
 
 
1712
    class ExampleTests(tests.TestCase):
 
1713
 
 
1714
        def test_fail(self):
 
1715
            mutter('this was a failing test')
 
1716
            self.fail('this test will fail')
 
1717
 
 
1718
        def test_error(self):
 
1719
            mutter('this test errored')
 
1720
            raise RuntimeError('gotcha')
 
1721
 
 
1722
        def test_missing_feature(self):
 
1723
            mutter('missing the feature')
 
1724
            self.requireFeature(missing_feature)
 
1725
 
 
1726
        def test_skip(self):
 
1727
            mutter('this test will be skipped')
 
1728
            raise tests.TestSkipped('reason')
 
1729
 
 
1730
        def test_success(self):
 
1731
            mutter('this test succeeds')
 
1732
 
 
1733
        def test_xfail(self):
 
1734
            mutter('test with expected failure')
 
1735
            self.knownFailure('this_fails')
 
1736
 
 
1737
        def test_unexpected_success(self):
 
1738
            mutter('test with unexpected success')
 
1739
            self.expectFailure('should_fail', lambda: None)
 
1740
 
 
1741
    return ExampleTests(name)
 
1742
 
 
1743
 
 
1744
class TestTestCaseLogDetails(tests.TestCase):
 
1745
 
 
1746
    def _run_test(self, test_name):
 
1747
        test = _get_test(test_name)
 
1748
        result = testtools.TestResult()
 
1749
        test.run(result)
 
1750
        return result
 
1751
 
 
1752
    def test_fail_has_log(self):
 
1753
        result = self._run_test('test_fail')
 
1754
        self.assertEqual(1, len(result.failures))
 
1755
        result_content = result.failures[0][1]
 
1756
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1757
        self.assertContainsRe(result_content, 'this was a failing test')
 
1758
 
 
1759
    def test_error_has_log(self):
 
1760
        result = self._run_test('test_error')
 
1761
        self.assertEqual(1, len(result.errors))
 
1762
        result_content = result.errors[0][1]
 
1763
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1764
        self.assertContainsRe(result_content, 'this test errored')
 
1765
 
 
1766
    def test_skip_has_no_log(self):
 
1767
        result = self._run_test('test_skip')
 
1768
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1769
        skips = result.skip_reasons['reason']
 
1770
        self.assertEqual(1, len(skips))
 
1771
        test = skips[0]
 
1772
        self.assertFalse('log' in test.getDetails())
 
1773
 
 
1774
    def test_missing_feature_has_no_log(self):
 
1775
        # testtools doesn't know about addNotSupported, so it just gets
 
1776
        # considered as a skip
 
1777
        result = self._run_test('test_missing_feature')
 
1778
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1779
        skips = result.skip_reasons[missing_feature]
 
1780
        self.assertEqual(1, len(skips))
 
1781
        test = skips[0]
 
1782
        self.assertFalse('log' in test.getDetails())
 
1783
 
 
1784
    def test_xfail_has_no_log(self):
 
1785
        result = self._run_test('test_xfail')
 
1786
        self.assertEqual(1, len(result.expectedFailures))
 
1787
        result_content = result.expectedFailures[0][1]
 
1788
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1789
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1790
 
 
1791
    def test_unexpected_success_has_log(self):
 
1792
        result = self._run_test('test_unexpected_success')
 
1793
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1794
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1795
        # expectedFailures is a list of reasons?
 
1796
        test = result.unexpectedSuccesses[0]
 
1797
        details = test.getDetails()
 
1798
        self.assertTrue('log' in details)
 
1799
 
 
1800
 
 
1801
class TestTestCloning(tests.TestCase):
 
1802
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1803
 
 
1804
    def test_cloned_testcase_does_not_share_details(self):
 
1805
        """A TestCase cloned with clone_test does not share mutable attributes
 
1806
        such as details or cleanups.
 
1807
        """
 
1808
        class Test(tests.TestCase):
 
1809
            def test_foo(self):
 
1810
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1811
        orig_test = Test('test_foo')
 
1812
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1813
        orig_test.run(unittest.TestResult())
 
1814
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1815
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1816
 
 
1817
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1818
        """Applying two levels of scenarios to a test preserves the attributes
 
1819
        added by both scenarios.
 
1820
        """
 
1821
        class Test(tests.TestCase):
 
1822
            def test_foo(self):
 
1823
                pass
 
1824
        test = Test('test_foo')
 
1825
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1826
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1827
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1828
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1829
        all_tests = list(tests.iter_suite_tests(suite))
 
1830
        self.assertLength(4, all_tests)
 
1831
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1832
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1833
 
 
1834
 
1658
1835
# NB: Don't delete this; it's not actually from 0.11!
1659
1836
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1837
def sample_deprecated_function():
1814
1991
                tree.branch.repository.bzrdir.root_transport)
1815
1992
 
1816
1993
 
1817
 
class SelfTestHelper:
 
1994
class SelfTestHelper(object):
1818
1995
 
1819
1996
    def run_selftest(self, **kwargs):
1820
1997
        """Run selftest returning its output."""
1880
2057
            def __call__(test, result):
1881
2058
                test.run(result)
1882
2059
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2060
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2061
                calls.append("called")
1885
2062
            def countTestCases(self):
1886
2063
                return 1
1971
2148
            load_list='missing file name', list_only=True)
1972
2149
 
1973
2150
 
 
2151
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2152
 
 
2153
    _test_needs_features = [features.subunit]
 
2154
 
 
2155
    def run_subunit_stream(self, test_name):
 
2156
        from subunit import ProtocolTestCase
 
2157
        def factory():
 
2158
            return TestUtil.TestSuite([_get_test(test_name)])
 
2159
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2160
            test_suite_factory=factory)
 
2161
        test = ProtocolTestCase(stream)
 
2162
        result = testtools.TestResult()
 
2163
        test.run(result)
 
2164
        content = stream.getvalue()
 
2165
        return content, result
 
2166
 
 
2167
    def test_fail_has_log(self):
 
2168
        content, result = self.run_subunit_stream('test_fail')
 
2169
        self.assertEqual(1, len(result.failures))
 
2170
        self.assertContainsRe(content, '(?m)^log$')
 
2171
        self.assertContainsRe(content, 'this test will fail')
 
2172
 
 
2173
    def test_error_has_log(self):
 
2174
        content, result = self.run_subunit_stream('test_error')
 
2175
        self.assertContainsRe(content, '(?m)^log$')
 
2176
        self.assertContainsRe(content, 'this test errored')
 
2177
 
 
2178
    def test_skip_has_no_log(self):
 
2179
        content, result = self.run_subunit_stream('test_skip')
 
2180
        self.assertNotContainsRe(content, '(?m)^log$')
 
2181
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2182
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2183
        skips = result.skip_reasons['reason']
 
2184
        self.assertEqual(1, len(skips))
 
2185
        test = skips[0]
 
2186
        # RemotedTestCase doesn't preserve the "details"
 
2187
        ## self.assertFalse('log' in test.getDetails())
 
2188
 
 
2189
    def test_missing_feature_has_no_log(self):
 
2190
        content, result = self.run_subunit_stream('test_missing_feature')
 
2191
        self.assertNotContainsRe(content, '(?m)^log$')
 
2192
        self.assertNotContainsRe(content, 'missing the feature')
 
2193
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2194
        skips = result.skip_reasons['_MissingFeature\n']
 
2195
        self.assertEqual(1, len(skips))
 
2196
        test = skips[0]
 
2197
        # RemotedTestCase doesn't preserve the "details"
 
2198
        ## self.assertFalse('log' in test.getDetails())
 
2199
 
 
2200
    def test_xfail_has_no_log(self):
 
2201
        content, result = self.run_subunit_stream('test_xfail')
 
2202
        self.assertNotContainsRe(content, '(?m)^log$')
 
2203
        self.assertNotContainsRe(content, 'test with expected failure')
 
2204
        self.assertEqual(1, len(result.expectedFailures))
 
2205
        result_content = result.expectedFailures[0][1]
 
2206
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2207
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2208
 
 
2209
    def test_unexpected_success_has_log(self):
 
2210
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2211
        self.assertContainsRe(content, '(?m)^log$')
 
2212
        self.assertContainsRe(content, 'test with unexpected success')
 
2213
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2214
                           ' as a plain success',
 
2215
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2216
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2217
        test = result.unexpectedSuccesses[0]
 
2218
        # RemotedTestCase doesn't preserve the "details"
 
2219
        ## self.assertTrue('log' in test.getDetails())
 
2220
 
 
2221
    def test_success_has_no_log(self):
 
2222
        content, result = self.run_subunit_stream('test_success')
 
2223
        self.assertEqual(1, result.testsRun)
 
2224
        self.assertNotContainsRe(content, '(?m)^log$')
 
2225
        self.assertNotContainsRe(content, 'this test succeeds')
 
2226
 
 
2227
 
1974
2228
class TestRunBzr(tests.TestCase):
1975
2229
 
1976
2230
    out = ''
2339
2593
            os.chdir = orig_chdir
2340
2594
        self.assertEqual(['foo', 'current'], chdirs)
2341
2595
 
 
2596
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2597
        self.get_source_path = lambda: ""
 
2598
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2599
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2600
 
2342
2601
 
2343
2602
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2603
    """Tests that really need to do things with an external bzr."""
2960
3219
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3220
 
2962
3221
 
 
3222
class TestThreadLeakDetection(tests.TestCase):
 
3223
    """Ensure when tests leak threads we detect and report it"""
 
3224
 
 
3225
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3226
        def __init__(self):
 
3227
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3228
            self.leaks = []
 
3229
        def _report_thread_leak(self, test, leaks, alive):
 
3230
            self.leaks.append((test, leaks))
 
3231
 
 
3232
    def test_testcase_without_addCleanups(self):
 
3233
        """Check old TestCase instances don't break with leak detection"""
 
3234
        class Test(unittest.TestCase):
 
3235
            def runTest(self):
 
3236
                pass
 
3237
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3238
        result = self.LeakRecordingResult()
 
3239
        test = Test()
 
3240
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3241
        result.startTestRun()
 
3242
        test.run(result)
 
3243
        result.stopTestRun()
 
3244
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3245
        self.assertEqual(result.leaks, [])
 
3246
        
 
3247
    def test_thread_leak(self):
 
3248
        """Ensure a thread that outlives the running of a test is reported
 
3249
 
 
3250
        Uses a thread that blocks on an event, and is started by the inner
 
3251
        test case. As the thread outlives the inner case's run, it should be
 
3252
        detected as a leak, but the event is then set so that the thread can
 
3253
        be safely joined in cleanup so it's not leaked for real.
 
3254
        """
 
3255
        event = threading.Event()
 
3256
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3257
        class Test(tests.TestCase):
 
3258
            def test_leak(self):
 
3259
                thread.start()
 
3260
        result = self.LeakRecordingResult()
 
3261
        test = Test("test_leak")
 
3262
        self.addCleanup(thread.join)
 
3263
        self.addCleanup(event.set)
 
3264
        result.startTestRun()
 
3265
        test.run(result)
 
3266
        result.stopTestRun()
 
3267
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3268
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3269
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3270
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3271
 
 
3272
    def test_multiple_leaks(self):
 
3273
        """Check multiple leaks are blamed on the test cases at fault
 
3274
 
 
3275
        Same concept as the previous test, but has one inner test method that
 
3276
        leaks two threads, and one that doesn't leak at all.
 
3277
        """
 
3278
        event = threading.Event()
 
3279
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3280
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3281
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3282
        class Test(tests.TestCase):
 
3283
            def test_first_leak(self):
 
3284
                thread_b.start()
 
3285
            def test_second_no_leak(self):
 
3286
                pass
 
3287
            def test_third_leak(self):
 
3288
                thread_c.start()
 
3289
                thread_a.start()
 
3290
        result = self.LeakRecordingResult()
 
3291
        first_test = Test("test_first_leak")
 
3292
        third_test = Test("test_third_leak")
 
3293
        self.addCleanup(thread_a.join)
 
3294
        self.addCleanup(thread_b.join)
 
3295
        self.addCleanup(thread_c.join)
 
3296
        self.addCleanup(event.set)
 
3297
        result.startTestRun()
 
3298
        unittest.TestSuite(
 
3299
            [first_test, Test("test_second_no_leak"), third_test]
 
3300
            ).run(result)
 
3301
        result.stopTestRun()
 
3302
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3303
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3304
        self.assertEqual(result.leaks, [
 
3305
            (first_test, set([thread_b])),
 
3306
            (third_test, set([thread_a, thread_c]))])
 
3307
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3308
 
 
3309
 
 
3310
class TestPostMortemDebugging(tests.TestCase):
 
3311
    """Check post mortem debugging works when tests fail or error"""
 
3312
 
 
3313
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3314
        def __init__(self):
 
3315
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3316
            self.postcode = None
 
3317
        def _post_mortem(self, tb=None):
 
3318
            """Record the code object at the end of the current traceback"""
 
3319
            tb = tb or sys.exc_info()[2]
 
3320
            if tb is not None:
 
3321
                next = tb.tb_next
 
3322
                while next is not None:
 
3323
                    tb = next
 
3324
                    next = next.tb_next
 
3325
                self.postcode = tb.tb_frame.f_code
 
3326
        def report_error(self, test, err):
 
3327
            pass
 
3328
        def report_failure(self, test, err):
 
3329
            pass
 
3330
 
 
3331
    def test_location_unittest_error(self):
 
3332
        """Needs right post mortem traceback with erroring unittest case"""
 
3333
        class Test(unittest.TestCase):
 
3334
            def runTest(self):
 
3335
                raise RuntimeError
 
3336
        result = self.TracebackRecordingResult()
 
3337
        Test().run(result)
 
3338
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3339
 
 
3340
    def test_location_unittest_failure(self):
 
3341
        """Needs right post mortem traceback with failing unittest case"""
 
3342
        class Test(unittest.TestCase):
 
3343
            def runTest(self):
 
3344
                raise self.failureException
 
3345
        result = self.TracebackRecordingResult()
 
3346
        Test().run(result)
 
3347
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3348
 
 
3349
    def test_location_bt_error(self):
 
3350
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3351
        class Test(tests.TestCase):
 
3352
            def test_error(self):
 
3353
                raise RuntimeError
 
3354
        result = self.TracebackRecordingResult()
 
3355
        Test("test_error").run(result)
 
3356
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3357
 
 
3358
    def test_location_bt_failure(self):
 
3359
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3360
        class Test(tests.TestCase):
 
3361
            def test_failure(self):
 
3362
                raise self.failureException
 
3363
        result = self.TracebackRecordingResult()
 
3364
        Test("test_failure").run(result)
 
3365
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3366
 
 
3367
    def test_env_var_triggers_post_mortem(self):
 
3368
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3369
        import pdb
 
3370
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3371
        post_mortem_calls = []
 
3372
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3373
        self.addCleanup(osutils.set_or_unset_env, "BZR_TEST_PDB",
 
3374
            osutils.set_or_unset_env("BZR_TEST_PDB", None))
 
3375
        result._post_mortem(1)
 
3376
        os.environ["BZR_TEST_PDB"] = "on"
 
3377
        result._post_mortem(2)
 
3378
        self.assertEqual([2], post_mortem_calls)
 
3379
 
 
3380
 
2963
3381
class TestRunSuite(tests.TestCase):
2964
3382
 
2965
3383
    def test_runner_class(self):