~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-04-05 14:47:26 UTC
  • mfrom: (5752.2.11 2.4-windows-lfstat)
  • Revision ID: pqm@pqm.ubuntu.com-20110405144726-zi3lj2kwvjml4kx5
(jameinel) Add osutils.lstat/fstat so that even on Windows lstat(fname) ==
 fstat(open(fname).fileno()) (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
52
55
    )
53
56
from bzrlib.repofmt import (
54
57
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
58
    )
58
59
from bzrlib.symbol_versioning import (
59
60
    deprecated_function,
64
65
    features,
65
66
    test_lsprof,
66
67
    test_server,
67
 
    test_sftp_transport,
68
68
    TestUtil,
69
69
    )
70
 
from bzrlib.trace import note
 
70
from bzrlib.trace import note, mutter
71
71
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
72
 
74
73
 
75
74
def _test_ids(test_suite):
77
76
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
77
 
79
78
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
79
class MetaTestLog(tests.TestCase):
93
80
 
94
81
    def test_logging(self):
100
87
            "text", "plain", {"charset": "utf8"})))
101
88
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
89
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
90
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
91
 
105
92
 
106
93
class TestUnicodeFilename(tests.TestCase):
122
109
        self.failUnlessExists(filename)
123
110
 
124
111
 
 
112
class TestClassesAvailable(tests.TestCase):
 
113
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
114
 
 
115
    def test_test_case(self):
 
116
        from bzrlib.tests import TestCase
 
117
 
 
118
    def test_test_loader(self):
 
119
        from bzrlib.tests import TestLoader
 
120
 
 
121
    def test_test_suite(self):
 
122
        from bzrlib.tests import TestSuite
 
123
 
 
124
 
125
125
class TestTransportScenarios(tests.TestCase):
126
126
    """A group of tests that test the transport implementation adaption core.
127
127
 
208
208
    def test_scenarios(self):
209
209
        # check that constructor parameters are passed through to the adapted
210
210
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
211
        from bzrlib.tests.per_controldir import make_scenarios
212
212
        vfs_factory = "v"
213
213
        server1 = "a"
214
214
        server2 = "b"
312
312
        from bzrlib.tests.per_interrepository import make_scenarios
313
313
        server1 = "a"
314
314
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
315
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
316
        scenarios = make_scenarios(server1, server2, formats)
317
317
        self.assertEqual([
318
318
            ('C0,str,str',
319
319
             {'repository_format': 'C1',
320
320
              'repository_format_to': 'C2',
321
321
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
322
              'transport_server': 'a',
 
323
              'extra_setup': 'C3'}),
323
324
            ('D0,str,str',
324
325
             {'repository_format': 'D1',
325
326
              'repository_format_to': 'D2',
326
327
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
328
              'transport_server': 'a',
 
329
              'extra_setup': 'D3'})],
328
330
            scenarios)
329
331
 
330
332
 
336
338
        from bzrlib.tests.per_workingtree import make_scenarios
337
339
        server1 = "a"
338
340
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
 
341
        formats = [workingtree.WorkingTreeFormat4(),
340
342
                   workingtree.WorkingTreeFormat3(),]
341
343
        scenarios = make_scenarios(server1, server2, formats)
342
344
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
345
            ('WorkingTreeFormat4',
344
346
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
347
              'transport_readonly_server': 'b',
346
348
              'transport_server': 'a',
373
375
            )
374
376
        server1 = "a"
375
377
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
 
378
        formats = [workingtree.WorkingTreeFormat4(),
377
379
                   workingtree.WorkingTreeFormat3(),]
378
380
        scenarios = make_scenarios(server1, server2, formats)
379
381
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
382
        default_wt_format = workingtree.format_registry.get_default()
381
383
        wt4_format = workingtree.WorkingTreeFormat4()
382
384
        wt5_format = workingtree.WorkingTreeFormat5()
383
385
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
386
            ('WorkingTreeFormat4',
385
387
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
388
              'transport_readonly_server': 'b',
387
389
              'transport_server': 'a',
447
449
        # ones to add.
448
450
        from bzrlib.tests.per_tree import (
449
451
            return_parameter,
450
 
            revision_tree_from_workingtree
451
452
            )
452
453
        from bzrlib.tests.per_intertree import (
453
454
            make_scenarios,
454
455
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
456
        from bzrlib.workingtree import WorkingTreeFormat3, WorkingTreeFormat4
456
457
        input_test = TestInterTreeScenarios(
457
458
            "test_scenarios")
458
459
        server1 = "a"
459
460
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
461
        format1 = WorkingTreeFormat4()
461
462
        format2 = WorkingTreeFormat3()
462
463
        formats = [("1", str, format1, format2, "converter1"),
463
464
            ("2", int, format2, format1, "converter2")]
554
555
    def test_make_branch_and_memory_tree_with_format(self):
555
556
        """make_branch_and_memory_tree should accept a format option."""
556
557
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
558
        format.repository_format = repository.format_registry.get_default()
558
559
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
560
        # Guard against regression into MemoryTransport leaking
560
561
        # files to disk instead of keeping them in memory.
574
575
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
576
        # that the format objects are used.
576
577
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
578
        repo_format = repository.format_registry.get_default()
578
579
        format.repository_format = repo_format
579
580
        builder = self.make_branch_builder('dir', format=format)
580
581
        the_branch = builder.get_branch()
609
610
                l.attempt_lock()
610
611
        test = TestDanglingLock('test_function')
611
612
        result = test.run()
 
613
        total_failures = result.errors + result.failures
612
614
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
615
            self.assertEqual(1, len(total_failures))
614
616
        else:
615
617
            # When _lock_check_thorough is disabled, then we don't trigger a
616
618
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
619
            self.assertEqual(0, len(total_failures))
618
620
 
619
621
 
620
622
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
623
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
624
 
623
625
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
626
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
627
        self.vfs_transport_factory = memory.MemoryServer
627
628
        self.transport_readonly_server = None
629
630
        # for the server
630
631
        url = self.get_readonly_url()
631
632
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
633
        t = transport.get_transport(url)
 
634
        t2 = transport.get_transport(url2)
634
635
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
636
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
637
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
638
 
638
639
    def test_get_readonly_url_http(self):
639
640
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
641
        from bzrlib.transport.http import HttpTransportBase
642
642
        self.transport_server = test_server.LocalURLServer
643
643
        self.transport_readonly_server = HttpServer
645
645
        url = self.get_readonly_url()
646
646
        url2 = self.get_readonly_url('foo/bar')
647
647
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
650
650
        self.failUnless(isinstance(t, HttpTransportBase))
651
651
        self.failUnless(isinstance(t2, HttpTransportBase))
652
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
690
class TestChrootedTest(tests.ChrootedTestCase):
691
691
 
692
692
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
693
        t = transport.get_transport(self.get_readonly_url())
695
694
        url = t.base
696
695
        self.assertEqual(url, t.clone('..').base)
697
696
 
750
749
        self.check_timing(ShortDelayTestCase('test_short_delay'),
751
750
                          r"^ +[0-9]+ms$")
752
751
 
753
 
    def _patch_get_bzr_source_tree(self):
754
 
        # Reading from the actual source tree breaks isolation, but we don't
755
 
        # want to assume that thats *all* that would happen.
756
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
757
 
 
758
 
    def test_assigned_benchmark_file_stores_date(self):
759
 
        self._patch_get_bzr_source_tree()
760
 
        output = StringIO()
761
 
        result = bzrlib.tests.TextTestResult(self._log_file,
762
 
                                        descriptions=0,
763
 
                                        verbosity=1,
764
 
                                        bench_history=output
765
 
                                        )
766
 
        output_string = output.getvalue()
767
 
        # if you are wondering about the regexp please read the comment in
768
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
 
        # XXX: what comment?  -- Andrew Bennetts
770
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
771
 
 
772
 
    def test_benchhistory_records_test_times(self):
773
 
        self._patch_get_bzr_source_tree()
774
 
        result_stream = StringIO()
775
 
        result = bzrlib.tests.TextTestResult(
776
 
            self._log_file,
777
 
            descriptions=0,
778
 
            verbosity=1,
779
 
            bench_history=result_stream
780
 
            )
781
 
 
782
 
        # we want profile a call and check that its test duration is recorded
783
 
        # make a new test instance that when run will generate a benchmark
784
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
785
 
        # execute the test, which should succeed and record times
786
 
        example_test_case.run(result)
787
 
        lines = result_stream.getvalue().splitlines()
788
 
        self.assertEqual(2, len(lines))
789
 
        self.assertContainsRe(lines[1],
790
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
 
            "._time_hello_world_encoding")
792
 
 
793
752
    def _time_hello_world_encoding(self):
794
753
        """Profile two sleep calls
795
754
 
803
762
        self.requireFeature(test_lsprof.LSProfFeature)
804
763
        result_stream = StringIO()
805
764
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
765
            result_stream,
807
766
            descriptions=0,
808
767
            verbosity=2,
809
768
            )
835
794
        self.assertContainsRe(output,
836
795
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
796
 
 
797
    def test_uses_time_from_testtools(self):
 
798
        """Test case timings in verbose results should use testtools times"""
 
799
        import datetime
 
800
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
801
            def startTest(self, test):
 
802
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
803
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
804
            def addSuccess(self, test):
 
805
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
806
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
807
            def report_tests_starting(self): pass
 
808
        sio = StringIO()
 
809
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
810
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
811
 
838
812
    def test_known_failure(self):
839
813
        """A KnownFailure being raised should trigger several result actions."""
840
814
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
815
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
816
            def report_tests_starting(self): pass
844
817
            def report_known_failure(self, test, err=None, details=None):
845
818
                self._call = test, 'known failure'
846
819
        result = InstrumentedTestResult(None, None, None, None)
864
837
        # verbose test output formatting
865
838
        result_stream = StringIO()
866
839
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
840
            result_stream,
868
841
            descriptions=0,
869
842
            verbosity=2,
870
843
            )
880
853
        output = result_stream.getvalue()[prefix:]
881
854
        lines = output.splitlines()
882
855
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
856
        if sys.version_info > (2, 7):
 
857
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
858
                self.assertNotEqual, lines[1], '    ')
883
859
        self.assertEqual(lines[1], '    foo')
884
860
        self.assertEqual(2, len(lines))
885
861
 
893
869
        """Test the behaviour of invoking addNotSupported."""
894
870
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
871
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
872
            def report_tests_starting(self): pass
898
873
            def report_unsupported(self, test, feature):
899
874
                self._call = test, feature
900
875
        result = InstrumentedTestResult(None, None, None, None)
919
894
        # verbose test output formatting
920
895
        result_stream = StringIO()
921
896
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
897
            result_stream,
923
898
            descriptions=0,
924
899
            verbosity=2,
925
900
            )
939
914
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
915
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
916
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
917
            def report_tests_starting(self): pass
944
918
            def addNotSupported(self, test, feature):
945
919
                self._call = test, feature
946
920
        result = InstrumentedTestResult(None, None, None, None)
988
962
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
963
            calls = 0
990
964
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
965
        result = InstrumentedTestResult(None, None, None, None)
993
966
        def test_function():
994
967
            pass
996
969
        test.run(result)
997
970
        self.assertEquals(1, result.calls)
998
971
 
 
972
    def test_startTests_only_once(self):
 
973
        """With multiple tests startTests should still only be called once"""
 
974
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
975
            calls = 0
 
976
            def startTests(self): self.calls += 1
 
977
        result = InstrumentedTestResult(None, None, None, None)
 
978
        suite = unittest.TestSuite([
 
979
            unittest.FunctionTestCase(lambda: None),
 
980
            unittest.FunctionTestCase(lambda: None)])
 
981
        suite.run(result)
 
982
        self.assertEquals(1, result.calls)
 
983
        self.assertEquals(2, result.count)
 
984
 
999
985
 
1000
986
class TestUnicodeFilenameFeature(tests.TestCase):
1001
987
 
1022
1008
        because of our use of global state.
1023
1009
        """
1024
1010
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1011
        try:
1027
1012
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1013
            return testrunner.run(test)
1030
1014
        finally:
1031
1015
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1016
 
1034
1017
    def test_known_failure_failed_run(self):
1035
1018
        # run a test that generates a known failure which should be printed in
1081
1064
    def test_result_decorator(self):
1082
1065
        # decorate results
1083
1066
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1067
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1068
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1069
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1070
                calls.append('start')
1088
1071
        test = unittest.FunctionTestCase(lambda:None)
1089
1072
        stream = StringIO()
1190
1173
            ],
1191
1174
            lines[-3:])
1192
1175
 
1193
 
    def _patch_get_bzr_source_tree(self):
1194
 
        # Reading from the actual source tree breaks isolation, but we don't
1195
 
        # want to assume that thats *all* that would happen.
1196
 
        self._get_source_tree_calls = []
1197
 
        def new_get():
1198
 
            self._get_source_tree_calls.append("called")
1199
 
            return None
1200
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1201
 
 
1202
 
    def test_bench_history(self):
1203
 
        # tests that the running the benchmark passes bench_history into
1204
 
        # the test result object. We can tell that happens if
1205
 
        # _get_bzr_source_tree is called.
1206
 
        self._patch_get_bzr_source_tree()
1207
 
        test = TestRunner('dummy_test')
1208
 
        output = StringIO()
1209
 
        runner = tests.TextTestRunner(stream=self._log_file,
1210
 
                                      bench_history=output)
1211
 
        result = self.run_test_runner(runner, test)
1212
 
        output_string = output.getvalue()
1213
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
 
        self.assertLength(1, self._get_source_tree_calls)
 
1176
    def test_verbose_test_count(self):
 
1177
        """A verbose test run reports the right test count at the start"""
 
1178
        suite = TestUtil.TestSuite([
 
1179
            unittest.FunctionTestCase(lambda:None),
 
1180
            unittest.FunctionTestCase(lambda:None)])
 
1181
        self.assertEqual(suite.countTestCases(), 2)
 
1182
        stream = StringIO()
 
1183
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1184
        # Need to use the CountingDecorator as that's what sets num_tests
 
1185
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1186
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1215
1187
 
1216
1188
    def test_startTestRun(self):
1217
1189
        """run should call result.startTestRun()"""
1218
1190
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1191
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1192
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1193
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1194
                calls.append('startTestRun')
1223
1195
        test = unittest.FunctionTestCase(lambda:None)
1224
1196
        stream = StringIO()
1230
1202
    def test_stopTestRun(self):
1231
1203
        """run should call result.stopTestRun()"""
1232
1204
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1205
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1206
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1207
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1208
                calls.append('stopTestRun')
1237
1209
        test = unittest.FunctionTestCase(lambda:None)
1238
1210
        stream = StringIO()
1241
1213
        result = self.run_test_runner(runner, test)
1242
1214
        self.assertLength(1, calls)
1243
1215
 
 
1216
    def test_unicode_test_output_on_ascii_stream(self):
 
1217
        """Showing results should always succeed even on an ascii console"""
 
1218
        class FailureWithUnicode(tests.TestCase):
 
1219
            def test_log_unicode(self):
 
1220
                self.log(u"\u2606")
 
1221
                self.fail("Now print that log!")
 
1222
        out = StringIO()
 
1223
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1224
            lambda trace=False: "ascii")
 
1225
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1226
            FailureWithUnicode("test_log_unicode"))
 
1227
        self.assertContainsRe(out.getvalue(),
 
1228
            "Text attachment: log\n"
 
1229
            "-+\n"
 
1230
            "\d+\.\d+  \\\\u2606\n"
 
1231
            "-+\n")
 
1232
 
1244
1233
 
1245
1234
class SampleTestCase(tests.TestCase):
1246
1235
 
1421
1410
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1411
        output_stream = StringIO()
1423
1412
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1413
            output_stream,
1425
1414
            descriptions=0,
1426
1415
            verbosity=2)
1427
1416
        sample_test.run(result)
1434
1423
        # Note this test won't fail with hooks that the core library doesn't
1435
1424
        # use - but it trigger with a plugin that adds hooks, so its still a
1436
1425
        # useful warning in that case.
1437
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1438
 
            bzrlib.branch.Branch.hooks)
1439
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1426
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1427
        self.assertEqual(
 
1428
            bzrlib.smart.server.SmartServerHooks(),
1440
1429
            bzrlib.smart.server.SmartTCPServer.hooks)
1441
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1442
 
            bzrlib.commands.Command.hooks)
 
1430
        self.assertEqual(
 
1431
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1443
1432
 
1444
1433
    def test__gather_lsprof_in_benchmarks(self):
1445
1434
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1655
1644
        self.assertEqual('original', obj.test_attr)
1656
1645
 
1657
1646
 
 
1647
class _MissingFeature(tests.Feature):
 
1648
    def _probe(self):
 
1649
        return False
 
1650
missing_feature = _MissingFeature()
 
1651
 
 
1652
 
 
1653
def _get_test(name):
 
1654
    """Get an instance of a specific example test.
 
1655
 
 
1656
    We protect this in a function so that they don't auto-run in the test
 
1657
    suite.
 
1658
    """
 
1659
 
 
1660
    class ExampleTests(tests.TestCase):
 
1661
 
 
1662
        def test_fail(self):
 
1663
            mutter('this was a failing test')
 
1664
            self.fail('this test will fail')
 
1665
 
 
1666
        def test_error(self):
 
1667
            mutter('this test errored')
 
1668
            raise RuntimeError('gotcha')
 
1669
 
 
1670
        def test_missing_feature(self):
 
1671
            mutter('missing the feature')
 
1672
            self.requireFeature(missing_feature)
 
1673
 
 
1674
        def test_skip(self):
 
1675
            mutter('this test will be skipped')
 
1676
            raise tests.TestSkipped('reason')
 
1677
 
 
1678
        def test_success(self):
 
1679
            mutter('this test succeeds')
 
1680
 
 
1681
        def test_xfail(self):
 
1682
            mutter('test with expected failure')
 
1683
            self.knownFailure('this_fails')
 
1684
 
 
1685
        def test_unexpected_success(self):
 
1686
            mutter('test with unexpected success')
 
1687
            self.expectFailure('should_fail', lambda: None)
 
1688
 
 
1689
    return ExampleTests(name)
 
1690
 
 
1691
 
 
1692
class TestTestCaseLogDetails(tests.TestCase):
 
1693
 
 
1694
    def _run_test(self, test_name):
 
1695
        test = _get_test(test_name)
 
1696
        result = testtools.TestResult()
 
1697
        test.run(result)
 
1698
        return result
 
1699
 
 
1700
    def test_fail_has_log(self):
 
1701
        result = self._run_test('test_fail')
 
1702
        self.assertEqual(1, len(result.failures))
 
1703
        result_content = result.failures[0][1]
 
1704
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1705
        self.assertContainsRe(result_content, 'this was a failing test')
 
1706
 
 
1707
    def test_error_has_log(self):
 
1708
        result = self._run_test('test_error')
 
1709
        self.assertEqual(1, len(result.errors))
 
1710
        result_content = result.errors[0][1]
 
1711
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1712
        self.assertContainsRe(result_content, 'this test errored')
 
1713
 
 
1714
    def test_skip_has_no_log(self):
 
1715
        result = self._run_test('test_skip')
 
1716
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1717
        skips = result.skip_reasons['reason']
 
1718
        self.assertEqual(1, len(skips))
 
1719
        test = skips[0]
 
1720
        self.assertFalse('log' in test.getDetails())
 
1721
 
 
1722
    def test_missing_feature_has_no_log(self):
 
1723
        # testtools doesn't know about addNotSupported, so it just gets
 
1724
        # considered as a skip
 
1725
        result = self._run_test('test_missing_feature')
 
1726
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1727
        skips = result.skip_reasons[missing_feature]
 
1728
        self.assertEqual(1, len(skips))
 
1729
        test = skips[0]
 
1730
        self.assertFalse('log' in test.getDetails())
 
1731
 
 
1732
    def test_xfail_has_no_log(self):
 
1733
        result = self._run_test('test_xfail')
 
1734
        self.assertEqual(1, len(result.expectedFailures))
 
1735
        result_content = result.expectedFailures[0][1]
 
1736
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1737
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1738
 
 
1739
    def test_unexpected_success_has_log(self):
 
1740
        result = self._run_test('test_unexpected_success')
 
1741
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1742
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1743
        # expectedFailures is a list of reasons?
 
1744
        test = result.unexpectedSuccesses[0]
 
1745
        details = test.getDetails()
 
1746
        self.assertTrue('log' in details)
 
1747
 
 
1748
 
 
1749
class TestTestCloning(tests.TestCase):
 
1750
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1751
 
 
1752
    def test_cloned_testcase_does_not_share_details(self):
 
1753
        """A TestCase cloned with clone_test does not share mutable attributes
 
1754
        such as details or cleanups.
 
1755
        """
 
1756
        class Test(tests.TestCase):
 
1757
            def test_foo(self):
 
1758
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1759
        orig_test = Test('test_foo')
 
1760
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1761
        orig_test.run(unittest.TestResult())
 
1762
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1763
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1764
 
 
1765
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1766
        """Applying two levels of scenarios to a test preserves the attributes
 
1767
        added by both scenarios.
 
1768
        """
 
1769
        class Test(tests.TestCase):
 
1770
            def test_foo(self):
 
1771
                pass
 
1772
        test = Test('test_foo')
 
1773
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1774
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1775
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1776
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1777
        all_tests = list(tests.iter_suite_tests(suite))
 
1778
        self.assertLength(4, all_tests)
 
1779
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1780
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1781
 
 
1782
 
1658
1783
# NB: Don't delete this; it's not actually from 0.11!
1659
1784
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1785
def sample_deprecated_function():
1787
1912
    def test_make_branch_and_tree_with_format(self):
1788
1913
        # we should be able to supply a format to make_branch_and_tree
1789
1914
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1790
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1791
1915
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1792
1916
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1793
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1794
 
                              bzrlib.bzrdir.BzrDirFormat6)
1795
1917
 
1796
1918
    def test_make_branch_and_memory_tree(self):
1797
1919
        # we should be able to get a new branch and a mutable tree from
1814
1936
                tree.branch.repository.bzrdir.root_transport)
1815
1937
 
1816
1938
 
1817
 
class SelfTestHelper:
 
1939
class SelfTestHelper(object):
1818
1940
 
1819
1941
    def run_selftest(self, **kwargs):
1820
1942
        """Run selftest returning its output."""
1880
2002
            def __call__(test, result):
1881
2003
                test.run(result)
1882
2004
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2005
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2006
                calls.append("called")
1885
2007
            def countTestCases(self):
1886
2008
                return 1
1971
2093
            load_list='missing file name', list_only=True)
1972
2094
 
1973
2095
 
 
2096
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2097
 
 
2098
    _test_needs_features = [features.subunit]
 
2099
 
 
2100
    def run_subunit_stream(self, test_name):
 
2101
        from subunit import ProtocolTestCase
 
2102
        def factory():
 
2103
            return TestUtil.TestSuite([_get_test(test_name)])
 
2104
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2105
            test_suite_factory=factory)
 
2106
        test = ProtocolTestCase(stream)
 
2107
        result = testtools.TestResult()
 
2108
        test.run(result)
 
2109
        content = stream.getvalue()
 
2110
        return content, result
 
2111
 
 
2112
    def test_fail_has_log(self):
 
2113
        content, result = self.run_subunit_stream('test_fail')
 
2114
        self.assertEqual(1, len(result.failures))
 
2115
        self.assertContainsRe(content, '(?m)^log$')
 
2116
        self.assertContainsRe(content, 'this test will fail')
 
2117
 
 
2118
    def test_error_has_log(self):
 
2119
        content, result = self.run_subunit_stream('test_error')
 
2120
        self.assertContainsRe(content, '(?m)^log$')
 
2121
        self.assertContainsRe(content, 'this test errored')
 
2122
 
 
2123
    def test_skip_has_no_log(self):
 
2124
        content, result = self.run_subunit_stream('test_skip')
 
2125
        self.assertNotContainsRe(content, '(?m)^log$')
 
2126
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2127
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2128
        skips = result.skip_reasons['reason']
 
2129
        self.assertEqual(1, len(skips))
 
2130
        test = skips[0]
 
2131
        # RemotedTestCase doesn't preserve the "details"
 
2132
        ## self.assertFalse('log' in test.getDetails())
 
2133
 
 
2134
    def test_missing_feature_has_no_log(self):
 
2135
        content, result = self.run_subunit_stream('test_missing_feature')
 
2136
        self.assertNotContainsRe(content, '(?m)^log$')
 
2137
        self.assertNotContainsRe(content, 'missing the feature')
 
2138
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2139
        skips = result.skip_reasons['_MissingFeature\n']
 
2140
        self.assertEqual(1, len(skips))
 
2141
        test = skips[0]
 
2142
        # RemotedTestCase doesn't preserve the "details"
 
2143
        ## self.assertFalse('log' in test.getDetails())
 
2144
 
 
2145
    def test_xfail_has_no_log(self):
 
2146
        content, result = self.run_subunit_stream('test_xfail')
 
2147
        self.assertNotContainsRe(content, '(?m)^log$')
 
2148
        self.assertNotContainsRe(content, 'test with expected failure')
 
2149
        self.assertEqual(1, len(result.expectedFailures))
 
2150
        result_content = result.expectedFailures[0][1]
 
2151
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2152
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2153
 
 
2154
    def test_unexpected_success_has_log(self):
 
2155
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2156
        self.assertContainsRe(content, '(?m)^log$')
 
2157
        self.assertContainsRe(content, 'test with unexpected success')
 
2158
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2159
                           ' as a plain success',
 
2160
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2161
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2162
        test = result.unexpectedSuccesses[0]
 
2163
        # RemotedTestCase doesn't preserve the "details"
 
2164
        ## self.assertTrue('log' in test.getDetails())
 
2165
 
 
2166
    def test_success_has_no_log(self):
 
2167
        content, result = self.run_subunit_stream('test_success')
 
2168
        self.assertEqual(1, result.testsRun)
 
2169
        self.assertNotContainsRe(content, '(?m)^log$')
 
2170
        self.assertNotContainsRe(content, 'this test succeeds')
 
2171
 
 
2172
 
1974
2173
class TestRunBzr(tests.TestCase):
1975
2174
 
1976
2175
    out = ''
2339
2538
            os.chdir = orig_chdir
2340
2539
        self.assertEqual(['foo', 'current'], chdirs)
2341
2540
 
 
2541
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2542
        self.get_source_path = lambda: ""
 
2543
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2544
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2545
 
2342
2546
 
2343
2547
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2548
    """Tests that really need to do things with an external bzr."""
2934
3138
        tpr.register('bar', 'bBB.aAA.rRR')
2935
3139
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2936
3140
        self.assertThat(self.get_log(),
2937
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3141
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3142
                           doctest.ELLIPSIS))
2938
3143
 
2939
3144
    def test_get_unknown_prefix(self):
2940
3145
        tpr = self._get_registry()
2960
3165
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3166
 
2962
3167
 
 
3168
class TestThreadLeakDetection(tests.TestCase):
 
3169
    """Ensure when tests leak threads we detect and report it"""
 
3170
 
 
3171
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3172
        def __init__(self):
 
3173
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3174
            self.leaks = []
 
3175
        def _report_thread_leak(self, test, leaks, alive):
 
3176
            self.leaks.append((test, leaks))
 
3177
 
 
3178
    def test_testcase_without_addCleanups(self):
 
3179
        """Check old TestCase instances don't break with leak detection"""
 
3180
        class Test(unittest.TestCase):
 
3181
            def runTest(self):
 
3182
                pass
 
3183
        result = self.LeakRecordingResult()
 
3184
        test = Test()
 
3185
        result.startTestRun()
 
3186
        test.run(result)
 
3187
        result.stopTestRun()
 
3188
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3189
        self.assertEqual(result.leaks, [])
 
3190
        
 
3191
    def test_thread_leak(self):
 
3192
        """Ensure a thread that outlives the running of a test is reported
 
3193
 
 
3194
        Uses a thread that blocks on an event, and is started by the inner
 
3195
        test case. As the thread outlives the inner case's run, it should be
 
3196
        detected as a leak, but the event is then set so that the thread can
 
3197
        be safely joined in cleanup so it's not leaked for real.
 
3198
        """
 
3199
        event = threading.Event()
 
3200
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3201
        class Test(tests.TestCase):
 
3202
            def test_leak(self):
 
3203
                thread.start()
 
3204
        result = self.LeakRecordingResult()
 
3205
        test = Test("test_leak")
 
3206
        self.addCleanup(thread.join)
 
3207
        self.addCleanup(event.set)
 
3208
        result.startTestRun()
 
3209
        test.run(result)
 
3210
        result.stopTestRun()
 
3211
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3212
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3213
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3214
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3215
 
 
3216
    def test_multiple_leaks(self):
 
3217
        """Check multiple leaks are blamed on the test cases at fault
 
3218
 
 
3219
        Same concept as the previous test, but has one inner test method that
 
3220
        leaks two threads, and one that doesn't leak at all.
 
3221
        """
 
3222
        event = threading.Event()
 
3223
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3224
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3225
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3226
        class Test(tests.TestCase):
 
3227
            def test_first_leak(self):
 
3228
                thread_b.start()
 
3229
            def test_second_no_leak(self):
 
3230
                pass
 
3231
            def test_third_leak(self):
 
3232
                thread_c.start()
 
3233
                thread_a.start()
 
3234
        result = self.LeakRecordingResult()
 
3235
        first_test = Test("test_first_leak")
 
3236
        third_test = Test("test_third_leak")
 
3237
        self.addCleanup(thread_a.join)
 
3238
        self.addCleanup(thread_b.join)
 
3239
        self.addCleanup(thread_c.join)
 
3240
        self.addCleanup(event.set)
 
3241
        result.startTestRun()
 
3242
        unittest.TestSuite(
 
3243
            [first_test, Test("test_second_no_leak"), third_test]
 
3244
            ).run(result)
 
3245
        result.stopTestRun()
 
3246
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3247
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3248
        self.assertEqual(result.leaks, [
 
3249
            (first_test, set([thread_b])),
 
3250
            (third_test, set([thread_a, thread_c]))])
 
3251
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3252
 
 
3253
 
 
3254
class TestPostMortemDebugging(tests.TestCase):
 
3255
    """Check post mortem debugging works when tests fail or error"""
 
3256
 
 
3257
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3258
        def __init__(self):
 
3259
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3260
            self.postcode = None
 
3261
        def _post_mortem(self, tb=None):
 
3262
            """Record the code object at the end of the current traceback"""
 
3263
            tb = tb or sys.exc_info()[2]
 
3264
            if tb is not None:
 
3265
                next = tb.tb_next
 
3266
                while next is not None:
 
3267
                    tb = next
 
3268
                    next = next.tb_next
 
3269
                self.postcode = tb.tb_frame.f_code
 
3270
        def report_error(self, test, err):
 
3271
            pass
 
3272
        def report_failure(self, test, err):
 
3273
            pass
 
3274
 
 
3275
    def test_location_unittest_error(self):
 
3276
        """Needs right post mortem traceback with erroring unittest case"""
 
3277
        class Test(unittest.TestCase):
 
3278
            def runTest(self):
 
3279
                raise RuntimeError
 
3280
        result = self.TracebackRecordingResult()
 
3281
        Test().run(result)
 
3282
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3283
 
 
3284
    def test_location_unittest_failure(self):
 
3285
        """Needs right post mortem traceback with failing unittest case"""
 
3286
        class Test(unittest.TestCase):
 
3287
            def runTest(self):
 
3288
                raise self.failureException
 
3289
        result = self.TracebackRecordingResult()
 
3290
        Test().run(result)
 
3291
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3292
 
 
3293
    def test_location_bt_error(self):
 
3294
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3295
        class Test(tests.TestCase):
 
3296
            def test_error(self):
 
3297
                raise RuntimeError
 
3298
        result = self.TracebackRecordingResult()
 
3299
        Test("test_error").run(result)
 
3300
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3301
 
 
3302
    def test_location_bt_failure(self):
 
3303
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3304
        class Test(tests.TestCase):
 
3305
            def test_failure(self):
 
3306
                raise self.failureException
 
3307
        result = self.TracebackRecordingResult()
 
3308
        Test("test_failure").run(result)
 
3309
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3310
 
 
3311
    def test_env_var_triggers_post_mortem(self):
 
3312
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3313
        import pdb
 
3314
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3315
        post_mortem_calls = []
 
3316
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3317
        self.overrideEnv('BZR_TEST_PDB', None)
 
3318
        result._post_mortem(1)
 
3319
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3320
        result._post_mortem(2)
 
3321
        self.assertEqual([2], post_mortem_calls)
 
3322
 
 
3323
 
2963
3324
class TestRunSuite(tests.TestCase):
2964
3325
 
2965
3326
    def test_runner_class(self):
2976
3337
                                                self.verbosity)
2977
3338
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3339
        self.assertLength(1, calls)
 
3340
 
 
3341
 
 
3342
class TestEnvironHandling(tests.TestCase):
 
3343
 
 
3344
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3345
        self.failIf('MYVAR' in os.environ)
 
3346
        self.overrideEnv('MYVAR', '42')
 
3347
        # We use an embedded test to make sure we fix the _captureVar bug
 
3348
        class Test(tests.TestCase):
 
3349
            def test_me(self):
 
3350
                # The first call save the 42 value
 
3351
                self.overrideEnv('MYVAR', None)
 
3352
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3353
                # Make sure we can call it twice
 
3354
                self.overrideEnv('MYVAR', None)
 
3355
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3356
        output = StringIO()
 
3357
        result = tests.TextTestResult(output, 0, 1)
 
3358
        Test('test_me').run(result)
 
3359
        if not result.wasStrictlySuccessful():
 
3360
            self.fail(output.getvalue())
 
3361
        # We get our value back
 
3362
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3363
 
 
3364
 
 
3365
class TestIsolatedEnv(tests.TestCase):
 
3366
    """Test isolating tests from os.environ.
 
3367
 
 
3368
    Since we use tests that are already isolated from os.environ a bit of care
 
3369
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3370
    The tests start an already clean os.environ which allow doing valid
 
3371
    assertions about which variables are present or not and design tests around
 
3372
    these assertions.
 
3373
    """
 
3374
 
 
3375
    class ScratchMonkey(tests.TestCase):
 
3376
 
 
3377
        def test_me(self):
 
3378
            pass
 
3379
 
 
3380
    def test_basics(self):
 
3381
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3382
        # for tests.TestCase.
 
3383
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3384
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3385
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3386
        self.assertFalse('BZR_HOME' in os.environ)
 
3387
        # Make sure we know the definition of LINES: part of os.environ for
 
3388
        # tests.TestCase
 
3389
        self.assertTrue('LINES' in tests.isolated_environ)
 
3390
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3391
        self.assertEquals('25', os.environ['LINES'])
 
3392
 
 
3393
    def test_injecting_unknown_variable(self):
 
3394
        # BZR_HOME is known to be absent from os.environ
 
3395
        test = self.ScratchMonkey('test_me')
 
3396
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3397
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3398
        tests.restore_os_environ(test)
 
3399
        self.assertFalse('BZR_HOME' in os.environ)
 
3400
 
 
3401
    def test_injecting_known_variable(self):
 
3402
        test = self.ScratchMonkey('test_me')
 
3403
        # LINES is known to be present in os.environ
 
3404
        tests.override_os_environ(test, {'LINES': '42'})
 
3405
        self.assertEquals('42', os.environ['LINES'])
 
3406
        tests.restore_os_environ(test)
 
3407
        self.assertEquals('25', os.environ['LINES'])
 
3408
 
 
3409
    def test_deleting_variable(self):
 
3410
        test = self.ScratchMonkey('test_me')
 
3411
        # LINES is known to be present in os.environ
 
3412
        tests.override_os_environ(test, {'LINES': None})
 
3413
        self.assertTrue('LINES' not in os.environ)
 
3414
        tests.restore_os_environ(test)
 
3415
        self.assertEquals('25', os.environ['LINES'])
 
3416
 
 
3417
 
 
3418
class TestDocTestSuiteIsolation(tests.TestCase):
 
3419
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3420
 
 
3421
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3422
    the clean environment as a base for testing. To precisely capture the
 
3423
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3424
    compare against.
 
3425
 
 
3426
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3427
    not `os.environ` so each test overrides it to suit its needs.
 
3428
 
 
3429
    """
 
3430
 
 
3431
    def get_doctest_suite_for_string(self, klass, string):
 
3432
        class Finder(doctest.DocTestFinder):
 
3433
 
 
3434
            def find(*args, **kwargs):
 
3435
                test = doctest.DocTestParser().get_doctest(
 
3436
                    string, {}, 'foo', 'foo.py', 0)
 
3437
                return [test]
 
3438
 
 
3439
        suite = klass(test_finder=Finder())
 
3440
        return suite
 
3441
 
 
3442
    def run_doctest_suite_for_string(self, klass, string):
 
3443
        suite = self.get_doctest_suite_for_string(klass, string)
 
3444
        output = StringIO()
 
3445
        result = tests.TextTestResult(output, 0, 1)
 
3446
        suite.run(result)
 
3447
        return result, output
 
3448
 
 
3449
    def assertDocTestStringSucceds(self, klass, string):
 
3450
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3451
        if not result.wasStrictlySuccessful():
 
3452
            self.fail(output.getvalue())
 
3453
 
 
3454
    def assertDocTestStringFails(self, klass, string):
 
3455
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3456
        if result.wasStrictlySuccessful():
 
3457
            self.fail(output.getvalue())
 
3458
 
 
3459
    def test_injected_variable(self):
 
3460
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3461
        test = """
 
3462
            >>> import os
 
3463
            >>> os.environ['LINES']
 
3464
            '42'
 
3465
            """
 
3466
        # doctest.DocTestSuite fails as it sees '25'
 
3467
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3468
        # tests.DocTestSuite sees '42'
 
3469
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3470
 
 
3471
    def test_deleted_variable(self):
 
3472
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3473
        test = """
 
3474
            >>> import os
 
3475
            >>> os.environ.get('LINES')
 
3476
            """
 
3477
        # doctest.DocTestSuite fails as it sees '25'
 
3478
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3479
        # tests.DocTestSuite sees None
 
3480
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)