~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

(gz) Backslash escape selftest output when printing to non-unicode consoles
 (Martin [gz])

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
42
47
    lockdir,
43
48
    memorytree,
44
49
    osutils,
45
 
    progress,
46
50
    remote,
47
51
    repository,
48
52
    symbol_versioning,
63
67
from bzrlib.tests import (
64
68
    features,
65
69
    test_lsprof,
 
70
    test_server,
66
71
    test_sftp_transport,
67
72
    TestUtil,
68
73
    )
69
 
from bzrlib.trace import note
70
 
from bzrlib.transport.memory import MemoryServer, MemoryTransport
 
74
from bzrlib.trace import note, mutter
 
75
from bzrlib.transport import memory
71
76
from bzrlib.version import _get_bzr_source_tree
72
77
 
73
78
 
76
81
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
77
82
 
78
83
 
79
 
class SelftestTests(tests.TestCase):
80
 
 
81
 
    def test_import_tests(self):
82
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
83
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
84
 
 
85
 
    def test_import_test_failure(self):
86
 
        self.assertRaises(ImportError,
87
 
                          TestUtil._load_module_by_name,
88
 
                          'bzrlib.no-name-yet')
89
 
 
90
 
 
91
84
class MetaTestLog(tests.TestCase):
92
85
 
93
86
    def test_logging(self):
121
114
        self.failUnlessExists(filename)
122
115
 
123
116
 
 
117
class TestClassesAvailable(tests.TestCase):
 
118
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
119
 
 
120
    def test_test_case(self):
 
121
        from bzrlib.tests import TestCase
 
122
 
 
123
    def test_test_loader(self):
 
124
        from bzrlib.tests import TestLoader
 
125
 
 
126
    def test_test_suite(self):
 
127
        from bzrlib.tests import TestSuite
 
128
 
 
129
 
124
130
class TestTransportScenarios(tests.TestCase):
125
131
    """A group of tests that test the transport implementation adaption core.
126
132
 
207
213
    def test_scenarios(self):
208
214
        # check that constructor parameters are passed through to the adapted
209
215
        # test.
210
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
216
        from bzrlib.tests.per_controldir import make_scenarios
211
217
        vfs_factory = "v"
212
218
        server1 = "a"
213
219
        server2 = "b"
311
317
        from bzrlib.tests.per_interrepository import make_scenarios
312
318
        server1 = "a"
313
319
        server2 = "b"
314
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
320
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
315
321
        scenarios = make_scenarios(server1, server2, formats)
316
322
        self.assertEqual([
317
323
            ('C0,str,str',
318
324
             {'repository_format': 'C1',
319
325
              'repository_format_to': 'C2',
320
326
              'transport_readonly_server': 'b',
321
 
              'transport_server': 'a'}),
 
327
              'transport_server': 'a',
 
328
              'extra_setup': 'C3'}),
322
329
            ('D0,str,str',
323
330
             {'repository_format': 'D1',
324
331
              'repository_format_to': 'D2',
325
332
              'transport_readonly_server': 'b',
326
 
              'transport_server': 'a'})],
 
333
              'transport_server': 'a',
 
334
              'extra_setup': 'D3'})],
327
335
            scenarios)
328
336
 
329
337
 
608
616
                l.attempt_lock()
609
617
        test = TestDanglingLock('test_function')
610
618
        result = test.run()
 
619
        total_failures = result.errors + result.failures
611
620
        if self._lock_check_thorough:
612
 
            self.assertEqual(1, len(result.errors))
 
621
            self.assertEqual(1, len(total_failures))
613
622
        else:
614
623
            # When _lock_check_thorough is disabled, then we don't trigger a
615
624
            # failure
616
 
            self.assertEqual(0, len(result.errors))
 
625
            self.assertEqual(0, len(total_failures))
617
626
 
618
627
 
619
628
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
620
629
    """Tests for the convenience functions TestCaseWithTransport introduces."""
621
630
 
622
631
    def test_get_readonly_url_none(self):
623
 
        from bzrlib.transport import get_transport
624
 
        from bzrlib.transport.memory import MemoryServer
625
632
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
 
        self.vfs_transport_factory = MemoryServer
 
633
        self.vfs_transport_factory = memory.MemoryServer
627
634
        self.transport_readonly_server = None
628
635
        # calling get_readonly_transport() constructs a decorator on the url
629
636
        # for the server
630
637
        url = self.get_readonly_url()
631
638
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
639
        t = transport.get_transport(url)
 
640
        t2 = transport.get_transport(url2)
634
641
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
642
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
643
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
644
 
638
645
    def test_get_readonly_url_http(self):
639
646
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
 
        from bzrlib.transport.local import LocalURLServer
642
647
        from bzrlib.transport.http import HttpTransportBase
643
 
        self.transport_server = LocalURLServer
 
648
        self.transport_server = test_server.LocalURLServer
644
649
        self.transport_readonly_server = HttpServer
645
650
        # calling get_readonly_transport() gives us a HTTP server instance.
646
651
        url = self.get_readonly_url()
647
652
        url2 = self.get_readonly_url('foo/bar')
648
653
        # the transport returned may be any HttpTransportBase subclass
649
 
        t = get_transport(url)
650
 
        t2 = get_transport(url2)
 
654
        t = transport.get_transport(url)
 
655
        t2 = transport.get_transport(url2)
651
656
        self.failUnless(isinstance(t, HttpTransportBase))
652
657
        self.failUnless(isinstance(t2, HttpTransportBase))
653
658
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
677
682
 
678
683
    def setUp(self):
679
684
        super(TestTestCaseTransports, self).setUp()
680
 
        self.vfs_transport_factory = MemoryServer
 
685
        self.vfs_transport_factory = memory.MemoryServer
681
686
 
682
687
    def test_make_bzrdir_preserves_transport(self):
683
688
        t = self.get_transport()
684
689
        result_bzrdir = self.make_bzrdir('subdir')
685
690
        self.assertIsInstance(result_bzrdir.transport,
686
 
                              MemoryTransport)
 
691
                              memory.MemoryTransport)
687
692
        # should not be on disk, should only be in memory
688
693
        self.failIfExists('subdir')
689
694
 
691
696
class TestChrootedTest(tests.ChrootedTestCase):
692
697
 
693
698
    def test_root_is_root(self):
694
 
        from bzrlib.transport import get_transport
695
 
        t = get_transport(self.get_readonly_url())
 
699
        t = transport.get_transport(self.get_readonly_url())
696
700
        url = t.base
697
701
        self.assertEqual(url, t.clone('..').base)
698
702
 
804
808
        self.requireFeature(test_lsprof.LSProfFeature)
805
809
        result_stream = StringIO()
806
810
        result = bzrlib.tests.VerboseTestResult(
807
 
            unittest._WritelnDecorator(result_stream),
 
811
            result_stream,
808
812
            descriptions=0,
809
813
            verbosity=2,
810
814
            )
836
840
        self.assertContainsRe(output,
837
841
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
838
842
 
 
843
    def test_uses_time_from_testtools(self):
 
844
        """Test case timings in verbose results should use testtools times"""
 
845
        import datetime
 
846
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
847
            def startTest(self, test):
 
848
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
849
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
850
            def addSuccess(self, test):
 
851
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
852
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
853
            def report_tests_starting(self): pass
 
854
        sio = StringIO()
 
855
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
856
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
857
 
839
858
    def test_known_failure(self):
840
859
        """A KnownFailure being raised should trigger several result actions."""
841
860
        class InstrumentedTestResult(tests.ExtendedTestResult):
842
861
            def stopTestRun(self): pass
843
 
            def startTests(self): pass
844
 
            def report_test_start(self, test): pass
 
862
            def report_tests_starting(self): pass
845
863
            def report_known_failure(self, test, err=None, details=None):
846
864
                self._call = test, 'known failure'
847
865
        result = InstrumentedTestResult(None, None, None, None)
865
883
        # verbose test output formatting
866
884
        result_stream = StringIO()
867
885
        result = bzrlib.tests.VerboseTestResult(
868
 
            unittest._WritelnDecorator(result_stream),
 
886
            result_stream,
869
887
            descriptions=0,
870
888
            verbosity=2,
871
889
            )
881
899
        output = result_stream.getvalue()[prefix:]
882
900
        lines = output.splitlines()
883
901
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
902
        if sys.version_info > (2, 7):
 
903
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
904
                self.assertNotEqual, lines[1], '    ')
884
905
        self.assertEqual(lines[1], '    foo')
885
906
        self.assertEqual(2, len(lines))
886
907
 
894
915
        """Test the behaviour of invoking addNotSupported."""
895
916
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
917
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
918
            def report_tests_starting(self): pass
899
919
            def report_unsupported(self, test, feature):
900
920
                self._call = test, feature
901
921
        result = InstrumentedTestResult(None, None, None, None)
920
940
        # verbose test output formatting
921
941
        result_stream = StringIO()
922
942
        result = bzrlib.tests.VerboseTestResult(
923
 
            unittest._WritelnDecorator(result_stream),
 
943
            result_stream,
924
944
            descriptions=0,
925
945
            verbosity=2,
926
946
            )
940
960
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
961
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
962
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
963
            def report_tests_starting(self): pass
945
964
            def addNotSupported(self, test, feature):
946
965
                self._call = test, feature
947
966
        result = InstrumentedTestResult(None, None, None, None)
989
1008
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
1009
            calls = 0
991
1010
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
1011
        result = InstrumentedTestResult(None, None, None, None)
994
1012
        def test_function():
995
1013
            pass
997
1015
        test.run(result)
998
1016
        self.assertEquals(1, result.calls)
999
1017
 
 
1018
    def test_startTests_only_once(self):
 
1019
        """With multiple tests startTests should still only be called once"""
 
1020
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1021
            calls = 0
 
1022
            def startTests(self): self.calls += 1
 
1023
        result = InstrumentedTestResult(None, None, None, None)
 
1024
        suite = unittest.TestSuite([
 
1025
            unittest.FunctionTestCase(lambda: None),
 
1026
            unittest.FunctionTestCase(lambda: None)])
 
1027
        suite.run(result)
 
1028
        self.assertEquals(1, result.calls)
 
1029
        self.assertEquals(2, result.count)
 
1030
 
1000
1031
 
1001
1032
class TestUnicodeFilenameFeature(tests.TestCase):
1002
1033
 
1023
1054
        because of our use of global state.
1024
1055
        """
1025
1056
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1057
        try:
1028
1058
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1059
            return testrunner.run(test)
1031
1060
        finally:
1032
1061
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1062
 
1035
1063
    def test_known_failure_failed_run(self):
1036
1064
        # run a test that generates a known failure which should be printed in
1082
1110
    def test_result_decorator(self):
1083
1111
        # decorate results
1084
1112
        calls = []
1085
 
        class LoggingDecorator(tests.ForwardingResult):
 
1113
        class LoggingDecorator(ExtendedToOriginalDecorator):
1086
1114
            def startTest(self, test):
1087
 
                tests.ForwardingResult.startTest(self, test)
 
1115
                ExtendedToOriginalDecorator.startTest(self, test)
1088
1116
                calls.append('start')
1089
1117
        test = unittest.FunctionTestCase(lambda:None)
1090
1118
        stream = StringIO()
1214
1242
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1243
        self.assertLength(1, self._get_source_tree_calls)
1216
1244
 
 
1245
    def test_verbose_test_count(self):
 
1246
        """A verbose test run reports the right test count at the start"""
 
1247
        suite = TestUtil.TestSuite([
 
1248
            unittest.FunctionTestCase(lambda:None),
 
1249
            unittest.FunctionTestCase(lambda:None)])
 
1250
        self.assertEqual(suite.countTestCases(), 2)
 
1251
        stream = StringIO()
 
1252
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1253
        # Need to use the CountingDecorator as that's what sets num_tests
 
1254
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1255
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1256
 
1217
1257
    def test_startTestRun(self):
1218
1258
        """run should call result.startTestRun()"""
1219
1259
        calls = []
1220
 
        class LoggingDecorator(tests.ForwardingResult):
 
1260
        class LoggingDecorator(ExtendedToOriginalDecorator):
1221
1261
            def startTestRun(self):
1222
 
                tests.ForwardingResult.startTestRun(self)
 
1262
                ExtendedToOriginalDecorator.startTestRun(self)
1223
1263
                calls.append('startTestRun')
1224
1264
        test = unittest.FunctionTestCase(lambda:None)
1225
1265
        stream = StringIO()
1231
1271
    def test_stopTestRun(self):
1232
1272
        """run should call result.stopTestRun()"""
1233
1273
        calls = []
1234
 
        class LoggingDecorator(tests.ForwardingResult):
 
1274
        class LoggingDecorator(ExtendedToOriginalDecorator):
1235
1275
            def stopTestRun(self):
1236
 
                tests.ForwardingResult.stopTestRun(self)
 
1276
                ExtendedToOriginalDecorator.stopTestRun(self)
1237
1277
                calls.append('stopTestRun')
1238
1278
        test = unittest.FunctionTestCase(lambda:None)
1239
1279
        stream = StringIO()
1242
1282
        result = self.run_test_runner(runner, test)
1243
1283
        self.assertLength(1, calls)
1244
1284
 
 
1285
    def test_unicode_test_output_on_ascii_stream(self):
 
1286
        """Showing results should always succeed even on an ascii console"""
 
1287
        class FailureWithUnicode(tests.TestCase):
 
1288
            def test_log_unicode(self):
 
1289
                self.log(u"\u2606")
 
1290
                self.fail("Now print that log!")
 
1291
        out = StringIO()
 
1292
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1293
            lambda trace=False: "ascii")
 
1294
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1295
            FailureWithUnicode("test_log_unicode"))
 
1296
        self.assertContainsRe(out.getvalue(),
 
1297
            "Text attachment: log\n"
 
1298
            "-+\n"
 
1299
            "\d+\.\d+  \\\\u2606\n"
 
1300
            "-+\n")
 
1301
 
1245
1302
 
1246
1303
class SampleTestCase(tests.TestCase):
1247
1304
 
1422
1479
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1423
1480
        output_stream = StringIO()
1424
1481
        result = bzrlib.tests.VerboseTestResult(
1425
 
            unittest._WritelnDecorator(output_stream),
 
1482
            output_stream,
1426
1483
            descriptions=0,
1427
1484
            verbosity=2)
1428
1485
        sample_test.run(result)
1469
1526
        # permitted.
1470
1527
        # Manually set one up (TestCase doesn't and shouldn't provide magic
1471
1528
        # machinery)
1472
 
        transport_server = MemoryServer()
 
1529
        transport_server = memory.MemoryServer()
1473
1530
        transport_server.start_server()
1474
1531
        self.addCleanup(transport_server.stop_server)
1475
1532
        t = transport.get_transport(transport_server.get_url())
1557
1614
            result.calls)
1558
1615
 
1559
1616
    def test_start_server_registers_url(self):
1560
 
        transport_server = MemoryServer()
 
1617
        transport_server = memory.MemoryServer()
1561
1618
        # A little strict, but unlikely to be changed soon.
1562
1619
        self.assertEqual([], self._bzr_selftest_roots)
1563
1620
        self.start_server(transport_server)
1656
1713
        self.assertEqual('original', obj.test_attr)
1657
1714
 
1658
1715
 
 
1716
class _MissingFeature(tests.Feature):
 
1717
    def _probe(self):
 
1718
        return False
 
1719
missing_feature = _MissingFeature()
 
1720
 
 
1721
 
 
1722
def _get_test(name):
 
1723
    """Get an instance of a specific example test.
 
1724
 
 
1725
    We protect this in a function so that they don't auto-run in the test
 
1726
    suite.
 
1727
    """
 
1728
 
 
1729
    class ExampleTests(tests.TestCase):
 
1730
 
 
1731
        def test_fail(self):
 
1732
            mutter('this was a failing test')
 
1733
            self.fail('this test will fail')
 
1734
 
 
1735
        def test_error(self):
 
1736
            mutter('this test errored')
 
1737
            raise RuntimeError('gotcha')
 
1738
 
 
1739
        def test_missing_feature(self):
 
1740
            mutter('missing the feature')
 
1741
            self.requireFeature(missing_feature)
 
1742
 
 
1743
        def test_skip(self):
 
1744
            mutter('this test will be skipped')
 
1745
            raise tests.TestSkipped('reason')
 
1746
 
 
1747
        def test_success(self):
 
1748
            mutter('this test succeeds')
 
1749
 
 
1750
        def test_xfail(self):
 
1751
            mutter('test with expected failure')
 
1752
            self.knownFailure('this_fails')
 
1753
 
 
1754
        def test_unexpected_success(self):
 
1755
            mutter('test with unexpected success')
 
1756
            self.expectFailure('should_fail', lambda: None)
 
1757
 
 
1758
    return ExampleTests(name)
 
1759
 
 
1760
 
 
1761
class TestTestCaseLogDetails(tests.TestCase):
 
1762
 
 
1763
    def _run_test(self, test_name):
 
1764
        test = _get_test(test_name)
 
1765
        result = testtools.TestResult()
 
1766
        test.run(result)
 
1767
        return result
 
1768
 
 
1769
    def test_fail_has_log(self):
 
1770
        result = self._run_test('test_fail')
 
1771
        self.assertEqual(1, len(result.failures))
 
1772
        result_content = result.failures[0][1]
 
1773
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1774
        self.assertContainsRe(result_content, 'this was a failing test')
 
1775
 
 
1776
    def test_error_has_log(self):
 
1777
        result = self._run_test('test_error')
 
1778
        self.assertEqual(1, len(result.errors))
 
1779
        result_content = result.errors[0][1]
 
1780
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1781
        self.assertContainsRe(result_content, 'this test errored')
 
1782
 
 
1783
    def test_skip_has_no_log(self):
 
1784
        result = self._run_test('test_skip')
 
1785
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1786
        skips = result.skip_reasons['reason']
 
1787
        self.assertEqual(1, len(skips))
 
1788
        test = skips[0]
 
1789
        self.assertFalse('log' in test.getDetails())
 
1790
 
 
1791
    def test_missing_feature_has_no_log(self):
 
1792
        # testtools doesn't know about addNotSupported, so it just gets
 
1793
        # considered as a skip
 
1794
        result = self._run_test('test_missing_feature')
 
1795
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1796
        skips = result.skip_reasons[missing_feature]
 
1797
        self.assertEqual(1, len(skips))
 
1798
        test = skips[0]
 
1799
        self.assertFalse('log' in test.getDetails())
 
1800
 
 
1801
    def test_xfail_has_no_log(self):
 
1802
        result = self._run_test('test_xfail')
 
1803
        self.assertEqual(1, len(result.expectedFailures))
 
1804
        result_content = result.expectedFailures[0][1]
 
1805
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1806
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1807
 
 
1808
    def test_unexpected_success_has_log(self):
 
1809
        result = self._run_test('test_unexpected_success')
 
1810
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1811
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1812
        # expectedFailures is a list of reasons?
 
1813
        test = result.unexpectedSuccesses[0]
 
1814
        details = test.getDetails()
 
1815
        self.assertTrue('log' in details)
 
1816
 
 
1817
 
 
1818
class TestTestCloning(tests.TestCase):
 
1819
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1820
 
 
1821
    def test_cloned_testcase_does_not_share_details(self):
 
1822
        """A TestCase cloned with clone_test does not share mutable attributes
 
1823
        such as details or cleanups.
 
1824
        """
 
1825
        class Test(tests.TestCase):
 
1826
            def test_foo(self):
 
1827
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1828
        orig_test = Test('test_foo')
 
1829
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1830
        orig_test.run(unittest.TestResult())
 
1831
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1832
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1833
 
 
1834
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1835
        """Applying two levels of scenarios to a test preserves the attributes
 
1836
        added by both scenarios.
 
1837
        """
 
1838
        class Test(tests.TestCase):
 
1839
            def test_foo(self):
 
1840
                pass
 
1841
        test = Test('test_foo')
 
1842
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1843
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1844
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1845
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1846
        all_tests = list(tests.iter_suite_tests(suite))
 
1847
        self.assertLength(4, all_tests)
 
1848
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1849
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1850
 
 
1851
 
1659
1852
# NB: Don't delete this; it's not actually from 0.11!
1660
1853
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1854
def sample_deprecated_function():
1804
1997
        # make_branch_and_tree has to use local branch and repositories
1805
1998
        # when the vfs transport and local disk are colocated, even if
1806
1999
        # a different transport is in use for url generation.
1807
 
        from bzrlib.transport.fakevfat import FakeVFATServer
1808
 
        self.transport_server = FakeVFATServer
 
2000
        self.transport_server = test_server.FakeVFATServer
1809
2001
        self.assertFalse(self.get_url('t1').startswith('file://'))
1810
2002
        tree = self.make_branch_and_tree('t1')
1811
2003
        base = tree.bzrdir.root_transport.base
1816
2008
                tree.branch.repository.bzrdir.root_transport)
1817
2009
 
1818
2010
 
1819
 
class SelfTestHelper:
 
2011
class SelfTestHelper(object):
1820
2012
 
1821
2013
    def run_selftest(self, **kwargs):
1822
2014
        """Run selftest returning its output."""
1882
2074
            def __call__(test, result):
1883
2075
                test.run(result)
1884
2076
            def run(test, result):
1885
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2077
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1886
2078
                calls.append("called")
1887
2079
            def countTestCases(self):
1888
2080
                return 1
1948
2140
 
1949
2141
    def test_transport_sftp(self):
1950
2142
        self.requireFeature(features.paramiko)
1951
 
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
2143
        from bzrlib.tests import stub_sftp
 
2144
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
1952
2145
 
1953
2146
    def test_transport_memory(self):
1954
 
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
2147
        self.check_transport_set(memory.MemoryServer)
1955
2148
 
1956
2149
 
1957
2150
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1972
2165
            load_list='missing file name', list_only=True)
1973
2166
 
1974
2167
 
 
2168
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2169
 
 
2170
    _test_needs_features = [features.subunit]
 
2171
 
 
2172
    def run_subunit_stream(self, test_name):
 
2173
        from subunit import ProtocolTestCase
 
2174
        def factory():
 
2175
            return TestUtil.TestSuite([_get_test(test_name)])
 
2176
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2177
            test_suite_factory=factory)
 
2178
        test = ProtocolTestCase(stream)
 
2179
        result = testtools.TestResult()
 
2180
        test.run(result)
 
2181
        content = stream.getvalue()
 
2182
        return content, result
 
2183
 
 
2184
    def test_fail_has_log(self):
 
2185
        content, result = self.run_subunit_stream('test_fail')
 
2186
        self.assertEqual(1, len(result.failures))
 
2187
        self.assertContainsRe(content, '(?m)^log$')
 
2188
        self.assertContainsRe(content, 'this test will fail')
 
2189
 
 
2190
    def test_error_has_log(self):
 
2191
        content, result = self.run_subunit_stream('test_error')
 
2192
        self.assertContainsRe(content, '(?m)^log$')
 
2193
        self.assertContainsRe(content, 'this test errored')
 
2194
 
 
2195
    def test_skip_has_no_log(self):
 
2196
        content, result = self.run_subunit_stream('test_skip')
 
2197
        self.assertNotContainsRe(content, '(?m)^log$')
 
2198
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2199
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2200
        skips = result.skip_reasons['reason']
 
2201
        self.assertEqual(1, len(skips))
 
2202
        test = skips[0]
 
2203
        # RemotedTestCase doesn't preserve the "details"
 
2204
        ## self.assertFalse('log' in test.getDetails())
 
2205
 
 
2206
    def test_missing_feature_has_no_log(self):
 
2207
        content, result = self.run_subunit_stream('test_missing_feature')
 
2208
        self.assertNotContainsRe(content, '(?m)^log$')
 
2209
        self.assertNotContainsRe(content, 'missing the feature')
 
2210
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2211
        skips = result.skip_reasons['_MissingFeature\n']
 
2212
        self.assertEqual(1, len(skips))
 
2213
        test = skips[0]
 
2214
        # RemotedTestCase doesn't preserve the "details"
 
2215
        ## self.assertFalse('log' in test.getDetails())
 
2216
 
 
2217
    def test_xfail_has_no_log(self):
 
2218
        content, result = self.run_subunit_stream('test_xfail')
 
2219
        self.assertNotContainsRe(content, '(?m)^log$')
 
2220
        self.assertNotContainsRe(content, 'test with expected failure')
 
2221
        self.assertEqual(1, len(result.expectedFailures))
 
2222
        result_content = result.expectedFailures[0][1]
 
2223
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2224
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2225
 
 
2226
    def test_unexpected_success_has_log(self):
 
2227
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2228
        self.assertContainsRe(content, '(?m)^log$')
 
2229
        self.assertContainsRe(content, 'test with unexpected success')
 
2230
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2231
                           ' as a plain success',
 
2232
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2233
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2234
        test = result.unexpectedSuccesses[0]
 
2235
        # RemotedTestCase doesn't preserve the "details"
 
2236
        ## self.assertTrue('log' in test.getDetails())
 
2237
 
 
2238
    def test_success_has_no_log(self):
 
2239
        content, result = self.run_subunit_stream('test_success')
 
2240
        self.assertEqual(1, result.testsRun)
 
2241
        self.assertNotContainsRe(content, '(?m)^log$')
 
2242
        self.assertNotContainsRe(content, 'this test succeeds')
 
2243
 
 
2244
 
1975
2245
class TestRunBzr(tests.TestCase):
1976
2246
 
1977
2247
    out = ''
2340
2610
            os.chdir = orig_chdir
2341
2611
        self.assertEqual(['foo', 'current'], chdirs)
2342
2612
 
 
2613
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2614
        self.get_source_path = lambda: ""
 
2615
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2616
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2617
 
2343
2618
 
2344
2619
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2345
2620
    """Tests that really need to do things with an external bzr."""
2400
2675
 
2401
2676
 
2402
2677
simple_thunk_feature = tests._CompatabilityThunkFeature(
2403
 
    'bzrlib.tests', 'UnicodeFilename',
2404
 
    'bzrlib.tests.test_selftest.simple_thunk_feature',
2405
 
    deprecated_in((2,1,0)))
 
2678
    deprecated_in((2, 1, 0)),
 
2679
    'bzrlib.tests.test_selftest',
 
2680
    'simple_thunk_feature','UnicodeFilename',
 
2681
    replacement_module='bzrlib.tests'
 
2682
    )
2406
2683
 
2407
2684
class Test_CompatibilityFeature(tests.TestCase):
2408
2685
 
2413
2690
            simple_thunk_feature.available)
2414
2691
        self.assertEqual(tests.UnicodeFilename.available(), res)
2415
2692
 
2416
 
        
 
2693
 
2417
2694
class TestModuleAvailableFeature(tests.TestCase):
2418
2695
 
2419
2696
    def test_available_module(self):
2620
2897
        # Running bzr in blackbox mode, normal/expected/user errors should be
2621
2898
        # caught in the regular way and turned into an error message plus exit
2622
2899
        # code.
2623
 
        transport_server = MemoryServer()
 
2900
        transport_server = memory.MemoryServer()
2624
2901
        transport_server.start_server()
2625
2902
        self.addCleanup(transport_server.stop_server)
2626
2903
        url = transport_server.get_url()
2772
3049
        # Test that a plausible list of modules to doctest is returned
2773
3050
        # by _test_suite_modules_to_doctest.
2774
3051
        test_list = tests._test_suite_modules_to_doctest()
 
3052
        if __doc__ is None:
 
3053
            # When docstrings are stripped, there are no modules to doctest
 
3054
            self.assertEqual([], test_list)
 
3055
            return
2775
3056
        self.assertSubset([
2776
3057
            'bzrlib.timestamp',
2777
3058
            ],
2794
3075
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2795
3076
        def doctests():
2796
3077
            calls.append("modules_to_doctest")
 
3078
            if __doc__ is None:
 
3079
                return []
2797
3080
            return ['bzrlib.timestamp']
2798
3081
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2799
3082
        expected_test_list = [
2802
3085
            ('bzrlib.tests.per_transport.TransportTests'
2803
3086
             '.test_abspath(LocalTransport,LocalURLServer)'),
2804
3087
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2805
 
            # modules_to_doctest
2806
 
            'bzrlib.timestamp.format_highres_date',
2807
3088
            # plugins can't be tested that way since selftest may be run with
2808
3089
            # --no-plugins
2809
3090
            ]
 
3091
        if __doc__ is not None:
 
3092
            expected_test_list.extend([
 
3093
                # modules_to_doctest
 
3094
                'bzrlib.timestamp.format_highres_date',
 
3095
                ])
2810
3096
        suite = tests.test_suite()
2811
3097
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
2812
3098
            set(calls))
2950
3236
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2951
3237
 
2952
3238
 
 
3239
class TestThreadLeakDetection(tests.TestCase):
 
3240
    """Ensure when tests leak threads we detect and report it"""
 
3241
 
 
3242
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3243
        def __init__(self):
 
3244
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3245
            self.leaks = []
 
3246
        def _report_thread_leak(self, test, leaks, alive):
 
3247
            self.leaks.append((test, leaks))
 
3248
 
 
3249
    def test_testcase_without_addCleanups(self):
 
3250
        """Check old TestCase instances don't break with leak detection"""
 
3251
        class Test(unittest.TestCase):
 
3252
            def runTest(self):
 
3253
                pass
 
3254
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3255
        result = self.LeakRecordingResult()
 
3256
        test = Test()
 
3257
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3262
        self.assertEqual(result.leaks, [])
 
3263
        
 
3264
    def test_thread_leak(self):
 
3265
        """Ensure a thread that outlives the running of a test is reported
 
3266
 
 
3267
        Uses a thread that blocks on an event, and is started by the inner
 
3268
        test case. As the thread outlives the inner case's run, it should be
 
3269
        detected as a leak, but the event is then set so that the thread can
 
3270
        be safely joined in cleanup so it's not leaked for real.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3274
        class Test(tests.TestCase):
 
3275
            def test_leak(self):
 
3276
                thread.start()
 
3277
        result = self.LeakRecordingResult()
 
3278
        test = Test("test_leak")
 
3279
        self.addCleanup(thread.join)
 
3280
        self.addCleanup(event.set)
 
3281
        result.startTestRun()
 
3282
        test.run(result)
 
3283
        result.stopTestRun()
 
3284
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3285
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3286
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3287
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3288
 
 
3289
    def test_multiple_leaks(self):
 
3290
        """Check multiple leaks are blamed on the test cases at fault
 
3291
 
 
3292
        Same concept as the previous test, but has one inner test method that
 
3293
        leaks two threads, and one that doesn't leak at all.
 
3294
        """
 
3295
        event = threading.Event()
 
3296
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3297
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3298
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3299
        class Test(tests.TestCase):
 
3300
            def test_first_leak(self):
 
3301
                thread_b.start()
 
3302
            def test_second_no_leak(self):
 
3303
                pass
 
3304
            def test_third_leak(self):
 
3305
                thread_c.start()
 
3306
                thread_a.start()
 
3307
        result = self.LeakRecordingResult()
 
3308
        first_test = Test("test_first_leak")
 
3309
        third_test = Test("test_third_leak")
 
3310
        self.addCleanup(thread_a.join)
 
3311
        self.addCleanup(thread_b.join)
 
3312
        self.addCleanup(thread_c.join)
 
3313
        self.addCleanup(event.set)
 
3314
        result.startTestRun()
 
3315
        unittest.TestSuite(
 
3316
            [first_test, Test("test_second_no_leak"), third_test]
 
3317
            ).run(result)
 
3318
        result.stopTestRun()
 
3319
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3320
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3321
        self.assertEqual(result.leaks, [
 
3322
            (first_test, set([thread_b])),
 
3323
            (third_test, set([thread_a, thread_c]))])
 
3324
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3325
 
 
3326
 
 
3327
class TestPostMortemDebugging(tests.TestCase):
 
3328
    """Check post mortem debugging works when tests fail or error"""
 
3329
 
 
3330
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3331
        def __init__(self):
 
3332
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3333
            self.postcode = None
 
3334
        def _post_mortem(self, tb=None):
 
3335
            """Record the code object at the end of the current traceback"""
 
3336
            tb = tb or sys.exc_info()[2]
 
3337
            if tb is not None:
 
3338
                next = tb.tb_next
 
3339
                while next is not None:
 
3340
                    tb = next
 
3341
                    next = next.tb_next
 
3342
                self.postcode = tb.tb_frame.f_code
 
3343
        def report_error(self, test, err):
 
3344
            pass
 
3345
        def report_failure(self, test, err):
 
3346
            pass
 
3347
 
 
3348
    def test_location_unittest_error(self):
 
3349
        """Needs right post mortem traceback with erroring unittest case"""
 
3350
        class Test(unittest.TestCase):
 
3351
            def runTest(self):
 
3352
                raise RuntimeError
 
3353
        result = self.TracebackRecordingResult()
 
3354
        Test().run(result)
 
3355
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3356
 
 
3357
    def test_location_unittest_failure(self):
 
3358
        """Needs right post mortem traceback with failing unittest case"""
 
3359
        class Test(unittest.TestCase):
 
3360
            def runTest(self):
 
3361
                raise self.failureException
 
3362
        result = self.TracebackRecordingResult()
 
3363
        Test().run(result)
 
3364
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3365
 
 
3366
    def test_location_bt_error(self):
 
3367
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3368
        class Test(tests.TestCase):
 
3369
            def test_error(self):
 
3370
                raise RuntimeError
 
3371
        result = self.TracebackRecordingResult()
 
3372
        Test("test_error").run(result)
 
3373
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3374
 
 
3375
    def test_location_bt_failure(self):
 
3376
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3377
        class Test(tests.TestCase):
 
3378
            def test_failure(self):
 
3379
                raise self.failureException
 
3380
        result = self.TracebackRecordingResult()
 
3381
        Test("test_failure").run(result)
 
3382
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3383
 
 
3384
    def test_env_var_triggers_post_mortem(self):
 
3385
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3386
        import pdb
 
3387
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3388
        post_mortem_calls = []
 
3389
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3390
        self.addCleanup(osutils.set_or_unset_env, "BZR_TEST_PDB",
 
3391
            osutils.set_or_unset_env("BZR_TEST_PDB", None))
 
3392
        result._post_mortem(1)
 
3393
        os.environ["BZR_TEST_PDB"] = "on"
 
3394
        result._post_mortem(2)
 
3395
        self.assertEqual([2], post_mortem_calls)
 
3396
 
 
3397
 
2953
3398
class TestRunSuite(tests.TestCase):
2954
3399
 
2955
3400
    def test_runner_class(self):