~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-19 10:42:59 UTC
  • mto: This revision was merged to the branch mainline in revision 5806.
  • Revision ID: jelmer@samba.org-20110419104259-g9exlcp1f5jdu3ci
Move Inventory._get_mutable_inventory -> mutable_inventory_from_tree.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
52
55
    )
53
56
from bzrlib.repofmt import (
54
57
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
58
    )
58
59
from bzrlib.symbol_versioning import (
59
60
    deprecated_function,
64
65
    features,
65
66
    test_lsprof,
66
67
    test_server,
67
 
    test_sftp_transport,
68
68
    TestUtil,
69
69
    )
70
 
from bzrlib.trace import note
 
70
from bzrlib.trace import note, mutter
71
71
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
72
 
74
73
 
75
74
def _test_ids(test_suite):
77
76
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
77
 
79
78
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
79
class MetaTestLog(tests.TestCase):
93
80
 
94
81
    def test_logging(self):
100
87
            "text", "plain", {"charset": "utf8"})))
101
88
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
89
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
90
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
91
 
105
92
 
106
93
class TestUnicodeFilename(tests.TestCase):
119
106
 
120
107
        filename = u'hell\u00d8'
121
108
        self.build_tree_contents([(filename, 'contents of hello')])
122
 
        self.failUnlessExists(filename)
 
109
        self.assertPathExists(filename)
 
110
 
 
111
 
 
112
class TestClassesAvailable(tests.TestCase):
 
113
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
114
 
 
115
    def test_test_case(self):
 
116
        from bzrlib.tests import TestCase
 
117
 
 
118
    def test_test_loader(self):
 
119
        from bzrlib.tests import TestLoader
 
120
 
 
121
    def test_test_suite(self):
 
122
        from bzrlib.tests import TestSuite
123
123
 
124
124
 
125
125
class TestTransportScenarios(tests.TestCase):
208
208
    def test_scenarios(self):
209
209
        # check that constructor parameters are passed through to the adapted
210
210
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
211
        from bzrlib.tests.per_controldir import make_scenarios
212
212
        vfs_factory = "v"
213
213
        server1 = "a"
214
214
        server2 = "b"
312
312
        from bzrlib.tests.per_interrepository import make_scenarios
313
313
        server1 = "a"
314
314
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
315
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
316
        scenarios = make_scenarios(server1, server2, formats)
317
317
        self.assertEqual([
318
318
            ('C0,str,str',
319
319
             {'repository_format': 'C1',
320
320
              'repository_format_to': 'C2',
321
321
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
322
              'transport_server': 'a',
 
323
              'extra_setup': 'C3'}),
323
324
            ('D0,str,str',
324
325
             {'repository_format': 'D1',
325
326
              'repository_format_to': 'D2',
326
327
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
328
              'transport_server': 'a',
 
329
              'extra_setup': 'D3'})],
328
330
            scenarios)
329
331
 
330
332
 
336
338
        from bzrlib.tests.per_workingtree import make_scenarios
337
339
        server1 = "a"
338
340
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
 
341
        formats = [workingtree.WorkingTreeFormat4(),
340
342
                   workingtree.WorkingTreeFormat3(),]
341
343
        scenarios = make_scenarios(server1, server2, formats)
342
344
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
345
            ('WorkingTreeFormat4',
344
346
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
347
              'transport_readonly_server': 'b',
346
348
              'transport_server': 'a',
373
375
            )
374
376
        server1 = "a"
375
377
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
 
378
        formats = [workingtree.WorkingTreeFormat4(),
377
379
                   workingtree.WorkingTreeFormat3(),]
378
380
        scenarios = make_scenarios(server1, server2, formats)
379
381
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
382
        default_wt_format = workingtree.format_registry.get_default()
381
383
        wt4_format = workingtree.WorkingTreeFormat4()
382
384
        wt5_format = workingtree.WorkingTreeFormat5()
383
385
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
386
            ('WorkingTreeFormat4',
385
387
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
388
              'transport_readonly_server': 'b',
387
389
              'transport_server': 'a',
447
449
        # ones to add.
448
450
        from bzrlib.tests.per_tree import (
449
451
            return_parameter,
450
 
            revision_tree_from_workingtree
451
452
            )
452
453
        from bzrlib.tests.per_intertree import (
453
454
            make_scenarios,
454
455
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
456
        from bzrlib.workingtree import WorkingTreeFormat3, WorkingTreeFormat4
456
457
        input_test = TestInterTreeScenarios(
457
458
            "test_scenarios")
458
459
        server1 = "a"
459
460
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
461
        format1 = WorkingTreeFormat4()
461
462
        format2 = WorkingTreeFormat3()
462
463
        formats = [("1", str, format1, format2, "converter1"),
463
464
            ("2", int, format2, format1, "converter2")]
509
510
        self.assertRaises(AssertionError, self.assertEqualStat,
510
511
            os.lstat("foo"), os.lstat("longname"))
511
512
 
 
513
    def test_failUnlessExists(self):
 
514
        """Deprecated failUnlessExists and failIfExists"""
 
515
        self.applyDeprecated(
 
516
            deprecated_in((2, 4)),
 
517
            self.failUnlessExists, '.')
 
518
        self.build_tree(['foo/', 'foo/bar'])
 
519
        self.applyDeprecated(
 
520
            deprecated_in((2, 4)),
 
521
            self.failUnlessExists, 'foo/bar')
 
522
        self.applyDeprecated(
 
523
            deprecated_in((2, 4)),
 
524
            self.failIfExists, 'foo/foo')
 
525
 
 
526
    def test_assertPathExists(self):
 
527
        self.assertPathExists('.')
 
528
        self.build_tree(['foo/', 'foo/bar'])
 
529
        self.assertPathExists('foo/bar')
 
530
        self.assertPathDoesNotExist('foo/foo')
 
531
 
512
532
 
513
533
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
514
534
 
548
568
        tree = self.make_branch_and_memory_tree('dir')
549
569
        # Guard against regression into MemoryTransport leaking
550
570
        # files to disk instead of keeping them in memory.
551
 
        self.failIf(osutils.lexists('dir'))
 
571
        self.assertFalse(osutils.lexists('dir'))
552
572
        self.assertIsInstance(tree, memorytree.MemoryTree)
553
573
 
554
574
    def test_make_branch_and_memory_tree_with_format(self):
555
575
        """make_branch_and_memory_tree should accept a format option."""
556
576
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
577
        format.repository_format = repository.format_registry.get_default()
558
578
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
579
        # Guard against regression into MemoryTransport leaking
560
580
        # files to disk instead of keeping them in memory.
561
 
        self.failIf(osutils.lexists('dir'))
 
581
        self.assertFalse(osutils.lexists('dir'))
562
582
        self.assertIsInstance(tree, memorytree.MemoryTree)
563
583
        self.assertEqual(format.repository_format.__class__,
564
584
            tree.branch.repository._format.__class__)
568
588
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
569
589
        # Guard against regression into MemoryTransport leaking
570
590
        # files to disk instead of keeping them in memory.
571
 
        self.failIf(osutils.lexists('dir'))
 
591
        self.assertFalse(osutils.lexists('dir'))
572
592
 
573
593
    def test_make_branch_builder_with_format(self):
574
594
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
595
        # that the format objects are used.
576
596
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
597
        repo_format = repository.format_registry.get_default()
578
598
        format.repository_format = repo_format
579
599
        builder = self.make_branch_builder('dir', format=format)
580
600
        the_branch = builder.get_branch()
581
601
        # Guard against regression into MemoryTransport leaking
582
602
        # files to disk instead of keeping them in memory.
583
 
        self.failIf(osutils.lexists('dir'))
 
603
        self.assertFalse(osutils.lexists('dir'))
584
604
        self.assertEqual(format.repository_format.__class__,
585
605
                         the_branch.repository._format.__class__)
586
606
        self.assertEqual(repo_format.get_format_string(),
592
612
        the_branch = builder.get_branch()
593
613
        # Guard against regression into MemoryTransport leaking
594
614
        # files to disk instead of keeping them in memory.
595
 
        self.failIf(osutils.lexists('dir'))
 
615
        self.assertFalse(osutils.lexists('dir'))
596
616
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
597
617
        self.assertEqual(dir_format.repository_format.__class__,
598
618
                         the_branch.repository._format.__class__)
609
629
                l.attempt_lock()
610
630
        test = TestDanglingLock('test_function')
611
631
        result = test.run()
 
632
        total_failures = result.errors + result.failures
612
633
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
634
            self.assertEqual(1, len(total_failures))
614
635
        else:
615
636
            # When _lock_check_thorough is disabled, then we don't trigger a
616
637
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
638
            self.assertEqual(0, len(total_failures))
618
639
 
619
640
 
620
641
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
642
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
643
 
623
644
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
645
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
646
        self.vfs_transport_factory = memory.MemoryServer
627
647
        self.transport_readonly_server = None
629
649
        # for the server
630
650
        url = self.get_readonly_url()
631
651
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
634
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
652
        t = transport.get_transport(url)
 
653
        t2 = transport.get_transport(url2)
 
654
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
655
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
656
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
657
 
638
658
    def test_get_readonly_url_http(self):
639
659
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
660
        from bzrlib.transport.http import HttpTransportBase
642
661
        self.transport_server = test_server.LocalURLServer
643
662
        self.transport_readonly_server = HttpServer
645
664
        url = self.get_readonly_url()
646
665
        url2 = self.get_readonly_url('foo/bar')
647
666
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
650
 
        self.failUnless(isinstance(t, HttpTransportBase))
651
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
667
        t = transport.get_transport(url)
 
668
        t2 = transport.get_transport(url2)
 
669
        self.assertIsInstance(t, HttpTransportBase)
 
670
        self.assertIsInstance(t2, HttpTransportBase)
652
671
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
653
672
 
654
673
    def test_is_directory(self):
662
681
    def test_make_branch_builder(self):
663
682
        builder = self.make_branch_builder('dir')
664
683
        rev_id = builder.build_commit()
665
 
        self.failUnlessExists('dir')
 
684
        self.assertPathExists('dir')
666
685
        a_dir = bzrdir.BzrDir.open('dir')
667
686
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
668
687
        a_branch = a_dir.open_branch()
684
703
        self.assertIsInstance(result_bzrdir.transport,
685
704
                              memory.MemoryTransport)
686
705
        # should not be on disk, should only be in memory
687
 
        self.failIfExists('subdir')
 
706
        self.assertPathDoesNotExist('subdir')
688
707
 
689
708
 
690
709
class TestChrootedTest(tests.ChrootedTestCase):
691
710
 
692
711
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
712
        t = transport.get_transport(self.get_readonly_url())
695
713
        url = t.base
696
714
        self.assertEqual(url, t.clone('..').base)
697
715
 
750
768
        self.check_timing(ShortDelayTestCase('test_short_delay'),
751
769
                          r"^ +[0-9]+ms$")
752
770
 
753
 
    def _patch_get_bzr_source_tree(self):
754
 
        # Reading from the actual source tree breaks isolation, but we don't
755
 
        # want to assume that thats *all* that would happen.
756
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
757
 
 
758
 
    def test_assigned_benchmark_file_stores_date(self):
759
 
        self._patch_get_bzr_source_tree()
760
 
        output = StringIO()
761
 
        result = bzrlib.tests.TextTestResult(self._log_file,
762
 
                                        descriptions=0,
763
 
                                        verbosity=1,
764
 
                                        bench_history=output
765
 
                                        )
766
 
        output_string = output.getvalue()
767
 
        # if you are wondering about the regexp please read the comment in
768
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
 
        # XXX: what comment?  -- Andrew Bennetts
770
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
771
 
 
772
 
    def test_benchhistory_records_test_times(self):
773
 
        self._patch_get_bzr_source_tree()
774
 
        result_stream = StringIO()
775
 
        result = bzrlib.tests.TextTestResult(
776
 
            self._log_file,
777
 
            descriptions=0,
778
 
            verbosity=1,
779
 
            bench_history=result_stream
780
 
            )
781
 
 
782
 
        # we want profile a call and check that its test duration is recorded
783
 
        # make a new test instance that when run will generate a benchmark
784
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
785
 
        # execute the test, which should succeed and record times
786
 
        example_test_case.run(result)
787
 
        lines = result_stream.getvalue().splitlines()
788
 
        self.assertEqual(2, len(lines))
789
 
        self.assertContainsRe(lines[1],
790
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
 
            "._time_hello_world_encoding")
792
 
 
793
771
    def _time_hello_world_encoding(self):
794
772
        """Profile two sleep calls
795
773
 
803
781
        self.requireFeature(test_lsprof.LSProfFeature)
804
782
        result_stream = StringIO()
805
783
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
784
            result_stream,
807
785
            descriptions=0,
808
786
            verbosity=2,
809
787
            )
835
813
        self.assertContainsRe(output,
836
814
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
815
 
 
816
    def test_uses_time_from_testtools(self):
 
817
        """Test case timings in verbose results should use testtools times"""
 
818
        import datetime
 
819
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
820
            def startTest(self, test):
 
821
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
822
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
823
            def addSuccess(self, test):
 
824
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
825
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
826
            def report_tests_starting(self): pass
 
827
        sio = StringIO()
 
828
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
829
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
830
 
838
831
    def test_known_failure(self):
839
832
        """A KnownFailure being raised should trigger several result actions."""
840
833
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
834
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
835
            def report_tests_starting(self): pass
844
836
            def report_known_failure(self, test, err=None, details=None):
845
837
                self._call = test, 'known failure'
846
838
        result = InstrumentedTestResult(None, None, None, None)
864
856
        # verbose test output formatting
865
857
        result_stream = StringIO()
866
858
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
859
            result_stream,
868
860
            descriptions=0,
869
861
            verbosity=2,
870
862
            )
880
872
        output = result_stream.getvalue()[prefix:]
881
873
        lines = output.splitlines()
882
874
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
875
        if sys.version_info > (2, 7):
 
876
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
877
                self.assertNotEqual, lines[1], '    ')
883
878
        self.assertEqual(lines[1], '    foo')
884
879
        self.assertEqual(2, len(lines))
885
880
 
893
888
        """Test the behaviour of invoking addNotSupported."""
894
889
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
890
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
891
            def report_tests_starting(self): pass
898
892
            def report_unsupported(self, test, feature):
899
893
                self._call = test, feature
900
894
        result = InstrumentedTestResult(None, None, None, None)
919
913
        # verbose test output formatting
920
914
        result_stream = StringIO()
921
915
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
916
            result_stream,
923
917
            descriptions=0,
924
918
            verbosity=2,
925
919
            )
939
933
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
934
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
935
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
936
            def report_tests_starting(self): pass
944
937
            def addNotSupported(self, test, feature):
945
938
                self._call = test, feature
946
939
        result = InstrumentedTestResult(None, None, None, None)
988
981
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
982
            calls = 0
990
983
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
984
        result = InstrumentedTestResult(None, None, None, None)
993
985
        def test_function():
994
986
            pass
996
988
        test.run(result)
997
989
        self.assertEquals(1, result.calls)
998
990
 
 
991
    def test_startTests_only_once(self):
 
992
        """With multiple tests startTests should still only be called once"""
 
993
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
994
            calls = 0
 
995
            def startTests(self): self.calls += 1
 
996
        result = InstrumentedTestResult(None, None, None, None)
 
997
        suite = unittest.TestSuite([
 
998
            unittest.FunctionTestCase(lambda: None),
 
999
            unittest.FunctionTestCase(lambda: None)])
 
1000
        suite.run(result)
 
1001
        self.assertEquals(1, result.calls)
 
1002
        self.assertEquals(2, result.count)
 
1003
 
999
1004
 
1000
1005
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1006
 
1022
1027
        because of our use of global state.
1023
1028
        """
1024
1029
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1030
        try:
1027
1031
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1032
            return testrunner.run(test)
1030
1033
        finally:
1031
1034
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1035
 
1034
1036
    def test_known_failure_failed_run(self):
1035
1037
        # run a test that generates a known failure which should be printed in
1081
1083
    def test_result_decorator(self):
1082
1084
        # decorate results
1083
1085
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1086
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1087
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1088
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1089
                calls.append('start')
1088
1090
        test = unittest.FunctionTestCase(lambda:None)
1089
1091
        stream = StringIO()
1190
1192
            ],
1191
1193
            lines[-3:])
1192
1194
 
1193
 
    def _patch_get_bzr_source_tree(self):
1194
 
        # Reading from the actual source tree breaks isolation, but we don't
1195
 
        # want to assume that thats *all* that would happen.
1196
 
        self._get_source_tree_calls = []
1197
 
        def new_get():
1198
 
            self._get_source_tree_calls.append("called")
1199
 
            return None
1200
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1201
 
 
1202
 
    def test_bench_history(self):
1203
 
        # tests that the running the benchmark passes bench_history into
1204
 
        # the test result object. We can tell that happens if
1205
 
        # _get_bzr_source_tree is called.
1206
 
        self._patch_get_bzr_source_tree()
1207
 
        test = TestRunner('dummy_test')
1208
 
        output = StringIO()
1209
 
        runner = tests.TextTestRunner(stream=self._log_file,
1210
 
                                      bench_history=output)
1211
 
        result = self.run_test_runner(runner, test)
1212
 
        output_string = output.getvalue()
1213
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
 
        self.assertLength(1, self._get_source_tree_calls)
 
1195
    def test_verbose_test_count(self):
 
1196
        """A verbose test run reports the right test count at the start"""
 
1197
        suite = TestUtil.TestSuite([
 
1198
            unittest.FunctionTestCase(lambda:None),
 
1199
            unittest.FunctionTestCase(lambda:None)])
 
1200
        self.assertEqual(suite.countTestCases(), 2)
 
1201
        stream = StringIO()
 
1202
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1203
        # Need to use the CountingDecorator as that's what sets num_tests
 
1204
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1205
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1215
1206
 
1216
1207
    def test_startTestRun(self):
1217
1208
        """run should call result.startTestRun()"""
1218
1209
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1210
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1211
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1212
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1213
                calls.append('startTestRun')
1223
1214
        test = unittest.FunctionTestCase(lambda:None)
1224
1215
        stream = StringIO()
1230
1221
    def test_stopTestRun(self):
1231
1222
        """run should call result.stopTestRun()"""
1232
1223
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1224
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1225
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1226
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1227
                calls.append('stopTestRun')
1237
1228
        test = unittest.FunctionTestCase(lambda:None)
1238
1229
        stream = StringIO()
1241
1232
        result = self.run_test_runner(runner, test)
1242
1233
        self.assertLength(1, calls)
1243
1234
 
 
1235
    def test_unicode_test_output_on_ascii_stream(self):
 
1236
        """Showing results should always succeed even on an ascii console"""
 
1237
        class FailureWithUnicode(tests.TestCase):
 
1238
            def test_log_unicode(self):
 
1239
                self.log(u"\u2606")
 
1240
                self.fail("Now print that log!")
 
1241
        out = StringIO()
 
1242
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1243
            lambda trace=False: "ascii")
 
1244
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1245
            FailureWithUnicode("test_log_unicode"))
 
1246
        self.assertContainsRe(out.getvalue(),
 
1247
            "Text attachment: log\n"
 
1248
            "-+\n"
 
1249
            "\d+\.\d+  \\\\u2606\n"
 
1250
            "-+\n")
 
1251
 
1244
1252
 
1245
1253
class SampleTestCase(tests.TestCase):
1246
1254
 
1421
1429
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1430
        output_stream = StringIO()
1423
1431
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1432
            output_stream,
1425
1433
            descriptions=0,
1426
1434
            verbosity=2)
1427
1435
        sample_test.run(result)
1434
1442
        # Note this test won't fail with hooks that the core library doesn't
1435
1443
        # use - but it trigger with a plugin that adds hooks, so its still a
1436
1444
        # useful warning in that case.
1437
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1438
 
            bzrlib.branch.Branch.hooks)
1439
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1445
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1446
        self.assertEqual(
 
1447
            bzrlib.smart.server.SmartServerHooks(),
1440
1448
            bzrlib.smart.server.SmartTCPServer.hooks)
1441
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1442
 
            bzrlib.commands.Command.hooks)
 
1449
        self.assertEqual(
 
1450
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1443
1451
 
1444
1452
    def test__gather_lsprof_in_benchmarks(self):
1445
1453
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1655
1663
        self.assertEqual('original', obj.test_attr)
1656
1664
 
1657
1665
 
 
1666
class _MissingFeature(tests.Feature):
 
1667
    def _probe(self):
 
1668
        return False
 
1669
missing_feature = _MissingFeature()
 
1670
 
 
1671
 
 
1672
def _get_test(name):
 
1673
    """Get an instance of a specific example test.
 
1674
 
 
1675
    We protect this in a function so that they don't auto-run in the test
 
1676
    suite.
 
1677
    """
 
1678
 
 
1679
    class ExampleTests(tests.TestCase):
 
1680
 
 
1681
        def test_fail(self):
 
1682
            mutter('this was a failing test')
 
1683
            self.fail('this test will fail')
 
1684
 
 
1685
        def test_error(self):
 
1686
            mutter('this test errored')
 
1687
            raise RuntimeError('gotcha')
 
1688
 
 
1689
        def test_missing_feature(self):
 
1690
            mutter('missing the feature')
 
1691
            self.requireFeature(missing_feature)
 
1692
 
 
1693
        def test_skip(self):
 
1694
            mutter('this test will be skipped')
 
1695
            raise tests.TestSkipped('reason')
 
1696
 
 
1697
        def test_success(self):
 
1698
            mutter('this test succeeds')
 
1699
 
 
1700
        def test_xfail(self):
 
1701
            mutter('test with expected failure')
 
1702
            self.knownFailure('this_fails')
 
1703
 
 
1704
        def test_unexpected_success(self):
 
1705
            mutter('test with unexpected success')
 
1706
            self.expectFailure('should_fail', lambda: None)
 
1707
 
 
1708
    return ExampleTests(name)
 
1709
 
 
1710
 
 
1711
class TestTestCaseLogDetails(tests.TestCase):
 
1712
 
 
1713
    def _run_test(self, test_name):
 
1714
        test = _get_test(test_name)
 
1715
        result = testtools.TestResult()
 
1716
        test.run(result)
 
1717
        return result
 
1718
 
 
1719
    def test_fail_has_log(self):
 
1720
        result = self._run_test('test_fail')
 
1721
        self.assertEqual(1, len(result.failures))
 
1722
        result_content = result.failures[0][1]
 
1723
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1724
        self.assertContainsRe(result_content, 'this was a failing test')
 
1725
 
 
1726
    def test_error_has_log(self):
 
1727
        result = self._run_test('test_error')
 
1728
        self.assertEqual(1, len(result.errors))
 
1729
        result_content = result.errors[0][1]
 
1730
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1731
        self.assertContainsRe(result_content, 'this test errored')
 
1732
 
 
1733
    def test_skip_has_no_log(self):
 
1734
        result = self._run_test('test_skip')
 
1735
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1736
        skips = result.skip_reasons['reason']
 
1737
        self.assertEqual(1, len(skips))
 
1738
        test = skips[0]
 
1739
        self.assertFalse('log' in test.getDetails())
 
1740
 
 
1741
    def test_missing_feature_has_no_log(self):
 
1742
        # testtools doesn't know about addNotSupported, so it just gets
 
1743
        # considered as a skip
 
1744
        result = self._run_test('test_missing_feature')
 
1745
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1746
        skips = result.skip_reasons[missing_feature]
 
1747
        self.assertEqual(1, len(skips))
 
1748
        test = skips[0]
 
1749
        self.assertFalse('log' in test.getDetails())
 
1750
 
 
1751
    def test_xfail_has_no_log(self):
 
1752
        result = self._run_test('test_xfail')
 
1753
        self.assertEqual(1, len(result.expectedFailures))
 
1754
        result_content = result.expectedFailures[0][1]
 
1755
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1756
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1757
 
 
1758
    def test_unexpected_success_has_log(self):
 
1759
        result = self._run_test('test_unexpected_success')
 
1760
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1761
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1762
        # expectedFailures is a list of reasons?
 
1763
        test = result.unexpectedSuccesses[0]
 
1764
        details = test.getDetails()
 
1765
        self.assertTrue('log' in details)
 
1766
 
 
1767
 
 
1768
class TestTestCloning(tests.TestCase):
 
1769
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1770
 
 
1771
    def test_cloned_testcase_does_not_share_details(self):
 
1772
        """A TestCase cloned with clone_test does not share mutable attributes
 
1773
        such as details or cleanups.
 
1774
        """
 
1775
        class Test(tests.TestCase):
 
1776
            def test_foo(self):
 
1777
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1778
        orig_test = Test('test_foo')
 
1779
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1780
        orig_test.run(unittest.TestResult())
 
1781
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1782
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1783
 
 
1784
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1785
        """Applying two levels of scenarios to a test preserves the attributes
 
1786
        added by both scenarios.
 
1787
        """
 
1788
        class Test(tests.TestCase):
 
1789
            def test_foo(self):
 
1790
                pass
 
1791
        test = Test('test_foo')
 
1792
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1793
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1794
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1795
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1796
        all_tests = list(tests.iter_suite_tests(suite))
 
1797
        self.assertLength(4, all_tests)
 
1798
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1799
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1800
 
 
1801
 
1658
1802
# NB: Don't delete this; it's not actually from 0.11!
1659
1803
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1804
def sample_deprecated_function():
1787
1931
    def test_make_branch_and_tree_with_format(self):
1788
1932
        # we should be able to supply a format to make_branch_and_tree
1789
1933
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1790
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1791
1934
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1792
1935
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1793
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1794
 
                              bzrlib.bzrdir.BzrDirFormat6)
1795
1936
 
1796
1937
    def test_make_branch_and_memory_tree(self):
1797
1938
        # we should be able to get a new branch and a mutable tree from
1814
1955
                tree.branch.repository.bzrdir.root_transport)
1815
1956
 
1816
1957
 
1817
 
class SelfTestHelper:
 
1958
class SelfTestHelper(object):
1818
1959
 
1819
1960
    def run_selftest(self, **kwargs):
1820
1961
        """Run selftest returning its output."""
1880
2021
            def __call__(test, result):
1881
2022
                test.run(result)
1882
2023
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2024
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2025
                calls.append("called")
1885
2026
            def countTestCases(self):
1886
2027
                return 1
1971
2112
            load_list='missing file name', list_only=True)
1972
2113
 
1973
2114
 
 
2115
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2116
 
 
2117
    _test_needs_features = [features.subunit]
 
2118
 
 
2119
    def run_subunit_stream(self, test_name):
 
2120
        from subunit import ProtocolTestCase
 
2121
        def factory():
 
2122
            return TestUtil.TestSuite([_get_test(test_name)])
 
2123
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2124
            test_suite_factory=factory)
 
2125
        test = ProtocolTestCase(stream)
 
2126
        result = testtools.TestResult()
 
2127
        test.run(result)
 
2128
        content = stream.getvalue()
 
2129
        return content, result
 
2130
 
 
2131
    def test_fail_has_log(self):
 
2132
        content, result = self.run_subunit_stream('test_fail')
 
2133
        self.assertEqual(1, len(result.failures))
 
2134
        self.assertContainsRe(content, '(?m)^log$')
 
2135
        self.assertContainsRe(content, 'this test will fail')
 
2136
 
 
2137
    def test_error_has_log(self):
 
2138
        content, result = self.run_subunit_stream('test_error')
 
2139
        self.assertContainsRe(content, '(?m)^log$')
 
2140
        self.assertContainsRe(content, 'this test errored')
 
2141
 
 
2142
    def test_skip_has_no_log(self):
 
2143
        content, result = self.run_subunit_stream('test_skip')
 
2144
        self.assertNotContainsRe(content, '(?m)^log$')
 
2145
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2146
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2147
        skips = result.skip_reasons['reason']
 
2148
        self.assertEqual(1, len(skips))
 
2149
        test = skips[0]
 
2150
        # RemotedTestCase doesn't preserve the "details"
 
2151
        ## self.assertFalse('log' in test.getDetails())
 
2152
 
 
2153
    def test_missing_feature_has_no_log(self):
 
2154
        content, result = self.run_subunit_stream('test_missing_feature')
 
2155
        self.assertNotContainsRe(content, '(?m)^log$')
 
2156
        self.assertNotContainsRe(content, 'missing the feature')
 
2157
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2158
        skips = result.skip_reasons['_MissingFeature\n']
 
2159
        self.assertEqual(1, len(skips))
 
2160
        test = skips[0]
 
2161
        # RemotedTestCase doesn't preserve the "details"
 
2162
        ## self.assertFalse('log' in test.getDetails())
 
2163
 
 
2164
    def test_xfail_has_no_log(self):
 
2165
        content, result = self.run_subunit_stream('test_xfail')
 
2166
        self.assertNotContainsRe(content, '(?m)^log$')
 
2167
        self.assertNotContainsRe(content, 'test with expected failure')
 
2168
        self.assertEqual(1, len(result.expectedFailures))
 
2169
        result_content = result.expectedFailures[0][1]
 
2170
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2171
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2172
 
 
2173
    def test_unexpected_success_has_log(self):
 
2174
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2175
        self.assertContainsRe(content, '(?m)^log$')
 
2176
        self.assertContainsRe(content, 'test with unexpected success')
 
2177
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2178
                           ' as a plain success',
 
2179
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2180
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2181
        test = result.unexpectedSuccesses[0]
 
2182
        # RemotedTestCase doesn't preserve the "details"
 
2183
        ## self.assertTrue('log' in test.getDetails())
 
2184
 
 
2185
    def test_success_has_no_log(self):
 
2186
        content, result = self.run_subunit_stream('test_success')
 
2187
        self.assertEqual(1, result.testsRun)
 
2188
        self.assertNotContainsRe(content, '(?m)^log$')
 
2189
        self.assertNotContainsRe(content, 'this test succeeds')
 
2190
 
 
2191
 
1974
2192
class TestRunBzr(tests.TestCase):
1975
2193
 
1976
2194
    out = ''
2099
2317
        # stdout and stderr of the invoked run_bzr
2100
2318
        current_factory = bzrlib.ui.ui_factory
2101
2319
        self.run_bzr(['foo'])
2102
 
        self.failIf(current_factory is self.factory)
 
2320
        self.assertFalse(current_factory is self.factory)
2103
2321
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2104
2322
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2105
2323
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2287
2505
        self.assertEqual([], command[2:])
2288
2506
 
2289
2507
    def test_set_env(self):
2290
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2508
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2509
        # set in the child
2292
2510
        def check_environment():
2293
2511
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2299
2517
 
2300
2518
    def test_run_bzr_subprocess_env_del(self):
2301
2519
        """run_bzr_subprocess can remove environment variables too."""
2302
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2520
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2521
        def check_environment():
2304
2522
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2523
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2311
2529
        del os.environ['EXISTANT_ENV_VAR']
2312
2530
 
2313
2531
    def test_env_del_missing(self):
2314
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2532
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2533
        def check_environment():
2316
2534
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2535
        self.check_popen_state = check_environment
2339
2557
            os.chdir = orig_chdir
2340
2558
        self.assertEqual(['foo', 'current'], chdirs)
2341
2559
 
 
2560
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2561
        self.get_source_path = lambda: ""
 
2562
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2563
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2564
 
2342
2565
 
2343
2566
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2567
    """Tests that really need to do things with an external bzr."""
2934
3157
        tpr.register('bar', 'bBB.aAA.rRR')
2935
3158
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2936
3159
        self.assertThat(self.get_log(),
2937
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3160
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3161
                           doctest.ELLIPSIS))
2938
3162
 
2939
3163
    def test_get_unknown_prefix(self):
2940
3164
        tpr = self._get_registry()
2960
3184
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3185
 
2962
3186
 
 
3187
class TestThreadLeakDetection(tests.TestCase):
 
3188
    """Ensure when tests leak threads we detect and report it"""
 
3189
 
 
3190
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3191
        def __init__(self):
 
3192
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3193
            self.leaks = []
 
3194
        def _report_thread_leak(self, test, leaks, alive):
 
3195
            self.leaks.append((test, leaks))
 
3196
 
 
3197
    def test_testcase_without_addCleanups(self):
 
3198
        """Check old TestCase instances don't break with leak detection"""
 
3199
        class Test(unittest.TestCase):
 
3200
            def runTest(self):
 
3201
                pass
 
3202
        result = self.LeakRecordingResult()
 
3203
        test = Test()
 
3204
        result.startTestRun()
 
3205
        test.run(result)
 
3206
        result.stopTestRun()
 
3207
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3208
        self.assertEqual(result.leaks, [])
 
3209
        
 
3210
    def test_thread_leak(self):
 
3211
        """Ensure a thread that outlives the running of a test is reported
 
3212
 
 
3213
        Uses a thread that blocks on an event, and is started by the inner
 
3214
        test case. As the thread outlives the inner case's run, it should be
 
3215
        detected as a leak, but the event is then set so that the thread can
 
3216
        be safely joined in cleanup so it's not leaked for real.
 
3217
        """
 
3218
        event = threading.Event()
 
3219
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3220
        class Test(tests.TestCase):
 
3221
            def test_leak(self):
 
3222
                thread.start()
 
3223
        result = self.LeakRecordingResult()
 
3224
        test = Test("test_leak")
 
3225
        self.addCleanup(thread.join)
 
3226
        self.addCleanup(event.set)
 
3227
        result.startTestRun()
 
3228
        test.run(result)
 
3229
        result.stopTestRun()
 
3230
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3231
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3232
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3233
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3234
 
 
3235
    def test_multiple_leaks(self):
 
3236
        """Check multiple leaks are blamed on the test cases at fault
 
3237
 
 
3238
        Same concept as the previous test, but has one inner test method that
 
3239
        leaks two threads, and one that doesn't leak at all.
 
3240
        """
 
3241
        event = threading.Event()
 
3242
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3243
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3244
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3245
        class Test(tests.TestCase):
 
3246
            def test_first_leak(self):
 
3247
                thread_b.start()
 
3248
            def test_second_no_leak(self):
 
3249
                pass
 
3250
            def test_third_leak(self):
 
3251
                thread_c.start()
 
3252
                thread_a.start()
 
3253
        result = self.LeakRecordingResult()
 
3254
        first_test = Test("test_first_leak")
 
3255
        third_test = Test("test_third_leak")
 
3256
        self.addCleanup(thread_a.join)
 
3257
        self.addCleanup(thread_b.join)
 
3258
        self.addCleanup(thread_c.join)
 
3259
        self.addCleanup(event.set)
 
3260
        result.startTestRun()
 
3261
        unittest.TestSuite(
 
3262
            [first_test, Test("test_second_no_leak"), third_test]
 
3263
            ).run(result)
 
3264
        result.stopTestRun()
 
3265
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3266
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3267
        self.assertEqual(result.leaks, [
 
3268
            (first_test, set([thread_b])),
 
3269
            (third_test, set([thread_a, thread_c]))])
 
3270
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3271
 
 
3272
 
 
3273
class TestPostMortemDebugging(tests.TestCase):
 
3274
    """Check post mortem debugging works when tests fail or error"""
 
3275
 
 
3276
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3277
        def __init__(self):
 
3278
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3279
            self.postcode = None
 
3280
        def _post_mortem(self, tb=None):
 
3281
            """Record the code object at the end of the current traceback"""
 
3282
            tb = tb or sys.exc_info()[2]
 
3283
            if tb is not None:
 
3284
                next = tb.tb_next
 
3285
                while next is not None:
 
3286
                    tb = next
 
3287
                    next = next.tb_next
 
3288
                self.postcode = tb.tb_frame.f_code
 
3289
        def report_error(self, test, err):
 
3290
            pass
 
3291
        def report_failure(self, test, err):
 
3292
            pass
 
3293
 
 
3294
    def test_location_unittest_error(self):
 
3295
        """Needs right post mortem traceback with erroring unittest case"""
 
3296
        class Test(unittest.TestCase):
 
3297
            def runTest(self):
 
3298
                raise RuntimeError
 
3299
        result = self.TracebackRecordingResult()
 
3300
        Test().run(result)
 
3301
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3302
 
 
3303
    def test_location_unittest_failure(self):
 
3304
        """Needs right post mortem traceback with failing unittest case"""
 
3305
        class Test(unittest.TestCase):
 
3306
            def runTest(self):
 
3307
                raise self.failureException
 
3308
        result = self.TracebackRecordingResult()
 
3309
        Test().run(result)
 
3310
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3311
 
 
3312
    def test_location_bt_error(self):
 
3313
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3314
        class Test(tests.TestCase):
 
3315
            def test_error(self):
 
3316
                raise RuntimeError
 
3317
        result = self.TracebackRecordingResult()
 
3318
        Test("test_error").run(result)
 
3319
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3320
 
 
3321
    def test_location_bt_failure(self):
 
3322
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3323
        class Test(tests.TestCase):
 
3324
            def test_failure(self):
 
3325
                raise self.failureException
 
3326
        result = self.TracebackRecordingResult()
 
3327
        Test("test_failure").run(result)
 
3328
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3329
 
 
3330
    def test_env_var_triggers_post_mortem(self):
 
3331
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3332
        import pdb
 
3333
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3334
        post_mortem_calls = []
 
3335
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3336
        self.overrideEnv('BZR_TEST_PDB', None)
 
3337
        result._post_mortem(1)
 
3338
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3339
        result._post_mortem(2)
 
3340
        self.assertEqual([2], post_mortem_calls)
 
3341
 
 
3342
 
2963
3343
class TestRunSuite(tests.TestCase):
2964
3344
 
2965
3345
    def test_runner_class(self):
2976
3356
                                                self.verbosity)
2977
3357
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3358
        self.assertLength(1, calls)
 
3359
 
 
3360
 
 
3361
class TestEnvironHandling(tests.TestCase):
 
3362
 
 
3363
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3364
        self.assertFalse('MYVAR' in os.environ)
 
3365
        self.overrideEnv('MYVAR', '42')
 
3366
        # We use an embedded test to make sure we fix the _captureVar bug
 
3367
        class Test(tests.TestCase):
 
3368
            def test_me(self):
 
3369
                # The first call save the 42 value
 
3370
                self.overrideEnv('MYVAR', None)
 
3371
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3372
                # Make sure we can call it twice
 
3373
                self.overrideEnv('MYVAR', None)
 
3374
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3375
        output = StringIO()
 
3376
        result = tests.TextTestResult(output, 0, 1)
 
3377
        Test('test_me').run(result)
 
3378
        if not result.wasStrictlySuccessful():
 
3379
            self.fail(output.getvalue())
 
3380
        # We get our value back
 
3381
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3382
 
 
3383
 
 
3384
class TestIsolatedEnv(tests.TestCase):
 
3385
    """Test isolating tests from os.environ.
 
3386
 
 
3387
    Since we use tests that are already isolated from os.environ a bit of care
 
3388
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3389
    The tests start an already clean os.environ which allow doing valid
 
3390
    assertions about which variables are present or not and design tests around
 
3391
    these assertions.
 
3392
    """
 
3393
 
 
3394
    class ScratchMonkey(tests.TestCase):
 
3395
 
 
3396
        def test_me(self):
 
3397
            pass
 
3398
 
 
3399
    def test_basics(self):
 
3400
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3401
        # for tests.TestCase.
 
3402
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3403
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3404
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3405
        self.assertFalse('BZR_HOME' in os.environ)
 
3406
        # Make sure we know the definition of LINES: part of os.environ for
 
3407
        # tests.TestCase
 
3408
        self.assertTrue('LINES' in tests.isolated_environ)
 
3409
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3410
        self.assertEquals('25', os.environ['LINES'])
 
3411
 
 
3412
    def test_injecting_unknown_variable(self):
 
3413
        # BZR_HOME is known to be absent from os.environ
 
3414
        test = self.ScratchMonkey('test_me')
 
3415
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3416
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3417
        tests.restore_os_environ(test)
 
3418
        self.assertFalse('BZR_HOME' in os.environ)
 
3419
 
 
3420
    def test_injecting_known_variable(self):
 
3421
        test = self.ScratchMonkey('test_me')
 
3422
        # LINES is known to be present in os.environ
 
3423
        tests.override_os_environ(test, {'LINES': '42'})
 
3424
        self.assertEquals('42', os.environ['LINES'])
 
3425
        tests.restore_os_environ(test)
 
3426
        self.assertEquals('25', os.environ['LINES'])
 
3427
 
 
3428
    def test_deleting_variable(self):
 
3429
        test = self.ScratchMonkey('test_me')
 
3430
        # LINES is known to be present in os.environ
 
3431
        tests.override_os_environ(test, {'LINES': None})
 
3432
        self.assertTrue('LINES' not in os.environ)
 
3433
        tests.restore_os_environ(test)
 
3434
        self.assertEquals('25', os.environ['LINES'])
 
3435
 
 
3436
 
 
3437
class TestDocTestSuiteIsolation(tests.TestCase):
 
3438
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3439
 
 
3440
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3441
    the clean environment as a base for testing. To precisely capture the
 
3442
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3443
    compare against.
 
3444
 
 
3445
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3446
    not `os.environ` so each test overrides it to suit its needs.
 
3447
 
 
3448
    """
 
3449
 
 
3450
    def get_doctest_suite_for_string(self, klass, string):
 
3451
        class Finder(doctest.DocTestFinder):
 
3452
 
 
3453
            def find(*args, **kwargs):
 
3454
                test = doctest.DocTestParser().get_doctest(
 
3455
                    string, {}, 'foo', 'foo.py', 0)
 
3456
                return [test]
 
3457
 
 
3458
        suite = klass(test_finder=Finder())
 
3459
        return suite
 
3460
 
 
3461
    def run_doctest_suite_for_string(self, klass, string):
 
3462
        suite = self.get_doctest_suite_for_string(klass, string)
 
3463
        output = StringIO()
 
3464
        result = tests.TextTestResult(output, 0, 1)
 
3465
        suite.run(result)
 
3466
        return result, output
 
3467
 
 
3468
    def assertDocTestStringSucceds(self, klass, string):
 
3469
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3470
        if not result.wasStrictlySuccessful():
 
3471
            self.fail(output.getvalue())
 
3472
 
 
3473
    def assertDocTestStringFails(self, klass, string):
 
3474
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3475
        if result.wasStrictlySuccessful():
 
3476
            self.fail(output.getvalue())
 
3477
 
 
3478
    def test_injected_variable(self):
 
3479
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3480
        test = """
 
3481
            >>> import os
 
3482
            >>> os.environ['LINES']
 
3483
            '42'
 
3484
            """
 
3485
        # doctest.DocTestSuite fails as it sees '25'
 
3486
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3487
        # tests.DocTestSuite sees '42'
 
3488
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3489
 
 
3490
    def test_deleted_variable(self):
 
3491
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3492
        test = """
 
3493
            >>> import os
 
3494
            >>> os.environ.get('LINES')
 
3495
            """
 
3496
        # doctest.DocTestSuite fails as it sees '25'
 
3497
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3498
        # tests.DocTestSuite sees None
 
3499
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)