~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin Pool
  • Date: 2010-06-02 05:03:31 UTC
  • mto: This revision was merged to the branch mainline in revision 5279.
  • Revision ID: mbp@canonical.com-20100602050331-n2p1qt8hfsahspnv
Correct more sloppy use of the term 'Linux'

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import doctest
 
20
from doctest import ELLIPSIS
21
21
import os
22
22
import signal
23
23
import sys
24
 
import threading
25
24
import time
26
25
import unittest
27
26
import warnings
28
27
 
29
 
from testtools import (
30
 
    ExtendedToOriginalDecorator,
31
 
    MultiTestResult,
32
 
    )
33
 
from testtools.content import Content
 
28
from testtools import MultiTestResult
34
29
from testtools.content_type import ContentType
35
30
from testtools.matchers import (
36
31
    DocTestMatches,
42
37
from bzrlib import (
43
38
    branchbuilder,
44
39
    bzrdir,
 
40
    debug,
45
41
    errors,
46
42
    lockdir,
47
43
    memorytree,
48
44
    osutils,
 
45
    progress,
49
46
    remote,
50
47
    repository,
51
48
    symbol_versioning,
55
52
    )
56
53
from bzrlib.repofmt import (
57
54
    groupcompress_repo,
 
55
    pack_repo,
58
56
    weaverepo,
59
57
    )
60
58
from bzrlib.symbol_versioning import (
66
64
    features,
67
65
    test_lsprof,
68
66
    test_server,
 
67
    test_sftp_transport,
69
68
    TestUtil,
70
69
    )
71
 
from bzrlib.trace import note, mutter
 
70
from bzrlib.trace import note
72
71
from bzrlib.transport import memory
 
72
from bzrlib.version import _get_bzr_source_tree
73
73
 
74
74
 
75
75
def _test_ids(test_suite):
77
77
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
78
 
79
79
 
 
80
class SelftestTests(tests.TestCase):
 
81
 
 
82
    def test_import_tests(self):
 
83
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
84
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
85
 
 
86
    def test_import_test_failure(self):
 
87
        self.assertRaises(ImportError,
 
88
                          TestUtil._load_module_by_name,
 
89
                          'bzrlib.no-name-yet')
 
90
 
 
91
 
80
92
class MetaTestLog(tests.TestCase):
81
93
 
82
94
    def test_logging(self):
88
100
            "text", "plain", {"charset": "utf8"})))
89
101
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
90
102
        self.assertThat(self.get_log(),
91
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
103
            DocTestMatches(u"...a test message\n", ELLIPSIS))
92
104
 
93
105
 
94
106
class TestUnicodeFilename(tests.TestCase):
110
122
        self.failUnlessExists(filename)
111
123
 
112
124
 
113
 
class TestClassesAvailable(tests.TestCase):
114
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
115
 
 
116
 
    def test_test_case(self):
117
 
        from bzrlib.tests import TestCase
118
 
 
119
 
    def test_test_loader(self):
120
 
        from bzrlib.tests import TestLoader
121
 
 
122
 
    def test_test_suite(self):
123
 
        from bzrlib.tests import TestSuite
124
 
 
125
 
 
126
125
class TestTransportScenarios(tests.TestCase):
127
126
    """A group of tests that test the transport implementation adaption core.
128
127
 
209
208
    def test_scenarios(self):
210
209
        # check that constructor parameters are passed through to the adapted
211
210
        # test.
212
 
        from bzrlib.tests.per_controldir import make_scenarios
 
211
        from bzrlib.tests.per_bzrdir import make_scenarios
213
212
        vfs_factory = "v"
214
213
        server1 = "a"
215
214
        server2 = "b"
313
312
        from bzrlib.tests.per_interrepository import make_scenarios
314
313
        server1 = "a"
315
314
        server2 = "b"
316
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
315
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
317
316
        scenarios = make_scenarios(server1, server2, formats)
318
317
        self.assertEqual([
319
318
            ('C0,str,str',
320
319
             {'repository_format': 'C1',
321
320
              'repository_format_to': 'C2',
322
321
              'transport_readonly_server': 'b',
323
 
              'transport_server': 'a',
324
 
              'extra_setup': 'C3'}),
 
322
              'transport_server': 'a'}),
325
323
            ('D0,str,str',
326
324
             {'repository_format': 'D1',
327
325
              'repository_format_to': 'D2',
328
326
              'transport_readonly_server': 'b',
329
 
              'transport_server': 'a',
330
 
              'extra_setup': 'D3'})],
 
327
              'transport_server': 'a'})],
331
328
            scenarios)
332
329
 
333
330
 
614
611
        result = test.run()
615
612
        total_failures = result.errors + result.failures
616
613
        if self._lock_check_thorough:
617
 
            self.assertEqual(1, len(total_failures))
 
614
            self.assertLength(1, total_failures)
618
615
        else:
619
616
            # When _lock_check_thorough is disabled, then we don't trigger a
620
617
            # failure
621
 
            self.assertEqual(0, len(total_failures))
 
618
            self.assertLength(0, total_failures)
622
619
 
623
620
 
624
621
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
625
622
    """Tests for the convenience functions TestCaseWithTransport introduces."""
626
623
 
627
624
    def test_get_readonly_url_none(self):
 
625
        from bzrlib.transport import get_transport
628
626
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
629
627
        self.vfs_transport_factory = memory.MemoryServer
630
628
        self.transport_readonly_server = None
632
630
        # for the server
633
631
        url = self.get_readonly_url()
634
632
        url2 = self.get_readonly_url('foo/bar')
635
 
        t = transport.get_transport(url)
636
 
        t2 = transport.get_transport(url2)
 
633
        t = get_transport(url)
 
634
        t2 = get_transport(url2)
637
635
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
638
636
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
639
637
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
640
638
 
641
639
    def test_get_readonly_url_http(self):
642
640
        from bzrlib.tests.http_server import HttpServer
 
641
        from bzrlib.transport import get_transport
643
642
        from bzrlib.transport.http import HttpTransportBase
644
643
        self.transport_server = test_server.LocalURLServer
645
644
        self.transport_readonly_server = HttpServer
647
646
        url = self.get_readonly_url()
648
647
        url2 = self.get_readonly_url('foo/bar')
649
648
        # the transport returned may be any HttpTransportBase subclass
650
 
        t = transport.get_transport(url)
651
 
        t2 = transport.get_transport(url2)
 
649
        t = get_transport(url)
 
650
        t2 = get_transport(url2)
652
651
        self.failUnless(isinstance(t, HttpTransportBase))
653
652
        self.failUnless(isinstance(t2, HttpTransportBase))
654
653
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
692
691
class TestChrootedTest(tests.ChrootedTestCase):
693
692
 
694
693
    def test_root_is_root(self):
695
 
        t = transport.get_transport(self.get_readonly_url())
 
694
        from bzrlib.transport import get_transport
 
695
        t = get_transport(self.get_readonly_url())
696
696
        url = t.base
697
697
        self.assertEqual(url, t.clone('..').base)
698
698
 
804
804
        self.requireFeature(test_lsprof.LSProfFeature)
805
805
        result_stream = StringIO()
806
806
        result = bzrlib.tests.VerboseTestResult(
807
 
            result_stream,
 
807
            unittest._WritelnDecorator(result_stream),
808
808
            descriptions=0,
809
809
            verbosity=2,
810
810
            )
836
836
        self.assertContainsRe(output,
837
837
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
838
838
 
839
 
    def test_uses_time_from_testtools(self):
840
 
        """Test case timings in verbose results should use testtools times"""
841
 
        import datetime
842
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
843
 
            def startTest(self, test):
844
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
845
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
846
 
            def addSuccess(self, test):
847
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
848
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
849
 
            def report_tests_starting(self): pass
850
 
        sio = StringIO()
851
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
852
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
853
 
 
854
839
    def test_known_failure(self):
855
840
        """A KnownFailure being raised should trigger several result actions."""
856
841
        class InstrumentedTestResult(tests.ExtendedTestResult):
857
842
            def stopTestRun(self): pass
858
 
            def report_tests_starting(self): pass
 
843
            def startTests(self): pass
 
844
            def report_test_start(self, test): pass
859
845
            def report_known_failure(self, test, err=None, details=None):
860
846
                self._call = test, 'known failure'
861
847
        result = InstrumentedTestResult(None, None, None, None)
879
865
        # verbose test output formatting
880
866
        result_stream = StringIO()
881
867
        result = bzrlib.tests.VerboseTestResult(
882
 
            result_stream,
 
868
            unittest._WritelnDecorator(result_stream),
883
869
            descriptions=0,
884
870
            verbosity=2,
885
871
            )
895
881
        output = result_stream.getvalue()[prefix:]
896
882
        lines = output.splitlines()
897
883
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
898
 
        if sys.version_info > (2, 7):
899
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
900
 
                self.assertNotEqual, lines[1], '    ')
901
884
        self.assertEqual(lines[1], '    foo')
902
885
        self.assertEqual(2, len(lines))
903
886
 
911
894
        """Test the behaviour of invoking addNotSupported."""
912
895
        class InstrumentedTestResult(tests.ExtendedTestResult):
913
896
            def stopTestRun(self): pass
914
 
            def report_tests_starting(self): pass
 
897
            def startTests(self): pass
 
898
            def report_test_start(self, test): pass
915
899
            def report_unsupported(self, test, feature):
916
900
                self._call = test, feature
917
901
        result = InstrumentedTestResult(None, None, None, None)
936
920
        # verbose test output formatting
937
921
        result_stream = StringIO()
938
922
        result = bzrlib.tests.VerboseTestResult(
939
 
            result_stream,
 
923
            unittest._WritelnDecorator(result_stream),
940
924
            descriptions=0,
941
925
            verbosity=2,
942
926
            )
956
940
        """An UnavailableFeature being raised should invoke addNotSupported."""
957
941
        class InstrumentedTestResult(tests.ExtendedTestResult):
958
942
            def stopTestRun(self): pass
959
 
            def report_tests_starting(self): pass
 
943
            def startTests(self): pass
 
944
            def report_test_start(self, test): pass
960
945
            def addNotSupported(self, test, feature):
961
946
                self._call = test, feature
962
947
        result = InstrumentedTestResult(None, None, None, None)
1004
989
        class InstrumentedTestResult(tests.ExtendedTestResult):
1005
990
            calls = 0
1006
991
            def startTests(self): self.calls += 1
 
992
            def report_test_start(self, test): pass
1007
993
        result = InstrumentedTestResult(None, None, None, None)
1008
994
        def test_function():
1009
995
            pass
1011
997
        test.run(result)
1012
998
        self.assertEquals(1, result.calls)
1013
999
 
1014
 
    def test_startTests_only_once(self):
1015
 
        """With multiple tests startTests should still only be called once"""
1016
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
1017
 
            calls = 0
1018
 
            def startTests(self): self.calls += 1
1019
 
        result = InstrumentedTestResult(None, None, None, None)
1020
 
        suite = unittest.TestSuite([
1021
 
            unittest.FunctionTestCase(lambda: None),
1022
 
            unittest.FunctionTestCase(lambda: None)])
1023
 
        suite.run(result)
1024
 
        self.assertEquals(1, result.calls)
1025
 
        self.assertEquals(2, result.count)
1026
 
 
1027
1000
 
1028
1001
class TestUnicodeFilenameFeature(tests.TestCase):
1029
1002
 
1050
1023
        because of our use of global state.
1051
1024
        """
1052
1025
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1026
        old_leak = tests.TestCase._first_thread_leaker_id
1053
1027
        try:
1054
1028
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1029
            tests.TestCase._first_thread_leaker_id = None
1055
1030
            return testrunner.run(test)
1056
1031
        finally:
1057
1032
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1033
            tests.TestCase._first_thread_leaker_id = old_leak
1058
1034
 
1059
1035
    def test_known_failure_failed_run(self):
1060
1036
        # run a test that generates a known failure which should be printed in
1106
1082
    def test_result_decorator(self):
1107
1083
        # decorate results
1108
1084
        calls = []
1109
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1085
        class LoggingDecorator(tests.ForwardingResult):
1110
1086
            def startTest(self, test):
1111
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1087
                tests.ForwardingResult.startTest(self, test)
1112
1088
                calls.append('start')
1113
1089
        test = unittest.FunctionTestCase(lambda:None)
1114
1090
        stream = StringIO()
1238
1214
        self.assertContainsRe(output_string, "--date [0-9.]+")
1239
1215
        self.assertLength(1, self._get_source_tree_calls)
1240
1216
 
1241
 
    def test_verbose_test_count(self):
1242
 
        """A verbose test run reports the right test count at the start"""
1243
 
        suite = TestUtil.TestSuite([
1244
 
            unittest.FunctionTestCase(lambda:None),
1245
 
            unittest.FunctionTestCase(lambda:None)])
1246
 
        self.assertEqual(suite.countTestCases(), 2)
1247
 
        stream = StringIO()
1248
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1249
 
        # Need to use the CountingDecorator as that's what sets num_tests
1250
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1251
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1252
 
 
1253
1217
    def test_startTestRun(self):
1254
1218
        """run should call result.startTestRun()"""
1255
1219
        calls = []
1256
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1220
        class LoggingDecorator(tests.ForwardingResult):
1257
1221
            def startTestRun(self):
1258
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1222
                tests.ForwardingResult.startTestRun(self)
1259
1223
                calls.append('startTestRun')
1260
1224
        test = unittest.FunctionTestCase(lambda:None)
1261
1225
        stream = StringIO()
1267
1231
    def test_stopTestRun(self):
1268
1232
        """run should call result.stopTestRun()"""
1269
1233
        calls = []
1270
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1234
        class LoggingDecorator(tests.ForwardingResult):
1271
1235
            def stopTestRun(self):
1272
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1236
                tests.ForwardingResult.stopTestRun(self)
1273
1237
                calls.append('stopTestRun')
1274
1238
        test = unittest.FunctionTestCase(lambda:None)
1275
1239
        stream = StringIO()
1278
1242
        result = self.run_test_runner(runner, test)
1279
1243
        self.assertLength(1, calls)
1280
1244
 
1281
 
    def test_unicode_test_output_on_ascii_stream(self):
1282
 
        """Showing results should always succeed even on an ascii console"""
1283
 
        class FailureWithUnicode(tests.TestCase):
1284
 
            def test_log_unicode(self):
1285
 
                self.log(u"\u2606")
1286
 
                self.fail("Now print that log!")
1287
 
        out = StringIO()
1288
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1289
 
            lambda trace=False: "ascii")
1290
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1291
 
            FailureWithUnicode("test_log_unicode"))
1292
 
        self.assertContainsRe(out.getvalue(),
1293
 
            "Text attachment: log\n"
1294
 
            "-+\n"
1295
 
            "\d+\.\d+  \\\\u2606\n"
1296
 
            "-+\n")
1297
 
 
1298
1245
 
1299
1246
class SampleTestCase(tests.TestCase):
1300
1247
 
1475
1422
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1476
1423
        output_stream = StringIO()
1477
1424
        result = bzrlib.tests.VerboseTestResult(
1478
 
            output_stream,
 
1425
            unittest._WritelnDecorator(output_stream),
1479
1426
            descriptions=0,
1480
1427
            verbosity=2)
1481
1428
        sample_test.run(result)
1709
1656
        self.assertEqual('original', obj.test_attr)
1710
1657
 
1711
1658
 
1712
 
class _MissingFeature(tests.Feature):
1713
 
    def _probe(self):
1714
 
        return False
1715
 
missing_feature = _MissingFeature()
1716
 
 
1717
 
 
1718
 
def _get_test(name):
1719
 
    """Get an instance of a specific example test.
1720
 
 
1721
 
    We protect this in a function so that they don't auto-run in the test
1722
 
    suite.
1723
 
    """
1724
 
 
1725
 
    class ExampleTests(tests.TestCase):
1726
 
 
1727
 
        def test_fail(self):
1728
 
            mutter('this was a failing test')
1729
 
            self.fail('this test will fail')
1730
 
 
1731
 
        def test_error(self):
1732
 
            mutter('this test errored')
1733
 
            raise RuntimeError('gotcha')
1734
 
 
1735
 
        def test_missing_feature(self):
1736
 
            mutter('missing the feature')
1737
 
            self.requireFeature(missing_feature)
1738
 
 
1739
 
        def test_skip(self):
1740
 
            mutter('this test will be skipped')
1741
 
            raise tests.TestSkipped('reason')
1742
 
 
1743
 
        def test_success(self):
1744
 
            mutter('this test succeeds')
1745
 
 
1746
 
        def test_xfail(self):
1747
 
            mutter('test with expected failure')
1748
 
            self.knownFailure('this_fails')
1749
 
 
1750
 
        def test_unexpected_success(self):
1751
 
            mutter('test with unexpected success')
1752
 
            self.expectFailure('should_fail', lambda: None)
1753
 
 
1754
 
    return ExampleTests(name)
1755
 
 
1756
 
 
1757
 
class TestTestCaseLogDetails(tests.TestCase):
1758
 
 
1759
 
    def _run_test(self, test_name):
1760
 
        test = _get_test(test_name)
1761
 
        result = testtools.TestResult()
1762
 
        test.run(result)
1763
 
        return result
1764
 
 
1765
 
    def test_fail_has_log(self):
1766
 
        result = self._run_test('test_fail')
1767
 
        self.assertEqual(1, len(result.failures))
1768
 
        result_content = result.failures[0][1]
1769
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1770
 
        self.assertContainsRe(result_content, 'this was a failing test')
1771
 
 
1772
 
    def test_error_has_log(self):
1773
 
        result = self._run_test('test_error')
1774
 
        self.assertEqual(1, len(result.errors))
1775
 
        result_content = result.errors[0][1]
1776
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1777
 
        self.assertContainsRe(result_content, 'this test errored')
1778
 
 
1779
 
    def test_skip_has_no_log(self):
1780
 
        result = self._run_test('test_skip')
1781
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1782
 
        skips = result.skip_reasons['reason']
1783
 
        self.assertEqual(1, len(skips))
1784
 
        test = skips[0]
1785
 
        self.assertFalse('log' in test.getDetails())
1786
 
 
1787
 
    def test_missing_feature_has_no_log(self):
1788
 
        # testtools doesn't know about addNotSupported, so it just gets
1789
 
        # considered as a skip
1790
 
        result = self._run_test('test_missing_feature')
1791
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1792
 
        skips = result.skip_reasons[missing_feature]
1793
 
        self.assertEqual(1, len(skips))
1794
 
        test = skips[0]
1795
 
        self.assertFalse('log' in test.getDetails())
1796
 
 
1797
 
    def test_xfail_has_no_log(self):
1798
 
        result = self._run_test('test_xfail')
1799
 
        self.assertEqual(1, len(result.expectedFailures))
1800
 
        result_content = result.expectedFailures[0][1]
1801
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1802
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1803
 
 
1804
 
    def test_unexpected_success_has_log(self):
1805
 
        result = self._run_test('test_unexpected_success')
1806
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1807
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1808
 
        # expectedFailures is a list of reasons?
1809
 
        test = result.unexpectedSuccesses[0]
1810
 
        details = test.getDetails()
1811
 
        self.assertTrue('log' in details)
1812
 
 
1813
 
 
1814
 
class TestTestCloning(tests.TestCase):
1815
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1816
 
 
1817
 
    def test_cloned_testcase_does_not_share_details(self):
1818
 
        """A TestCase cloned with clone_test does not share mutable attributes
1819
 
        such as details or cleanups.
1820
 
        """
1821
 
        class Test(tests.TestCase):
1822
 
            def test_foo(self):
1823
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1824
 
        orig_test = Test('test_foo')
1825
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1826
 
        orig_test.run(unittest.TestResult())
1827
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1828
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1829
 
 
1830
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1831
 
        """Applying two levels of scenarios to a test preserves the attributes
1832
 
        added by both scenarios.
1833
 
        """
1834
 
        class Test(tests.TestCase):
1835
 
            def test_foo(self):
1836
 
                pass
1837
 
        test = Test('test_foo')
1838
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1839
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1840
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1841
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1842
 
        all_tests = list(tests.iter_suite_tests(suite))
1843
 
        self.assertLength(4, all_tests)
1844
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1845
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1846
 
 
1847
 
 
1848
1659
# NB: Don't delete this; it's not actually from 0.11!
1849
1660
@deprecated_function(deprecated_in((0, 11, 0)))
1850
1661
def sample_deprecated_function():
2004
1815
                tree.branch.repository.bzrdir.root_transport)
2005
1816
 
2006
1817
 
2007
 
class SelfTestHelper(object):
 
1818
class SelfTestHelper:
2008
1819
 
2009
1820
    def run_selftest(self, **kwargs):
2010
1821
        """Run selftest returning its output."""
2070
1881
            def __call__(test, result):
2071
1882
                test.run(result)
2072
1883
            def run(test, result):
2073
 
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
 
1884
                self.assertIsInstance(result, tests.ForwardingResult)
2074
1885
                calls.append("called")
2075
1886
            def countTestCases(self):
2076
1887
                return 1
2161
1972
            load_list='missing file name', list_only=True)
2162
1973
 
2163
1974
 
2164
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2165
 
 
2166
 
    _test_needs_features = [features.subunit]
2167
 
 
2168
 
    def run_subunit_stream(self, test_name):
2169
 
        from subunit import ProtocolTestCase
2170
 
        def factory():
2171
 
            return TestUtil.TestSuite([_get_test(test_name)])
2172
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2173
 
            test_suite_factory=factory)
2174
 
        test = ProtocolTestCase(stream)
2175
 
        result = testtools.TestResult()
2176
 
        test.run(result)
2177
 
        content = stream.getvalue()
2178
 
        return content, result
2179
 
 
2180
 
    def test_fail_has_log(self):
2181
 
        content, result = self.run_subunit_stream('test_fail')
2182
 
        self.assertEqual(1, len(result.failures))
2183
 
        self.assertContainsRe(content, '(?m)^log$')
2184
 
        self.assertContainsRe(content, 'this test will fail')
2185
 
 
2186
 
    def test_error_has_log(self):
2187
 
        content, result = self.run_subunit_stream('test_error')
2188
 
        self.assertContainsRe(content, '(?m)^log$')
2189
 
        self.assertContainsRe(content, 'this test errored')
2190
 
 
2191
 
    def test_skip_has_no_log(self):
2192
 
        content, result = self.run_subunit_stream('test_skip')
2193
 
        self.assertNotContainsRe(content, '(?m)^log$')
2194
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2195
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2196
 
        skips = result.skip_reasons['reason']
2197
 
        self.assertEqual(1, len(skips))
2198
 
        test = skips[0]
2199
 
        # RemotedTestCase doesn't preserve the "details"
2200
 
        ## self.assertFalse('log' in test.getDetails())
2201
 
 
2202
 
    def test_missing_feature_has_no_log(self):
2203
 
        content, result = self.run_subunit_stream('test_missing_feature')
2204
 
        self.assertNotContainsRe(content, '(?m)^log$')
2205
 
        self.assertNotContainsRe(content, 'missing the feature')
2206
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2207
 
        skips = result.skip_reasons['_MissingFeature\n']
2208
 
        self.assertEqual(1, len(skips))
2209
 
        test = skips[0]
2210
 
        # RemotedTestCase doesn't preserve the "details"
2211
 
        ## self.assertFalse('log' in test.getDetails())
2212
 
 
2213
 
    def test_xfail_has_no_log(self):
2214
 
        content, result = self.run_subunit_stream('test_xfail')
2215
 
        self.assertNotContainsRe(content, '(?m)^log$')
2216
 
        self.assertNotContainsRe(content, 'test with expected failure')
2217
 
        self.assertEqual(1, len(result.expectedFailures))
2218
 
        result_content = result.expectedFailures[0][1]
2219
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2220
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2221
 
 
2222
 
    def test_unexpected_success_has_log(self):
2223
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2224
 
        self.assertContainsRe(content, '(?m)^log$')
2225
 
        self.assertContainsRe(content, 'test with unexpected success')
2226
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2227
 
                           ' as a plain success',
2228
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2229
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2230
 
        test = result.unexpectedSuccesses[0]
2231
 
        # RemotedTestCase doesn't preserve the "details"
2232
 
        ## self.assertTrue('log' in test.getDetails())
2233
 
 
2234
 
    def test_success_has_no_log(self):
2235
 
        content, result = self.run_subunit_stream('test_success')
2236
 
        self.assertEqual(1, result.testsRun)
2237
 
        self.assertNotContainsRe(content, '(?m)^log$')
2238
 
        self.assertNotContainsRe(content, 'this test succeeds')
2239
 
 
2240
 
 
2241
1975
class TestRunBzr(tests.TestCase):
2242
1976
 
2243
1977
    out = ''
2606
2340
            os.chdir = orig_chdir
2607
2341
        self.assertEqual(['foo', 'current'], chdirs)
2608
2342
 
2609
 
    def test_get_bzr_path_with_cwd_bzrlib(self):
2610
 
        self.get_source_path = lambda: ""
2611
 
        self.overrideAttr(os.path, "isfile", lambda path: True)
2612
 
        self.assertEqual(self.get_bzr_path(), "bzr")
2613
 
 
2614
2343
 
2615
2344
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2616
2345
    """Tests that really need to do things with an external bzr."""
3206
2935
        tpr.register('bar', 'bBB.aAA.rRR')
3207
2936
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3208
2937
        self.assertThat(self.get_log(),
3209
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3210
 
                           doctest.ELLIPSIS))
 
2938
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3211
2939
 
3212
2940
    def test_get_unknown_prefix(self):
3213
2941
        tpr = self._get_registry()
3233
2961
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3234
2962
 
3235
2963
 
3236
 
class TestThreadLeakDetection(tests.TestCase):
3237
 
    """Ensure when tests leak threads we detect and report it"""
3238
 
 
3239
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3240
 
        def __init__(self):
3241
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3242
 
            self.leaks = []
3243
 
        def _report_thread_leak(self, test, leaks, alive):
3244
 
            self.leaks.append((test, leaks))
3245
 
 
3246
 
    def test_testcase_without_addCleanups(self):
3247
 
        """Check old TestCase instances don't break with leak detection"""
3248
 
        class Test(unittest.TestCase):
3249
 
            def runTest(self):
3250
 
                pass
3251
 
        result = self.LeakRecordingResult()
3252
 
        test = Test()
3253
 
        result.startTestRun()
3254
 
        test.run(result)
3255
 
        result.stopTestRun()
3256
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3257
 
        self.assertEqual(result.leaks, [])
3258
 
        
3259
 
    def test_thread_leak(self):
3260
 
        """Ensure a thread that outlives the running of a test is reported
3261
 
 
3262
 
        Uses a thread that blocks on an event, and is started by the inner
3263
 
        test case. As the thread outlives the inner case's run, it should be
3264
 
        detected as a leak, but the event is then set so that the thread can
3265
 
        be safely joined in cleanup so it's not leaked for real.
3266
 
        """
3267
 
        event = threading.Event()
3268
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3269
 
        class Test(tests.TestCase):
3270
 
            def test_leak(self):
3271
 
                thread.start()
3272
 
        result = self.LeakRecordingResult()
3273
 
        test = Test("test_leak")
3274
 
        self.addCleanup(thread.join)
3275
 
        self.addCleanup(event.set)
3276
 
        result.startTestRun()
3277
 
        test.run(result)
3278
 
        result.stopTestRun()
3279
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3280
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3281
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3282
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3283
 
 
3284
 
    def test_multiple_leaks(self):
3285
 
        """Check multiple leaks are blamed on the test cases at fault
3286
 
 
3287
 
        Same concept as the previous test, but has one inner test method that
3288
 
        leaks two threads, and one that doesn't leak at all.
3289
 
        """
3290
 
        event = threading.Event()
3291
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3292
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3293
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3294
 
        class Test(tests.TestCase):
3295
 
            def test_first_leak(self):
3296
 
                thread_b.start()
3297
 
            def test_second_no_leak(self):
3298
 
                pass
3299
 
            def test_third_leak(self):
3300
 
                thread_c.start()
3301
 
                thread_a.start()
3302
 
        result = self.LeakRecordingResult()
3303
 
        first_test = Test("test_first_leak")
3304
 
        third_test = Test("test_third_leak")
3305
 
        self.addCleanup(thread_a.join)
3306
 
        self.addCleanup(thread_b.join)
3307
 
        self.addCleanup(thread_c.join)
3308
 
        self.addCleanup(event.set)
3309
 
        result.startTestRun()
3310
 
        unittest.TestSuite(
3311
 
            [first_test, Test("test_second_no_leak"), third_test]
3312
 
            ).run(result)
3313
 
        result.stopTestRun()
3314
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3315
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3316
 
        self.assertEqual(result.leaks, [
3317
 
            (first_test, set([thread_b])),
3318
 
            (third_test, set([thread_a, thread_c]))])
3319
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3320
 
 
3321
 
 
3322
 
class TestPostMortemDebugging(tests.TestCase):
3323
 
    """Check post mortem debugging works when tests fail or error"""
3324
 
 
3325
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3326
 
        def __init__(self):
3327
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3328
 
            self.postcode = None
3329
 
        def _post_mortem(self, tb=None):
3330
 
            """Record the code object at the end of the current traceback"""
3331
 
            tb = tb or sys.exc_info()[2]
3332
 
            if tb is not None:
3333
 
                next = tb.tb_next
3334
 
                while next is not None:
3335
 
                    tb = next
3336
 
                    next = next.tb_next
3337
 
                self.postcode = tb.tb_frame.f_code
3338
 
        def report_error(self, test, err):
3339
 
            pass
3340
 
        def report_failure(self, test, err):
3341
 
            pass
3342
 
 
3343
 
    def test_location_unittest_error(self):
3344
 
        """Needs right post mortem traceback with erroring unittest case"""
3345
 
        class Test(unittest.TestCase):
3346
 
            def runTest(self):
3347
 
                raise RuntimeError
3348
 
        result = self.TracebackRecordingResult()
3349
 
        Test().run(result)
3350
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3351
 
 
3352
 
    def test_location_unittest_failure(self):
3353
 
        """Needs right post mortem traceback with failing unittest case"""
3354
 
        class Test(unittest.TestCase):
3355
 
            def runTest(self):
3356
 
                raise self.failureException
3357
 
        result = self.TracebackRecordingResult()
3358
 
        Test().run(result)
3359
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3360
 
 
3361
 
    def test_location_bt_error(self):
3362
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3363
 
        class Test(tests.TestCase):
3364
 
            def test_error(self):
3365
 
                raise RuntimeError
3366
 
        result = self.TracebackRecordingResult()
3367
 
        Test("test_error").run(result)
3368
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3369
 
 
3370
 
    def test_location_bt_failure(self):
3371
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3372
 
        class Test(tests.TestCase):
3373
 
            def test_failure(self):
3374
 
                raise self.failureException
3375
 
        result = self.TracebackRecordingResult()
3376
 
        Test("test_failure").run(result)
3377
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3378
 
 
3379
 
    def test_env_var_triggers_post_mortem(self):
3380
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3381
 
        import pdb
3382
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3383
 
        post_mortem_calls = []
3384
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3385
 
        self.overrideEnv('BZR_TEST_PDB', None)
3386
 
        result._post_mortem(1)
3387
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3388
 
        result._post_mortem(2)
3389
 
        self.assertEqual([2], post_mortem_calls)
3390
 
 
3391
 
 
3392
2964
class TestRunSuite(tests.TestCase):
3393
2965
 
3394
2966
    def test_runner_class(self):
3405
2977
                                                self.verbosity)
3406
2978
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3407
2979
        self.assertLength(1, calls)
3408
 
 
3409
 
 
3410
 
class TestEnvironHandling(tests.TestCase):
3411
 
 
3412
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3413
 
        self.failIf('MYVAR' in os.environ)
3414
 
        self.overrideEnv('MYVAR', '42')
3415
 
        # We use an embedded test to make sure we fix the _captureVar bug
3416
 
        class Test(tests.TestCase):
3417
 
            def test_me(self):
3418
 
                # The first call save the 42 value
3419
 
                self.overrideEnv('MYVAR', None)
3420
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3421
 
                # Make sure we can call it twice
3422
 
                self.overrideEnv('MYVAR', None)
3423
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3424
 
        output = StringIO()
3425
 
        result = tests.TextTestResult(output, 0, 1)
3426
 
        Test('test_me').run(result)
3427
 
        if not result.wasStrictlySuccessful():
3428
 
            self.fail(output.getvalue())
3429
 
        # We get our value back
3430
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3431
 
 
3432
 
 
3433
 
class TestIsolatedEnv(tests.TestCase):
3434
 
    """Test isolating tests from os.environ.
3435
 
 
3436
 
    Since we use tests that are already isolated from os.environ a bit of care
3437
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3438
 
    The tests start an already clean os.environ which allow doing valid
3439
 
    assertions about which variables are present or not and design tests around
3440
 
    these assertions.
3441
 
    """
3442
 
 
3443
 
    class ScratchMonkey(tests.TestCase):
3444
 
 
3445
 
        def test_me(self):
3446
 
            pass
3447
 
 
3448
 
    def test_basics(self):
3449
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3450
 
        # for tests.TestCase.
3451
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3452
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3453
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3454
 
        self.assertFalse('BZR_HOME' in os.environ)
3455
 
        # Make sure we know the definition of LINES: part of os.environ for
3456
 
        # tests.TestCase
3457
 
        self.assertTrue('LINES' in tests.isolated_environ)
3458
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3459
 
        self.assertEquals('25', os.environ['LINES'])
3460
 
 
3461
 
    def test_injecting_unknown_variable(self):
3462
 
        # BZR_HOME is known to be absent from os.environ
3463
 
        test = self.ScratchMonkey('test_me')
3464
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3465
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3466
 
        tests.restore_os_environ(test)
3467
 
        self.assertFalse('BZR_HOME' in os.environ)
3468
 
 
3469
 
    def test_injecting_known_variable(self):
3470
 
        test = self.ScratchMonkey('test_me')
3471
 
        # LINES is known to be present in os.environ
3472
 
        tests.override_os_environ(test, {'LINES': '42'})
3473
 
        self.assertEquals('42', os.environ['LINES'])
3474
 
        tests.restore_os_environ(test)
3475
 
        self.assertEquals('25', os.environ['LINES'])
3476
 
 
3477
 
    def test_deleting_variable(self):
3478
 
        test = self.ScratchMonkey('test_me')
3479
 
        # LINES is known to be present in os.environ
3480
 
        tests.override_os_environ(test, {'LINES': None})
3481
 
        self.assertTrue('LINES' not in os.environ)
3482
 
        tests.restore_os_environ(test)
3483
 
        self.assertEquals('25', os.environ['LINES'])
3484
 
 
3485
 
 
3486
 
class TestDocTestSuiteIsolation(tests.TestCase):
3487
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3488
 
 
3489
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3490
 
    the clean environment as a base for testing. To precisely capture the
3491
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3492
 
    compare against.
3493
 
 
3494
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3495
 
    not `os.environ` so each test overrides it to suit its needs.
3496
 
 
3497
 
    """
3498
 
 
3499
 
    def get_doctest_suite_for_string(self, klass, string):
3500
 
        class Finder(doctest.DocTestFinder):
3501
 
 
3502
 
            def find(*args, **kwargs):
3503
 
                test = doctest.DocTestParser().get_doctest(
3504
 
                    string, {}, 'foo', 'foo.py', 0)
3505
 
                return [test]
3506
 
 
3507
 
        suite = klass(test_finder=Finder())
3508
 
        return suite
3509
 
 
3510
 
    def run_doctest_suite_for_string(self, klass, string):
3511
 
        suite = self.get_doctest_suite_for_string(klass, string)
3512
 
        output = StringIO()
3513
 
        result = tests.TextTestResult(output, 0, 1)
3514
 
        suite.run(result)
3515
 
        return result, output
3516
 
 
3517
 
    def assertDocTestStringSucceds(self, klass, string):
3518
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3519
 
        if not result.wasStrictlySuccessful():
3520
 
            self.fail(output.getvalue())
3521
 
 
3522
 
    def assertDocTestStringFails(self, klass, string):
3523
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3524
 
        if result.wasStrictlySuccessful():
3525
 
            self.fail(output.getvalue())
3526
 
 
3527
 
    def test_injected_variable(self):
3528
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3529
 
        test = """
3530
 
            >>> import os
3531
 
            >>> os.environ['LINES']
3532
 
            '42'
3533
 
            """
3534
 
        # doctest.DocTestSuite fails as it sees '25'
3535
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3536
 
        # tests.DocTestSuite sees '42'
3537
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3538
 
 
3539
 
    def test_deleted_variable(self):
3540
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3541
 
        test = """
3542
 
            >>> import os
3543
 
            >>> os.environ.get('LINES')
3544
 
            """
3545
 
        # doctest.DocTestSuite fails as it sees '25'
3546
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3547
 
        # tests.DocTestSuite sees None
3548
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)