~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: John Arbash Meinel
  • Date: 2010-05-11 10:45:26 UTC
  • mto: This revision was merged to the branch mainline in revision 5225.
  • Revision ID: john@arbash-meinel.com-20100511104526-zxnstcxta22hzw2n
Implement a compiled extension for parsing the text key out of a CHKInventory value.

Related to bug #562666. This seems to shave 5-10% out of the time spent doing a complete
branch of bzr.dev/launchpad/etc.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import doctest
 
20
from doctest import ELLIPSIS
21
21
import os
22
22
import signal
23
23
import sys
24
 
import threading
25
24
import time
26
25
import unittest
27
26
import warnings
28
27
 
29
 
from testtools import (
30
 
    ExtendedToOriginalDecorator,
31
 
    MultiTestResult,
32
 
    )
33
 
from testtools.content import Content
 
28
from testtools import MultiTestResult
34
29
from testtools.content_type import ContentType
35
30
from testtools.matchers import (
36
31
    DocTestMatches,
42
37
from bzrlib import (
43
38
    branchbuilder,
44
39
    bzrdir,
 
40
    debug,
45
41
    errors,
46
42
    lockdir,
47
43
    memorytree,
48
44
    osutils,
 
45
    progress,
49
46
    remote,
50
47
    repository,
51
48
    symbol_versioning,
52
49
    tests,
53
50
    transport,
54
51
    workingtree,
55
 
    workingtree_3,
56
 
    workingtree_4,
57
52
    )
58
53
from bzrlib.repofmt import (
59
54
    groupcompress_repo,
 
55
    pack_repo,
 
56
    weaverepo,
60
57
    )
61
58
from bzrlib.symbol_versioning import (
62
59
    deprecated_function,
67
64
    features,
68
65
    test_lsprof,
69
66
    test_server,
 
67
    test_sftp_transport,
70
68
    TestUtil,
71
69
    )
72
 
from bzrlib.trace import note, mutter
 
70
from bzrlib.trace import note
73
71
from bzrlib.transport import memory
 
72
from bzrlib.version import _get_bzr_source_tree
74
73
 
75
74
 
76
75
def _test_ids(test_suite):
78
77
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
79
78
 
80
79
 
 
80
class SelftestTests(tests.TestCase):
 
81
 
 
82
    def test_import_tests(self):
 
83
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
84
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
85
 
 
86
    def test_import_test_failure(self):
 
87
        self.assertRaises(ImportError,
 
88
                          TestUtil._load_module_by_name,
 
89
                          'bzrlib.no-name-yet')
 
90
 
 
91
 
81
92
class MetaTestLog(tests.TestCase):
82
93
 
83
94
    def test_logging(self):
89
100
            "text", "plain", {"charset": "utf8"})))
90
101
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
91
102
        self.assertThat(self.get_log(),
92
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
103
            DocTestMatches(u"...a test message\n", ELLIPSIS))
93
104
 
94
105
 
95
106
class TestUnicodeFilename(tests.TestCase):
108
119
 
109
120
        filename = u'hell\u00d8'
110
121
        self.build_tree_contents([(filename, 'contents of hello')])
111
 
        self.assertPathExists(filename)
112
 
 
113
 
 
114
 
class TestClassesAvailable(tests.TestCase):
115
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
116
 
 
117
 
    def test_test_case(self):
118
 
        from bzrlib.tests import TestCase
119
 
 
120
 
    def test_test_loader(self):
121
 
        from bzrlib.tests import TestLoader
122
 
 
123
 
    def test_test_suite(self):
124
 
        from bzrlib.tests import TestSuite
 
122
        self.failUnlessExists(filename)
125
123
 
126
124
 
127
125
class TestTransportScenarios(tests.TestCase):
210
208
    def test_scenarios(self):
211
209
        # check that constructor parameters are passed through to the adapted
212
210
        # test.
213
 
        from bzrlib.tests.per_controldir import make_scenarios
 
211
        from bzrlib.tests.per_bzrdir import make_scenarios
214
212
        vfs_factory = "v"
215
213
        server1 = "a"
216
214
        server2 = "b"
314
312
        from bzrlib.tests.per_interrepository import make_scenarios
315
313
        server1 = "a"
316
314
        server2 = "b"
317
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
315
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
318
316
        scenarios = make_scenarios(server1, server2, formats)
319
317
        self.assertEqual([
320
318
            ('C0,str,str',
321
319
             {'repository_format': 'C1',
322
320
              'repository_format_to': 'C2',
323
321
              'transport_readonly_server': 'b',
324
 
              'transport_server': 'a',
325
 
              'extra_setup': 'C3'}),
 
322
              'transport_server': 'a'}),
326
323
            ('D0,str,str',
327
324
             {'repository_format': 'D1',
328
325
              'repository_format_to': 'D2',
329
326
              'transport_readonly_server': 'b',
330
 
              'transport_server': 'a',
331
 
              'extra_setup': 'D3'})],
 
327
              'transport_server': 'a'})],
332
328
            scenarios)
333
329
 
334
330
 
340
336
        from bzrlib.tests.per_workingtree import make_scenarios
341
337
        server1 = "a"
342
338
        server2 = "b"
343
 
        formats = [workingtree_4.WorkingTreeFormat4(),
344
 
                   workingtree_3.WorkingTreeFormat3(),]
 
339
        formats = [workingtree.WorkingTreeFormat2(),
 
340
                   workingtree.WorkingTreeFormat3(),]
345
341
        scenarios = make_scenarios(server1, server2, formats)
346
342
        self.assertEqual([
347
 
            ('WorkingTreeFormat4',
 
343
            ('WorkingTreeFormat2',
348
344
             {'bzrdir_format': formats[0]._matchingbzrdir,
349
345
              'transport_readonly_server': 'b',
350
346
              'transport_server': 'a',
377
373
            )
378
374
        server1 = "a"
379
375
        server2 = "b"
380
 
        formats = [workingtree_4.WorkingTreeFormat4(),
381
 
                   workingtree_3.WorkingTreeFormat3(),]
 
376
        formats = [workingtree.WorkingTreeFormat2(),
 
377
                   workingtree.WorkingTreeFormat3(),]
382
378
        scenarios = make_scenarios(server1, server2, formats)
383
379
        self.assertEqual(7, len(scenarios))
384
 
        default_wt_format = workingtree.format_registry.get_default()
385
 
        wt4_format = workingtree_4.WorkingTreeFormat4()
386
 
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
380
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
381
        wt4_format = workingtree.WorkingTreeFormat4()
 
382
        wt5_format = workingtree.WorkingTreeFormat5()
387
383
        expected_scenarios = [
388
 
            ('WorkingTreeFormat4',
 
384
            ('WorkingTreeFormat2',
389
385
             {'bzrdir_format': formats[0]._matchingbzrdir,
390
386
              'transport_readonly_server': 'b',
391
387
              'transport_server': 'a',
451
447
        # ones to add.
452
448
        from bzrlib.tests.per_tree import (
453
449
            return_parameter,
 
450
            revision_tree_from_workingtree
454
451
            )
455
452
        from bzrlib.tests.per_intertree import (
456
453
            make_scenarios,
457
454
            )
458
 
        from bzrlib.workingtree_3 import WorkingTreeFormat3
459
 
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
455
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
460
456
        input_test = TestInterTreeScenarios(
461
457
            "test_scenarios")
462
458
        server1 = "a"
463
459
        server2 = "b"
464
 
        format1 = WorkingTreeFormat4()
 
460
        format1 = WorkingTreeFormat2()
465
461
        format2 = WorkingTreeFormat3()
466
462
        formats = [("1", str, format1, format2, "converter1"),
467
463
            ("2", int, format2, format1, "converter2")]
513
509
        self.assertRaises(AssertionError, self.assertEqualStat,
514
510
            os.lstat("foo"), os.lstat("longname"))
515
511
 
516
 
    def test_failUnlessExists(self):
517
 
        """Deprecated failUnlessExists and failIfExists"""
518
 
        self.applyDeprecated(
519
 
            deprecated_in((2, 4)),
520
 
            self.failUnlessExists, '.')
521
 
        self.build_tree(['foo/', 'foo/bar'])
522
 
        self.applyDeprecated(
523
 
            deprecated_in((2, 4)),
524
 
            self.failUnlessExists, 'foo/bar')
525
 
        self.applyDeprecated(
526
 
            deprecated_in((2, 4)),
527
 
            self.failIfExists, 'foo/foo')
528
 
 
529
 
    def test_assertPathExists(self):
530
 
        self.assertPathExists('.')
531
 
        self.build_tree(['foo/', 'foo/bar'])
532
 
        self.assertPathExists('foo/bar')
533
 
        self.assertPathDoesNotExist('foo/foo')
534
 
 
535
512
 
536
513
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
537
514
 
571
548
        tree = self.make_branch_and_memory_tree('dir')
572
549
        # Guard against regression into MemoryTransport leaking
573
550
        # files to disk instead of keeping them in memory.
574
 
        self.assertFalse(osutils.lexists('dir'))
 
551
        self.failIf(osutils.lexists('dir'))
575
552
        self.assertIsInstance(tree, memorytree.MemoryTree)
576
553
 
577
554
    def test_make_branch_and_memory_tree_with_format(self):
578
555
        """make_branch_and_memory_tree should accept a format option."""
579
556
        format = bzrdir.BzrDirMetaFormat1()
580
 
        format.repository_format = repository.format_registry.get_default()
 
557
        format.repository_format = weaverepo.RepositoryFormat7()
581
558
        tree = self.make_branch_and_memory_tree('dir', format=format)
582
559
        # Guard against regression into MemoryTransport leaking
583
560
        # files to disk instead of keeping them in memory.
584
 
        self.assertFalse(osutils.lexists('dir'))
 
561
        self.failIf(osutils.lexists('dir'))
585
562
        self.assertIsInstance(tree, memorytree.MemoryTree)
586
563
        self.assertEqual(format.repository_format.__class__,
587
564
            tree.branch.repository._format.__class__)
591
568
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
592
569
        # Guard against regression into MemoryTransport leaking
593
570
        # files to disk instead of keeping them in memory.
594
 
        self.assertFalse(osutils.lexists('dir'))
 
571
        self.failIf(osutils.lexists('dir'))
595
572
 
596
573
    def test_make_branch_builder_with_format(self):
597
574
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
598
575
        # that the format objects are used.
599
576
        format = bzrdir.BzrDirMetaFormat1()
600
 
        repo_format = repository.format_registry.get_default()
 
577
        repo_format = weaverepo.RepositoryFormat7()
601
578
        format.repository_format = repo_format
602
579
        builder = self.make_branch_builder('dir', format=format)
603
580
        the_branch = builder.get_branch()
604
581
        # Guard against regression into MemoryTransport leaking
605
582
        # files to disk instead of keeping them in memory.
606
 
        self.assertFalse(osutils.lexists('dir'))
 
583
        self.failIf(osutils.lexists('dir'))
607
584
        self.assertEqual(format.repository_format.__class__,
608
585
                         the_branch.repository._format.__class__)
609
586
        self.assertEqual(repo_format.get_format_string(),
615
592
        the_branch = builder.get_branch()
616
593
        # Guard against regression into MemoryTransport leaking
617
594
        # files to disk instead of keeping them in memory.
618
 
        self.assertFalse(osutils.lexists('dir'))
 
595
        self.failIf(osutils.lexists('dir'))
619
596
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
620
597
        self.assertEqual(dir_format.repository_format.__class__,
621
598
                         the_branch.repository._format.__class__)
632
609
                l.attempt_lock()
633
610
        test = TestDanglingLock('test_function')
634
611
        result = test.run()
635
 
        total_failures = result.errors + result.failures
636
612
        if self._lock_check_thorough:
637
 
            self.assertEqual(1, len(total_failures))
 
613
            self.assertEqual(1, len(result.errors))
638
614
        else:
639
615
            # When _lock_check_thorough is disabled, then we don't trigger a
640
616
            # failure
641
 
            self.assertEqual(0, len(total_failures))
 
617
            self.assertEqual(0, len(result.errors))
642
618
 
643
619
 
644
620
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
645
621
    """Tests for the convenience functions TestCaseWithTransport introduces."""
646
622
 
647
623
    def test_get_readonly_url_none(self):
 
624
        from bzrlib.transport import get_transport
648
625
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
649
626
        self.vfs_transport_factory = memory.MemoryServer
650
627
        self.transport_readonly_server = None
652
629
        # for the server
653
630
        url = self.get_readonly_url()
654
631
        url2 = self.get_readonly_url('foo/bar')
655
 
        t = transport.get_transport(url)
656
 
        t2 = transport.get_transport(url2)
657
 
        self.assertIsInstance(t, ReadonlyTransportDecorator)
658
 
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
632
        t = get_transport(url)
 
633
        t2 = get_transport(url2)
 
634
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
635
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
659
636
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
660
637
 
661
638
    def test_get_readonly_url_http(self):
662
639
        from bzrlib.tests.http_server import HttpServer
 
640
        from bzrlib.transport import get_transport
663
641
        from bzrlib.transport.http import HttpTransportBase
664
642
        self.transport_server = test_server.LocalURLServer
665
643
        self.transport_readonly_server = HttpServer
667
645
        url = self.get_readonly_url()
668
646
        url2 = self.get_readonly_url('foo/bar')
669
647
        # the transport returned may be any HttpTransportBase subclass
670
 
        t = transport.get_transport(url)
671
 
        t2 = transport.get_transport(url2)
672
 
        self.assertIsInstance(t, HttpTransportBase)
673
 
        self.assertIsInstance(t2, HttpTransportBase)
 
648
        t = get_transport(url)
 
649
        t2 = get_transport(url2)
 
650
        self.failUnless(isinstance(t, HttpTransportBase))
 
651
        self.failUnless(isinstance(t2, HttpTransportBase))
674
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
675
653
 
676
654
    def test_is_directory(self):
684
662
    def test_make_branch_builder(self):
685
663
        builder = self.make_branch_builder('dir')
686
664
        rev_id = builder.build_commit()
687
 
        self.assertPathExists('dir')
 
665
        self.failUnlessExists('dir')
688
666
        a_dir = bzrdir.BzrDir.open('dir')
689
667
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
690
668
        a_branch = a_dir.open_branch()
706
684
        self.assertIsInstance(result_bzrdir.transport,
707
685
                              memory.MemoryTransport)
708
686
        # should not be on disk, should only be in memory
709
 
        self.assertPathDoesNotExist('subdir')
 
687
        self.failIfExists('subdir')
710
688
 
711
689
 
712
690
class TestChrootedTest(tests.ChrootedTestCase):
713
691
 
714
692
    def test_root_is_root(self):
715
 
        t = transport.get_transport(self.get_readonly_url())
 
693
        from bzrlib.transport import get_transport
 
694
        t = get_transport(self.get_readonly_url())
716
695
        url = t.base
717
696
        self.assertEqual(url, t.clone('..').base)
718
697
 
771
750
        self.check_timing(ShortDelayTestCase('test_short_delay'),
772
751
                          r"^ +[0-9]+ms$")
773
752
 
 
753
    def _patch_get_bzr_source_tree(self):
 
754
        # Reading from the actual source tree breaks isolation, but we don't
 
755
        # want to assume that thats *all* that would happen.
 
756
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
 
757
 
 
758
    def test_assigned_benchmark_file_stores_date(self):
 
759
        self._patch_get_bzr_source_tree()
 
760
        output = StringIO()
 
761
        result = bzrlib.tests.TextTestResult(self._log_file,
 
762
                                        descriptions=0,
 
763
                                        verbosity=1,
 
764
                                        bench_history=output
 
765
                                        )
 
766
        output_string = output.getvalue()
 
767
        # if you are wondering about the regexp please read the comment in
 
768
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
769
        # XXX: what comment?  -- Andrew Bennetts
 
770
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
771
 
 
772
    def test_benchhistory_records_test_times(self):
 
773
        self._patch_get_bzr_source_tree()
 
774
        result_stream = StringIO()
 
775
        result = bzrlib.tests.TextTestResult(
 
776
            self._log_file,
 
777
            descriptions=0,
 
778
            verbosity=1,
 
779
            bench_history=result_stream
 
780
            )
 
781
 
 
782
        # we want profile a call and check that its test duration is recorded
 
783
        # make a new test instance that when run will generate a benchmark
 
784
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
785
        # execute the test, which should succeed and record times
 
786
        example_test_case.run(result)
 
787
        lines = result_stream.getvalue().splitlines()
 
788
        self.assertEqual(2, len(lines))
 
789
        self.assertContainsRe(lines[1],
 
790
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
791
            "._time_hello_world_encoding")
 
792
 
774
793
    def _time_hello_world_encoding(self):
775
794
        """Profile two sleep calls
776
795
 
784
803
        self.requireFeature(test_lsprof.LSProfFeature)
785
804
        result_stream = StringIO()
786
805
        result = bzrlib.tests.VerboseTestResult(
787
 
            result_stream,
 
806
            unittest._WritelnDecorator(result_stream),
788
807
            descriptions=0,
789
808
            verbosity=2,
790
809
            )
816
835
        self.assertContainsRe(output,
817
836
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
818
837
 
819
 
    def test_uses_time_from_testtools(self):
820
 
        """Test case timings in verbose results should use testtools times"""
821
 
        import datetime
822
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
823
 
            def startTest(self, test):
824
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
825
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
826
 
            def addSuccess(self, test):
827
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
828
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
829
 
            def report_tests_starting(self): pass
830
 
        sio = StringIO()
831
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
832
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
833
 
 
834
838
    def test_known_failure(self):
835
839
        """A KnownFailure being raised should trigger several result actions."""
836
840
        class InstrumentedTestResult(tests.ExtendedTestResult):
837
841
            def stopTestRun(self): pass
838
 
            def report_tests_starting(self): pass
 
842
            def startTests(self): pass
 
843
            def report_test_start(self, test): pass
839
844
            def report_known_failure(self, test, err=None, details=None):
840
845
                self._call = test, 'known failure'
841
846
        result = InstrumentedTestResult(None, None, None, None)
859
864
        # verbose test output formatting
860
865
        result_stream = StringIO()
861
866
        result = bzrlib.tests.VerboseTestResult(
862
 
            result_stream,
 
867
            unittest._WritelnDecorator(result_stream),
863
868
            descriptions=0,
864
869
            verbosity=2,
865
870
            )
875
880
        output = result_stream.getvalue()[prefix:]
876
881
        lines = output.splitlines()
877
882
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
878
 
        if sys.version_info > (2, 7):
879
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
880
 
                self.assertNotEqual, lines[1], '    ')
881
883
        self.assertEqual(lines[1], '    foo')
882
884
        self.assertEqual(2, len(lines))
883
885
 
891
893
        """Test the behaviour of invoking addNotSupported."""
892
894
        class InstrumentedTestResult(tests.ExtendedTestResult):
893
895
            def stopTestRun(self): pass
894
 
            def report_tests_starting(self): pass
 
896
            def startTests(self): pass
 
897
            def report_test_start(self, test): pass
895
898
            def report_unsupported(self, test, feature):
896
899
                self._call = test, feature
897
900
        result = InstrumentedTestResult(None, None, None, None)
916
919
        # verbose test output formatting
917
920
        result_stream = StringIO()
918
921
        result = bzrlib.tests.VerboseTestResult(
919
 
            result_stream,
 
922
            unittest._WritelnDecorator(result_stream),
920
923
            descriptions=0,
921
924
            verbosity=2,
922
925
            )
936
939
        """An UnavailableFeature being raised should invoke addNotSupported."""
937
940
        class InstrumentedTestResult(tests.ExtendedTestResult):
938
941
            def stopTestRun(self): pass
939
 
            def report_tests_starting(self): pass
 
942
            def startTests(self): pass
 
943
            def report_test_start(self, test): pass
940
944
            def addNotSupported(self, test, feature):
941
945
                self._call = test, feature
942
946
        result = InstrumentedTestResult(None, None, None, None)
984
988
        class InstrumentedTestResult(tests.ExtendedTestResult):
985
989
            calls = 0
986
990
            def startTests(self): self.calls += 1
 
991
            def report_test_start(self, test): pass
987
992
        result = InstrumentedTestResult(None, None, None, None)
988
993
        def test_function():
989
994
            pass
991
996
        test.run(result)
992
997
        self.assertEquals(1, result.calls)
993
998
 
994
 
    def test_startTests_only_once(self):
995
 
        """With multiple tests startTests should still only be called once"""
996
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
997
 
            calls = 0
998
 
            def startTests(self): self.calls += 1
999
 
        result = InstrumentedTestResult(None, None, None, None)
1000
 
        suite = unittest.TestSuite([
1001
 
            unittest.FunctionTestCase(lambda: None),
1002
 
            unittest.FunctionTestCase(lambda: None)])
1003
 
        suite.run(result)
1004
 
        self.assertEquals(1, result.calls)
1005
 
        self.assertEquals(2, result.count)
1006
 
 
1007
999
 
1008
1000
class TestUnicodeFilenameFeature(tests.TestCase):
1009
1001
 
1030
1022
        because of our use of global state.
1031
1023
        """
1032
1024
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1025
        old_leak = tests.TestCase._first_thread_leaker_id
1033
1026
        try:
1034
1027
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1028
            tests.TestCase._first_thread_leaker_id = None
1035
1029
            return testrunner.run(test)
1036
1030
        finally:
1037
1031
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1032
            tests.TestCase._first_thread_leaker_id = old_leak
1038
1033
 
1039
1034
    def test_known_failure_failed_run(self):
1040
1035
        # run a test that generates a known failure which should be printed in
1086
1081
    def test_result_decorator(self):
1087
1082
        # decorate results
1088
1083
        calls = []
1089
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1084
        class LoggingDecorator(tests.ForwardingResult):
1090
1085
            def startTest(self, test):
1091
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1086
                tests.ForwardingResult.startTest(self, test)
1092
1087
                calls.append('start')
1093
1088
        test = unittest.FunctionTestCase(lambda:None)
1094
1089
        stream = StringIO()
1195
1190
            ],
1196
1191
            lines[-3:])
1197
1192
 
1198
 
    def test_verbose_test_count(self):
1199
 
        """A verbose test run reports the right test count at the start"""
1200
 
        suite = TestUtil.TestSuite([
1201
 
            unittest.FunctionTestCase(lambda:None),
1202
 
            unittest.FunctionTestCase(lambda:None)])
1203
 
        self.assertEqual(suite.countTestCases(), 2)
1204
 
        stream = StringIO()
1205
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1206
 
        # Need to use the CountingDecorator as that's what sets num_tests
1207
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1208
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1193
    def _patch_get_bzr_source_tree(self):
 
1194
        # Reading from the actual source tree breaks isolation, but we don't
 
1195
        # want to assume that thats *all* that would happen.
 
1196
        self._get_source_tree_calls = []
 
1197
        def new_get():
 
1198
            self._get_source_tree_calls.append("called")
 
1199
            return None
 
1200
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
 
1201
 
 
1202
    def test_bench_history(self):
 
1203
        # tests that the running the benchmark passes bench_history into
 
1204
        # the test result object. We can tell that happens if
 
1205
        # _get_bzr_source_tree is called.
 
1206
        self._patch_get_bzr_source_tree()
 
1207
        test = TestRunner('dummy_test')
 
1208
        output = StringIO()
 
1209
        runner = tests.TextTestRunner(stream=self._log_file,
 
1210
                                      bench_history=output)
 
1211
        result = self.run_test_runner(runner, test)
 
1212
        output_string = output.getvalue()
 
1213
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1214
        self.assertLength(1, self._get_source_tree_calls)
1209
1215
 
1210
1216
    def test_startTestRun(self):
1211
1217
        """run should call result.startTestRun()"""
1212
1218
        calls = []
1213
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1219
        class LoggingDecorator(tests.ForwardingResult):
1214
1220
            def startTestRun(self):
1215
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1221
                tests.ForwardingResult.startTestRun(self)
1216
1222
                calls.append('startTestRun')
1217
1223
        test = unittest.FunctionTestCase(lambda:None)
1218
1224
        stream = StringIO()
1224
1230
    def test_stopTestRun(self):
1225
1231
        """run should call result.stopTestRun()"""
1226
1232
        calls = []
1227
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1233
        class LoggingDecorator(tests.ForwardingResult):
1228
1234
            def stopTestRun(self):
1229
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1235
                tests.ForwardingResult.stopTestRun(self)
1230
1236
                calls.append('stopTestRun')
1231
1237
        test = unittest.FunctionTestCase(lambda:None)
1232
1238
        stream = StringIO()
1235
1241
        result = self.run_test_runner(runner, test)
1236
1242
        self.assertLength(1, calls)
1237
1243
 
1238
 
    def test_unicode_test_output_on_ascii_stream(self):
1239
 
        """Showing results should always succeed even on an ascii console"""
1240
 
        class FailureWithUnicode(tests.TestCase):
1241
 
            def test_log_unicode(self):
1242
 
                self.log(u"\u2606")
1243
 
                self.fail("Now print that log!")
1244
 
        out = StringIO()
1245
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1246
 
            lambda trace=False: "ascii")
1247
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1248
 
            FailureWithUnicode("test_log_unicode"))
1249
 
        self.assertContainsRe(out.getvalue(),
1250
 
            "Text attachment: log\n"
1251
 
            "-+\n"
1252
 
            "\d+\.\d+  \\\\u2606\n"
1253
 
            "-+\n")
1254
 
 
1255
1244
 
1256
1245
class SampleTestCase(tests.TestCase):
1257
1246
 
1432
1421
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1433
1422
        output_stream = StringIO()
1434
1423
        result = bzrlib.tests.VerboseTestResult(
1435
 
            output_stream,
 
1424
            unittest._WritelnDecorator(output_stream),
1436
1425
            descriptions=0,
1437
1426
            verbosity=2)
1438
1427
        sample_test.run(result)
1445
1434
        # Note this test won't fail with hooks that the core library doesn't
1446
1435
        # use - but it trigger with a plugin that adds hooks, so its still a
1447
1436
        # useful warning in that case.
1448
 
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1449
 
        self.assertEqual(
1450
 
            bzrlib.smart.server.SmartServerHooks(),
 
1437
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1438
            bzrlib.branch.Branch.hooks)
 
1439
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
1451
1440
            bzrlib.smart.server.SmartTCPServer.hooks)
1452
 
        self.assertEqual(
1453
 
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1441
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1442
            bzrlib.commands.Command.hooks)
1454
1443
 
1455
1444
    def test__gather_lsprof_in_benchmarks(self):
1456
1445
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1666
1655
        self.assertEqual('original', obj.test_attr)
1667
1656
 
1668
1657
 
1669
 
class _MissingFeature(tests.Feature):
1670
 
    def _probe(self):
1671
 
        return False
1672
 
missing_feature = _MissingFeature()
1673
 
 
1674
 
 
1675
 
def _get_test(name):
1676
 
    """Get an instance of a specific example test.
1677
 
 
1678
 
    We protect this in a function so that they don't auto-run in the test
1679
 
    suite.
1680
 
    """
1681
 
 
1682
 
    class ExampleTests(tests.TestCase):
1683
 
 
1684
 
        def test_fail(self):
1685
 
            mutter('this was a failing test')
1686
 
            self.fail('this test will fail')
1687
 
 
1688
 
        def test_error(self):
1689
 
            mutter('this test errored')
1690
 
            raise RuntimeError('gotcha')
1691
 
 
1692
 
        def test_missing_feature(self):
1693
 
            mutter('missing the feature')
1694
 
            self.requireFeature(missing_feature)
1695
 
 
1696
 
        def test_skip(self):
1697
 
            mutter('this test will be skipped')
1698
 
            raise tests.TestSkipped('reason')
1699
 
 
1700
 
        def test_success(self):
1701
 
            mutter('this test succeeds')
1702
 
 
1703
 
        def test_xfail(self):
1704
 
            mutter('test with expected failure')
1705
 
            self.knownFailure('this_fails')
1706
 
 
1707
 
        def test_unexpected_success(self):
1708
 
            mutter('test with unexpected success')
1709
 
            self.expectFailure('should_fail', lambda: None)
1710
 
 
1711
 
    return ExampleTests(name)
1712
 
 
1713
 
 
1714
 
class TestTestCaseLogDetails(tests.TestCase):
1715
 
 
1716
 
    def _run_test(self, test_name):
1717
 
        test = _get_test(test_name)
1718
 
        result = testtools.TestResult()
1719
 
        test.run(result)
1720
 
        return result
1721
 
 
1722
 
    def test_fail_has_log(self):
1723
 
        result = self._run_test('test_fail')
1724
 
        self.assertEqual(1, len(result.failures))
1725
 
        result_content = result.failures[0][1]
1726
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1727
 
        self.assertContainsRe(result_content, 'this was a failing test')
1728
 
 
1729
 
    def test_error_has_log(self):
1730
 
        result = self._run_test('test_error')
1731
 
        self.assertEqual(1, len(result.errors))
1732
 
        result_content = result.errors[0][1]
1733
 
        self.assertContainsRe(result_content, 'Text attachment: log')
1734
 
        self.assertContainsRe(result_content, 'this test errored')
1735
 
 
1736
 
    def test_skip_has_no_log(self):
1737
 
        result = self._run_test('test_skip')
1738
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1739
 
        skips = result.skip_reasons['reason']
1740
 
        self.assertEqual(1, len(skips))
1741
 
        test = skips[0]
1742
 
        self.assertFalse('log' in test.getDetails())
1743
 
 
1744
 
    def test_missing_feature_has_no_log(self):
1745
 
        # testtools doesn't know about addNotSupported, so it just gets
1746
 
        # considered as a skip
1747
 
        result = self._run_test('test_missing_feature')
1748
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1749
 
        skips = result.skip_reasons[missing_feature]
1750
 
        self.assertEqual(1, len(skips))
1751
 
        test = skips[0]
1752
 
        self.assertFalse('log' in test.getDetails())
1753
 
 
1754
 
    def test_xfail_has_no_log(self):
1755
 
        result = self._run_test('test_xfail')
1756
 
        self.assertEqual(1, len(result.expectedFailures))
1757
 
        result_content = result.expectedFailures[0][1]
1758
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1759
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1760
 
 
1761
 
    def test_unexpected_success_has_log(self):
1762
 
        result = self._run_test('test_unexpected_success')
1763
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1764
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1765
 
        # expectedFailures is a list of reasons?
1766
 
        test = result.unexpectedSuccesses[0]
1767
 
        details = test.getDetails()
1768
 
        self.assertTrue('log' in details)
1769
 
 
1770
 
 
1771
 
class TestTestCloning(tests.TestCase):
1772
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1773
 
 
1774
 
    def test_cloned_testcase_does_not_share_details(self):
1775
 
        """A TestCase cloned with clone_test does not share mutable attributes
1776
 
        such as details or cleanups.
1777
 
        """
1778
 
        class Test(tests.TestCase):
1779
 
            def test_foo(self):
1780
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1781
 
        orig_test = Test('test_foo')
1782
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1783
 
        orig_test.run(unittest.TestResult())
1784
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1785
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1786
 
 
1787
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1788
 
        """Applying two levels of scenarios to a test preserves the attributes
1789
 
        added by both scenarios.
1790
 
        """
1791
 
        class Test(tests.TestCase):
1792
 
            def test_foo(self):
1793
 
                pass
1794
 
        test = Test('test_foo')
1795
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1796
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1797
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1798
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1799
 
        all_tests = list(tests.iter_suite_tests(suite))
1800
 
        self.assertLength(4, all_tests)
1801
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1802
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1803
 
 
1804
 
 
1805
1658
# NB: Don't delete this; it's not actually from 0.11!
1806
1659
@deprecated_function(deprecated_in((0, 11, 0)))
1807
1660
def sample_deprecated_function():
1934
1787
    def test_make_branch_and_tree_with_format(self):
1935
1788
        # we should be able to supply a format to make_branch_and_tree
1936
1789
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
 
1790
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1937
1791
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1938
1792
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1793
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1794
                              bzrlib.bzrdir.BzrDirFormat6)
1939
1795
 
1940
1796
    def test_make_branch_and_memory_tree(self):
1941
1797
        # we should be able to get a new branch and a mutable tree from
1958
1814
                tree.branch.repository.bzrdir.root_transport)
1959
1815
 
1960
1816
 
1961
 
class SelfTestHelper(object):
 
1817
class SelfTestHelper:
1962
1818
 
1963
1819
    def run_selftest(self, **kwargs):
1964
1820
        """Run selftest returning its output."""
2024
1880
            def __call__(test, result):
2025
1881
                test.run(result)
2026
1882
            def run(test, result):
2027
 
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
 
1883
                self.assertIsInstance(result, tests.ForwardingResult)
2028
1884
                calls.append("called")
2029
1885
            def countTestCases(self):
2030
1886
                return 1
2115
1971
            load_list='missing file name', list_only=True)
2116
1972
 
2117
1973
 
2118
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2119
 
 
2120
 
    _test_needs_features = [features.subunit]
2121
 
 
2122
 
    def run_subunit_stream(self, test_name):
2123
 
        from subunit import ProtocolTestCase
2124
 
        def factory():
2125
 
            return TestUtil.TestSuite([_get_test(test_name)])
2126
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2127
 
            test_suite_factory=factory)
2128
 
        test = ProtocolTestCase(stream)
2129
 
        result = testtools.TestResult()
2130
 
        test.run(result)
2131
 
        content = stream.getvalue()
2132
 
        return content, result
2133
 
 
2134
 
    def test_fail_has_log(self):
2135
 
        content, result = self.run_subunit_stream('test_fail')
2136
 
        self.assertEqual(1, len(result.failures))
2137
 
        self.assertContainsRe(content, '(?m)^log$')
2138
 
        self.assertContainsRe(content, 'this test will fail')
2139
 
 
2140
 
    def test_error_has_log(self):
2141
 
        content, result = self.run_subunit_stream('test_error')
2142
 
        self.assertContainsRe(content, '(?m)^log$')
2143
 
        self.assertContainsRe(content, 'this test errored')
2144
 
 
2145
 
    def test_skip_has_no_log(self):
2146
 
        content, result = self.run_subunit_stream('test_skip')
2147
 
        self.assertNotContainsRe(content, '(?m)^log$')
2148
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2149
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2150
 
        skips = result.skip_reasons['reason']
2151
 
        self.assertEqual(1, len(skips))
2152
 
        test = skips[0]
2153
 
        # RemotedTestCase doesn't preserve the "details"
2154
 
        ## self.assertFalse('log' in test.getDetails())
2155
 
 
2156
 
    def test_missing_feature_has_no_log(self):
2157
 
        content, result = self.run_subunit_stream('test_missing_feature')
2158
 
        self.assertNotContainsRe(content, '(?m)^log$')
2159
 
        self.assertNotContainsRe(content, 'missing the feature')
2160
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2161
 
        skips = result.skip_reasons['_MissingFeature\n']
2162
 
        self.assertEqual(1, len(skips))
2163
 
        test = skips[0]
2164
 
        # RemotedTestCase doesn't preserve the "details"
2165
 
        ## self.assertFalse('log' in test.getDetails())
2166
 
 
2167
 
    def test_xfail_has_no_log(self):
2168
 
        content, result = self.run_subunit_stream('test_xfail')
2169
 
        self.assertNotContainsRe(content, '(?m)^log$')
2170
 
        self.assertNotContainsRe(content, 'test with expected failure')
2171
 
        self.assertEqual(1, len(result.expectedFailures))
2172
 
        result_content = result.expectedFailures[0][1]
2173
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2174
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2175
 
 
2176
 
    def test_unexpected_success_has_log(self):
2177
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2178
 
        self.assertContainsRe(content, '(?m)^log$')
2179
 
        self.assertContainsRe(content, 'test with unexpected success')
2180
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2181
 
                           ' as a plain success',
2182
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2183
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2184
 
        test = result.unexpectedSuccesses[0]
2185
 
        # RemotedTestCase doesn't preserve the "details"
2186
 
        ## self.assertTrue('log' in test.getDetails())
2187
 
 
2188
 
    def test_success_has_no_log(self):
2189
 
        content, result = self.run_subunit_stream('test_success')
2190
 
        self.assertEqual(1, result.testsRun)
2191
 
        self.assertNotContainsRe(content, '(?m)^log$')
2192
 
        self.assertNotContainsRe(content, 'this test succeeds')
2193
 
 
2194
 
 
2195
1974
class TestRunBzr(tests.TestCase):
2196
1975
 
2197
1976
    out = ''
2320
2099
        # stdout and stderr of the invoked run_bzr
2321
2100
        current_factory = bzrlib.ui.ui_factory
2322
2101
        self.run_bzr(['foo'])
2323
 
        self.assertFalse(current_factory is self.factory)
 
2102
        self.failIf(current_factory is self.factory)
2324
2103
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2325
2104
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2326
2105
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2508
2287
        self.assertEqual([], command[2:])
2509
2288
 
2510
2289
    def test_set_env(self):
2511
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2290
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2512
2291
        # set in the child
2513
2292
        def check_environment():
2514
2293
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2520
2299
 
2521
2300
    def test_run_bzr_subprocess_env_del(self):
2522
2301
        """run_bzr_subprocess can remove environment variables too."""
2523
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2302
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2524
2303
        def check_environment():
2525
2304
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2526
2305
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2532
2311
        del os.environ['EXISTANT_ENV_VAR']
2533
2312
 
2534
2313
    def test_env_del_missing(self):
2535
 
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2314
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2536
2315
        def check_environment():
2537
2316
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2538
2317
        self.check_popen_state = check_environment
2560
2339
            os.chdir = orig_chdir
2561
2340
        self.assertEqual(['foo', 'current'], chdirs)
2562
2341
 
2563
 
    def test_get_bzr_path_with_cwd_bzrlib(self):
2564
 
        self.get_source_path = lambda: ""
2565
 
        self.overrideAttr(os.path, "isfile", lambda path: True)
2566
 
        self.assertEqual(self.get_bzr_path(), "bzr")
2567
 
 
2568
2342
 
2569
2343
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2570
2344
    """Tests that really need to do things with an external bzr."""
2813
2587
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2814
2588
 
2815
2589
 
2816
 
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2590
class TestCheckInventoryShape(tests.TestCaseWithTransport):
2817
2591
 
2818
 
    def test_check_tree_shape(self):
 
2592
    def test_check_inventory_shape(self):
2819
2593
        files = ['a', 'b/', 'b/c']
2820
2594
        tree = self.make_branch_and_tree('.')
2821
2595
        self.build_tree(files)
2822
2596
        tree.add(files)
2823
2597
        tree.lock_read()
2824
2598
        try:
2825
 
            self.check_tree_shape(tree, files)
 
2599
            self.check_inventory_shape(tree.inventory, files)
2826
2600
        finally:
2827
2601
            tree.unlock()
2828
2602
 
3160
2934
        tpr.register('bar', 'bBB.aAA.rRR')
3161
2935
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3162
2936
        self.assertThat(self.get_log(),
3163
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3164
 
                           doctest.ELLIPSIS))
 
2937
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3165
2938
 
3166
2939
    def test_get_unknown_prefix(self):
3167
2940
        tpr = self._get_registry()
3187
2960
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3188
2961
 
3189
2962
 
3190
 
class TestThreadLeakDetection(tests.TestCase):
3191
 
    """Ensure when tests leak threads we detect and report it"""
3192
 
 
3193
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3194
 
        def __init__(self):
3195
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3196
 
            self.leaks = []
3197
 
        def _report_thread_leak(self, test, leaks, alive):
3198
 
            self.leaks.append((test, leaks))
3199
 
 
3200
 
    def test_testcase_without_addCleanups(self):
3201
 
        """Check old TestCase instances don't break with leak detection"""
3202
 
        class Test(unittest.TestCase):
3203
 
            def runTest(self):
3204
 
                pass
3205
 
        result = self.LeakRecordingResult()
3206
 
        test = Test()
3207
 
        result.startTestRun()
3208
 
        test.run(result)
3209
 
        result.stopTestRun()
3210
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3211
 
        self.assertEqual(result.leaks, [])
3212
 
        
3213
 
    def test_thread_leak(self):
3214
 
        """Ensure a thread that outlives the running of a test is reported
3215
 
 
3216
 
        Uses a thread that blocks on an event, and is started by the inner
3217
 
        test case. As the thread outlives the inner case's run, it should be
3218
 
        detected as a leak, but the event is then set so that the thread can
3219
 
        be safely joined in cleanup so it's not leaked for real.
3220
 
        """
3221
 
        event = threading.Event()
3222
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3223
 
        class Test(tests.TestCase):
3224
 
            def test_leak(self):
3225
 
                thread.start()
3226
 
        result = self.LeakRecordingResult()
3227
 
        test = Test("test_leak")
3228
 
        self.addCleanup(thread.join)
3229
 
        self.addCleanup(event.set)
3230
 
        result.startTestRun()
3231
 
        test.run(result)
3232
 
        result.stopTestRun()
3233
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3234
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3235
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3236
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3237
 
 
3238
 
    def test_multiple_leaks(self):
3239
 
        """Check multiple leaks are blamed on the test cases at fault
3240
 
 
3241
 
        Same concept as the previous test, but has one inner test method that
3242
 
        leaks two threads, and one that doesn't leak at all.
3243
 
        """
3244
 
        event = threading.Event()
3245
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3246
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3247
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3248
 
        class Test(tests.TestCase):
3249
 
            def test_first_leak(self):
3250
 
                thread_b.start()
3251
 
            def test_second_no_leak(self):
3252
 
                pass
3253
 
            def test_third_leak(self):
3254
 
                thread_c.start()
3255
 
                thread_a.start()
3256
 
        result = self.LeakRecordingResult()
3257
 
        first_test = Test("test_first_leak")
3258
 
        third_test = Test("test_third_leak")
3259
 
        self.addCleanup(thread_a.join)
3260
 
        self.addCleanup(thread_b.join)
3261
 
        self.addCleanup(thread_c.join)
3262
 
        self.addCleanup(event.set)
3263
 
        result.startTestRun()
3264
 
        unittest.TestSuite(
3265
 
            [first_test, Test("test_second_no_leak"), third_test]
3266
 
            ).run(result)
3267
 
        result.stopTestRun()
3268
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3269
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3270
 
        self.assertEqual(result.leaks, [
3271
 
            (first_test, set([thread_b])),
3272
 
            (third_test, set([thread_a, thread_c]))])
3273
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3274
 
 
3275
 
 
3276
 
class TestPostMortemDebugging(tests.TestCase):
3277
 
    """Check post mortem debugging works when tests fail or error"""
3278
 
 
3279
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3280
 
        def __init__(self):
3281
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3282
 
            self.postcode = None
3283
 
        def _post_mortem(self, tb=None):
3284
 
            """Record the code object at the end of the current traceback"""
3285
 
            tb = tb or sys.exc_info()[2]
3286
 
            if tb is not None:
3287
 
                next = tb.tb_next
3288
 
                while next is not None:
3289
 
                    tb = next
3290
 
                    next = next.tb_next
3291
 
                self.postcode = tb.tb_frame.f_code
3292
 
        def report_error(self, test, err):
3293
 
            pass
3294
 
        def report_failure(self, test, err):
3295
 
            pass
3296
 
 
3297
 
    def test_location_unittest_error(self):
3298
 
        """Needs right post mortem traceback with erroring unittest case"""
3299
 
        class Test(unittest.TestCase):
3300
 
            def runTest(self):
3301
 
                raise RuntimeError
3302
 
        result = self.TracebackRecordingResult()
3303
 
        Test().run(result)
3304
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3305
 
 
3306
 
    def test_location_unittest_failure(self):
3307
 
        """Needs right post mortem traceback with failing unittest case"""
3308
 
        class Test(unittest.TestCase):
3309
 
            def runTest(self):
3310
 
                raise self.failureException
3311
 
        result = self.TracebackRecordingResult()
3312
 
        Test().run(result)
3313
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3314
 
 
3315
 
    def test_location_bt_error(self):
3316
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3317
 
        class Test(tests.TestCase):
3318
 
            def test_error(self):
3319
 
                raise RuntimeError
3320
 
        result = self.TracebackRecordingResult()
3321
 
        Test("test_error").run(result)
3322
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3323
 
 
3324
 
    def test_location_bt_failure(self):
3325
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3326
 
        class Test(tests.TestCase):
3327
 
            def test_failure(self):
3328
 
                raise self.failureException
3329
 
        result = self.TracebackRecordingResult()
3330
 
        Test("test_failure").run(result)
3331
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3332
 
 
3333
 
    def test_env_var_triggers_post_mortem(self):
3334
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3335
 
        import pdb
3336
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3337
 
        post_mortem_calls = []
3338
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3339
 
        self.overrideEnv('BZR_TEST_PDB', None)
3340
 
        result._post_mortem(1)
3341
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3342
 
        result._post_mortem(2)
3343
 
        self.assertEqual([2], post_mortem_calls)
3344
 
 
3345
 
 
3346
2963
class TestRunSuite(tests.TestCase):
3347
2964
 
3348
2965
    def test_runner_class(self):
3359
2976
                                                self.verbosity)
3360
2977
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3361
2978
        self.assertLength(1, calls)
3362
 
 
3363
 
 
3364
 
class TestEnvironHandling(tests.TestCase):
3365
 
 
3366
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3367
 
        self.assertFalse('MYVAR' in os.environ)
3368
 
        self.overrideEnv('MYVAR', '42')
3369
 
        # We use an embedded test to make sure we fix the _captureVar bug
3370
 
        class Test(tests.TestCase):
3371
 
            def test_me(self):
3372
 
                # The first call save the 42 value
3373
 
                self.overrideEnv('MYVAR', None)
3374
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3375
 
                # Make sure we can call it twice
3376
 
                self.overrideEnv('MYVAR', None)
3377
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3378
 
        output = StringIO()
3379
 
        result = tests.TextTestResult(output, 0, 1)
3380
 
        Test('test_me').run(result)
3381
 
        if not result.wasStrictlySuccessful():
3382
 
            self.fail(output.getvalue())
3383
 
        # We get our value back
3384
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3385
 
 
3386
 
 
3387
 
class TestIsolatedEnv(tests.TestCase):
3388
 
    """Test isolating tests from os.environ.
3389
 
 
3390
 
    Since we use tests that are already isolated from os.environ a bit of care
3391
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3392
 
    The tests start an already clean os.environ which allow doing valid
3393
 
    assertions about which variables are present or not and design tests around
3394
 
    these assertions.
3395
 
    """
3396
 
 
3397
 
    class ScratchMonkey(tests.TestCase):
3398
 
 
3399
 
        def test_me(self):
3400
 
            pass
3401
 
 
3402
 
    def test_basics(self):
3403
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3404
 
        # for tests.TestCase.
3405
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3406
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3407
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3408
 
        self.assertFalse('BZR_HOME' in os.environ)
3409
 
        # Make sure we know the definition of LINES: part of os.environ for
3410
 
        # tests.TestCase
3411
 
        self.assertTrue('LINES' in tests.isolated_environ)
3412
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3413
 
        self.assertEquals('25', os.environ['LINES'])
3414
 
 
3415
 
    def test_injecting_unknown_variable(self):
3416
 
        # BZR_HOME is known to be absent from os.environ
3417
 
        test = self.ScratchMonkey('test_me')
3418
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3419
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3420
 
        tests.restore_os_environ(test)
3421
 
        self.assertFalse('BZR_HOME' in os.environ)
3422
 
 
3423
 
    def test_injecting_known_variable(self):
3424
 
        test = self.ScratchMonkey('test_me')
3425
 
        # LINES is known to be present in os.environ
3426
 
        tests.override_os_environ(test, {'LINES': '42'})
3427
 
        self.assertEquals('42', os.environ['LINES'])
3428
 
        tests.restore_os_environ(test)
3429
 
        self.assertEquals('25', os.environ['LINES'])
3430
 
 
3431
 
    def test_deleting_variable(self):
3432
 
        test = self.ScratchMonkey('test_me')
3433
 
        # LINES is known to be present in os.environ
3434
 
        tests.override_os_environ(test, {'LINES': None})
3435
 
        self.assertTrue('LINES' not in os.environ)
3436
 
        tests.restore_os_environ(test)
3437
 
        self.assertEquals('25', os.environ['LINES'])
3438
 
 
3439
 
 
3440
 
class TestDocTestSuiteIsolation(tests.TestCase):
3441
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3442
 
 
3443
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3444
 
    the clean environment as a base for testing. To precisely capture the
3445
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3446
 
    compare against.
3447
 
 
3448
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3449
 
    not `os.environ` so each test overrides it to suit its needs.
3450
 
 
3451
 
    """
3452
 
 
3453
 
    def get_doctest_suite_for_string(self, klass, string):
3454
 
        class Finder(doctest.DocTestFinder):
3455
 
 
3456
 
            def find(*args, **kwargs):
3457
 
                test = doctest.DocTestParser().get_doctest(
3458
 
                    string, {}, 'foo', 'foo.py', 0)
3459
 
                return [test]
3460
 
 
3461
 
        suite = klass(test_finder=Finder())
3462
 
        return suite
3463
 
 
3464
 
    def run_doctest_suite_for_string(self, klass, string):
3465
 
        suite = self.get_doctest_suite_for_string(klass, string)
3466
 
        output = StringIO()
3467
 
        result = tests.TextTestResult(output, 0, 1)
3468
 
        suite.run(result)
3469
 
        return result, output
3470
 
 
3471
 
    def assertDocTestStringSucceds(self, klass, string):
3472
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3473
 
        if not result.wasStrictlySuccessful():
3474
 
            self.fail(output.getvalue())
3475
 
 
3476
 
    def assertDocTestStringFails(self, klass, string):
3477
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3478
 
        if result.wasStrictlySuccessful():
3479
 
            self.fail(output.getvalue())
3480
 
 
3481
 
    def test_injected_variable(self):
3482
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3483
 
        test = """
3484
 
            >>> import os
3485
 
            >>> os.environ['LINES']
3486
 
            '42'
3487
 
            """
3488
 
        # doctest.DocTestSuite fails as it sees '25'
3489
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3490
 
        # tests.DocTestSuite sees '42'
3491
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3492
 
 
3493
 
    def test_deleted_variable(self):
3494
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3495
 
        test = """
3496
 
            >>> import os
3497
 
            >>> os.environ.get('LINES')
3498
 
            """
3499
 
        # doctest.DocTestSuite fails as it sees '25'
3500
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3501
 
        # tests.DocTestSuite sees None
3502
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)