~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2010-04-30 11:03:59 UTC
  • mto: This revision was merged to the branch mainline in revision 5197.
  • Revision ID: jelmer@samba.org-20100430110359-ow3e3grh7sxy93pa
Remove more unused imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import doctest
 
20
from doctest import ELLIPSIS
21
21
import os
22
22
import signal
23
23
import sys
24
 
import threading
25
24
import time
26
25
import unittest
27
26
import warnings
28
27
 
29
 
from testtools import (
30
 
    ExtendedToOriginalDecorator,
31
 
    MultiTestResult,
32
 
    )
33
 
from testtools.content import Content
 
28
from testtools import MultiTestResult
34
29
from testtools.content_type import ContentType
35
30
from testtools.matchers import (
36
31
    DocTestMatches,
42
37
from bzrlib import (
43
38
    branchbuilder,
44
39
    bzrdir,
 
40
    debug,
45
41
    errors,
46
42
    lockdir,
47
43
    memorytree,
48
44
    osutils,
 
45
    progress,
49
46
    remote,
50
47
    repository,
51
48
    symbol_versioning,
52
49
    tests,
53
50
    transport,
54
51
    workingtree,
55
 
    workingtree_3,
56
 
    workingtree_4,
57
52
    )
58
53
from bzrlib.repofmt import (
59
54
    groupcompress_repo,
 
55
    pack_repo,
 
56
    weaverepo,
60
57
    )
61
58
from bzrlib.symbol_versioning import (
62
59
    deprecated_function,
65
62
    )
66
63
from bzrlib.tests import (
67
64
    features,
 
65
    stub_sftp,
68
66
    test_lsprof,
69
67
    test_server,
 
68
    test_sftp_transport,
70
69
    TestUtil,
71
70
    )
72
 
from bzrlib.trace import note, mutter
 
71
from bzrlib.trace import note
73
72
from bzrlib.transport import memory
 
73
from bzrlib.version import _get_bzr_source_tree
74
74
 
75
75
 
76
76
def _test_ids(test_suite):
78
78
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
79
79
 
80
80
 
 
81
class SelftestTests(tests.TestCase):
 
82
 
 
83
    def test_import_tests(self):
 
84
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
85
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
86
 
 
87
    def test_import_test_failure(self):
 
88
        self.assertRaises(ImportError,
 
89
                          TestUtil._load_module_by_name,
 
90
                          'bzrlib.no-name-yet')
 
91
 
 
92
 
81
93
class MetaTestLog(tests.TestCase):
82
94
 
83
95
    def test_logging(self):
89
101
            "text", "plain", {"charset": "utf8"})))
90
102
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
91
103
        self.assertThat(self.get_log(),
92
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
104
            DocTestMatches(u"...a test message\n", ELLIPSIS))
93
105
 
94
106
 
95
107
class TestUnicodeFilename(tests.TestCase):
108
120
 
109
121
        filename = u'hell\u00d8'
110
122
        self.build_tree_contents([(filename, 'contents of hello')])
111
 
        self.assertPathExists(filename)
112
 
 
113
 
 
114
 
class TestClassesAvailable(tests.TestCase):
115
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
116
 
 
117
 
    def test_test_case(self):
118
 
        from bzrlib.tests import TestCase
119
 
 
120
 
    def test_test_loader(self):
121
 
        from bzrlib.tests import TestLoader
122
 
 
123
 
    def test_test_suite(self):
124
 
        from bzrlib.tests import TestSuite
 
123
        self.failUnlessExists(filename)
125
124
 
126
125
 
127
126
class TestTransportScenarios(tests.TestCase):
210
209
    def test_scenarios(self):
211
210
        # check that constructor parameters are passed through to the adapted
212
211
        # test.
213
 
        from bzrlib.tests.per_controldir import make_scenarios
 
212
        from bzrlib.tests.per_bzrdir import make_scenarios
214
213
        vfs_factory = "v"
215
214
        server1 = "a"
216
215
        server2 = "b"
314
313
        from bzrlib.tests.per_interrepository import make_scenarios
315
314
        server1 = "a"
316
315
        server2 = "b"
317
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
316
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
318
317
        scenarios = make_scenarios(server1, server2, formats)
319
318
        self.assertEqual([
320
319
            ('C0,str,str',
321
320
             {'repository_format': 'C1',
322
321
              'repository_format_to': 'C2',
323
322
              'transport_readonly_server': 'b',
324
 
              'transport_server': 'a',
325
 
              'extra_setup': 'C3'}),
 
323
              'transport_server': 'a'}),
326
324
            ('D0,str,str',
327
325
             {'repository_format': 'D1',
328
326
              'repository_format_to': 'D2',
329
327
              'transport_readonly_server': 'b',
330
 
              'transport_server': 'a',
331
 
              'extra_setup': 'D3'})],
 
328
              'transport_server': 'a'})],
332
329
            scenarios)
333
330
 
334
331
 
340
337
        from bzrlib.tests.per_workingtree import make_scenarios
341
338
        server1 = "a"
342
339
        server2 = "b"
343
 
        formats = [workingtree_4.WorkingTreeFormat4(),
344
 
                   workingtree_3.WorkingTreeFormat3(),]
 
340
        formats = [workingtree.WorkingTreeFormat2(),
 
341
                   workingtree.WorkingTreeFormat3(),]
345
342
        scenarios = make_scenarios(server1, server2, formats)
346
343
        self.assertEqual([
347
 
            ('WorkingTreeFormat4',
 
344
            ('WorkingTreeFormat2',
348
345
             {'bzrdir_format': formats[0]._matchingbzrdir,
349
346
              'transport_readonly_server': 'b',
350
347
              'transport_server': 'a',
377
374
            )
378
375
        server1 = "a"
379
376
        server2 = "b"
380
 
        formats = [workingtree_4.WorkingTreeFormat4(),
381
 
                   workingtree_3.WorkingTreeFormat3(),]
 
377
        formats = [workingtree.WorkingTreeFormat2(),
 
378
                   workingtree.WorkingTreeFormat3(),]
382
379
        scenarios = make_scenarios(server1, server2, formats)
383
380
        self.assertEqual(7, len(scenarios))
384
 
        default_wt_format = workingtree.format_registry.get_default()
385
 
        wt4_format = workingtree_4.WorkingTreeFormat4()
386
 
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
381
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
382
        wt4_format = workingtree.WorkingTreeFormat4()
 
383
        wt5_format = workingtree.WorkingTreeFormat5()
387
384
        expected_scenarios = [
388
 
            ('WorkingTreeFormat4',
 
385
            ('WorkingTreeFormat2',
389
386
             {'bzrdir_format': formats[0]._matchingbzrdir,
390
387
              'transport_readonly_server': 'b',
391
388
              'transport_server': 'a',
451
448
        # ones to add.
452
449
        from bzrlib.tests.per_tree import (
453
450
            return_parameter,
 
451
            revision_tree_from_workingtree
454
452
            )
455
453
        from bzrlib.tests.per_intertree import (
456
454
            make_scenarios,
457
455
            )
458
 
        from bzrlib.workingtree_3 import WorkingTreeFormat3
459
 
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
456
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
460
457
        input_test = TestInterTreeScenarios(
461
458
            "test_scenarios")
462
459
        server1 = "a"
463
460
        server2 = "b"
464
 
        format1 = WorkingTreeFormat4()
 
461
        format1 = WorkingTreeFormat2()
465
462
        format2 = WorkingTreeFormat3()
466
463
        formats = [("1", str, format1, format2, "converter1"),
467
464
            ("2", int, format2, format1, "converter2")]
513
510
        self.assertRaises(AssertionError, self.assertEqualStat,
514
511
            os.lstat("foo"), os.lstat("longname"))
515
512
 
516
 
    def test_failUnlessExists(self):
517
 
        """Deprecated failUnlessExists and failIfExists"""
518
 
        self.applyDeprecated(
519
 
            deprecated_in((2, 4)),
520
 
            self.failUnlessExists, '.')
521
 
        self.build_tree(['foo/', 'foo/bar'])
522
 
        self.applyDeprecated(
523
 
            deprecated_in((2, 4)),
524
 
            self.failUnlessExists, 'foo/bar')
525
 
        self.applyDeprecated(
526
 
            deprecated_in((2, 4)),
527
 
            self.failIfExists, 'foo/foo')
528
 
 
529
 
    def test_assertPathExists(self):
530
 
        self.assertPathExists('.')
531
 
        self.build_tree(['foo/', 'foo/bar'])
532
 
        self.assertPathExists('foo/bar')
533
 
        self.assertPathDoesNotExist('foo/foo')
534
 
 
535
513
 
536
514
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
537
515
 
571
549
        tree = self.make_branch_and_memory_tree('dir')
572
550
        # Guard against regression into MemoryTransport leaking
573
551
        # files to disk instead of keeping them in memory.
574
 
        self.assertFalse(osutils.lexists('dir'))
 
552
        self.failIf(osutils.lexists('dir'))
575
553
        self.assertIsInstance(tree, memorytree.MemoryTree)
576
554
 
577
555
    def test_make_branch_and_memory_tree_with_format(self):
578
556
        """make_branch_and_memory_tree should accept a format option."""
579
557
        format = bzrdir.BzrDirMetaFormat1()
580
 
        format.repository_format = repository.format_registry.get_default()
 
558
        format.repository_format = weaverepo.RepositoryFormat7()
581
559
        tree = self.make_branch_and_memory_tree('dir', format=format)
582
560
        # Guard against regression into MemoryTransport leaking
583
561
        # files to disk instead of keeping them in memory.
584
 
        self.assertFalse(osutils.lexists('dir'))
 
562
        self.failIf(osutils.lexists('dir'))
585
563
        self.assertIsInstance(tree, memorytree.MemoryTree)
586
564
        self.assertEqual(format.repository_format.__class__,
587
565
            tree.branch.repository._format.__class__)
591
569
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
592
570
        # Guard against regression into MemoryTransport leaking
593
571
        # files to disk instead of keeping them in memory.
594
 
        self.assertFalse(osutils.lexists('dir'))
 
572
        self.failIf(osutils.lexists('dir'))
595
573
 
596
574
    def test_make_branch_builder_with_format(self):
597
575
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
598
576
        # that the format objects are used.
599
577
        format = bzrdir.BzrDirMetaFormat1()
600
 
        repo_format = repository.format_registry.get_default()
 
578
        repo_format = weaverepo.RepositoryFormat7()
601
579
        format.repository_format = repo_format
602
580
        builder = self.make_branch_builder('dir', format=format)
603
581
        the_branch = builder.get_branch()
604
582
        # Guard against regression into MemoryTransport leaking
605
583
        # files to disk instead of keeping them in memory.
606
 
        self.assertFalse(osutils.lexists('dir'))
 
584
        self.failIf(osutils.lexists('dir'))
607
585
        self.assertEqual(format.repository_format.__class__,
608
586
                         the_branch.repository._format.__class__)
609
587
        self.assertEqual(repo_format.get_format_string(),
615
593
        the_branch = builder.get_branch()
616
594
        # Guard against regression into MemoryTransport leaking
617
595
        # files to disk instead of keeping them in memory.
618
 
        self.assertFalse(osutils.lexists('dir'))
 
596
        self.failIf(osutils.lexists('dir'))
619
597
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
620
598
        self.assertEqual(dir_format.repository_format.__class__,
621
599
                         the_branch.repository._format.__class__)
632
610
                l.attempt_lock()
633
611
        test = TestDanglingLock('test_function')
634
612
        result = test.run()
635
 
        total_failures = result.errors + result.failures
636
613
        if self._lock_check_thorough:
637
 
            self.assertEqual(1, len(total_failures))
 
614
            self.assertEqual(1, len(result.errors))
638
615
        else:
639
616
            # When _lock_check_thorough is disabled, then we don't trigger a
640
617
            # failure
641
 
            self.assertEqual(0, len(total_failures))
 
618
            self.assertEqual(0, len(result.errors))
642
619
 
643
620
 
644
621
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
645
622
    """Tests for the convenience functions TestCaseWithTransport introduces."""
646
623
 
647
624
    def test_get_readonly_url_none(self):
 
625
        from bzrlib.transport import get_transport
648
626
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
649
627
        self.vfs_transport_factory = memory.MemoryServer
650
628
        self.transport_readonly_server = None
652
630
        # for the server
653
631
        url = self.get_readonly_url()
654
632
        url2 = self.get_readonly_url('foo/bar')
655
 
        t = transport.get_transport(url)
656
 
        t2 = transport.get_transport(url2)
657
 
        self.assertIsInstance(t, ReadonlyTransportDecorator)
658
 
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
633
        t = get_transport(url)
 
634
        t2 = get_transport(url2)
 
635
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
636
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
659
637
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
660
638
 
661
639
    def test_get_readonly_url_http(self):
662
640
        from bzrlib.tests.http_server import HttpServer
 
641
        from bzrlib.transport import get_transport
663
642
        from bzrlib.transport.http import HttpTransportBase
664
643
        self.transport_server = test_server.LocalURLServer
665
644
        self.transport_readonly_server = HttpServer
667
646
        url = self.get_readonly_url()
668
647
        url2 = self.get_readonly_url('foo/bar')
669
648
        # the transport returned may be any HttpTransportBase subclass
670
 
        t = transport.get_transport(url)
671
 
        t2 = transport.get_transport(url2)
672
 
        self.assertIsInstance(t, HttpTransportBase)
673
 
        self.assertIsInstance(t2, HttpTransportBase)
 
649
        t = get_transport(url)
 
650
        t2 = get_transport(url2)
 
651
        self.failUnless(isinstance(t, HttpTransportBase))
 
652
        self.failUnless(isinstance(t2, HttpTransportBase))
674
653
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
675
654
 
676
655
    def test_is_directory(self):
684
663
    def test_make_branch_builder(self):
685
664
        builder = self.make_branch_builder('dir')
686
665
        rev_id = builder.build_commit()
687
 
        self.assertPathExists('dir')
 
666
        self.failUnlessExists('dir')
688
667
        a_dir = bzrdir.BzrDir.open('dir')
689
668
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
690
669
        a_branch = a_dir.open_branch()
706
685
        self.assertIsInstance(result_bzrdir.transport,
707
686
                              memory.MemoryTransport)
708
687
        # should not be on disk, should only be in memory
709
 
        self.assertPathDoesNotExist('subdir')
 
688
        self.failIfExists('subdir')
710
689
 
711
690
 
712
691
class TestChrootedTest(tests.ChrootedTestCase):
713
692
 
714
693
    def test_root_is_root(self):
715
 
        t = transport.get_transport(self.get_readonly_url())
 
694
        from bzrlib.transport import get_transport
 
695
        t = get_transport(self.get_readonly_url())
716
696
        url = t.base
717
697
        self.assertEqual(url, t.clone('..').base)
718
698
 
771
751
        self.check_timing(ShortDelayTestCase('test_short_delay'),
772
752
                          r"^ +[0-9]+ms$")
773
753
 
 
754
    def _patch_get_bzr_source_tree(self):
 
755
        # Reading from the actual source tree breaks isolation, but we don't
 
756
        # want to assume that thats *all* that would happen.
 
757
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
 
758
 
 
759
    def test_assigned_benchmark_file_stores_date(self):
 
760
        self._patch_get_bzr_source_tree()
 
761
        output = StringIO()
 
762
        result = bzrlib.tests.TextTestResult(self._log_file,
 
763
                                        descriptions=0,
 
764
                                        verbosity=1,
 
765
                                        bench_history=output
 
766
                                        )
 
767
        output_string = output.getvalue()
 
768
        # if you are wondering about the regexp please read the comment in
 
769
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
770
        # XXX: what comment?  -- Andrew Bennetts
 
771
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
772
 
 
773
    def test_benchhistory_records_test_times(self):
 
774
        self._patch_get_bzr_source_tree()
 
775
        result_stream = StringIO()
 
776
        result = bzrlib.tests.TextTestResult(
 
777
            self._log_file,
 
778
            descriptions=0,
 
779
            verbosity=1,
 
780
            bench_history=result_stream
 
781
            )
 
782
 
 
783
        # we want profile a call and check that its test duration is recorded
 
784
        # make a new test instance that when run will generate a benchmark
 
785
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
786
        # execute the test, which should succeed and record times
 
787
        example_test_case.run(result)
 
788
        lines = result_stream.getvalue().splitlines()
 
789
        self.assertEqual(2, len(lines))
 
790
        self.assertContainsRe(lines[1],
 
791
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
792
            "._time_hello_world_encoding")
 
793
 
774
794
    def _time_hello_world_encoding(self):
775
795
        """Profile two sleep calls
776
796
 
784
804
        self.requireFeature(test_lsprof.LSProfFeature)
785
805
        result_stream = StringIO()
786
806
        result = bzrlib.tests.VerboseTestResult(
787
 
            result_stream,
 
807
            unittest._WritelnDecorator(result_stream),
788
808
            descriptions=0,
789
809
            verbosity=2,
790
810
            )
816
836
        self.assertContainsRe(output,
817
837
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
818
838
 
819
 
    def test_uses_time_from_testtools(self):
820
 
        """Test case timings in verbose results should use testtools times"""
821
 
        import datetime
822
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
823
 
            def startTest(self, test):
824
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
825
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
826
 
            def addSuccess(self, test):
827
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
828
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
829
 
            def report_tests_starting(self): pass
830
 
        sio = StringIO()
831
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
832
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
833
 
 
834
839
    def test_known_failure(self):
835
840
        """A KnownFailure being raised should trigger several result actions."""
836
841
        class InstrumentedTestResult(tests.ExtendedTestResult):
837
842
            def stopTestRun(self): pass
838
 
            def report_tests_starting(self): pass
 
843
            def startTests(self): pass
 
844
            def report_test_start(self, test): pass
839
845
            def report_known_failure(self, test, err=None, details=None):
840
846
                self._call = test, 'known failure'
841
847
        result = InstrumentedTestResult(None, None, None, None)
859
865
        # verbose test output formatting
860
866
        result_stream = StringIO()
861
867
        result = bzrlib.tests.VerboseTestResult(
862
 
            result_stream,
 
868
            unittest._WritelnDecorator(result_stream),
863
869
            descriptions=0,
864
870
            verbosity=2,
865
871
            )
875
881
        output = result_stream.getvalue()[prefix:]
876
882
        lines = output.splitlines()
877
883
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
878
 
        if sys.version_info > (2, 7):
879
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
880
 
                self.assertNotEqual, lines[1], '    ')
881
884
        self.assertEqual(lines[1], '    foo')
882
885
        self.assertEqual(2, len(lines))
883
886
 
891
894
        """Test the behaviour of invoking addNotSupported."""
892
895
        class InstrumentedTestResult(tests.ExtendedTestResult):
893
896
            def stopTestRun(self): pass
894
 
            def report_tests_starting(self): pass
 
897
            def startTests(self): pass
 
898
            def report_test_start(self, test): pass
895
899
            def report_unsupported(self, test, feature):
896
900
                self._call = test, feature
897
901
        result = InstrumentedTestResult(None, None, None, None)
916
920
        # verbose test output formatting
917
921
        result_stream = StringIO()
918
922
        result = bzrlib.tests.VerboseTestResult(
919
 
            result_stream,
 
923
            unittest._WritelnDecorator(result_stream),
920
924
            descriptions=0,
921
925
            verbosity=2,
922
926
            )
936
940
        """An UnavailableFeature being raised should invoke addNotSupported."""
937
941
        class InstrumentedTestResult(tests.ExtendedTestResult):
938
942
            def stopTestRun(self): pass
939
 
            def report_tests_starting(self): pass
 
943
            def startTests(self): pass
 
944
            def report_test_start(self, test): pass
940
945
            def addNotSupported(self, test, feature):
941
946
                self._call = test, feature
942
947
        result = InstrumentedTestResult(None, None, None, None)
984
989
        class InstrumentedTestResult(tests.ExtendedTestResult):
985
990
            calls = 0
986
991
            def startTests(self): self.calls += 1
 
992
            def report_test_start(self, test): pass
987
993
        result = InstrumentedTestResult(None, None, None, None)
988
994
        def test_function():
989
995
            pass
991
997
        test.run(result)
992
998
        self.assertEquals(1, result.calls)
993
999
 
994
 
    def test_startTests_only_once(self):
995
 
        """With multiple tests startTests should still only be called once"""
996
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
997
 
            calls = 0
998
 
            def startTests(self): self.calls += 1
999
 
        result = InstrumentedTestResult(None, None, None, None)
1000
 
        suite = unittest.TestSuite([
1001
 
            unittest.FunctionTestCase(lambda: None),
1002
 
            unittest.FunctionTestCase(lambda: None)])
1003
 
        suite.run(result)
1004
 
        self.assertEquals(1, result.calls)
1005
 
        self.assertEquals(2, result.count)
1006
 
 
1007
1000
 
1008
1001
class TestUnicodeFilenameFeature(tests.TestCase):
1009
1002
 
1030
1023
        because of our use of global state.
1031
1024
        """
1032
1025
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1026
        old_leak = tests.TestCase._first_thread_leaker_id
1033
1027
        try:
1034
1028
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1029
            tests.TestCase._first_thread_leaker_id = None
1035
1030
            return testrunner.run(test)
1036
1031
        finally:
1037
1032
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1033
            tests.TestCase._first_thread_leaker_id = old_leak
1038
1034
 
1039
1035
    def test_known_failure_failed_run(self):
1040
1036
        # run a test that generates a known failure which should be printed in
1086
1082
    def test_result_decorator(self):
1087
1083
        # decorate results
1088
1084
        calls = []
1089
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1085
        class LoggingDecorator(tests.ForwardingResult):
1090
1086
            def startTest(self, test):
1091
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1087
                tests.ForwardingResult.startTest(self, test)
1092
1088
                calls.append('start')
1093
1089
        test = unittest.FunctionTestCase(lambda:None)
1094
1090
        stream = StringIO()
1195
1191
            ],
1196
1192
            lines[-3:])
1197
1193
 
1198
 
    def test_verbose_test_count(self):
1199
 
        """A verbose test run reports the right test count at the start"""
1200
 
        suite = TestUtil.TestSuite([
1201
 
            unittest.FunctionTestCase(lambda:None),
1202
 
            unittest.FunctionTestCase(lambda:None)])
1203
 
        self.assertEqual(suite.countTestCases(), 2)
1204
 
        stream = StringIO()
1205
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1206
 
        # Need to use the CountingDecorator as that's what sets num_tests
1207
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1208
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1194
    def _patch_get_bzr_source_tree(self):
 
1195
        # Reading from the actual source tree breaks isolation, but we don't
 
1196
        # want to assume that thats *all* that would happen.
 
1197
        self._get_source_tree_calls = []
 
1198
        def new_get():
 
1199
            self._get_source_tree_calls.append("called")
 
1200
            return None
 
1201
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
 
1202
 
 
1203
    def test_bench_history(self):
 
1204
        # tests that the running the benchmark passes bench_history into
 
1205
        # the test result object. We can tell that happens if
 
1206
        # _get_bzr_source_tree is called.
 
1207
        self._patch_get_bzr_source_tree()
 
1208
        test = TestRunner('dummy_test')
 
1209
        output = StringIO()
 
1210
        runner = tests.TextTestRunner(stream=self._log_file,
 
1211
                                      bench_history=output)
 
1212
        result = self.run_test_runner(runner, test)
 
1213
        output_string = output.getvalue()
 
1214
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1215
        self.assertLength(1, self._get_source_tree_calls)
1209
1216
 
1210
1217
    def test_startTestRun(self):
1211
1218
        """run should call result.startTestRun()"""
1212
1219
        calls = []
1213
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1220
        class LoggingDecorator(tests.ForwardingResult):
1214
1221
            def startTestRun(self):
1215
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1222
                tests.ForwardingResult.startTestRun(self)
1216
1223
                calls.append('startTestRun')
1217
1224
        test = unittest.FunctionTestCase(lambda:None)
1218
1225
        stream = StringIO()
1224
1231
    def test_stopTestRun(self):
1225
1232
        """run should call result.stopTestRun()"""
1226
1233
        calls = []
1227
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1234
        class LoggingDecorator(tests.ForwardingResult):
1228
1235
            def stopTestRun(self):
1229
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1236
                tests.ForwardingResult.stopTestRun(self)
1230
1237
                calls.append('stopTestRun')
1231
1238
        test = unittest.FunctionTestCase(lambda:None)
1232
1239
        stream = StringIO()
1235
1242
        result = self.run_test_runner(runner, test)
1236
1243
        self.assertLength(1, calls)
1237
1244
 
1238
 
    def test_unicode_test_output_on_ascii_stream(self):
1239
 
        """Showing results should always succeed even on an ascii console"""
1240
 
        class FailureWithUnicode(tests.TestCase):
1241
 
            def test_log_unicode(self):
1242
 
                self.log(u"\u2606")
1243
 
                self.fail("Now print that log!")
1244
 
        out = StringIO()
1245
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1246
 
            lambda trace=False: "ascii")
1247
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1248
 
            FailureWithUnicode("test_log_unicode"))
1249
 
        self.assertContainsRe(out.getvalue(),
1250
 
            "Text attachment: log\n"
1251
 
            "-+\n"
1252
 
            "\d+\.\d+  \\\\u2606\n"
1253
 
            "-+\n")
1254
 
 
1255
1245
 
1256
1246
class SampleTestCase(tests.TestCase):
1257
1247
 
1432
1422
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1433
1423
        output_stream = StringIO()
1434
1424
        result = bzrlib.tests.VerboseTestResult(
1435
 
            output_stream,
 
1425
            unittest._WritelnDecorator(output_stream),
1436
1426
            descriptions=0,
1437
1427
            verbosity=2)
1438
1428
        sample_test.run(result)
1445
1435
        # Note this test won't fail with hooks that the core library doesn't
1446
1436
        # use - but it trigger with a plugin that adds hooks, so its still a
1447
1437
        # useful warning in that case.
1448
 
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1449
 
        self.assertEqual(
1450
 
            bzrlib.smart.server.SmartServerHooks(),
 
1438
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1439
            bzrlib.branch.Branch.hooks)
 
1440
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
1451
1441
            bzrlib.smart.server.SmartTCPServer.hooks)
1452
 
        self.assertEqual(
1453
 
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1442
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1443
            bzrlib.commands.Command.hooks)
1454
1444
 
1455
1445
    def test__gather_lsprof_in_benchmarks(self):
1456
1446
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1666
1656
        self.assertEqual('original', obj.test_attr)
1667
1657
 
1668
1658
 
1669
 
class _MissingFeature(tests.Feature):
1670
 
    def _probe(self):
1671
 
        return False
1672
 
missing_feature = _MissingFeature()
1673
 
 
1674
 
 
1675
 
def _get_test(name):
1676
 
    """Get an instance of a specific example test.
1677
 
 
1678
 
    We protect this in a function so that they don't auto-run in the test
1679
 
    suite.
1680
 
    """
1681
 
 
1682
 
    class ExampleTests(tests.TestCase):
1683
 
 
1684
 
        def test_fail(self):
1685
 
            mutter('this was a failing test')
1686
 
            self.fail('this test will fail')
1687
 
 
1688
 
        def test_error(self):
1689
 
            mutter('this test errored')
1690
 
            raise RuntimeError('gotcha')
1691
 
 
1692
 
        def test_missing_feature(self):
1693
 
            mutter('missing the feature')
1694
 
            self.requireFeature(missing_feature)
1695
 
 
1696
 
        def test_skip(self):
1697
 
            mutter('this test will be skipped')
1698
 
            raise tests.TestSkipped('reason')
1699
 
 
1700
 
        def test_success(self):
1701
 
            mutter('this test succeeds')
1702
 
 
1703
 
        def test_xfail(self):
1704
 
            mutter('test with expected failure')
1705
 
            self.knownFailure('this_fails')
1706
 
 
1707
 
        def test_unexpected_success(self):
1708
 
            mutter('test with unexpected success')
1709
 
            self.expectFailure('should_fail', lambda: None)
1710
 
 
1711
 
    return ExampleTests(name)
1712
 
 
1713
 
 
1714
 
class TestTestCaseLogDetails(tests.TestCase):
1715
 
 
1716
 
    def _run_test(self, test_name):
1717
 
        test = _get_test(test_name)
1718
 
        result = testtools.TestResult()
1719
 
        test.run(result)
1720
 
        return result
1721
 
 
1722
 
    def test_fail_has_log(self):
1723
 
        result = self._run_test('test_fail')
1724
 
        self.assertEqual(1, len(result.failures))
1725
 
        result_content = result.failures[0][1]
1726
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1727
 
        self.assertContainsRe(result_content, 'this was a failing test')
1728
 
 
1729
 
    def test_error_has_log(self):
1730
 
        result = self._run_test('test_error')
1731
 
        self.assertEqual(1, len(result.errors))
1732
 
        result_content = result.errors[0][1]
1733
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1734
 
        self.assertContainsRe(result_content, 'this test errored')
1735
 
 
1736
 
    def test_skip_has_no_log(self):
1737
 
        result = self._run_test('test_skip')
1738
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1739
 
        skips = result.skip_reasons['reason']
1740
 
        self.assertEqual(1, len(skips))
1741
 
        test = skips[0]
1742
 
        self.assertFalse('log' in test.getDetails())
1743
 
 
1744
 
    def test_missing_feature_has_no_log(self):
1745
 
        # testtools doesn't know about addNotSupported, so it just gets
1746
 
        # considered as a skip
1747
 
        result = self._run_test('test_missing_feature')
1748
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1749
 
        skips = result.skip_reasons[missing_feature]
1750
 
        self.assertEqual(1, len(skips))
1751
 
        test = skips[0]
1752
 
        self.assertFalse('log' in test.getDetails())
1753
 
 
1754
 
    def test_xfail_has_no_log(self):
1755
 
        result = self._run_test('test_xfail')
1756
 
        self.assertEqual(1, len(result.expectedFailures))
1757
 
        result_content = result.expectedFailures[0][1]
1758
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1759
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1760
 
 
1761
 
    def test_unexpected_success_has_log(self):
1762
 
        result = self._run_test('test_unexpected_success')
1763
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1764
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1765
 
        # expectedFailures is a list of reasons?
1766
 
        test = result.unexpectedSuccesses[0]
1767
 
        details = test.getDetails()
1768
 
        self.assertTrue('log' in details)
1769
 
 
1770
 
 
1771
 
class TestTestCloning(tests.TestCase):
1772
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1773
 
 
1774
 
    def test_cloned_testcase_does_not_share_details(self):
1775
 
        """A TestCase cloned with clone_test does not share mutable attributes
1776
 
        such as details or cleanups.
1777
 
        """
1778
 
        class Test(tests.TestCase):
1779
 
            def test_foo(self):
1780
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1781
 
        orig_test = Test('test_foo')
1782
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1783
 
        orig_test.run(unittest.TestResult())
1784
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1785
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1786
 
 
1787
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1788
 
        """Applying two levels of scenarios to a test preserves the attributes
1789
 
        added by both scenarios.
1790
 
        """
1791
 
        class Test(tests.TestCase):
1792
 
            def test_foo(self):
1793
 
                pass
1794
 
        test = Test('test_foo')
1795
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1796
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1797
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1798
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1799
 
        all_tests = list(tests.iter_suite_tests(suite))
1800
 
        self.assertLength(4, all_tests)
1801
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1802
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1803
 
 
1804
 
 
1805
1659
# NB: Don't delete this; it's not actually from 0.11!
1806
1660
@deprecated_function(deprecated_in((0, 11, 0)))
1807
1661
def sample_deprecated_function():
1934
1788
    def test_make_branch_and_tree_with_format(self):
1935
1789
        # we should be able to supply a format to make_branch_and_tree
1936
1790
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
 
1791
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1937
1792
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1938
1793
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1794
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1795
                              bzrlib.bzrdir.BzrDirFormat6)
1939
1796
 
1940
1797
    def test_make_branch_and_memory_tree(self):
1941
1798
        # we should be able to get a new branch and a mutable tree from
1958
1815
                tree.branch.repository.bzrdir.root_transport)
1959
1816
 
1960
1817
 
1961
 
class SelfTestHelper(object):
 
1818
class SelfTestHelper:
1962
1819
 
1963
1820
    def run_selftest(self, **kwargs):
1964
1821
        """Run selftest returning its output."""
2024
1881
            def __call__(test, result):
2025
1882
                test.run(result)
2026
1883
            def run(test, result):
2027
 
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
 
1884
                self.assertIsInstance(result, tests.ForwardingResult)
2028
1885
                calls.append("called")
2029
1886
            def countTestCases(self):
2030
1887
                return 1
2090
1947
 
2091
1948
    def test_transport_sftp(self):
2092
1949
        self.requireFeature(features.paramiko)
2093
 
        from bzrlib.tests import stub_sftp
2094
1950
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2095
1951
 
2096
1952
    def test_transport_memory(self):
2115
1971
            load_list='missing file name', list_only=True)
2116
1972
 
2117
1973
 
2118
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2119
 
 
2120
 
    _test_needs_features = [features.subunit]
2121
 
 
2122
 
    def run_subunit_stream(self, test_name):
2123
 
        from subunit import ProtocolTestCase
2124
 
        def factory():
2125
 
            return TestUtil.TestSuite([_get_test(test_name)])
2126
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2127
 
            test_suite_factory=factory)
2128
 
        test = ProtocolTestCase(stream)
2129
 
        result = testtools.TestResult()
2130
 
        test.run(result)
2131
 
        content = stream.getvalue()
2132
 
        return content, result
2133
 
 
2134
 
    def test_fail_has_log(self):
2135
 
        content, result = self.run_subunit_stream('test_fail')
2136
 
        self.assertEqual(1, len(result.failures))
2137
 
        self.assertContainsRe(content, '(?m)^log$')
2138
 
        self.assertContainsRe(content, 'this test will fail')
2139
 
 
2140
 
    def test_error_has_log(self):
2141
 
        content, result = self.run_subunit_stream('test_error')
2142
 
        self.assertContainsRe(content, '(?m)^log$')
2143
 
        self.assertContainsRe(content, 'this test errored')
2144
 
 
2145
 
    def test_skip_has_no_log(self):
2146
 
        content, result = self.run_subunit_stream('test_skip')
2147
 
        self.assertNotContainsRe(content, '(?m)^log$')
2148
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2149
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2150
 
        skips = result.skip_reasons['reason']
2151
 
        self.assertEqual(1, len(skips))
2152
 
        test = skips[0]
2153
 
        # RemotedTestCase doesn't preserve the "details"
2154
 
        ## self.assertFalse('log' in test.getDetails())
2155
 
 
2156
 
    def test_missing_feature_has_no_log(self):
2157
 
        content, result = self.run_subunit_stream('test_missing_feature')
2158
 
        self.assertNotContainsRe(content, '(?m)^log$')
2159
 
        self.assertNotContainsRe(content, 'missing the feature')
2160
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2161
 
        skips = result.skip_reasons['_MissingFeature\n']
2162
 
        self.assertEqual(1, len(skips))
2163
 
        test = skips[0]
2164
 
        # RemotedTestCase doesn't preserve the "details"
2165
 
        ## self.assertFalse('log' in test.getDetails())
2166
 
 
2167
 
    def test_xfail_has_no_log(self):
2168
 
        content, result = self.run_subunit_stream('test_xfail')
2169
 
        self.assertNotContainsRe(content, '(?m)^log$')
2170
 
        self.assertNotContainsRe(content, 'test with expected failure')
2171
 
        self.assertEqual(1, len(result.expectedFailures))
2172
 
        result_content = result.expectedFailures[0][1]
2173
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2174
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2175
 
 
2176
 
    def test_unexpected_success_has_log(self):
2177
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2178
 
        self.assertContainsRe(content, '(?m)^log$')
2179
 
        self.assertContainsRe(content, 'test with unexpected success')
2180
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2181
 
                           ' as a plain success',
2182
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2183
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2184
 
        test = result.unexpectedSuccesses[0]
2185
 
        # RemotedTestCase doesn't preserve the "details"
2186
 
        ## self.assertTrue('log' in test.getDetails())
2187
 
 
2188
 
    def test_success_has_no_log(self):
2189
 
        content, result = self.run_subunit_stream('test_success')
2190
 
        self.assertEqual(1, result.testsRun)
2191
 
        self.assertNotContainsRe(content, '(?m)^log$')
2192
 
        self.assertNotContainsRe(content, 'this test succeeds')
2193
 
 
2194
 
 
2195
1974
class TestRunBzr(tests.TestCase):
2196
1975
 
2197
1976
    out = ''
2320
2099
        # stdout and stderr of the invoked run_bzr
2321
2100
        current_factory = bzrlib.ui.ui_factory
2322
2101
        self.run_bzr(['foo'])
2323
 
        self.assertFalse(current_factory is self.factory)
 
2102
        self.failIf(current_factory is self.factory)
2324
2103
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2325
2104
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2326
2105
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2508
2287
        self.assertEqual([], command[2:])
2509
2288
 
2510
2289
    def test_set_env(self):
2511
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2290
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2512
2291
        # set in the child
2513
2292
        def check_environment():
2514
2293
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2520
2299
 
2521
2300
    def test_run_bzr_subprocess_env_del(self):
2522
2301
        """run_bzr_subprocess can remove environment variables too."""
2523
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2302
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2524
2303
        def check_environment():
2525
2304
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2526
2305
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2532
2311
        del os.environ['EXISTANT_ENV_VAR']
2533
2312
 
2534
2313
    def test_env_del_missing(self):
2535
 
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2314
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2536
2315
        def check_environment():
2537
2316
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2538
2317
        self.check_popen_state = check_environment
2560
2339
            os.chdir = orig_chdir
2561
2340
        self.assertEqual(['foo', 'current'], chdirs)
2562
2341
 
2563
 
    def test_get_bzr_path_with_cwd_bzrlib(self):
2564
 
        self.get_source_path = lambda: ""
2565
 
        self.overrideAttr(os.path, "isfile", lambda path: True)
2566
 
        self.assertEqual(self.get_bzr_path(), "bzr")
2567
 
 
2568
2342
 
2569
2343
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2570
2344
    """Tests that really need to do things with an external bzr."""
2813
2587
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2814
2588
 
2815
2589
 
2816
 
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2590
class TestCheckInventoryShape(tests.TestCaseWithTransport):
2817
2591
 
2818
 
    def test_check_tree_shape(self):
 
2592
    def test_check_inventory_shape(self):
2819
2593
        files = ['a', 'b/', 'b/c']
2820
2594
        tree = self.make_branch_and_tree('.')
2821
2595
        self.build_tree(files)
2822
2596
        tree.add(files)
2823
2597
        tree.lock_read()
2824
2598
        try:
2825
 
            self.check_tree_shape(tree, files)
 
2599
            self.check_inventory_shape(tree.inventory, files)
2826
2600
        finally:
2827
2601
            tree.unlock()
2828
2602
 
2999
2773
        # Test that a plausible list of modules to doctest is returned
3000
2774
        # by _test_suite_modules_to_doctest.
3001
2775
        test_list = tests._test_suite_modules_to_doctest()
3002
 
        if __doc__ is None:
3003
 
            # When docstrings are stripped, there are no modules to doctest
3004
 
            self.assertEqual([], test_list)
3005
 
            return
3006
2776
        self.assertSubset([
3007
2777
            'bzrlib.timestamp',
3008
2778
            ],
3025
2795
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3026
2796
        def doctests():
3027
2797
            calls.append("modules_to_doctest")
3028
 
            if __doc__ is None:
3029
 
                return []
3030
2798
            return ['bzrlib.timestamp']
3031
2799
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3032
2800
        expected_test_list = [
3035
2803
            ('bzrlib.tests.per_transport.TransportTests'
3036
2804
             '.test_abspath(LocalTransport,LocalURLServer)'),
3037
2805
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
 
2806
            # modules_to_doctest
 
2807
            'bzrlib.timestamp.format_highres_date',
3038
2808
            # plugins can't be tested that way since selftest may be run with
3039
2809
            # --no-plugins
3040
2810
            ]
3041
 
        if __doc__ is not None:
3042
 
            expected_test_list.extend([
3043
 
                # modules_to_doctest
3044
 
                'bzrlib.timestamp.format_highres_date',
3045
 
                ])
3046
2811
        suite = tests.test_suite()
3047
2812
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3048
2813
            set(calls))
3160
2925
        tpr.register('bar', 'bBB.aAA.rRR')
3161
2926
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3162
2927
        self.assertThat(self.get_log(),
3163
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3164
 
                           doctest.ELLIPSIS))
 
2928
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3165
2929
 
3166
2930
    def test_get_unknown_prefix(self):
3167
2931
        tpr = self._get_registry()
3187
2951
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3188
2952
 
3189
2953
 
3190
 
class TestThreadLeakDetection(tests.TestCase):
3191
 
    """Ensure when tests leak threads we detect and report it"""
3192
 
 
3193
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3194
 
        def __init__(self):
3195
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3196
 
            self.leaks = []
3197
 
        def _report_thread_leak(self, test, leaks, alive):
3198
 
            self.leaks.append((test, leaks))
3199
 
 
3200
 
    def test_testcase_without_addCleanups(self):
3201
 
        """Check old TestCase instances don't break with leak detection"""
3202
 
        class Test(unittest.TestCase):
3203
 
            def runTest(self):
3204
 
                pass
3205
 
        result = self.LeakRecordingResult()
3206
 
        test = Test()
3207
 
        result.startTestRun()
3208
 
        test.run(result)
3209
 
        result.stopTestRun()
3210
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3211
 
        self.assertEqual(result.leaks, [])
3212
 
        
3213
 
    def test_thread_leak(self):
3214
 
        """Ensure a thread that outlives the running of a test is reported
3215
 
 
3216
 
        Uses a thread that blocks on an event, and is started by the inner
3217
 
        test case. As the thread outlives the inner case's run, it should be
3218
 
        detected as a leak, but the event is then set so that the thread can
3219
 
        be safely joined in cleanup so it's not leaked for real.
3220
 
        """
3221
 
        event = threading.Event()
3222
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3223
 
        class Test(tests.TestCase):
3224
 
            def test_leak(self):
3225
 
                thread.start()
3226
 
        result = self.LeakRecordingResult()
3227
 
        test = Test("test_leak")
3228
 
        self.addCleanup(thread.join)
3229
 
        self.addCleanup(event.set)
3230
 
        result.startTestRun()
3231
 
        test.run(result)
3232
 
        result.stopTestRun()
3233
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3234
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3235
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3236
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3237
 
 
3238
 
    def test_multiple_leaks(self):
3239
 
        """Check multiple leaks are blamed on the test cases at fault
3240
 
 
3241
 
        Same concept as the previous test, but has one inner test method that
3242
 
        leaks two threads, and one that doesn't leak at all.
3243
 
        """
3244
 
        event = threading.Event()
3245
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3246
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3247
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3248
 
        class Test(tests.TestCase):
3249
 
            def test_first_leak(self):
3250
 
                thread_b.start()
3251
 
            def test_second_no_leak(self):
3252
 
                pass
3253
 
            def test_third_leak(self):
3254
 
                thread_c.start()
3255
 
                thread_a.start()
3256
 
        result = self.LeakRecordingResult()
3257
 
        first_test = Test("test_first_leak")
3258
 
        third_test = Test("test_third_leak")
3259
 
        self.addCleanup(thread_a.join)
3260
 
        self.addCleanup(thread_b.join)
3261
 
        self.addCleanup(thread_c.join)
3262
 
        self.addCleanup(event.set)
3263
 
        result.startTestRun()
3264
 
        unittest.TestSuite(
3265
 
            [first_test, Test("test_second_no_leak"), third_test]
3266
 
            ).run(result)
3267
 
        result.stopTestRun()
3268
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3269
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3270
 
        self.assertEqual(result.leaks, [
3271
 
            (first_test, set([thread_b])),
3272
 
            (third_test, set([thread_a, thread_c]))])
3273
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3274
 
 
3275
 
 
3276
 
class TestPostMortemDebugging(tests.TestCase):
3277
 
    """Check post mortem debugging works when tests fail or error"""
3278
 
 
3279
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3280
 
        def __init__(self):
3281
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3282
 
            self.postcode = None
3283
 
        def _post_mortem(self, tb=None):
3284
 
            """Record the code object at the end of the current traceback"""
3285
 
            tb = tb or sys.exc_info()[2]
3286
 
            if tb is not None:
3287
 
                next = tb.tb_next
3288
 
                while next is not None:
3289
 
                    tb = next
3290
 
                    next = next.tb_next
3291
 
                self.postcode = tb.tb_frame.f_code
3292
 
        def report_error(self, test, err):
3293
 
            pass
3294
 
        def report_failure(self, test, err):
3295
 
            pass
3296
 
 
3297
 
    def test_location_unittest_error(self):
3298
 
        """Needs right post mortem traceback with erroring unittest case"""
3299
 
        class Test(unittest.TestCase):
3300
 
            def runTest(self):
3301
 
                raise RuntimeError
3302
 
        result = self.TracebackRecordingResult()
3303
 
        Test().run(result)
3304
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3305
 
 
3306
 
    def test_location_unittest_failure(self):
3307
 
        """Needs right post mortem traceback with failing unittest case"""
3308
 
        class Test(unittest.TestCase):
3309
 
            def runTest(self):
3310
 
                raise self.failureException
3311
 
        result = self.TracebackRecordingResult()
3312
 
        Test().run(result)
3313
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3314
 
 
3315
 
    def test_location_bt_error(self):
3316
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3317
 
        class Test(tests.TestCase):
3318
 
            def test_error(self):
3319
 
                raise RuntimeError
3320
 
        result = self.TracebackRecordingResult()
3321
 
        Test("test_error").run(result)
3322
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3323
 
 
3324
 
    def test_location_bt_failure(self):
3325
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3326
 
        class Test(tests.TestCase):
3327
 
            def test_failure(self):
3328
 
                raise self.failureException
3329
 
        result = self.TracebackRecordingResult()
3330
 
        Test("test_failure").run(result)
3331
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3332
 
 
3333
 
    def test_env_var_triggers_post_mortem(self):
3334
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3335
 
        import pdb
3336
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3337
 
        post_mortem_calls = []
3338
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3339
 
        self.overrideEnv('BZR_TEST_PDB', None)
3340
 
        result._post_mortem(1)
3341
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3342
 
        result._post_mortem(2)
3343
 
        self.assertEqual([2], post_mortem_calls)
3344
 
 
3345
 
 
3346
2954
class TestRunSuite(tests.TestCase):
3347
2955
 
3348
2956
    def test_runner_class(self):
3359
2967
                                                self.verbosity)
3360
2968
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3361
2969
        self.assertLength(1, calls)
3362
 
 
3363
 
 
3364
 
class TestEnvironHandling(tests.TestCase):
3365
 
 
3366
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3367
 
        self.assertFalse('MYVAR' in os.environ)
3368
 
        self.overrideEnv('MYVAR', '42')
3369
 
        # We use an embedded test to make sure we fix the _captureVar bug
3370
 
        class Test(tests.TestCase):
3371
 
            def test_me(self):
3372
 
                # The first call save the 42 value
3373
 
                self.overrideEnv('MYVAR', None)
3374
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3375
 
                # Make sure we can call it twice
3376
 
                self.overrideEnv('MYVAR', None)
3377
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3378
 
        output = StringIO()
3379
 
        result = tests.TextTestResult(output, 0, 1)
3380
 
        Test('test_me').run(result)
3381
 
        if not result.wasStrictlySuccessful():
3382
 
            self.fail(output.getvalue())
3383
 
        # We get our value back
3384
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3385
 
 
3386
 
 
3387
 
class TestIsolatedEnv(tests.TestCase):
3388
 
    """Test isolating tests from os.environ.
3389
 
 
3390
 
    Since we use tests that are already isolated from os.environ a bit of care
3391
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3392
 
    The tests start an already clean os.environ which allow doing valid
3393
 
    assertions about which variables are present or not and design tests around
3394
 
    these assertions.
3395
 
    """
3396
 
 
3397
 
    class ScratchMonkey(tests.TestCase):
3398
 
 
3399
 
        def test_me(self):
3400
 
            pass
3401
 
 
3402
 
    def test_basics(self):
3403
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3404
 
        # for tests.TestCase.
3405
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3406
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3407
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3408
 
        self.assertFalse('BZR_HOME' in os.environ)
3409
 
        # Make sure we know the definition of LINES: part of os.environ for
3410
 
        # tests.TestCase
3411
 
        self.assertTrue('LINES' in tests.isolated_environ)
3412
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3413
 
        self.assertEquals('25', os.environ['LINES'])
3414
 
 
3415
 
    def test_injecting_unknown_variable(self):
3416
 
        # BZR_HOME is known to be absent from os.environ
3417
 
        test = self.ScratchMonkey('test_me')
3418
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3419
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3420
 
        tests.restore_os_environ(test)
3421
 
        self.assertFalse('BZR_HOME' in os.environ)
3422
 
 
3423
 
    def test_injecting_known_variable(self):
3424
 
        test = self.ScratchMonkey('test_me')
3425
 
        # LINES is known to be present in os.environ
3426
 
        tests.override_os_environ(test, {'LINES': '42'})
3427
 
        self.assertEquals('42', os.environ['LINES'])
3428
 
        tests.restore_os_environ(test)
3429
 
        self.assertEquals('25', os.environ['LINES'])
3430
 
 
3431
 
    def test_deleting_variable(self):
3432
 
        test = self.ScratchMonkey('test_me')
3433
 
        # LINES is known to be present in os.environ
3434
 
        tests.override_os_environ(test, {'LINES': None})
3435
 
        self.assertTrue('LINES' not in os.environ)
3436
 
        tests.restore_os_environ(test)
3437
 
        self.assertEquals('25', os.environ['LINES'])
3438
 
 
3439
 
 
3440
 
class TestDocTestSuiteIsolation(tests.TestCase):
3441
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3442
 
 
3443
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3444
 
    the clean environment as a base for testing. To precisely capture the
3445
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3446
 
    compare against.
3447
 
 
3448
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3449
 
    not `os.environ` so each test overrides it to suit its needs.
3450
 
 
3451
 
    """
3452
 
 
3453
 
    def get_doctest_suite_for_string(self, klass, string):
3454
 
        class Finder(doctest.DocTestFinder):
3455
 
 
3456
 
            def find(*args, **kwargs):
3457
 
                test = doctest.DocTestParser().get_doctest(
3458
 
                    string, {}, 'foo', 'foo.py', 0)
3459
 
                return [test]
3460
 
 
3461
 
        suite = klass(test_finder=Finder())
3462
 
        return suite
3463
 
 
3464
 
    def run_doctest_suite_for_string(self, klass, string):
3465
 
        suite = self.get_doctest_suite_for_string(klass, string)
3466
 
        output = StringIO()
3467
 
        result = tests.TextTestResult(output, 0, 1)
3468
 
        suite.run(result)
3469
 
        return result, output
3470
 
 
3471
 
    def assertDocTestStringSucceds(self, klass, string):
3472
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3473
 
        if not result.wasStrictlySuccessful():
3474
 
            self.fail(output.getvalue())
3475
 
 
3476
 
    def assertDocTestStringFails(self, klass, string):
3477
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3478
 
        if result.wasStrictlySuccessful():
3479
 
            self.fail(output.getvalue())
3480
 
 
3481
 
    def test_injected_variable(self):
3482
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3483
 
        test = """
3484
 
            >>> import os
3485
 
            >>> os.environ['LINES']
3486
 
            '42'
3487
 
            """
3488
 
        # doctest.DocTestSuite fails as it sees '25'
3489
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3490
 
        # tests.DocTestSuite sees '42'
3491
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3492
 
 
3493
 
    def test_deleted_variable(self):
3494
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3495
 
        test = """
3496
 
            >>> import os
3497
 
            >>> os.environ.get('LINES')
3498
 
            """
3499
 
        # doctest.DocTestSuite fails as it sees '25'
3500
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3501
 
        # tests.DocTestSuite sees None
3502
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)