~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Andrew Bennetts
  • Date: 2011-05-05 06:11:43 UTC
  • mto: This revision was merged to the branch mainline in revision 5831.
  • Revision ID: andrew.bennetts@canonical.com-20110505061143-54119u25xtrfd28m
Share _group_cache when using GroupCompressVersionedFiles.without_fallbacks, and move _find_parents_keys_of_revisions call to before we clear repo.revisions' cache.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
49
52
    tests,
50
53
    transport,
51
54
    workingtree,
 
55
    workingtree_4,
52
56
    )
53
57
from bzrlib.repofmt import (
54
58
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
59
    )
58
60
from bzrlib.symbol_versioning import (
59
61
    deprecated_function,
64
66
    features,
65
67
    test_lsprof,
66
68
    test_server,
67
 
    test_sftp_transport,
68
69
    TestUtil,
69
70
    )
70
 
from bzrlib.trace import note
 
71
from bzrlib.trace import note, mutter
71
72
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
73
 
74
74
 
75
75
def _test_ids(test_suite):
77
77
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
78
 
79
79
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
80
class MetaTestLog(tests.TestCase):
93
81
 
94
82
    def test_logging(self):
100
88
            "text", "plain", {"charset": "utf8"})))
101
89
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
90
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
91
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
92
 
105
93
 
106
94
class TestUnicodeFilename(tests.TestCase):
119
107
 
120
108
        filename = u'hell\u00d8'
121
109
        self.build_tree_contents([(filename, 'contents of hello')])
122
 
        self.failUnlessExists(filename)
 
110
        self.assertPathExists(filename)
 
111
 
 
112
 
 
113
class TestClassesAvailable(tests.TestCase):
 
114
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
115
 
 
116
    def test_test_case(self):
 
117
        from bzrlib.tests import TestCase
 
118
 
 
119
    def test_test_loader(self):
 
120
        from bzrlib.tests import TestLoader
 
121
 
 
122
    def test_test_suite(self):
 
123
        from bzrlib.tests import TestSuite
123
124
 
124
125
 
125
126
class TestTransportScenarios(tests.TestCase):
208
209
    def test_scenarios(self):
209
210
        # check that constructor parameters are passed through to the adapted
210
211
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
212
        from bzrlib.tests.per_controldir import make_scenarios
212
213
        vfs_factory = "v"
213
214
        server1 = "a"
214
215
        server2 = "b"
312
313
        from bzrlib.tests.per_interrepository import make_scenarios
313
314
        server1 = "a"
314
315
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
316
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
317
        scenarios = make_scenarios(server1, server2, formats)
317
318
        self.assertEqual([
318
319
            ('C0,str,str',
319
320
             {'repository_format': 'C1',
320
321
              'repository_format_to': 'C2',
321
322
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
323
              'transport_server': 'a',
 
324
              'extra_setup': 'C3'}),
323
325
            ('D0,str,str',
324
326
             {'repository_format': 'D1',
325
327
              'repository_format_to': 'D2',
326
328
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
329
              'transport_server': 'a',
 
330
              'extra_setup': 'D3'})],
328
331
            scenarios)
329
332
 
330
333
 
336
339
        from bzrlib.tests.per_workingtree import make_scenarios
337
340
        server1 = "a"
338
341
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
 
342
        formats = [workingtree_4.WorkingTreeFormat4(),
340
343
                   workingtree.WorkingTreeFormat3(),]
341
344
        scenarios = make_scenarios(server1, server2, formats)
342
345
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
346
            ('WorkingTreeFormat4',
344
347
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
348
              'transport_readonly_server': 'b',
346
349
              'transport_server': 'a',
373
376
            )
374
377
        server1 = "a"
375
378
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
 
379
        formats = [workingtree_4.WorkingTreeFormat4(),
377
380
                   workingtree.WorkingTreeFormat3(),]
378
381
        scenarios = make_scenarios(server1, server2, formats)
379
382
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
 
        wt4_format = workingtree.WorkingTreeFormat4()
382
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
383
        default_wt_format = workingtree.format_registry.get_default()
 
384
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
385
        wt5_format = workingtree_4.WorkingTreeFormat5()
383
386
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
387
            ('WorkingTreeFormat4',
385
388
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
389
              'transport_readonly_server': 'b',
387
390
              'transport_server': 'a',
447
450
        # ones to add.
448
451
        from bzrlib.tests.per_tree import (
449
452
            return_parameter,
450
 
            revision_tree_from_workingtree
451
453
            )
452
454
        from bzrlib.tests.per_intertree import (
453
455
            make_scenarios,
454
456
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
457
        from bzrlib.workingtree import WorkingTreeFormat3
 
458
        from bzrlib.workingtree_4 import WorkingTreeFormat4
456
459
        input_test = TestInterTreeScenarios(
457
460
            "test_scenarios")
458
461
        server1 = "a"
459
462
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
463
        format1 = WorkingTreeFormat4()
461
464
        format2 = WorkingTreeFormat3()
462
465
        formats = [("1", str, format1, format2, "converter1"),
463
466
            ("2", int, format2, format1, "converter2")]
509
512
        self.assertRaises(AssertionError, self.assertEqualStat,
510
513
            os.lstat("foo"), os.lstat("longname"))
511
514
 
 
515
    def test_failUnlessExists(self):
 
516
        """Deprecated failUnlessExists and failIfExists"""
 
517
        self.applyDeprecated(
 
518
            deprecated_in((2, 4)),
 
519
            self.failUnlessExists, '.')
 
520
        self.build_tree(['foo/', 'foo/bar'])
 
521
        self.applyDeprecated(
 
522
            deprecated_in((2, 4)),
 
523
            self.failUnlessExists, 'foo/bar')
 
524
        self.applyDeprecated(
 
525
            deprecated_in((2, 4)),
 
526
            self.failIfExists, 'foo/foo')
 
527
 
 
528
    def test_assertPathExists(self):
 
529
        self.assertPathExists('.')
 
530
        self.build_tree(['foo/', 'foo/bar'])
 
531
        self.assertPathExists('foo/bar')
 
532
        self.assertPathDoesNotExist('foo/foo')
 
533
 
512
534
 
513
535
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
514
536
 
548
570
        tree = self.make_branch_and_memory_tree('dir')
549
571
        # Guard against regression into MemoryTransport leaking
550
572
        # files to disk instead of keeping them in memory.
551
 
        self.failIf(osutils.lexists('dir'))
 
573
        self.assertFalse(osutils.lexists('dir'))
552
574
        self.assertIsInstance(tree, memorytree.MemoryTree)
553
575
 
554
576
    def test_make_branch_and_memory_tree_with_format(self):
555
577
        """make_branch_and_memory_tree should accept a format option."""
556
578
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
579
        format.repository_format = repository.format_registry.get_default()
558
580
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
581
        # Guard against regression into MemoryTransport leaking
560
582
        # files to disk instead of keeping them in memory.
561
 
        self.failIf(osutils.lexists('dir'))
 
583
        self.assertFalse(osutils.lexists('dir'))
562
584
        self.assertIsInstance(tree, memorytree.MemoryTree)
563
585
        self.assertEqual(format.repository_format.__class__,
564
586
            tree.branch.repository._format.__class__)
568
590
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
569
591
        # Guard against regression into MemoryTransport leaking
570
592
        # files to disk instead of keeping them in memory.
571
 
        self.failIf(osutils.lexists('dir'))
 
593
        self.assertFalse(osutils.lexists('dir'))
572
594
 
573
595
    def test_make_branch_builder_with_format(self):
574
596
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
597
        # that the format objects are used.
576
598
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
599
        repo_format = repository.format_registry.get_default()
578
600
        format.repository_format = repo_format
579
601
        builder = self.make_branch_builder('dir', format=format)
580
602
        the_branch = builder.get_branch()
581
603
        # Guard against regression into MemoryTransport leaking
582
604
        # files to disk instead of keeping them in memory.
583
 
        self.failIf(osutils.lexists('dir'))
 
605
        self.assertFalse(osutils.lexists('dir'))
584
606
        self.assertEqual(format.repository_format.__class__,
585
607
                         the_branch.repository._format.__class__)
586
608
        self.assertEqual(repo_format.get_format_string(),
592
614
        the_branch = builder.get_branch()
593
615
        # Guard against regression into MemoryTransport leaking
594
616
        # files to disk instead of keeping them in memory.
595
 
        self.failIf(osutils.lexists('dir'))
 
617
        self.assertFalse(osutils.lexists('dir'))
596
618
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
597
619
        self.assertEqual(dir_format.repository_format.__class__,
598
620
                         the_branch.repository._format.__class__)
609
631
                l.attempt_lock()
610
632
        test = TestDanglingLock('test_function')
611
633
        result = test.run()
 
634
        total_failures = result.errors + result.failures
612
635
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
636
            self.assertEqual(1, len(total_failures))
614
637
        else:
615
638
            # When _lock_check_thorough is disabled, then we don't trigger a
616
639
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
640
            self.assertEqual(0, len(total_failures))
618
641
 
619
642
 
620
643
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
644
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
645
 
623
646
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
647
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
648
        self.vfs_transport_factory = memory.MemoryServer
627
649
        self.transport_readonly_server = None
629
651
        # for the server
630
652
        url = self.get_readonly_url()
631
653
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
634
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
654
        t = transport.get_transport(url)
 
655
        t2 = transport.get_transport(url2)
 
656
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
657
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
658
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
659
 
638
660
    def test_get_readonly_url_http(self):
639
661
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
662
        from bzrlib.transport.http import HttpTransportBase
642
663
        self.transport_server = test_server.LocalURLServer
643
664
        self.transport_readonly_server = HttpServer
645
666
        url = self.get_readonly_url()
646
667
        url2 = self.get_readonly_url('foo/bar')
647
668
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
650
 
        self.failUnless(isinstance(t, HttpTransportBase))
651
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
669
        t = transport.get_transport(url)
 
670
        t2 = transport.get_transport(url2)
 
671
        self.assertIsInstance(t, HttpTransportBase)
 
672
        self.assertIsInstance(t2, HttpTransportBase)
652
673
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
653
674
 
654
675
    def test_is_directory(self):
662
683
    def test_make_branch_builder(self):
663
684
        builder = self.make_branch_builder('dir')
664
685
        rev_id = builder.build_commit()
665
 
        self.failUnlessExists('dir')
 
686
        self.assertPathExists('dir')
666
687
        a_dir = bzrdir.BzrDir.open('dir')
667
688
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
668
689
        a_branch = a_dir.open_branch()
684
705
        self.assertIsInstance(result_bzrdir.transport,
685
706
                              memory.MemoryTransport)
686
707
        # should not be on disk, should only be in memory
687
 
        self.failIfExists('subdir')
 
708
        self.assertPathDoesNotExist('subdir')
688
709
 
689
710
 
690
711
class TestChrootedTest(tests.ChrootedTestCase):
691
712
 
692
713
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
714
        t = transport.get_transport(self.get_readonly_url())
695
715
        url = t.base
696
716
        self.assertEqual(url, t.clone('..').base)
697
717
 
750
770
        self.check_timing(ShortDelayTestCase('test_short_delay'),
751
771
                          r"^ +[0-9]+ms$")
752
772
 
753
 
    def _patch_get_bzr_source_tree(self):
754
 
        # Reading from the actual source tree breaks isolation, but we don't
755
 
        # want to assume that thats *all* that would happen.
756
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
757
 
 
758
 
    def test_assigned_benchmark_file_stores_date(self):
759
 
        self._patch_get_bzr_source_tree()
760
 
        output = StringIO()
761
 
        result = bzrlib.tests.TextTestResult(self._log_file,
762
 
                                        descriptions=0,
763
 
                                        verbosity=1,
764
 
                                        bench_history=output
765
 
                                        )
766
 
        output_string = output.getvalue()
767
 
        # if you are wondering about the regexp please read the comment in
768
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
 
        # XXX: what comment?  -- Andrew Bennetts
770
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
771
 
 
772
 
    def test_benchhistory_records_test_times(self):
773
 
        self._patch_get_bzr_source_tree()
774
 
        result_stream = StringIO()
775
 
        result = bzrlib.tests.TextTestResult(
776
 
            self._log_file,
777
 
            descriptions=0,
778
 
            verbosity=1,
779
 
            bench_history=result_stream
780
 
            )
781
 
 
782
 
        # we want profile a call and check that its test duration is recorded
783
 
        # make a new test instance that when run will generate a benchmark
784
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
785
 
        # execute the test, which should succeed and record times
786
 
        example_test_case.run(result)
787
 
        lines = result_stream.getvalue().splitlines()
788
 
        self.assertEqual(2, len(lines))
789
 
        self.assertContainsRe(lines[1],
790
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
 
            "._time_hello_world_encoding")
792
 
 
793
773
    def _time_hello_world_encoding(self):
794
774
        """Profile two sleep calls
795
775
 
803
783
        self.requireFeature(test_lsprof.LSProfFeature)
804
784
        result_stream = StringIO()
805
785
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
786
            result_stream,
807
787
            descriptions=0,
808
788
            verbosity=2,
809
789
            )
835
815
        self.assertContainsRe(output,
836
816
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
817
 
 
818
    def test_uses_time_from_testtools(self):
 
819
        """Test case timings in verbose results should use testtools times"""
 
820
        import datetime
 
821
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
822
            def startTest(self, test):
 
823
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
824
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
825
            def addSuccess(self, test):
 
826
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
827
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
828
            def report_tests_starting(self): pass
 
829
        sio = StringIO()
 
830
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
831
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
832
 
838
833
    def test_known_failure(self):
839
834
        """A KnownFailure being raised should trigger several result actions."""
840
835
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
836
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
837
            def report_tests_starting(self): pass
844
838
            def report_known_failure(self, test, err=None, details=None):
845
839
                self._call = test, 'known failure'
846
840
        result = InstrumentedTestResult(None, None, None, None)
864
858
        # verbose test output formatting
865
859
        result_stream = StringIO()
866
860
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
861
            result_stream,
868
862
            descriptions=0,
869
863
            verbosity=2,
870
864
            )
880
874
        output = result_stream.getvalue()[prefix:]
881
875
        lines = output.splitlines()
882
876
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
877
        if sys.version_info > (2, 7):
 
878
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
879
                self.assertNotEqual, lines[1], '    ')
883
880
        self.assertEqual(lines[1], '    foo')
884
881
        self.assertEqual(2, len(lines))
885
882
 
893
890
        """Test the behaviour of invoking addNotSupported."""
894
891
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
892
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
893
            def report_tests_starting(self): pass
898
894
            def report_unsupported(self, test, feature):
899
895
                self._call = test, feature
900
896
        result = InstrumentedTestResult(None, None, None, None)
919
915
        # verbose test output formatting
920
916
        result_stream = StringIO()
921
917
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
918
            result_stream,
923
919
            descriptions=0,
924
920
            verbosity=2,
925
921
            )
939
935
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
936
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
937
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
938
            def report_tests_starting(self): pass
944
939
            def addNotSupported(self, test, feature):
945
940
                self._call = test, feature
946
941
        result = InstrumentedTestResult(None, None, None, None)
988
983
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
984
            calls = 0
990
985
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
986
        result = InstrumentedTestResult(None, None, None, None)
993
987
        def test_function():
994
988
            pass
996
990
        test.run(result)
997
991
        self.assertEquals(1, result.calls)
998
992
 
 
993
    def test_startTests_only_once(self):
 
994
        """With multiple tests startTests should still only be called once"""
 
995
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
996
            calls = 0
 
997
            def startTests(self): self.calls += 1
 
998
        result = InstrumentedTestResult(None, None, None, None)
 
999
        suite = unittest.TestSuite([
 
1000
            unittest.FunctionTestCase(lambda: None),
 
1001
            unittest.FunctionTestCase(lambda: None)])
 
1002
        suite.run(result)
 
1003
        self.assertEquals(1, result.calls)
 
1004
        self.assertEquals(2, result.count)
 
1005
 
999
1006
 
1000
1007
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1008
 
1022
1029
        because of our use of global state.
1023
1030
        """
1024
1031
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1032
        try:
1027
1033
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1034
            return testrunner.run(test)
1030
1035
        finally:
1031
1036
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1037
 
1034
1038
    def test_known_failure_failed_run(self):
1035
1039
        # run a test that generates a known failure which should be printed in
1081
1085
    def test_result_decorator(self):
1082
1086
        # decorate results
1083
1087
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1088
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1089
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1090
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1091
                calls.append('start')
1088
1092
        test = unittest.FunctionTestCase(lambda:None)
1089
1093
        stream = StringIO()
1190
1194
            ],
1191
1195
            lines[-3:])
1192
1196
 
1193
 
    def _patch_get_bzr_source_tree(self):
1194
 
        # Reading from the actual source tree breaks isolation, but we don't
1195
 
        # want to assume that thats *all* that would happen.
1196
 
        self._get_source_tree_calls = []
1197
 
        def new_get():
1198
 
            self._get_source_tree_calls.append("called")
1199
 
            return None
1200
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1201
 
 
1202
 
    def test_bench_history(self):
1203
 
        # tests that the running the benchmark passes bench_history into
1204
 
        # the test result object. We can tell that happens if
1205
 
        # _get_bzr_source_tree is called.
1206
 
        self._patch_get_bzr_source_tree()
1207
 
        test = TestRunner('dummy_test')
1208
 
        output = StringIO()
1209
 
        runner = tests.TextTestRunner(stream=self._log_file,
1210
 
                                      bench_history=output)
1211
 
        result = self.run_test_runner(runner, test)
1212
 
        output_string = output.getvalue()
1213
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
 
        self.assertLength(1, self._get_source_tree_calls)
 
1197
    def test_verbose_test_count(self):
 
1198
        """A verbose test run reports the right test count at the start"""
 
1199
        suite = TestUtil.TestSuite([
 
1200
            unittest.FunctionTestCase(lambda:None),
 
1201
            unittest.FunctionTestCase(lambda:None)])
 
1202
        self.assertEqual(suite.countTestCases(), 2)
 
1203
        stream = StringIO()
 
1204
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1205
        # Need to use the CountingDecorator as that's what sets num_tests
 
1206
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1207
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1215
1208
 
1216
1209
    def test_startTestRun(self):
1217
1210
        """run should call result.startTestRun()"""
1218
1211
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1212
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1213
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1214
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1215
                calls.append('startTestRun')
1223
1216
        test = unittest.FunctionTestCase(lambda:None)
1224
1217
        stream = StringIO()
1230
1223
    def test_stopTestRun(self):
1231
1224
        """run should call result.stopTestRun()"""
1232
1225
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1226
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1227
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1228
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1229
                calls.append('stopTestRun')
1237
1230
        test = unittest.FunctionTestCase(lambda:None)
1238
1231
        stream = StringIO()
1241
1234
        result = self.run_test_runner(runner, test)
1242
1235
        self.assertLength(1, calls)
1243
1236
 
 
1237
    def test_unicode_test_output_on_ascii_stream(self):
 
1238
        """Showing results should always succeed even on an ascii console"""
 
1239
        class FailureWithUnicode(tests.TestCase):
 
1240
            def test_log_unicode(self):
 
1241
                self.log(u"\u2606")
 
1242
                self.fail("Now print that log!")
 
1243
        out = StringIO()
 
1244
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1245
            lambda trace=False: "ascii")
 
1246
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1247
            FailureWithUnicode("test_log_unicode"))
 
1248
        self.assertContainsRe(out.getvalue(),
 
1249
            "Text attachment: log\n"
 
1250
            "-+\n"
 
1251
            "\d+\.\d+  \\\\u2606\n"
 
1252
            "-+\n")
 
1253
 
1244
1254
 
1245
1255
class SampleTestCase(tests.TestCase):
1246
1256
 
1421
1431
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1432
        output_stream = StringIO()
1423
1433
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1434
            output_stream,
1425
1435
            descriptions=0,
1426
1436
            verbosity=2)
1427
1437
        sample_test.run(result)
1434
1444
        # Note this test won't fail with hooks that the core library doesn't
1435
1445
        # use - but it trigger with a plugin that adds hooks, so its still a
1436
1446
        # useful warning in that case.
1437
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1438
 
            bzrlib.branch.Branch.hooks)
1439
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1447
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1448
        self.assertEqual(
 
1449
            bzrlib.smart.server.SmartServerHooks(),
1440
1450
            bzrlib.smart.server.SmartTCPServer.hooks)
1441
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1442
 
            bzrlib.commands.Command.hooks)
 
1451
        self.assertEqual(
 
1452
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1443
1453
 
1444
1454
    def test__gather_lsprof_in_benchmarks(self):
1445
1455
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1655
1665
        self.assertEqual('original', obj.test_attr)
1656
1666
 
1657
1667
 
 
1668
class _MissingFeature(tests.Feature):
 
1669
    def _probe(self):
 
1670
        return False
 
1671
missing_feature = _MissingFeature()
 
1672
 
 
1673
 
 
1674
def _get_test(name):
 
1675
    """Get an instance of a specific example test.
 
1676
 
 
1677
    We protect this in a function so that they don't auto-run in the test
 
1678
    suite.
 
1679
    """
 
1680
 
 
1681
    class ExampleTests(tests.TestCase):
 
1682
 
 
1683
        def test_fail(self):
 
1684
            mutter('this was a failing test')
 
1685
            self.fail('this test will fail')
 
1686
 
 
1687
        def test_error(self):
 
1688
            mutter('this test errored')
 
1689
            raise RuntimeError('gotcha')
 
1690
 
 
1691
        def test_missing_feature(self):
 
1692
            mutter('missing the feature')
 
1693
            self.requireFeature(missing_feature)
 
1694
 
 
1695
        def test_skip(self):
 
1696
            mutter('this test will be skipped')
 
1697
            raise tests.TestSkipped('reason')
 
1698
 
 
1699
        def test_success(self):
 
1700
            mutter('this test succeeds')
 
1701
 
 
1702
        def test_xfail(self):
 
1703
            mutter('test with expected failure')
 
1704
            self.knownFailure('this_fails')
 
1705
 
 
1706
        def test_unexpected_success(self):
 
1707
            mutter('test with unexpected success')
 
1708
            self.expectFailure('should_fail', lambda: None)
 
1709
 
 
1710
    return ExampleTests(name)
 
1711
 
 
1712
 
 
1713
class TestTestCaseLogDetails(tests.TestCase):
 
1714
 
 
1715
    def _run_test(self, test_name):
 
1716
        test = _get_test(test_name)
 
1717
        result = testtools.TestResult()
 
1718
        test.run(result)
 
1719
        return result
 
1720
 
 
1721
    def test_fail_has_log(self):
 
1722
        result = self._run_test('test_fail')
 
1723
        self.assertEqual(1, len(result.failures))
 
1724
        result_content = result.failures[0][1]
 
1725
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1726
        self.assertContainsRe(result_content, 'this was a failing test')
 
1727
 
 
1728
    def test_error_has_log(self):
 
1729
        result = self._run_test('test_error')
 
1730
        self.assertEqual(1, len(result.errors))
 
1731
        result_content = result.errors[0][1]
 
1732
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1733
        self.assertContainsRe(result_content, 'this test errored')
 
1734
 
 
1735
    def test_skip_has_no_log(self):
 
1736
        result = self._run_test('test_skip')
 
1737
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1738
        skips = result.skip_reasons['reason']
 
1739
        self.assertEqual(1, len(skips))
 
1740
        test = skips[0]
 
1741
        self.assertFalse('log' in test.getDetails())
 
1742
 
 
1743
    def test_missing_feature_has_no_log(self):
 
1744
        # testtools doesn't know about addNotSupported, so it just gets
 
1745
        # considered as a skip
 
1746
        result = self._run_test('test_missing_feature')
 
1747
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1748
        skips = result.skip_reasons[missing_feature]
 
1749
        self.assertEqual(1, len(skips))
 
1750
        test = skips[0]
 
1751
        self.assertFalse('log' in test.getDetails())
 
1752
 
 
1753
    def test_xfail_has_no_log(self):
 
1754
        result = self._run_test('test_xfail')
 
1755
        self.assertEqual(1, len(result.expectedFailures))
 
1756
        result_content = result.expectedFailures[0][1]
 
1757
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1758
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1759
 
 
1760
    def test_unexpected_success_has_log(self):
 
1761
        result = self._run_test('test_unexpected_success')
 
1762
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1763
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1764
        # expectedFailures is a list of reasons?
 
1765
        test = result.unexpectedSuccesses[0]
 
1766
        details = test.getDetails()
 
1767
        self.assertTrue('log' in details)
 
1768
 
 
1769
 
 
1770
class TestTestCloning(tests.TestCase):
 
1771
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1772
 
 
1773
    def test_cloned_testcase_does_not_share_details(self):
 
1774
        """A TestCase cloned with clone_test does not share mutable attributes
 
1775
        such as details or cleanups.
 
1776
        """
 
1777
        class Test(tests.TestCase):
 
1778
            def test_foo(self):
 
1779
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1780
        orig_test = Test('test_foo')
 
1781
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1782
        orig_test.run(unittest.TestResult())
 
1783
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1784
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1785
 
 
1786
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1787
        """Applying two levels of scenarios to a test preserves the attributes
 
1788
        added by both scenarios.
 
1789
        """
 
1790
        class Test(tests.TestCase):
 
1791
            def test_foo(self):
 
1792
                pass
 
1793
        test = Test('test_foo')
 
1794
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1795
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1796
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1797
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1798
        all_tests = list(tests.iter_suite_tests(suite))
 
1799
        self.assertLength(4, all_tests)
 
1800
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1801
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1802
 
 
1803
 
1658
1804
# NB: Don't delete this; it's not actually from 0.11!
1659
1805
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1806
def sample_deprecated_function():
1787
1933
    def test_make_branch_and_tree_with_format(self):
1788
1934
        # we should be able to supply a format to make_branch_and_tree
1789
1935
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1790
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1791
1936
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1792
1937
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1793
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1794
 
                              bzrlib.bzrdir.BzrDirFormat6)
1795
1938
 
1796
1939
    def test_make_branch_and_memory_tree(self):
1797
1940
        # we should be able to get a new branch and a mutable tree from
1814
1957
                tree.branch.repository.bzrdir.root_transport)
1815
1958
 
1816
1959
 
1817
 
class SelfTestHelper:
 
1960
class SelfTestHelper(object):
1818
1961
 
1819
1962
    def run_selftest(self, **kwargs):
1820
1963
        """Run selftest returning its output."""
1880
2023
            def __call__(test, result):
1881
2024
                test.run(result)
1882
2025
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2026
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2027
                calls.append("called")
1885
2028
            def countTestCases(self):
1886
2029
                return 1
1971
2114
            load_list='missing file name', list_only=True)
1972
2115
 
1973
2116
 
 
2117
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2118
 
 
2119
    _test_needs_features = [features.subunit]
 
2120
 
 
2121
    def run_subunit_stream(self, test_name):
 
2122
        from subunit import ProtocolTestCase
 
2123
        def factory():
 
2124
            return TestUtil.TestSuite([_get_test(test_name)])
 
2125
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2126
            test_suite_factory=factory)
 
2127
        test = ProtocolTestCase(stream)
 
2128
        result = testtools.TestResult()
 
2129
        test.run(result)
 
2130
        content = stream.getvalue()
 
2131
        return content, result
 
2132
 
 
2133
    def test_fail_has_log(self):
 
2134
        content, result = self.run_subunit_stream('test_fail')
 
2135
        self.assertEqual(1, len(result.failures))
 
2136
        self.assertContainsRe(content, '(?m)^log$')
 
2137
        self.assertContainsRe(content, 'this test will fail')
 
2138
 
 
2139
    def test_error_has_log(self):
 
2140
        content, result = self.run_subunit_stream('test_error')
 
2141
        self.assertContainsRe(content, '(?m)^log$')
 
2142
        self.assertContainsRe(content, 'this test errored')
 
2143
 
 
2144
    def test_skip_has_no_log(self):
 
2145
        content, result = self.run_subunit_stream('test_skip')
 
2146
        self.assertNotContainsRe(content, '(?m)^log$')
 
2147
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2148
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2149
        skips = result.skip_reasons['reason']
 
2150
        self.assertEqual(1, len(skips))
 
2151
        test = skips[0]
 
2152
        # RemotedTestCase doesn't preserve the "details"
 
2153
        ## self.assertFalse('log' in test.getDetails())
 
2154
 
 
2155
    def test_missing_feature_has_no_log(self):
 
2156
        content, result = self.run_subunit_stream('test_missing_feature')
 
2157
        self.assertNotContainsRe(content, '(?m)^log$')
 
2158
        self.assertNotContainsRe(content, 'missing the feature')
 
2159
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2160
        skips = result.skip_reasons['_MissingFeature\n']
 
2161
        self.assertEqual(1, len(skips))
 
2162
        test = skips[0]
 
2163
        # RemotedTestCase doesn't preserve the "details"
 
2164
        ## self.assertFalse('log' in test.getDetails())
 
2165
 
 
2166
    def test_xfail_has_no_log(self):
 
2167
        content, result = self.run_subunit_stream('test_xfail')
 
2168
        self.assertNotContainsRe(content, '(?m)^log$')
 
2169
        self.assertNotContainsRe(content, 'test with expected failure')
 
2170
        self.assertEqual(1, len(result.expectedFailures))
 
2171
        result_content = result.expectedFailures[0][1]
 
2172
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2173
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2174
 
 
2175
    def test_unexpected_success_has_log(self):
 
2176
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2177
        self.assertContainsRe(content, '(?m)^log$')
 
2178
        self.assertContainsRe(content, 'test with unexpected success')
 
2179
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2180
                           ' as a plain success',
 
2181
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2182
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2183
        test = result.unexpectedSuccesses[0]
 
2184
        # RemotedTestCase doesn't preserve the "details"
 
2185
        ## self.assertTrue('log' in test.getDetails())
 
2186
 
 
2187
    def test_success_has_no_log(self):
 
2188
        content, result = self.run_subunit_stream('test_success')
 
2189
        self.assertEqual(1, result.testsRun)
 
2190
        self.assertNotContainsRe(content, '(?m)^log$')
 
2191
        self.assertNotContainsRe(content, 'this test succeeds')
 
2192
 
 
2193
 
1974
2194
class TestRunBzr(tests.TestCase):
1975
2195
 
1976
2196
    out = ''
2099
2319
        # stdout and stderr of the invoked run_bzr
2100
2320
        current_factory = bzrlib.ui.ui_factory
2101
2321
        self.run_bzr(['foo'])
2102
 
        self.failIf(current_factory is self.factory)
 
2322
        self.assertFalse(current_factory is self.factory)
2103
2323
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2104
2324
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2105
2325
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2287
2507
        self.assertEqual([], command[2:])
2288
2508
 
2289
2509
    def test_set_env(self):
2290
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2510
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2511
        # set in the child
2292
2512
        def check_environment():
2293
2513
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2299
2519
 
2300
2520
    def test_run_bzr_subprocess_env_del(self):
2301
2521
        """run_bzr_subprocess can remove environment variables too."""
2302
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2522
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2523
        def check_environment():
2304
2524
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2525
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2311
2531
        del os.environ['EXISTANT_ENV_VAR']
2312
2532
 
2313
2533
    def test_env_del_missing(self):
2314
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2534
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2535
        def check_environment():
2316
2536
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2537
        self.check_popen_state = check_environment
2339
2559
            os.chdir = orig_chdir
2340
2560
        self.assertEqual(['foo', 'current'], chdirs)
2341
2561
 
 
2562
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2563
        self.get_source_path = lambda: ""
 
2564
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2565
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2566
 
2342
2567
 
2343
2568
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2569
    """Tests that really need to do things with an external bzr."""
2587
2812
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2588
2813
 
2589
2814
 
2590
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2815
class TestCheckTreeShape(tests.TestCaseWithTransport):
2591
2816
 
2592
 
    def test_check_inventory_shape(self):
 
2817
    def test_check_tree_shape(self):
2593
2818
        files = ['a', 'b/', 'b/c']
2594
2819
        tree = self.make_branch_and_tree('.')
2595
2820
        self.build_tree(files)
2596
2821
        tree.add(files)
2597
2822
        tree.lock_read()
2598
2823
        try:
2599
 
            self.check_inventory_shape(tree.inventory, files)
 
2824
            self.check_tree_shape(tree, files)
2600
2825
        finally:
2601
2826
            tree.unlock()
2602
2827
 
2934
3159
        tpr.register('bar', 'bBB.aAA.rRR')
2935
3160
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2936
3161
        self.assertThat(self.get_log(),
2937
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3162
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3163
                           doctest.ELLIPSIS))
2938
3164
 
2939
3165
    def test_get_unknown_prefix(self):
2940
3166
        tpr = self._get_registry()
2960
3186
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3187
 
2962
3188
 
 
3189
class TestThreadLeakDetection(tests.TestCase):
 
3190
    """Ensure when tests leak threads we detect and report it"""
 
3191
 
 
3192
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3193
        def __init__(self):
 
3194
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3195
            self.leaks = []
 
3196
        def _report_thread_leak(self, test, leaks, alive):
 
3197
            self.leaks.append((test, leaks))
 
3198
 
 
3199
    def test_testcase_without_addCleanups(self):
 
3200
        """Check old TestCase instances don't break with leak detection"""
 
3201
        class Test(unittest.TestCase):
 
3202
            def runTest(self):
 
3203
                pass
 
3204
        result = self.LeakRecordingResult()
 
3205
        test = Test()
 
3206
        result.startTestRun()
 
3207
        test.run(result)
 
3208
        result.stopTestRun()
 
3209
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3210
        self.assertEqual(result.leaks, [])
 
3211
        
 
3212
    def test_thread_leak(self):
 
3213
        """Ensure a thread that outlives the running of a test is reported
 
3214
 
 
3215
        Uses a thread that blocks on an event, and is started by the inner
 
3216
        test case. As the thread outlives the inner case's run, it should be
 
3217
        detected as a leak, but the event is then set so that the thread can
 
3218
        be safely joined in cleanup so it's not leaked for real.
 
3219
        """
 
3220
        event = threading.Event()
 
3221
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3222
        class Test(tests.TestCase):
 
3223
            def test_leak(self):
 
3224
                thread.start()
 
3225
        result = self.LeakRecordingResult()
 
3226
        test = Test("test_leak")
 
3227
        self.addCleanup(thread.join)
 
3228
        self.addCleanup(event.set)
 
3229
        result.startTestRun()
 
3230
        test.run(result)
 
3231
        result.stopTestRun()
 
3232
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3233
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3234
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3235
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3236
 
 
3237
    def test_multiple_leaks(self):
 
3238
        """Check multiple leaks are blamed on the test cases at fault
 
3239
 
 
3240
        Same concept as the previous test, but has one inner test method that
 
3241
        leaks two threads, and one that doesn't leak at all.
 
3242
        """
 
3243
        event = threading.Event()
 
3244
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3245
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3246
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3247
        class Test(tests.TestCase):
 
3248
            def test_first_leak(self):
 
3249
                thread_b.start()
 
3250
            def test_second_no_leak(self):
 
3251
                pass
 
3252
            def test_third_leak(self):
 
3253
                thread_c.start()
 
3254
                thread_a.start()
 
3255
        result = self.LeakRecordingResult()
 
3256
        first_test = Test("test_first_leak")
 
3257
        third_test = Test("test_third_leak")
 
3258
        self.addCleanup(thread_a.join)
 
3259
        self.addCleanup(thread_b.join)
 
3260
        self.addCleanup(thread_c.join)
 
3261
        self.addCleanup(event.set)
 
3262
        result.startTestRun()
 
3263
        unittest.TestSuite(
 
3264
            [first_test, Test("test_second_no_leak"), third_test]
 
3265
            ).run(result)
 
3266
        result.stopTestRun()
 
3267
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3268
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3269
        self.assertEqual(result.leaks, [
 
3270
            (first_test, set([thread_b])),
 
3271
            (third_test, set([thread_a, thread_c]))])
 
3272
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3273
 
 
3274
 
 
3275
class TestPostMortemDebugging(tests.TestCase):
 
3276
    """Check post mortem debugging works when tests fail or error"""
 
3277
 
 
3278
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3279
        def __init__(self):
 
3280
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3281
            self.postcode = None
 
3282
        def _post_mortem(self, tb=None):
 
3283
            """Record the code object at the end of the current traceback"""
 
3284
            tb = tb or sys.exc_info()[2]
 
3285
            if tb is not None:
 
3286
                next = tb.tb_next
 
3287
                while next is not None:
 
3288
                    tb = next
 
3289
                    next = next.tb_next
 
3290
                self.postcode = tb.tb_frame.f_code
 
3291
        def report_error(self, test, err):
 
3292
            pass
 
3293
        def report_failure(self, test, err):
 
3294
            pass
 
3295
 
 
3296
    def test_location_unittest_error(self):
 
3297
        """Needs right post mortem traceback with erroring unittest case"""
 
3298
        class Test(unittest.TestCase):
 
3299
            def runTest(self):
 
3300
                raise RuntimeError
 
3301
        result = self.TracebackRecordingResult()
 
3302
        Test().run(result)
 
3303
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3304
 
 
3305
    def test_location_unittest_failure(self):
 
3306
        """Needs right post mortem traceback with failing unittest case"""
 
3307
        class Test(unittest.TestCase):
 
3308
            def runTest(self):
 
3309
                raise self.failureException
 
3310
        result = self.TracebackRecordingResult()
 
3311
        Test().run(result)
 
3312
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3313
 
 
3314
    def test_location_bt_error(self):
 
3315
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3316
        class Test(tests.TestCase):
 
3317
            def test_error(self):
 
3318
                raise RuntimeError
 
3319
        result = self.TracebackRecordingResult()
 
3320
        Test("test_error").run(result)
 
3321
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3322
 
 
3323
    def test_location_bt_failure(self):
 
3324
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3325
        class Test(tests.TestCase):
 
3326
            def test_failure(self):
 
3327
                raise self.failureException
 
3328
        result = self.TracebackRecordingResult()
 
3329
        Test("test_failure").run(result)
 
3330
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3331
 
 
3332
    def test_env_var_triggers_post_mortem(self):
 
3333
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3334
        import pdb
 
3335
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3336
        post_mortem_calls = []
 
3337
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3338
        self.overrideEnv('BZR_TEST_PDB', None)
 
3339
        result._post_mortem(1)
 
3340
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3341
        result._post_mortem(2)
 
3342
        self.assertEqual([2], post_mortem_calls)
 
3343
 
 
3344
 
2963
3345
class TestRunSuite(tests.TestCase):
2964
3346
 
2965
3347
    def test_runner_class(self):
2976
3358
                                                self.verbosity)
2977
3359
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3360
        self.assertLength(1, calls)
 
3361
 
 
3362
 
 
3363
class TestEnvironHandling(tests.TestCase):
 
3364
 
 
3365
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3366
        self.assertFalse('MYVAR' in os.environ)
 
3367
        self.overrideEnv('MYVAR', '42')
 
3368
        # We use an embedded test to make sure we fix the _captureVar bug
 
3369
        class Test(tests.TestCase):
 
3370
            def test_me(self):
 
3371
                # The first call save the 42 value
 
3372
                self.overrideEnv('MYVAR', None)
 
3373
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3374
                # Make sure we can call it twice
 
3375
                self.overrideEnv('MYVAR', None)
 
3376
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3377
        output = StringIO()
 
3378
        result = tests.TextTestResult(output, 0, 1)
 
3379
        Test('test_me').run(result)
 
3380
        if not result.wasStrictlySuccessful():
 
3381
            self.fail(output.getvalue())
 
3382
        # We get our value back
 
3383
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3384
 
 
3385
 
 
3386
class TestIsolatedEnv(tests.TestCase):
 
3387
    """Test isolating tests from os.environ.
 
3388
 
 
3389
    Since we use tests that are already isolated from os.environ a bit of care
 
3390
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3391
    The tests start an already clean os.environ which allow doing valid
 
3392
    assertions about which variables are present or not and design tests around
 
3393
    these assertions.
 
3394
    """
 
3395
 
 
3396
    class ScratchMonkey(tests.TestCase):
 
3397
 
 
3398
        def test_me(self):
 
3399
            pass
 
3400
 
 
3401
    def test_basics(self):
 
3402
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3403
        # for tests.TestCase.
 
3404
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3405
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3406
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3407
        self.assertFalse('BZR_HOME' in os.environ)
 
3408
        # Make sure we know the definition of LINES: part of os.environ for
 
3409
        # tests.TestCase
 
3410
        self.assertTrue('LINES' in tests.isolated_environ)
 
3411
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3412
        self.assertEquals('25', os.environ['LINES'])
 
3413
 
 
3414
    def test_injecting_unknown_variable(self):
 
3415
        # BZR_HOME is known to be absent from os.environ
 
3416
        test = self.ScratchMonkey('test_me')
 
3417
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3418
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3419
        tests.restore_os_environ(test)
 
3420
        self.assertFalse('BZR_HOME' in os.environ)
 
3421
 
 
3422
    def test_injecting_known_variable(self):
 
3423
        test = self.ScratchMonkey('test_me')
 
3424
        # LINES is known to be present in os.environ
 
3425
        tests.override_os_environ(test, {'LINES': '42'})
 
3426
        self.assertEquals('42', os.environ['LINES'])
 
3427
        tests.restore_os_environ(test)
 
3428
        self.assertEquals('25', os.environ['LINES'])
 
3429
 
 
3430
    def test_deleting_variable(self):
 
3431
        test = self.ScratchMonkey('test_me')
 
3432
        # LINES is known to be present in os.environ
 
3433
        tests.override_os_environ(test, {'LINES': None})
 
3434
        self.assertTrue('LINES' not in os.environ)
 
3435
        tests.restore_os_environ(test)
 
3436
        self.assertEquals('25', os.environ['LINES'])
 
3437
 
 
3438
 
 
3439
class TestDocTestSuiteIsolation(tests.TestCase):
 
3440
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3441
 
 
3442
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3443
    the clean environment as a base for testing. To precisely capture the
 
3444
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3445
    compare against.
 
3446
 
 
3447
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3448
    not `os.environ` so each test overrides it to suit its needs.
 
3449
 
 
3450
    """
 
3451
 
 
3452
    def get_doctest_suite_for_string(self, klass, string):
 
3453
        class Finder(doctest.DocTestFinder):
 
3454
 
 
3455
            def find(*args, **kwargs):
 
3456
                test = doctest.DocTestParser().get_doctest(
 
3457
                    string, {}, 'foo', 'foo.py', 0)
 
3458
                return [test]
 
3459
 
 
3460
        suite = klass(test_finder=Finder())
 
3461
        return suite
 
3462
 
 
3463
    def run_doctest_suite_for_string(self, klass, string):
 
3464
        suite = self.get_doctest_suite_for_string(klass, string)
 
3465
        output = StringIO()
 
3466
        result = tests.TextTestResult(output, 0, 1)
 
3467
        suite.run(result)
 
3468
        return result, output
 
3469
 
 
3470
    def assertDocTestStringSucceds(self, klass, string):
 
3471
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3472
        if not result.wasStrictlySuccessful():
 
3473
            self.fail(output.getvalue())
 
3474
 
 
3475
    def assertDocTestStringFails(self, klass, string):
 
3476
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3477
        if result.wasStrictlySuccessful():
 
3478
            self.fail(output.getvalue())
 
3479
 
 
3480
    def test_injected_variable(self):
 
3481
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3482
        test = """
 
3483
            >>> import os
 
3484
            >>> os.environ['LINES']
 
3485
            '42'
 
3486
            """
 
3487
        # doctest.DocTestSuite fails as it sees '25'
 
3488
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3489
        # tests.DocTestSuite sees '42'
 
3490
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3491
 
 
3492
    def test_deleted_variable(self):
 
3493
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3494
        test = """
 
3495
            >>> import os
 
3496
            >>> os.environ.get('LINES')
 
3497
            """
 
3498
        # doctest.DocTestSuite fails as it sees '25'
 
3499
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3500
        # tests.DocTestSuite sees None
 
3501
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)