~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-10-01 22:33:10 UTC
  • mfrom: (5452.3.1 doc-fix)
  • Revision ID: pqm@pqm.ubuntu.com-20101001223310-t8adqw9m9ogrvnlc
(jameinel) fixed link to main smart server doc from http smart server doc
 (dmuir)

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
29
31
from testtools.content_type import ContentType
30
32
from testtools.matchers import (
31
33
    DocTestMatches,
42
44
    lockdir,
43
45
    memorytree,
44
46
    osutils,
45
 
    progress,
46
47
    remote,
47
48
    repository,
48
49
    symbol_versioning,
62
63
    )
63
64
from bzrlib.tests import (
64
65
    features,
65
 
    stub_sftp,
66
66
    test_lsprof,
67
67
    test_server,
68
68
    test_sftp_transport,
69
69
    TestUtil,
70
70
    )
71
 
from bzrlib.trace import note
 
71
from bzrlib.trace import note, mutter
72
72
from bzrlib.transport import memory
73
73
from bzrlib.version import _get_bzr_source_tree
74
74
 
123
123
        self.failUnlessExists(filename)
124
124
 
125
125
 
 
126
class TestClassesAvailable(tests.TestCase):
 
127
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
128
 
 
129
    def test_test_case(self):
 
130
        from bzrlib.tests import TestCase
 
131
 
 
132
    def test_test_loader(self):
 
133
        from bzrlib.tests import TestLoader
 
134
 
 
135
    def test_test_suite(self):
 
136
        from bzrlib.tests import TestSuite
 
137
 
 
138
 
126
139
class TestTransportScenarios(tests.TestCase):
127
140
    """A group of tests that test the transport implementation adaption core.
128
141
 
209
222
    def test_scenarios(self):
210
223
        # check that constructor parameters are passed through to the adapted
211
224
        # test.
212
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
225
        from bzrlib.tests.per_controldir import make_scenarios
213
226
        vfs_factory = "v"
214
227
        server1 = "a"
215
228
        server2 = "b"
313
326
        from bzrlib.tests.per_interrepository import make_scenarios
314
327
        server1 = "a"
315
328
        server2 = "b"
316
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
317
330
        scenarios = make_scenarios(server1, server2, formats)
318
331
        self.assertEqual([
319
332
            ('C0,str,str',
320
333
             {'repository_format': 'C1',
321
334
              'repository_format_to': 'C2',
322
335
              'transport_readonly_server': 'b',
323
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
324
338
            ('D0,str,str',
325
339
             {'repository_format': 'D1',
326
340
              'repository_format_to': 'D2',
327
341
              'transport_readonly_server': 'b',
328
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
329
344
            scenarios)
330
345
 
331
346
 
610
625
                l.attempt_lock()
611
626
        test = TestDanglingLock('test_function')
612
627
        result = test.run()
 
628
        total_failures = result.errors + result.failures
613
629
        if self._lock_check_thorough:
614
 
            self.assertEqual(1, len(result.errors))
 
630
            self.assertEqual(1, len(total_failures))
615
631
        else:
616
632
            # When _lock_check_thorough is disabled, then we don't trigger a
617
633
            # failure
618
 
            self.assertEqual(0, len(result.errors))
 
634
            self.assertEqual(0, len(total_failures))
619
635
 
620
636
 
621
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
622
638
    """Tests for the convenience functions TestCaseWithTransport introduces."""
623
639
 
624
640
    def test_get_readonly_url_none(self):
625
 
        from bzrlib.transport import get_transport
626
641
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
627
642
        self.vfs_transport_factory = memory.MemoryServer
628
643
        self.transport_readonly_server = None
630
645
        # for the server
631
646
        url = self.get_readonly_url()
632
647
        url2 = self.get_readonly_url('foo/bar')
633
 
        t = get_transport(url)
634
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
635
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
636
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
637
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
653
 
639
654
    def test_get_readonly_url_http(self):
640
655
        from bzrlib.tests.http_server import HttpServer
641
 
        from bzrlib.transport import get_transport
642
656
        from bzrlib.transport.http import HttpTransportBase
643
657
        self.transport_server = test_server.LocalURLServer
644
658
        self.transport_readonly_server = HttpServer
646
660
        url = self.get_readonly_url()
647
661
        url2 = self.get_readonly_url('foo/bar')
648
662
        # the transport returned may be any HttpTransportBase subclass
649
 
        t = get_transport(url)
650
 
        t2 = get_transport(url2)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
651
665
        self.failUnless(isinstance(t, HttpTransportBase))
652
666
        self.failUnless(isinstance(t2, HttpTransportBase))
653
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
691
705
class TestChrootedTest(tests.ChrootedTestCase):
692
706
 
693
707
    def test_root_is_root(self):
694
 
        from bzrlib.transport import get_transport
695
 
        t = get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
696
709
        url = t.base
697
710
        self.assertEqual(url, t.clone('..').base)
698
711
 
804
817
        self.requireFeature(test_lsprof.LSProfFeature)
805
818
        result_stream = StringIO()
806
819
        result = bzrlib.tests.VerboseTestResult(
807
 
            unittest._WritelnDecorator(result_stream),
 
820
            result_stream,
808
821
            descriptions=0,
809
822
            verbosity=2,
810
823
            )
840
853
        """A KnownFailure being raised should trigger several result actions."""
841
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
842
855
            def stopTestRun(self): pass
843
 
            def startTests(self): pass
844
 
            def report_test_start(self, test): pass
 
856
            def report_tests_starting(self): pass
845
857
            def report_known_failure(self, test, err=None, details=None):
846
858
                self._call = test, 'known failure'
847
859
        result = InstrumentedTestResult(None, None, None, None)
865
877
        # verbose test output formatting
866
878
        result_stream = StringIO()
867
879
        result = bzrlib.tests.VerboseTestResult(
868
 
            unittest._WritelnDecorator(result_stream),
 
880
            result_stream,
869
881
            descriptions=0,
870
882
            verbosity=2,
871
883
            )
881
893
        output = result_stream.getvalue()[prefix:]
882
894
        lines = output.splitlines()
883
895
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
896
        if sys.version_info > (2, 7):
 
897
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
898
                self.assertNotEqual, lines[1], '    ')
884
899
        self.assertEqual(lines[1], '    foo')
885
900
        self.assertEqual(2, len(lines))
886
901
 
894
909
        """Test the behaviour of invoking addNotSupported."""
895
910
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
911
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
912
            def report_tests_starting(self): pass
899
913
            def report_unsupported(self, test, feature):
900
914
                self._call = test, feature
901
915
        result = InstrumentedTestResult(None, None, None, None)
920
934
        # verbose test output formatting
921
935
        result_stream = StringIO()
922
936
        result = bzrlib.tests.VerboseTestResult(
923
 
            unittest._WritelnDecorator(result_stream),
 
937
            result_stream,
924
938
            descriptions=0,
925
939
            verbosity=2,
926
940
            )
940
954
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
955
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
956
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
957
            def report_tests_starting(self): pass
945
958
            def addNotSupported(self, test, feature):
946
959
                self._call = test, feature
947
960
        result = InstrumentedTestResult(None, None, None, None)
989
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
1003
            calls = 0
991
1004
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
1005
        result = InstrumentedTestResult(None, None, None, None)
994
1006
        def test_function():
995
1007
            pass
997
1009
        test.run(result)
998
1010
        self.assertEquals(1, result.calls)
999
1011
 
 
1012
    def test_startTests_only_once(self):
 
1013
        """With multiple tests startTests should still only be called once"""
 
1014
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1015
            calls = 0
 
1016
            def startTests(self): self.calls += 1
 
1017
        result = InstrumentedTestResult(None, None, None, None)
 
1018
        suite = unittest.TestSuite([
 
1019
            unittest.FunctionTestCase(lambda: None),
 
1020
            unittest.FunctionTestCase(lambda: None)])
 
1021
        suite.run(result)
 
1022
        self.assertEquals(1, result.calls)
 
1023
        self.assertEquals(2, result.count)
 
1024
 
1000
1025
 
1001
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1002
1027
 
1023
1048
        because of our use of global state.
1024
1049
        """
1025
1050
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1051
        try:
1028
1052
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1053
            return testrunner.run(test)
1031
1054
        finally:
1032
1055
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1056
 
1035
1057
    def test_known_failure_failed_run(self):
1036
1058
        # run a test that generates a known failure which should be printed in
1214
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1237
        self.assertLength(1, self._get_source_tree_calls)
1216
1238
 
 
1239
    def test_verbose_test_count(self):
 
1240
        """A verbose test run reports the right test count at the start"""
 
1241
        suite = TestUtil.TestSuite([
 
1242
            unittest.FunctionTestCase(lambda:None),
 
1243
            unittest.FunctionTestCase(lambda:None)])
 
1244
        self.assertEqual(suite.countTestCases(), 2)
 
1245
        stream = StringIO()
 
1246
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1247
        # Need to use the CountingDecorator as that's what sets num_tests
 
1248
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1249
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1250
 
1217
1251
    def test_startTestRun(self):
1218
1252
        """run should call result.startTestRun()"""
1219
1253
        calls = []
1422
1456
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1423
1457
        output_stream = StringIO()
1424
1458
        result = bzrlib.tests.VerboseTestResult(
1425
 
            unittest._WritelnDecorator(output_stream),
 
1459
            output_stream,
1426
1460
            descriptions=0,
1427
1461
            verbosity=2)
1428
1462
        sample_test.run(result)
1656
1690
        self.assertEqual('original', obj.test_attr)
1657
1691
 
1658
1692
 
 
1693
class _MissingFeature(tests.Feature):
 
1694
    def _probe(self):
 
1695
        return False
 
1696
missing_feature = _MissingFeature()
 
1697
 
 
1698
 
 
1699
def _get_test(name):
 
1700
    """Get an instance of a specific example test.
 
1701
 
 
1702
    We protect this in a function so that they don't auto-run in the test
 
1703
    suite.
 
1704
    """
 
1705
 
 
1706
    class ExampleTests(tests.TestCase):
 
1707
 
 
1708
        def test_fail(self):
 
1709
            mutter('this was a failing test')
 
1710
            self.fail('this test will fail')
 
1711
 
 
1712
        def test_error(self):
 
1713
            mutter('this test errored')
 
1714
            raise RuntimeError('gotcha')
 
1715
 
 
1716
        def test_missing_feature(self):
 
1717
            mutter('missing the feature')
 
1718
            self.requireFeature(missing_feature)
 
1719
 
 
1720
        def test_skip(self):
 
1721
            mutter('this test will be skipped')
 
1722
            raise tests.TestSkipped('reason')
 
1723
 
 
1724
        def test_success(self):
 
1725
            mutter('this test succeeds')
 
1726
 
 
1727
        def test_xfail(self):
 
1728
            mutter('test with expected failure')
 
1729
            self.knownFailure('this_fails')
 
1730
 
 
1731
        def test_unexpected_success(self):
 
1732
            mutter('test with unexpected success')
 
1733
            self.expectFailure('should_fail', lambda: None)
 
1734
 
 
1735
    return ExampleTests(name)
 
1736
 
 
1737
 
 
1738
class TestTestCaseLogDetails(tests.TestCase):
 
1739
 
 
1740
    def _run_test(self, test_name):
 
1741
        test = _get_test(test_name)
 
1742
        result = testtools.TestResult()
 
1743
        test.run(result)
 
1744
        return result
 
1745
 
 
1746
    def test_fail_has_log(self):
 
1747
        result = self._run_test('test_fail')
 
1748
        self.assertEqual(1, len(result.failures))
 
1749
        result_content = result.failures[0][1]
 
1750
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1751
        self.assertContainsRe(result_content, 'this was a failing test')
 
1752
 
 
1753
    def test_error_has_log(self):
 
1754
        result = self._run_test('test_error')
 
1755
        self.assertEqual(1, len(result.errors))
 
1756
        result_content = result.errors[0][1]
 
1757
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1758
        self.assertContainsRe(result_content, 'this test errored')
 
1759
 
 
1760
    def test_skip_has_no_log(self):
 
1761
        result = self._run_test('test_skip')
 
1762
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1763
        skips = result.skip_reasons['reason']
 
1764
        self.assertEqual(1, len(skips))
 
1765
        test = skips[0]
 
1766
        self.assertFalse('log' in test.getDetails())
 
1767
 
 
1768
    def test_missing_feature_has_no_log(self):
 
1769
        # testtools doesn't know about addNotSupported, so it just gets
 
1770
        # considered as a skip
 
1771
        result = self._run_test('test_missing_feature')
 
1772
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1773
        skips = result.skip_reasons[missing_feature]
 
1774
        self.assertEqual(1, len(skips))
 
1775
        test = skips[0]
 
1776
        self.assertFalse('log' in test.getDetails())
 
1777
 
 
1778
    def test_xfail_has_no_log(self):
 
1779
        result = self._run_test('test_xfail')
 
1780
        self.assertEqual(1, len(result.expectedFailures))
 
1781
        result_content = result.expectedFailures[0][1]
 
1782
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1783
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1784
 
 
1785
    def test_unexpected_success_has_log(self):
 
1786
        result = self._run_test('test_unexpected_success')
 
1787
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1788
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1789
        # expectedFailures is a list of reasons?
 
1790
        test = result.unexpectedSuccesses[0]
 
1791
        details = test.getDetails()
 
1792
        self.assertTrue('log' in details)
 
1793
 
 
1794
 
 
1795
class TestTestCloning(tests.TestCase):
 
1796
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1797
 
 
1798
    def test_cloned_testcase_does_not_share_details(self):
 
1799
        """A TestCase cloned with clone_test does not share mutable attributes
 
1800
        such as details or cleanups.
 
1801
        """
 
1802
        class Test(tests.TestCase):
 
1803
            def test_foo(self):
 
1804
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1805
        orig_test = Test('test_foo')
 
1806
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1807
        orig_test.run(unittest.TestResult())
 
1808
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1809
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1810
 
 
1811
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1812
        """Applying two levels of scenarios to a test preserves the attributes
 
1813
        added by both scenarios.
 
1814
        """
 
1815
        class Test(tests.TestCase):
 
1816
            def test_foo(self):
 
1817
                pass
 
1818
        test = Test('test_foo')
 
1819
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1820
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1821
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1822
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1823
        all_tests = list(tests.iter_suite_tests(suite))
 
1824
        self.assertLength(4, all_tests)
 
1825
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1826
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1827
 
 
1828
 
1659
1829
# NB: Don't delete this; it's not actually from 0.11!
1660
1830
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1831
def sample_deprecated_function():
1815
1985
                tree.branch.repository.bzrdir.root_transport)
1816
1986
 
1817
1987
 
1818
 
class SelfTestHelper:
 
1988
class SelfTestHelper(object):
1819
1989
 
1820
1990
    def run_selftest(self, **kwargs):
1821
1991
        """Run selftest returning its output."""
1947
2117
 
1948
2118
    def test_transport_sftp(self):
1949
2119
        self.requireFeature(features.paramiko)
 
2120
        from bzrlib.tests import stub_sftp
1950
2121
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
1951
2122
 
1952
2123
    def test_transport_memory(self):
1971
2142
            load_list='missing file name', list_only=True)
1972
2143
 
1973
2144
 
 
2145
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2146
 
 
2147
    _test_needs_features = [features.subunit]
 
2148
 
 
2149
    def run_subunit_stream(self, test_name):
 
2150
        from subunit import ProtocolTestCase
 
2151
        def factory():
 
2152
            return TestUtil.TestSuite([_get_test(test_name)])
 
2153
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2154
            test_suite_factory=factory)
 
2155
        test = ProtocolTestCase(stream)
 
2156
        result = testtools.TestResult()
 
2157
        test.run(result)
 
2158
        content = stream.getvalue()
 
2159
        return content, result
 
2160
 
 
2161
    def test_fail_has_log(self):
 
2162
        content, result = self.run_subunit_stream('test_fail')
 
2163
        self.assertEqual(1, len(result.failures))
 
2164
        self.assertContainsRe(content, '(?m)^log$')
 
2165
        self.assertContainsRe(content, 'this test will fail')
 
2166
 
 
2167
    def test_error_has_log(self):
 
2168
        content, result = self.run_subunit_stream('test_error')
 
2169
        self.assertContainsRe(content, '(?m)^log$')
 
2170
        self.assertContainsRe(content, 'this test errored')
 
2171
 
 
2172
    def test_skip_has_no_log(self):
 
2173
        content, result = self.run_subunit_stream('test_skip')
 
2174
        self.assertNotContainsRe(content, '(?m)^log$')
 
2175
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2176
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2177
        skips = result.skip_reasons['reason']
 
2178
        self.assertEqual(1, len(skips))
 
2179
        test = skips[0]
 
2180
        # RemotedTestCase doesn't preserve the "details"
 
2181
        ## self.assertFalse('log' in test.getDetails())
 
2182
 
 
2183
    def test_missing_feature_has_no_log(self):
 
2184
        content, result = self.run_subunit_stream('test_missing_feature')
 
2185
        self.assertNotContainsRe(content, '(?m)^log$')
 
2186
        self.assertNotContainsRe(content, 'missing the feature')
 
2187
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2188
        skips = result.skip_reasons['_MissingFeature\n']
 
2189
        self.assertEqual(1, len(skips))
 
2190
        test = skips[0]
 
2191
        # RemotedTestCase doesn't preserve the "details"
 
2192
        ## self.assertFalse('log' in test.getDetails())
 
2193
 
 
2194
    def test_xfail_has_no_log(self):
 
2195
        content, result = self.run_subunit_stream('test_xfail')
 
2196
        self.assertNotContainsRe(content, '(?m)^log$')
 
2197
        self.assertNotContainsRe(content, 'test with expected failure')
 
2198
        self.assertEqual(1, len(result.expectedFailures))
 
2199
        result_content = result.expectedFailures[0][1]
 
2200
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2201
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2202
 
 
2203
    def test_unexpected_success_has_log(self):
 
2204
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2205
        self.assertContainsRe(content, '(?m)^log$')
 
2206
        self.assertContainsRe(content, 'test with unexpected success')
 
2207
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2208
                           ' as a plain success',
 
2209
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2210
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2211
        test = result.unexpectedSuccesses[0]
 
2212
        # RemotedTestCase doesn't preserve the "details"
 
2213
        ## self.assertTrue('log' in test.getDetails())
 
2214
 
 
2215
    def test_success_has_no_log(self):
 
2216
        content, result = self.run_subunit_stream('test_success')
 
2217
        self.assertEqual(1, result.testsRun)
 
2218
        self.assertNotContainsRe(content, '(?m)^log$')
 
2219
        self.assertNotContainsRe(content, 'this test succeeds')
 
2220
 
 
2221
 
1974
2222
class TestRunBzr(tests.TestCase):
1975
2223
 
1976
2224
    out = ''
2339
2587
            os.chdir = orig_chdir
2340
2588
        self.assertEqual(['foo', 'current'], chdirs)
2341
2589
 
 
2590
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2591
        self.get_source_path = lambda: ""
 
2592
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2593
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2594
 
2342
2595
 
2343
2596
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2597
    """Tests that really need to do things with an external bzr."""
2773
3026
        # Test that a plausible list of modules to doctest is returned
2774
3027
        # by _test_suite_modules_to_doctest.
2775
3028
        test_list = tests._test_suite_modules_to_doctest()
 
3029
        if __doc__ is None:
 
3030
            # When docstrings are stripped, there are no modules to doctest
 
3031
            self.assertEqual([], test_list)
 
3032
            return
2776
3033
        self.assertSubset([
2777
3034
            'bzrlib.timestamp',
2778
3035
            ],
2795
3052
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2796
3053
        def doctests():
2797
3054
            calls.append("modules_to_doctest")
 
3055
            if __doc__ is None:
 
3056
                return []
2798
3057
            return ['bzrlib.timestamp']
2799
3058
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2800
3059
        expected_test_list = [
2803
3062
            ('bzrlib.tests.per_transport.TransportTests'
2804
3063
             '.test_abspath(LocalTransport,LocalURLServer)'),
2805
3064
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2806
 
            # modules_to_doctest
2807
 
            'bzrlib.timestamp.format_highres_date',
2808
3065
            # plugins can't be tested that way since selftest may be run with
2809
3066
            # --no-plugins
2810
3067
            ]
 
3068
        if __doc__ is not None:
 
3069
            expected_test_list.extend([
 
3070
                # modules_to_doctest
 
3071
                'bzrlib.timestamp.format_highres_date',
 
3072
                ])
2811
3073
        suite = tests.test_suite()
2812
3074
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
2813
3075
            set(calls))
2951
3213
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2952
3214
 
2953
3215
 
 
3216
class TestThreadLeakDetection(tests.TestCase):
 
3217
    """Ensure when tests leak threads we detect and report it"""
 
3218
 
 
3219
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3220
        def __init__(self):
 
3221
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3222
            self.leaks = []
 
3223
        def _report_thread_leak(self, test, leaks, alive):
 
3224
            self.leaks.append((test, leaks))
 
3225
 
 
3226
    def test_testcase_without_addCleanups(self):
 
3227
        """Check old TestCase instances don't break with leak detection"""
 
3228
        class Test(unittest.TestCase):
 
3229
            def runTest(self):
 
3230
                pass
 
3231
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3232
        result = self.LeakRecordingResult()
 
3233
        test = Test()
 
3234
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3235
        result.startTestRun()
 
3236
        test.run(result)
 
3237
        result.stopTestRun()
 
3238
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3239
        self.assertEqual(result.leaks, [])
 
3240
        
 
3241
    def test_thread_leak(self):
 
3242
        """Ensure a thread that outlives the running of a test is reported
 
3243
 
 
3244
        Uses a thread that blocks on an event, and is started by the inner
 
3245
        test case. As the thread outlives the inner case's run, it should be
 
3246
        detected as a leak, but the event is then set so that the thread can
 
3247
        be safely joined in cleanup so it's not leaked for real.
 
3248
        """
 
3249
        event = threading.Event()
 
3250
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3251
        class Test(tests.TestCase):
 
3252
            def test_leak(self):
 
3253
                thread.start()
 
3254
        result = self.LeakRecordingResult()
 
3255
        test = Test("test_leak")
 
3256
        self.addCleanup(thread.join)
 
3257
        self.addCleanup(event.set)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3262
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3263
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3264
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3265
 
 
3266
    def test_multiple_leaks(self):
 
3267
        """Check multiple leaks are blamed on the test cases at fault
 
3268
 
 
3269
        Same concept as the previous test, but has one inner test method that
 
3270
        leaks two threads, and one that doesn't leak at all.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3274
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3275
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3276
        class Test(tests.TestCase):
 
3277
            def test_first_leak(self):
 
3278
                thread_b.start()
 
3279
            def test_second_no_leak(self):
 
3280
                pass
 
3281
            def test_third_leak(self):
 
3282
                thread_c.start()
 
3283
                thread_a.start()
 
3284
        result = self.LeakRecordingResult()
 
3285
        first_test = Test("test_first_leak")
 
3286
        third_test = Test("test_third_leak")
 
3287
        self.addCleanup(thread_a.join)
 
3288
        self.addCleanup(thread_b.join)
 
3289
        self.addCleanup(thread_c.join)
 
3290
        self.addCleanup(event.set)
 
3291
        result.startTestRun()
 
3292
        unittest.TestSuite(
 
3293
            [first_test, Test("test_second_no_leak"), third_test]
 
3294
            ).run(result)
 
3295
        result.stopTestRun()
 
3296
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3297
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3298
        self.assertEqual(result.leaks, [
 
3299
            (first_test, set([thread_b])),
 
3300
            (third_test, set([thread_a, thread_c]))])
 
3301
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3302
 
 
3303
 
2954
3304
class TestRunSuite(tests.TestCase):
2955
3305
 
2956
3306
    def test_runner_class(self):