~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
49
52
    tests,
50
53
    transport,
51
54
    workingtree,
 
55
    workingtree_3,
 
56
    workingtree_4,
52
57
    )
53
58
from bzrlib.repofmt import (
54
59
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
60
    )
58
61
from bzrlib.symbol_versioning import (
59
62
    deprecated_function,
64
67
    features,
65
68
    test_lsprof,
66
69
    test_server,
67
 
    test_sftp_transport,
68
70
    TestUtil,
69
71
    )
70
 
from bzrlib.trace import note
 
72
from bzrlib.trace import note, mutter
71
73
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
74
 
74
75
 
75
76
def _test_ids(test_suite):
77
78
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
79
 
79
80
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
81
class MetaTestLog(tests.TestCase):
93
82
 
94
83
    def test_logging(self):
100
89
            "text", "plain", {"charset": "utf8"})))
101
90
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
91
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
92
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
93
 
105
94
 
106
95
class TestUnicodeFilename(tests.TestCase):
119
108
 
120
109
        filename = u'hell\u00d8'
121
110
        self.build_tree_contents([(filename, 'contents of hello')])
122
 
        self.failUnlessExists(filename)
 
111
        self.assertPathExists(filename)
 
112
 
 
113
 
 
114
class TestClassesAvailable(tests.TestCase):
 
115
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
116
 
 
117
    def test_test_case(self):
 
118
        from bzrlib.tests import TestCase
 
119
 
 
120
    def test_test_loader(self):
 
121
        from bzrlib.tests import TestLoader
 
122
 
 
123
    def test_test_suite(self):
 
124
        from bzrlib.tests import TestSuite
123
125
 
124
126
 
125
127
class TestTransportScenarios(tests.TestCase):
312
314
        from bzrlib.tests.per_interrepository import make_scenarios
313
315
        server1 = "a"
314
316
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
317
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
318
        scenarios = make_scenarios(server1, server2, formats)
317
319
        self.assertEqual([
318
320
            ('C0,str,str',
319
321
             {'repository_format': 'C1',
320
322
              'repository_format_to': 'C2',
321
323
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
324
              'transport_server': 'a',
 
325
              'extra_setup': 'C3'}),
323
326
            ('D0,str,str',
324
327
             {'repository_format': 'D1',
325
328
              'repository_format_to': 'D2',
326
329
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
330
              'transport_server': 'a',
 
331
              'extra_setup': 'D3'})],
328
332
            scenarios)
329
333
 
330
334
 
336
340
        from bzrlib.tests.per_workingtree import make_scenarios
337
341
        server1 = "a"
338
342
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
340
 
                   workingtree.WorkingTreeFormat3(),]
 
343
        formats = [workingtree_4.WorkingTreeFormat4(),
 
344
                   workingtree_3.WorkingTreeFormat3(),]
341
345
        scenarios = make_scenarios(server1, server2, formats)
342
346
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
347
            ('WorkingTreeFormat4',
344
348
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
349
              'transport_readonly_server': 'b',
346
350
              'transport_server': 'a',
373
377
            )
374
378
        server1 = "a"
375
379
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
377
 
                   workingtree.WorkingTreeFormat3(),]
 
380
        formats = [workingtree_4.WorkingTreeFormat4(),
 
381
                   workingtree_3.WorkingTreeFormat3(),]
378
382
        scenarios = make_scenarios(server1, server2, formats)
379
383
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
 
        wt4_format = workingtree.WorkingTreeFormat4()
382
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
384
        default_wt_format = workingtree.format_registry.get_default()
 
385
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
386
        wt5_format = workingtree_4.WorkingTreeFormat5()
383
387
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
388
            ('WorkingTreeFormat4',
385
389
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
390
              'transport_readonly_server': 'b',
387
391
              'transport_server': 'a',
447
451
        # ones to add.
448
452
        from bzrlib.tests.per_tree import (
449
453
            return_parameter,
450
 
            revision_tree_from_workingtree
451
454
            )
452
455
        from bzrlib.tests.per_intertree import (
453
456
            make_scenarios,
454
457
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
458
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
459
        from bzrlib.workingtree_4 import WorkingTreeFormat4
456
460
        input_test = TestInterTreeScenarios(
457
461
            "test_scenarios")
458
462
        server1 = "a"
459
463
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
464
        format1 = WorkingTreeFormat4()
461
465
        format2 = WorkingTreeFormat3()
462
466
        formats = [("1", str, format1, format2, "converter1"),
463
467
            ("2", int, format2, format1, "converter2")]
509
513
        self.assertRaises(AssertionError, self.assertEqualStat,
510
514
            os.lstat("foo"), os.lstat("longname"))
511
515
 
 
516
    def test_failUnlessExists(self):
 
517
        """Deprecated failUnlessExists and failIfExists"""
 
518
        self.applyDeprecated(
 
519
            deprecated_in((2, 4)),
 
520
            self.failUnlessExists, '.')
 
521
        self.build_tree(['foo/', 'foo/bar'])
 
522
        self.applyDeprecated(
 
523
            deprecated_in((2, 4)),
 
524
            self.failUnlessExists, 'foo/bar')
 
525
        self.applyDeprecated(
 
526
            deprecated_in((2, 4)),
 
527
            self.failIfExists, 'foo/foo')
 
528
 
 
529
    def test_assertPathExists(self):
 
530
        self.assertPathExists('.')
 
531
        self.build_tree(['foo/', 'foo/bar'])
 
532
        self.assertPathExists('foo/bar')
 
533
        self.assertPathDoesNotExist('foo/foo')
 
534
 
512
535
 
513
536
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
514
537
 
548
571
        tree = self.make_branch_and_memory_tree('dir')
549
572
        # Guard against regression into MemoryTransport leaking
550
573
        # files to disk instead of keeping them in memory.
551
 
        self.failIf(osutils.lexists('dir'))
 
574
        self.assertFalse(osutils.lexists('dir'))
552
575
        self.assertIsInstance(tree, memorytree.MemoryTree)
553
576
 
554
577
    def test_make_branch_and_memory_tree_with_format(self):
555
578
        """make_branch_and_memory_tree should accept a format option."""
556
579
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
580
        format.repository_format = repository.format_registry.get_default()
558
581
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
582
        # Guard against regression into MemoryTransport leaking
560
583
        # files to disk instead of keeping them in memory.
561
 
        self.failIf(osutils.lexists('dir'))
 
584
        self.assertFalse(osutils.lexists('dir'))
562
585
        self.assertIsInstance(tree, memorytree.MemoryTree)
563
586
        self.assertEqual(format.repository_format.__class__,
564
587
            tree.branch.repository._format.__class__)
568
591
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
569
592
        # Guard against regression into MemoryTransport leaking
570
593
        # files to disk instead of keeping them in memory.
571
 
        self.failIf(osutils.lexists('dir'))
 
594
        self.assertFalse(osutils.lexists('dir'))
572
595
 
573
596
    def test_make_branch_builder_with_format(self):
574
597
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
598
        # that the format objects are used.
576
599
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
600
        repo_format = repository.format_registry.get_default()
578
601
        format.repository_format = repo_format
579
602
        builder = self.make_branch_builder('dir', format=format)
580
603
        the_branch = builder.get_branch()
581
604
        # Guard against regression into MemoryTransport leaking
582
605
        # files to disk instead of keeping them in memory.
583
 
        self.failIf(osutils.lexists('dir'))
 
606
        self.assertFalse(osutils.lexists('dir'))
584
607
        self.assertEqual(format.repository_format.__class__,
585
608
                         the_branch.repository._format.__class__)
586
609
        self.assertEqual(repo_format.get_format_string(),
592
615
        the_branch = builder.get_branch()
593
616
        # Guard against regression into MemoryTransport leaking
594
617
        # files to disk instead of keeping them in memory.
595
 
        self.failIf(osutils.lexists('dir'))
 
618
        self.assertFalse(osutils.lexists('dir'))
596
619
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
597
620
        self.assertEqual(dir_format.repository_format.__class__,
598
621
                         the_branch.repository._format.__class__)
611
634
        result = test.run()
612
635
        total_failures = result.errors + result.failures
613
636
        if self._lock_check_thorough:
614
 
            self.assertLength(1, total_failures)
 
637
            self.assertEqual(1, len(total_failures))
615
638
        else:
616
639
            # When _lock_check_thorough is disabled, then we don't trigger a
617
640
            # failure
618
 
            self.assertLength(0, total_failures)
 
641
            self.assertEqual(0, len(total_failures))
619
642
 
620
643
 
621
644
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
631
654
        url2 = self.get_readonly_url('foo/bar')
632
655
        t = transport.get_transport(url)
633
656
        t2 = transport.get_transport(url2)
634
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
657
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
658
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
659
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
660
 
638
661
    def test_get_readonly_url_http(self):
646
669
        # the transport returned may be any HttpTransportBase subclass
647
670
        t = transport.get_transport(url)
648
671
        t2 = transport.get_transport(url2)
649
 
        self.failUnless(isinstance(t, HttpTransportBase))
650
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
672
        self.assertIsInstance(t, HttpTransportBase)
 
673
        self.assertIsInstance(t2, HttpTransportBase)
651
674
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
652
675
 
653
676
    def test_is_directory(self):
661
684
    def test_make_branch_builder(self):
662
685
        builder = self.make_branch_builder('dir')
663
686
        rev_id = builder.build_commit()
664
 
        self.failUnlessExists('dir')
 
687
        self.assertPathExists('dir')
665
688
        a_dir = bzrdir.BzrDir.open('dir')
666
689
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
667
690
        a_branch = a_dir.open_branch()
683
706
        self.assertIsInstance(result_bzrdir.transport,
684
707
                              memory.MemoryTransport)
685
708
        # should not be on disk, should only be in memory
686
 
        self.failIfExists('subdir')
 
709
        self.assertPathDoesNotExist('subdir')
687
710
 
688
711
 
689
712
class TestChrootedTest(tests.ChrootedTestCase):
748
771
        self.check_timing(ShortDelayTestCase('test_short_delay'),
749
772
                          r"^ +[0-9]+ms$")
750
773
 
751
 
    def _patch_get_bzr_source_tree(self):
752
 
        # Reading from the actual source tree breaks isolation, but we don't
753
 
        # want to assume that thats *all* that would happen.
754
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
755
 
 
756
 
    def test_assigned_benchmark_file_stores_date(self):
757
 
        self._patch_get_bzr_source_tree()
758
 
        output = StringIO()
759
 
        result = bzrlib.tests.TextTestResult(self._log_file,
760
 
                                        descriptions=0,
761
 
                                        verbosity=1,
762
 
                                        bench_history=output
763
 
                                        )
764
 
        output_string = output.getvalue()
765
 
        # if you are wondering about the regexp please read the comment in
766
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
767
 
        # XXX: what comment?  -- Andrew Bennetts
768
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
769
 
 
770
 
    def test_benchhistory_records_test_times(self):
771
 
        self._patch_get_bzr_source_tree()
772
 
        result_stream = StringIO()
773
 
        result = bzrlib.tests.TextTestResult(
774
 
            self._log_file,
775
 
            descriptions=0,
776
 
            verbosity=1,
777
 
            bench_history=result_stream
778
 
            )
779
 
 
780
 
        # we want profile a call and check that its test duration is recorded
781
 
        # make a new test instance that when run will generate a benchmark
782
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
783
 
        # execute the test, which should succeed and record times
784
 
        example_test_case.run(result)
785
 
        lines = result_stream.getvalue().splitlines()
786
 
        self.assertEqual(2, len(lines))
787
 
        self.assertContainsRe(lines[1],
788
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
789
 
            "._time_hello_world_encoding")
790
 
 
791
774
    def _time_hello_world_encoding(self):
792
775
        """Profile two sleep calls
793
776
 
833
816
        self.assertContainsRe(output,
834
817
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
835
818
 
 
819
    def test_uses_time_from_testtools(self):
 
820
        """Test case timings in verbose results should use testtools times"""
 
821
        import datetime
 
822
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
823
            def startTest(self, test):
 
824
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
825
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
826
            def addSuccess(self, test):
 
827
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
828
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
829
            def report_tests_starting(self): pass
 
830
        sio = StringIO()
 
831
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
832
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
833
 
836
834
    def test_known_failure(self):
837
835
        """A KnownFailure being raised should trigger several result actions."""
838
836
        class InstrumentedTestResult(tests.ExtendedTestResult):
839
837
            def stopTestRun(self): pass
840
 
            def startTests(self): pass
841
 
            def report_test_start(self, test): pass
 
838
            def report_tests_starting(self): pass
842
839
            def report_known_failure(self, test, err=None, details=None):
843
840
                self._call = test, 'known failure'
844
841
        result = InstrumentedTestResult(None, None, None, None)
894
891
        """Test the behaviour of invoking addNotSupported."""
895
892
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
893
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
894
            def report_tests_starting(self): pass
899
895
            def report_unsupported(self, test, feature):
900
896
                self._call = test, feature
901
897
        result = InstrumentedTestResult(None, None, None, None)
940
936
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
937
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
938
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
939
            def report_tests_starting(self): pass
945
940
            def addNotSupported(self, test, feature):
946
941
                self._call = test, feature
947
942
        result = InstrumentedTestResult(None, None, None, None)
989
984
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
985
            calls = 0
991
986
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
987
        result = InstrumentedTestResult(None, None, None, None)
994
988
        def test_function():
995
989
            pass
997
991
        test.run(result)
998
992
        self.assertEquals(1, result.calls)
999
993
 
 
994
    def test_startTests_only_once(self):
 
995
        """With multiple tests startTests should still only be called once"""
 
996
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
997
            calls = 0
 
998
            def startTests(self): self.calls += 1
 
999
        result = InstrumentedTestResult(None, None, None, None)
 
1000
        suite = unittest.TestSuite([
 
1001
            unittest.FunctionTestCase(lambda: None),
 
1002
            unittest.FunctionTestCase(lambda: None)])
 
1003
        suite.run(result)
 
1004
        self.assertEquals(1, result.calls)
 
1005
        self.assertEquals(2, result.count)
 
1006
 
1000
1007
 
1001
1008
class TestUnicodeFilenameFeature(tests.TestCase):
1002
1009
 
1023
1030
        because of our use of global state.
1024
1031
        """
1025
1032
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1033
        try:
1028
1034
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1035
            return testrunner.run(test)
1031
1036
        finally:
1032
1037
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1038
 
1035
1039
    def test_known_failure_failed_run(self):
1036
1040
        # run a test that generates a known failure which should be printed in
1082
1086
    def test_result_decorator(self):
1083
1087
        # decorate results
1084
1088
        calls = []
1085
 
        class LoggingDecorator(tests.ForwardingResult):
 
1089
        class LoggingDecorator(ExtendedToOriginalDecorator):
1086
1090
            def startTest(self, test):
1087
 
                tests.ForwardingResult.startTest(self, test)
 
1091
                ExtendedToOriginalDecorator.startTest(self, test)
1088
1092
                calls.append('start')
1089
1093
        test = unittest.FunctionTestCase(lambda:None)
1090
1094
        stream = StringIO()
1191
1195
            ],
1192
1196
            lines[-3:])
1193
1197
 
1194
 
    def _patch_get_bzr_source_tree(self):
1195
 
        # Reading from the actual source tree breaks isolation, but we don't
1196
 
        # want to assume that thats *all* that would happen.
1197
 
        self._get_source_tree_calls = []
1198
 
        def new_get():
1199
 
            self._get_source_tree_calls.append("called")
1200
 
            return None
1201
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1202
 
 
1203
 
    def test_bench_history(self):
1204
 
        # tests that the running the benchmark passes bench_history into
1205
 
        # the test result object. We can tell that happens if
1206
 
        # _get_bzr_source_tree is called.
1207
 
        self._patch_get_bzr_source_tree()
1208
 
        test = TestRunner('dummy_test')
1209
 
        output = StringIO()
1210
 
        runner = tests.TextTestRunner(stream=self._log_file,
1211
 
                                      bench_history=output)
1212
 
        result = self.run_test_runner(runner, test)
1213
 
        output_string = output.getvalue()
1214
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
 
        self.assertLength(1, self._get_source_tree_calls)
 
1198
    def test_verbose_test_count(self):
 
1199
        """A verbose test run reports the right test count at the start"""
 
1200
        suite = TestUtil.TestSuite([
 
1201
            unittest.FunctionTestCase(lambda:None),
 
1202
            unittest.FunctionTestCase(lambda:None)])
 
1203
        self.assertEqual(suite.countTestCases(), 2)
 
1204
        stream = StringIO()
 
1205
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1206
        # Need to use the CountingDecorator as that's what sets num_tests
 
1207
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1208
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1216
1209
 
1217
1210
    def test_startTestRun(self):
1218
1211
        """run should call result.startTestRun()"""
1219
1212
        calls = []
1220
 
        class LoggingDecorator(tests.ForwardingResult):
 
1213
        class LoggingDecorator(ExtendedToOriginalDecorator):
1221
1214
            def startTestRun(self):
1222
 
                tests.ForwardingResult.startTestRun(self)
 
1215
                ExtendedToOriginalDecorator.startTestRun(self)
1223
1216
                calls.append('startTestRun')
1224
1217
        test = unittest.FunctionTestCase(lambda:None)
1225
1218
        stream = StringIO()
1231
1224
    def test_stopTestRun(self):
1232
1225
        """run should call result.stopTestRun()"""
1233
1226
        calls = []
1234
 
        class LoggingDecorator(tests.ForwardingResult):
 
1227
        class LoggingDecorator(ExtendedToOriginalDecorator):
1235
1228
            def stopTestRun(self):
1236
 
                tests.ForwardingResult.stopTestRun(self)
 
1229
                ExtendedToOriginalDecorator.stopTestRun(self)
1237
1230
                calls.append('stopTestRun')
1238
1231
        test = unittest.FunctionTestCase(lambda:None)
1239
1232
        stream = StringIO()
1242
1235
        result = self.run_test_runner(runner, test)
1243
1236
        self.assertLength(1, calls)
1244
1237
 
 
1238
    def test_unicode_test_output_on_ascii_stream(self):
 
1239
        """Showing results should always succeed even on an ascii console"""
 
1240
        class FailureWithUnicode(tests.TestCase):
 
1241
            def test_log_unicode(self):
 
1242
                self.log(u"\u2606")
 
1243
                self.fail("Now print that log!")
 
1244
        out = StringIO()
 
1245
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1246
            lambda trace=False: "ascii")
 
1247
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1248
            FailureWithUnicode("test_log_unicode"))
 
1249
        self.assertContainsRe(out.getvalue(),
 
1250
            "Text attachment: log\n"
 
1251
            "-+\n"
 
1252
            "\d+\.\d+  \\\\u2606\n"
 
1253
            "-+\n")
 
1254
 
1245
1255
 
1246
1256
class SampleTestCase(tests.TestCase):
1247
1257
 
1435
1445
        # Note this test won't fail with hooks that the core library doesn't
1436
1446
        # use - but it trigger with a plugin that adds hooks, so its still a
1437
1447
        # useful warning in that case.
1438
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1439
 
            bzrlib.branch.Branch.hooks)
1440
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1448
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1449
        self.assertEqual(
 
1450
            bzrlib.smart.server.SmartServerHooks(),
1441
1451
            bzrlib.smart.server.SmartTCPServer.hooks)
1442
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1443
 
            bzrlib.commands.Command.hooks)
 
1452
        self.assertEqual(
 
1453
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1444
1454
 
1445
1455
    def test__gather_lsprof_in_benchmarks(self):
1446
1456
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1656
1666
        self.assertEqual('original', obj.test_attr)
1657
1667
 
1658
1668
 
 
1669
class _MissingFeature(tests.Feature):
 
1670
    def _probe(self):
 
1671
        return False
 
1672
missing_feature = _MissingFeature()
 
1673
 
 
1674
 
 
1675
def _get_test(name):
 
1676
    """Get an instance of a specific example test.
 
1677
 
 
1678
    We protect this in a function so that they don't auto-run in the test
 
1679
    suite.
 
1680
    """
 
1681
 
 
1682
    class ExampleTests(tests.TestCase):
 
1683
 
 
1684
        def test_fail(self):
 
1685
            mutter('this was a failing test')
 
1686
            self.fail('this test will fail')
 
1687
 
 
1688
        def test_error(self):
 
1689
            mutter('this test errored')
 
1690
            raise RuntimeError('gotcha')
 
1691
 
 
1692
        def test_missing_feature(self):
 
1693
            mutter('missing the feature')
 
1694
            self.requireFeature(missing_feature)
 
1695
 
 
1696
        def test_skip(self):
 
1697
            mutter('this test will be skipped')
 
1698
            raise tests.TestSkipped('reason')
 
1699
 
 
1700
        def test_success(self):
 
1701
            mutter('this test succeeds')
 
1702
 
 
1703
        def test_xfail(self):
 
1704
            mutter('test with expected failure')
 
1705
            self.knownFailure('this_fails')
 
1706
 
 
1707
        def test_unexpected_success(self):
 
1708
            mutter('test with unexpected success')
 
1709
            self.expectFailure('should_fail', lambda: None)
 
1710
 
 
1711
    return ExampleTests(name)
 
1712
 
 
1713
 
 
1714
class TestTestCaseLogDetails(tests.TestCase):
 
1715
 
 
1716
    def _run_test(self, test_name):
 
1717
        test = _get_test(test_name)
 
1718
        result = testtools.TestResult()
 
1719
        test.run(result)
 
1720
        return result
 
1721
 
 
1722
    def test_fail_has_log(self):
 
1723
        result = self._run_test('test_fail')
 
1724
        self.assertEqual(1, len(result.failures))
 
1725
        result_content = result.failures[0][1]
 
1726
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1727
        self.assertContainsRe(result_content, 'this was a failing test')
 
1728
 
 
1729
    def test_error_has_log(self):
 
1730
        result = self._run_test('test_error')
 
1731
        self.assertEqual(1, len(result.errors))
 
1732
        result_content = result.errors[0][1]
 
1733
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1734
        self.assertContainsRe(result_content, 'this test errored')
 
1735
 
 
1736
    def test_skip_has_no_log(self):
 
1737
        result = self._run_test('test_skip')
 
1738
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1739
        skips = result.skip_reasons['reason']
 
1740
        self.assertEqual(1, len(skips))
 
1741
        test = skips[0]
 
1742
        self.assertFalse('log' in test.getDetails())
 
1743
 
 
1744
    def test_missing_feature_has_no_log(self):
 
1745
        # testtools doesn't know about addNotSupported, so it just gets
 
1746
        # considered as a skip
 
1747
        result = self._run_test('test_missing_feature')
 
1748
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1749
        skips = result.skip_reasons[missing_feature]
 
1750
        self.assertEqual(1, len(skips))
 
1751
        test = skips[0]
 
1752
        self.assertFalse('log' in test.getDetails())
 
1753
 
 
1754
    def test_xfail_has_no_log(self):
 
1755
        result = self._run_test('test_xfail')
 
1756
        self.assertEqual(1, len(result.expectedFailures))
 
1757
        result_content = result.expectedFailures[0][1]
 
1758
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1759
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1760
 
 
1761
    def test_unexpected_success_has_log(self):
 
1762
        result = self._run_test('test_unexpected_success')
 
1763
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1764
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1765
        # expectedFailures is a list of reasons?
 
1766
        test = result.unexpectedSuccesses[0]
 
1767
        details = test.getDetails()
 
1768
        self.assertTrue('log' in details)
 
1769
 
 
1770
 
 
1771
class TestTestCloning(tests.TestCase):
 
1772
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1773
 
 
1774
    def test_cloned_testcase_does_not_share_details(self):
 
1775
        """A TestCase cloned with clone_test does not share mutable attributes
 
1776
        such as details or cleanups.
 
1777
        """
 
1778
        class Test(tests.TestCase):
 
1779
            def test_foo(self):
 
1780
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1781
        orig_test = Test('test_foo')
 
1782
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1783
        orig_test.run(unittest.TestResult())
 
1784
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1785
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1786
 
 
1787
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1788
        """Applying two levels of scenarios to a test preserves the attributes
 
1789
        added by both scenarios.
 
1790
        """
 
1791
        class Test(tests.TestCase):
 
1792
            def test_foo(self):
 
1793
                pass
 
1794
        test = Test('test_foo')
 
1795
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1796
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1797
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1798
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1799
        all_tests = list(tests.iter_suite_tests(suite))
 
1800
        self.assertLength(4, all_tests)
 
1801
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1802
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1803
 
 
1804
 
1659
1805
# NB: Don't delete this; it's not actually from 0.11!
1660
1806
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1807
def sample_deprecated_function():
1788
1934
    def test_make_branch_and_tree_with_format(self):
1789
1935
        # we should be able to supply a format to make_branch_and_tree
1790
1936
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1791
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1792
1937
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1793
1938
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1794
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1795
 
                              bzrlib.bzrdir.BzrDirFormat6)
1796
1939
 
1797
1940
    def test_make_branch_and_memory_tree(self):
1798
1941
        # we should be able to get a new branch and a mutable tree from
1815
1958
                tree.branch.repository.bzrdir.root_transport)
1816
1959
 
1817
1960
 
1818
 
class SelfTestHelper:
 
1961
class SelfTestHelper(object):
1819
1962
 
1820
1963
    def run_selftest(self, **kwargs):
1821
1964
        """Run selftest returning its output."""
1881
2024
            def __call__(test, result):
1882
2025
                test.run(result)
1883
2026
            def run(test, result):
1884
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2027
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1885
2028
                calls.append("called")
1886
2029
            def countTestCases(self):
1887
2030
                return 1
1972
2115
            load_list='missing file name', list_only=True)
1973
2116
 
1974
2117
 
 
2118
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2119
 
 
2120
    _test_needs_features = [features.subunit]
 
2121
 
 
2122
    def run_subunit_stream(self, test_name):
 
2123
        from subunit import ProtocolTestCase
 
2124
        def factory():
 
2125
            return TestUtil.TestSuite([_get_test(test_name)])
 
2126
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2127
            test_suite_factory=factory)
 
2128
        test = ProtocolTestCase(stream)
 
2129
        result = testtools.TestResult()
 
2130
        test.run(result)
 
2131
        content = stream.getvalue()
 
2132
        return content, result
 
2133
 
 
2134
    def test_fail_has_log(self):
 
2135
        content, result = self.run_subunit_stream('test_fail')
 
2136
        self.assertEqual(1, len(result.failures))
 
2137
        self.assertContainsRe(content, '(?m)^log$')
 
2138
        self.assertContainsRe(content, 'this test will fail')
 
2139
 
 
2140
    def test_error_has_log(self):
 
2141
        content, result = self.run_subunit_stream('test_error')
 
2142
        self.assertContainsRe(content, '(?m)^log$')
 
2143
        self.assertContainsRe(content, 'this test errored')
 
2144
 
 
2145
    def test_skip_has_no_log(self):
 
2146
        content, result = self.run_subunit_stream('test_skip')
 
2147
        self.assertNotContainsRe(content, '(?m)^log$')
 
2148
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2149
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2150
        skips = result.skip_reasons['reason']
 
2151
        self.assertEqual(1, len(skips))
 
2152
        test = skips[0]
 
2153
        # RemotedTestCase doesn't preserve the "details"
 
2154
        ## self.assertFalse('log' in test.getDetails())
 
2155
 
 
2156
    def test_missing_feature_has_no_log(self):
 
2157
        content, result = self.run_subunit_stream('test_missing_feature')
 
2158
        self.assertNotContainsRe(content, '(?m)^log$')
 
2159
        self.assertNotContainsRe(content, 'missing the feature')
 
2160
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2161
        skips = result.skip_reasons['_MissingFeature\n']
 
2162
        self.assertEqual(1, len(skips))
 
2163
        test = skips[0]
 
2164
        # RemotedTestCase doesn't preserve the "details"
 
2165
        ## self.assertFalse('log' in test.getDetails())
 
2166
 
 
2167
    def test_xfail_has_no_log(self):
 
2168
        content, result = self.run_subunit_stream('test_xfail')
 
2169
        self.assertNotContainsRe(content, '(?m)^log$')
 
2170
        self.assertNotContainsRe(content, 'test with expected failure')
 
2171
        self.assertEqual(1, len(result.expectedFailures))
 
2172
        result_content = result.expectedFailures[0][1]
 
2173
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2174
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2175
 
 
2176
    def test_unexpected_success_has_log(self):
 
2177
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2178
        self.assertContainsRe(content, '(?m)^log$')
 
2179
        self.assertContainsRe(content, 'test with unexpected success')
 
2180
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2181
                           ' as a plain success',
 
2182
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2183
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2184
        test = result.unexpectedSuccesses[0]
 
2185
        # RemotedTestCase doesn't preserve the "details"
 
2186
        ## self.assertTrue('log' in test.getDetails())
 
2187
 
 
2188
    def test_success_has_no_log(self):
 
2189
        content, result = self.run_subunit_stream('test_success')
 
2190
        self.assertEqual(1, result.testsRun)
 
2191
        self.assertNotContainsRe(content, '(?m)^log$')
 
2192
        self.assertNotContainsRe(content, 'this test succeeds')
 
2193
 
 
2194
 
1975
2195
class TestRunBzr(tests.TestCase):
1976
2196
 
1977
2197
    out = ''
2100
2320
        # stdout and stderr of the invoked run_bzr
2101
2321
        current_factory = bzrlib.ui.ui_factory
2102
2322
        self.run_bzr(['foo'])
2103
 
        self.failIf(current_factory is self.factory)
 
2323
        self.assertFalse(current_factory is self.factory)
2104
2324
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2105
2325
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2106
2326
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2288
2508
        self.assertEqual([], command[2:])
2289
2509
 
2290
2510
    def test_set_env(self):
2291
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2511
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2292
2512
        # set in the child
2293
2513
        def check_environment():
2294
2514
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2300
2520
 
2301
2521
    def test_run_bzr_subprocess_env_del(self):
2302
2522
        """run_bzr_subprocess can remove environment variables too."""
2303
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2523
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2304
2524
        def check_environment():
2305
2525
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2306
2526
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2312
2532
        del os.environ['EXISTANT_ENV_VAR']
2313
2533
 
2314
2534
    def test_env_del_missing(self):
2315
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2535
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2316
2536
        def check_environment():
2317
2537
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2318
2538
        self.check_popen_state = check_environment
2593
2813
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2594
2814
 
2595
2815
 
2596
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2816
class TestCheckTreeShape(tests.TestCaseWithTransport):
2597
2817
 
2598
 
    def test_check_inventory_shape(self):
 
2818
    def test_check_tree_shape(self):
2599
2819
        files = ['a', 'b/', 'b/c']
2600
2820
        tree = self.make_branch_and_tree('.')
2601
2821
        self.build_tree(files)
2602
2822
        tree.add(files)
2603
2823
        tree.lock_read()
2604
2824
        try:
2605
 
            self.check_inventory_shape(tree.inventory, files)
 
2825
            self.check_tree_shape(tree, files)
2606
2826
        finally:
2607
2827
            tree.unlock()
2608
2828
 
2940
3160
        tpr.register('bar', 'bBB.aAA.rRR')
2941
3161
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2942
3162
        self.assertThat(self.get_log(),
2943
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3163
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3164
                           doctest.ELLIPSIS))
2944
3165
 
2945
3166
    def test_get_unknown_prefix(self):
2946
3167
        tpr = self._get_registry()
2966
3187
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2967
3188
 
2968
3189
 
 
3190
class TestThreadLeakDetection(tests.TestCase):
 
3191
    """Ensure when tests leak threads we detect and report it"""
 
3192
 
 
3193
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3194
        def __init__(self):
 
3195
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3196
            self.leaks = []
 
3197
        def _report_thread_leak(self, test, leaks, alive):
 
3198
            self.leaks.append((test, leaks))
 
3199
 
 
3200
    def test_testcase_without_addCleanups(self):
 
3201
        """Check old TestCase instances don't break with leak detection"""
 
3202
        class Test(unittest.TestCase):
 
3203
            def runTest(self):
 
3204
                pass
 
3205
        result = self.LeakRecordingResult()
 
3206
        test = Test()
 
3207
        result.startTestRun()
 
3208
        test.run(result)
 
3209
        result.stopTestRun()
 
3210
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3211
        self.assertEqual(result.leaks, [])
 
3212
        
 
3213
    def test_thread_leak(self):
 
3214
        """Ensure a thread that outlives the running of a test is reported
 
3215
 
 
3216
        Uses a thread that blocks on an event, and is started by the inner
 
3217
        test case. As the thread outlives the inner case's run, it should be
 
3218
        detected as a leak, but the event is then set so that the thread can
 
3219
        be safely joined in cleanup so it's not leaked for real.
 
3220
        """
 
3221
        event = threading.Event()
 
3222
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3223
        class Test(tests.TestCase):
 
3224
            def test_leak(self):
 
3225
                thread.start()
 
3226
        result = self.LeakRecordingResult()
 
3227
        test = Test("test_leak")
 
3228
        self.addCleanup(thread.join)
 
3229
        self.addCleanup(event.set)
 
3230
        result.startTestRun()
 
3231
        test.run(result)
 
3232
        result.stopTestRun()
 
3233
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3234
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3235
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3236
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3237
 
 
3238
    def test_multiple_leaks(self):
 
3239
        """Check multiple leaks are blamed on the test cases at fault
 
3240
 
 
3241
        Same concept as the previous test, but has one inner test method that
 
3242
        leaks two threads, and one that doesn't leak at all.
 
3243
        """
 
3244
        event = threading.Event()
 
3245
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3246
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3247
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3248
        class Test(tests.TestCase):
 
3249
            def test_first_leak(self):
 
3250
                thread_b.start()
 
3251
            def test_second_no_leak(self):
 
3252
                pass
 
3253
            def test_third_leak(self):
 
3254
                thread_c.start()
 
3255
                thread_a.start()
 
3256
        result = self.LeakRecordingResult()
 
3257
        first_test = Test("test_first_leak")
 
3258
        third_test = Test("test_third_leak")
 
3259
        self.addCleanup(thread_a.join)
 
3260
        self.addCleanup(thread_b.join)
 
3261
        self.addCleanup(thread_c.join)
 
3262
        self.addCleanup(event.set)
 
3263
        result.startTestRun()
 
3264
        unittest.TestSuite(
 
3265
            [first_test, Test("test_second_no_leak"), third_test]
 
3266
            ).run(result)
 
3267
        result.stopTestRun()
 
3268
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3269
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3270
        self.assertEqual(result.leaks, [
 
3271
            (first_test, set([thread_b])),
 
3272
            (third_test, set([thread_a, thread_c]))])
 
3273
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3274
 
 
3275
 
 
3276
class TestPostMortemDebugging(tests.TestCase):
 
3277
    """Check post mortem debugging works when tests fail or error"""
 
3278
 
 
3279
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3280
        def __init__(self):
 
3281
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3282
            self.postcode = None
 
3283
        def _post_mortem(self, tb=None):
 
3284
            """Record the code object at the end of the current traceback"""
 
3285
            tb = tb or sys.exc_info()[2]
 
3286
            if tb is not None:
 
3287
                next = tb.tb_next
 
3288
                while next is not None:
 
3289
                    tb = next
 
3290
                    next = next.tb_next
 
3291
                self.postcode = tb.tb_frame.f_code
 
3292
        def report_error(self, test, err):
 
3293
            pass
 
3294
        def report_failure(self, test, err):
 
3295
            pass
 
3296
 
 
3297
    def test_location_unittest_error(self):
 
3298
        """Needs right post mortem traceback with erroring unittest case"""
 
3299
        class Test(unittest.TestCase):
 
3300
            def runTest(self):
 
3301
                raise RuntimeError
 
3302
        result = self.TracebackRecordingResult()
 
3303
        Test().run(result)
 
3304
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3305
 
 
3306
    def test_location_unittest_failure(self):
 
3307
        """Needs right post mortem traceback with failing unittest case"""
 
3308
        class Test(unittest.TestCase):
 
3309
            def runTest(self):
 
3310
                raise self.failureException
 
3311
        result = self.TracebackRecordingResult()
 
3312
        Test().run(result)
 
3313
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3314
 
 
3315
    def test_location_bt_error(self):
 
3316
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3317
        class Test(tests.TestCase):
 
3318
            def test_error(self):
 
3319
                raise RuntimeError
 
3320
        result = self.TracebackRecordingResult()
 
3321
        Test("test_error").run(result)
 
3322
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3323
 
 
3324
    def test_location_bt_failure(self):
 
3325
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3326
        class Test(tests.TestCase):
 
3327
            def test_failure(self):
 
3328
                raise self.failureException
 
3329
        result = self.TracebackRecordingResult()
 
3330
        Test("test_failure").run(result)
 
3331
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3332
 
 
3333
    def test_env_var_triggers_post_mortem(self):
 
3334
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3335
        import pdb
 
3336
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3337
        post_mortem_calls = []
 
3338
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3339
        self.overrideEnv('BZR_TEST_PDB', None)
 
3340
        result._post_mortem(1)
 
3341
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3342
        result._post_mortem(2)
 
3343
        self.assertEqual([2], post_mortem_calls)
 
3344
 
 
3345
 
2969
3346
class TestRunSuite(tests.TestCase):
2970
3347
 
2971
3348
    def test_runner_class(self):
2982
3359
                                                self.verbosity)
2983
3360
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2984
3361
        self.assertLength(1, calls)
 
3362
 
 
3363
 
 
3364
class TestEnvironHandling(tests.TestCase):
 
3365
 
 
3366
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3367
        self.assertFalse('MYVAR' in os.environ)
 
3368
        self.overrideEnv('MYVAR', '42')
 
3369
        # We use an embedded test to make sure we fix the _captureVar bug
 
3370
        class Test(tests.TestCase):
 
3371
            def test_me(self):
 
3372
                # The first call save the 42 value
 
3373
                self.overrideEnv('MYVAR', None)
 
3374
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3375
                # Make sure we can call it twice
 
3376
                self.overrideEnv('MYVAR', None)
 
3377
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3378
        output = StringIO()
 
3379
        result = tests.TextTestResult(output, 0, 1)
 
3380
        Test('test_me').run(result)
 
3381
        if not result.wasStrictlySuccessful():
 
3382
            self.fail(output.getvalue())
 
3383
        # We get our value back
 
3384
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3385
 
 
3386
 
 
3387
class TestIsolatedEnv(tests.TestCase):
 
3388
    """Test isolating tests from os.environ.
 
3389
 
 
3390
    Since we use tests that are already isolated from os.environ a bit of care
 
3391
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3392
    The tests start an already clean os.environ which allow doing valid
 
3393
    assertions about which variables are present or not and design tests around
 
3394
    these assertions.
 
3395
    """
 
3396
 
 
3397
    class ScratchMonkey(tests.TestCase):
 
3398
 
 
3399
        def test_me(self):
 
3400
            pass
 
3401
 
 
3402
    def test_basics(self):
 
3403
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3404
        # for tests.TestCase.
 
3405
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3406
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3407
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3408
        self.assertFalse('BZR_HOME' in os.environ)
 
3409
        # Make sure we know the definition of LINES: part of os.environ for
 
3410
        # tests.TestCase
 
3411
        self.assertTrue('LINES' in tests.isolated_environ)
 
3412
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3413
        self.assertEquals('25', os.environ['LINES'])
 
3414
 
 
3415
    def test_injecting_unknown_variable(self):
 
3416
        # BZR_HOME is known to be absent from os.environ
 
3417
        test = self.ScratchMonkey('test_me')
 
3418
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3419
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3420
        tests.restore_os_environ(test)
 
3421
        self.assertFalse('BZR_HOME' in os.environ)
 
3422
 
 
3423
    def test_injecting_known_variable(self):
 
3424
        test = self.ScratchMonkey('test_me')
 
3425
        # LINES is known to be present in os.environ
 
3426
        tests.override_os_environ(test, {'LINES': '42'})
 
3427
        self.assertEquals('42', os.environ['LINES'])
 
3428
        tests.restore_os_environ(test)
 
3429
        self.assertEquals('25', os.environ['LINES'])
 
3430
 
 
3431
    def test_deleting_variable(self):
 
3432
        test = self.ScratchMonkey('test_me')
 
3433
        # LINES is known to be present in os.environ
 
3434
        tests.override_os_environ(test, {'LINES': None})
 
3435
        self.assertTrue('LINES' not in os.environ)
 
3436
        tests.restore_os_environ(test)
 
3437
        self.assertEquals('25', os.environ['LINES'])
 
3438
 
 
3439
 
 
3440
class TestDocTestSuiteIsolation(tests.TestCase):
 
3441
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3442
 
 
3443
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3444
    the clean environment as a base for testing. To precisely capture the
 
3445
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3446
    compare against.
 
3447
 
 
3448
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3449
    not `os.environ` so each test overrides it to suit its needs.
 
3450
 
 
3451
    """
 
3452
 
 
3453
    def get_doctest_suite_for_string(self, klass, string):
 
3454
        class Finder(doctest.DocTestFinder):
 
3455
 
 
3456
            def find(*args, **kwargs):
 
3457
                test = doctest.DocTestParser().get_doctest(
 
3458
                    string, {}, 'foo', 'foo.py', 0)
 
3459
                return [test]
 
3460
 
 
3461
        suite = klass(test_finder=Finder())
 
3462
        return suite
 
3463
 
 
3464
    def run_doctest_suite_for_string(self, klass, string):
 
3465
        suite = self.get_doctest_suite_for_string(klass, string)
 
3466
        output = StringIO()
 
3467
        result = tests.TextTestResult(output, 0, 1)
 
3468
        suite.run(result)
 
3469
        return result, output
 
3470
 
 
3471
    def assertDocTestStringSucceds(self, klass, string):
 
3472
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3473
        if not result.wasStrictlySuccessful():
 
3474
            self.fail(output.getvalue())
 
3475
 
 
3476
    def assertDocTestStringFails(self, klass, string):
 
3477
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3478
        if result.wasStrictlySuccessful():
 
3479
            self.fail(output.getvalue())
 
3480
 
 
3481
    def test_injected_variable(self):
 
3482
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3483
        test = """
 
3484
            >>> import os
 
3485
            >>> os.environ['LINES']
 
3486
            '42'
 
3487
            """
 
3488
        # doctest.DocTestSuite fails as it sees '25'
 
3489
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3490
        # tests.DocTestSuite sees '42'
 
3491
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3492
 
 
3493
    def test_deleted_variable(self):
 
3494
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3495
        test = """
 
3496
            >>> import os
 
3497
            >>> os.environ.get('LINES')
 
3498
            """
 
3499
        # doctest.DocTestSuite fails as it sees '25'
 
3500
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3501
        # tests.DocTestSuite sees None
 
3502
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)