~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

Merge bzr.dev, update to use new hooks.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import gc
 
21
import doctest
21
22
import os
22
23
import signal
23
24
import sys
26
27
import unittest
27
28
import warnings
28
29
 
29
 
from testtools import MultiTestResult
 
30
from testtools import (
 
31
    ExtendedToOriginalDecorator,
 
32
    MultiTestResult,
 
33
    )
30
34
from testtools.content import Content
31
35
from testtools.content_type import ContentType
32
36
from testtools.matchers import (
33
37
    DocTestMatches,
34
38
    Equals,
35
39
    )
36
 
import testtools.tests.helpers
 
40
import testtools.testresult.doubles
37
41
 
38
42
import bzrlib
39
43
from bzrlib import (
40
44
    branchbuilder,
41
45
    bzrdir,
42
 
    debug,
43
46
    errors,
 
47
    hooks,
44
48
    lockdir,
45
49
    memorytree,
46
50
    osutils,
50
54
    tests,
51
55
    transport,
52
56
    workingtree,
 
57
    workingtree_3,
 
58
    workingtree_4,
53
59
    )
54
60
from bzrlib.repofmt import (
55
61
    groupcompress_repo,
56
 
    pack_repo,
57
 
    weaverepo,
58
62
    )
59
63
from bzrlib.symbol_versioning import (
60
64
    deprecated_function,
65
69
    features,
66
70
    test_lsprof,
67
71
    test_server,
68
 
    test_sftp_transport,
69
72
    TestUtil,
70
73
    )
71
74
from bzrlib.trace import note, mutter
72
75
from bzrlib.transport import memory
73
 
from bzrlib.version import _get_bzr_source_tree
74
76
 
75
77
 
76
78
def _test_ids(test_suite):
89
91
            "text", "plain", {"charset": "utf8"})))
90
92
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
91
93
        self.assertThat(self.get_log(),
92
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
93
 
 
94
 
 
95
 
class TestUnicodeFilename(tests.TestCase):
96
 
 
97
 
    def test_probe_passes(self):
98
 
        """UnicodeFilename._probe passes."""
99
 
        # We can't test much more than that because the behaviour depends
100
 
        # on the platform.
101
 
        tests.UnicodeFilename._probe()
 
94
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
102
95
 
103
96
 
104
97
class TestTreeShape(tests.TestCaseInTempDir):
105
98
 
106
99
    def test_unicode_paths(self):
107
 
        self.requireFeature(tests.UnicodeFilename)
 
100
        self.requireFeature(features.UnicodeFilenameFeature)
108
101
 
109
102
        filename = u'hell\u00d8'
110
103
        self.build_tree_contents([(filename, 'contents of hello')])
111
 
        self.failUnlessExists(filename)
 
104
        self.assertPathExists(filename)
112
105
 
113
106
 
114
107
class TestClassesAvailable(tests.TestCase):
340
333
        from bzrlib.tests.per_workingtree import make_scenarios
341
334
        server1 = "a"
342
335
        server2 = "b"
343
 
        formats = [workingtree.WorkingTreeFormat2(),
344
 
                   workingtree.WorkingTreeFormat3(),]
 
336
        formats = [workingtree_4.WorkingTreeFormat4(),
 
337
                   workingtree_3.WorkingTreeFormat3(),]
345
338
        scenarios = make_scenarios(server1, server2, formats)
346
339
        self.assertEqual([
347
 
            ('WorkingTreeFormat2',
 
340
            ('WorkingTreeFormat4',
348
341
             {'bzrdir_format': formats[0]._matchingbzrdir,
349
342
              'transport_readonly_server': 'b',
350
343
              'transport_server': 'a',
377
370
            )
378
371
        server1 = "a"
379
372
        server2 = "b"
380
 
        formats = [workingtree.WorkingTreeFormat2(),
381
 
                   workingtree.WorkingTreeFormat3(),]
 
373
        formats = [workingtree_4.WorkingTreeFormat4(),
 
374
                   workingtree_3.WorkingTreeFormat3(),]
382
375
        scenarios = make_scenarios(server1, server2, formats)
383
376
        self.assertEqual(7, len(scenarios))
384
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
385
 
        wt4_format = workingtree.WorkingTreeFormat4()
386
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
377
        default_wt_format = workingtree.format_registry.get_default()
 
378
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
379
        wt5_format = workingtree_4.WorkingTreeFormat5()
387
380
        expected_scenarios = [
388
 
            ('WorkingTreeFormat2',
 
381
            ('WorkingTreeFormat4',
389
382
             {'bzrdir_format': formats[0]._matchingbzrdir,
390
383
              'transport_readonly_server': 'b',
391
384
              'transport_server': 'a',
451
444
        # ones to add.
452
445
        from bzrlib.tests.per_tree import (
453
446
            return_parameter,
454
 
            revision_tree_from_workingtree
455
447
            )
456
448
        from bzrlib.tests.per_intertree import (
457
449
            make_scenarios,
458
450
            )
459
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
451
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
452
        from bzrlib.workingtree_4 import WorkingTreeFormat4
460
453
        input_test = TestInterTreeScenarios(
461
454
            "test_scenarios")
462
455
        server1 = "a"
463
456
        server2 = "b"
464
 
        format1 = WorkingTreeFormat2()
 
457
        format1 = WorkingTreeFormat4()
465
458
        format2 = WorkingTreeFormat3()
466
459
        formats = [("1", str, format1, format2, "converter1"),
467
460
            ("2", int, format2, format1, "converter2")]
513
506
        self.assertRaises(AssertionError, self.assertEqualStat,
514
507
            os.lstat("foo"), os.lstat("longname"))
515
508
 
 
509
    def test_failUnlessExists(self):
 
510
        """Deprecated failUnlessExists and failIfExists"""
 
511
        self.applyDeprecated(
 
512
            deprecated_in((2, 4)),
 
513
            self.failUnlessExists, '.')
 
514
        self.build_tree(['foo/', 'foo/bar'])
 
515
        self.applyDeprecated(
 
516
            deprecated_in((2, 4)),
 
517
            self.failUnlessExists, 'foo/bar')
 
518
        self.applyDeprecated(
 
519
            deprecated_in((2, 4)),
 
520
            self.failIfExists, 'foo/foo')
 
521
 
 
522
    def test_assertPathExists(self):
 
523
        self.assertPathExists('.')
 
524
        self.build_tree(['foo/', 'foo/bar'])
 
525
        self.assertPathExists('foo/bar')
 
526
        self.assertPathDoesNotExist('foo/foo')
 
527
 
516
528
 
517
529
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
518
530
 
552
564
        tree = self.make_branch_and_memory_tree('dir')
553
565
        # Guard against regression into MemoryTransport leaking
554
566
        # files to disk instead of keeping them in memory.
555
 
        self.failIf(osutils.lexists('dir'))
 
567
        self.assertFalse(osutils.lexists('dir'))
556
568
        self.assertIsInstance(tree, memorytree.MemoryTree)
557
569
 
558
570
    def test_make_branch_and_memory_tree_with_format(self):
559
571
        """make_branch_and_memory_tree should accept a format option."""
560
572
        format = bzrdir.BzrDirMetaFormat1()
561
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
573
        format.repository_format = repository.format_registry.get_default()
562
574
        tree = self.make_branch_and_memory_tree('dir', format=format)
563
575
        # Guard against regression into MemoryTransport leaking
564
576
        # files to disk instead of keeping them in memory.
565
 
        self.failIf(osutils.lexists('dir'))
 
577
        self.assertFalse(osutils.lexists('dir'))
566
578
        self.assertIsInstance(tree, memorytree.MemoryTree)
567
579
        self.assertEqual(format.repository_format.__class__,
568
580
            tree.branch.repository._format.__class__)
572
584
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
573
585
        # Guard against regression into MemoryTransport leaking
574
586
        # files to disk instead of keeping them in memory.
575
 
        self.failIf(osutils.lexists('dir'))
 
587
        self.assertFalse(osutils.lexists('dir'))
576
588
 
577
589
    def test_make_branch_builder_with_format(self):
578
590
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
579
591
        # that the format objects are used.
580
592
        format = bzrdir.BzrDirMetaFormat1()
581
 
        repo_format = weaverepo.RepositoryFormat7()
 
593
        repo_format = repository.format_registry.get_default()
582
594
        format.repository_format = repo_format
583
595
        builder = self.make_branch_builder('dir', format=format)
584
596
        the_branch = builder.get_branch()
585
597
        # Guard against regression into MemoryTransport leaking
586
598
        # files to disk instead of keeping them in memory.
587
 
        self.failIf(osutils.lexists('dir'))
 
599
        self.assertFalse(osutils.lexists('dir'))
588
600
        self.assertEqual(format.repository_format.__class__,
589
601
                         the_branch.repository._format.__class__)
590
602
        self.assertEqual(repo_format.get_format_string(),
596
608
        the_branch = builder.get_branch()
597
609
        # Guard against regression into MemoryTransport leaking
598
610
        # files to disk instead of keeping them in memory.
599
 
        self.failIf(osutils.lexists('dir'))
 
611
        self.assertFalse(osutils.lexists('dir'))
600
612
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
601
613
        self.assertEqual(dir_format.repository_format.__class__,
602
614
                         the_branch.repository._format.__class__)
607
619
    def test_dangling_locks_cause_failures(self):
608
620
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
609
621
            def test_function(self):
610
 
                t = self.get_transport('.')
 
622
                t = self.get_transport_from_path('.')
611
623
                l = lockdir.LockDir(t, 'lock')
612
624
                l.create()
613
625
                l.attempt_lock()
633
645
        # for the server
634
646
        url = self.get_readonly_url()
635
647
        url2 = self.get_readonly_url('foo/bar')
636
 
        t = transport.get_transport(url)
637
 
        t2 = transport.get_transport(url2)
638
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
639
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
648
        t = transport.get_transport_from_url(url)
 
649
        t2 = transport.get_transport_from_url(url2)
 
650
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
651
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
640
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
641
653
 
642
654
    def test_get_readonly_url_http(self):
648
660
        url = self.get_readonly_url()
649
661
        url2 = self.get_readonly_url('foo/bar')
650
662
        # the transport returned may be any HttpTransportBase subclass
651
 
        t = transport.get_transport(url)
652
 
        t2 = transport.get_transport(url2)
653
 
        self.failUnless(isinstance(t, HttpTransportBase))
654
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
663
        t = transport.get_transport_from_url(url)
 
664
        t2 = transport.get_transport_from_url(url2)
 
665
        self.assertIsInstance(t, HttpTransportBase)
 
666
        self.assertIsInstance(t2, HttpTransportBase)
655
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
656
668
 
657
669
    def test_is_directory(self):
665
677
    def test_make_branch_builder(self):
666
678
        builder = self.make_branch_builder('dir')
667
679
        rev_id = builder.build_commit()
668
 
        self.failUnlessExists('dir')
 
680
        self.assertPathExists('dir')
669
681
        a_dir = bzrdir.BzrDir.open('dir')
670
682
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
671
683
        a_branch = a_dir.open_branch()
687
699
        self.assertIsInstance(result_bzrdir.transport,
688
700
                              memory.MemoryTransport)
689
701
        # should not be on disk, should only be in memory
690
 
        self.failIfExists('subdir')
 
702
        self.assertPathDoesNotExist('subdir')
691
703
 
692
704
 
693
705
class TestChrootedTest(tests.ChrootedTestCase):
694
706
 
695
707
    def test_root_is_root(self):
696
 
        t = transport.get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport_from_url(self.get_readonly_url())
697
709
        url = t.base
698
710
        self.assertEqual(url, t.clone('..').base)
699
711
 
701
713
class TestProfileResult(tests.TestCase):
702
714
 
703
715
    def test_profiles_tests(self):
704
 
        self.requireFeature(test_lsprof.LSProfFeature)
705
 
        terminal = testtools.tests.helpers.ExtendedTestResult()
 
716
        self.requireFeature(features.lsprof_feature)
 
717
        terminal = testtools.testresult.doubles.ExtendedTestResult()
706
718
        result = tests.ProfileResult(terminal)
707
719
        class Sample(tests.TestCase):
708
720
            def a(self):
725
737
                descriptions=0,
726
738
                verbosity=1,
727
739
                )
728
 
        capture = testtools.tests.helpers.ExtendedTestResult()
 
740
        capture = testtools.testresult.doubles.ExtendedTestResult()
729
741
        test_case.run(MultiTestResult(result, capture))
730
742
        run_case = capture._events[0][1]
731
743
        timed_string = result._testTimeString(run_case)
752
764
        self.check_timing(ShortDelayTestCase('test_short_delay'),
753
765
                          r"^ +[0-9]+ms$")
754
766
 
755
 
    def _patch_get_bzr_source_tree(self):
756
 
        # Reading from the actual source tree breaks isolation, but we don't
757
 
        # want to assume that thats *all* that would happen.
758
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
759
 
 
760
 
    def test_assigned_benchmark_file_stores_date(self):
761
 
        self._patch_get_bzr_source_tree()
762
 
        output = StringIO()
763
 
        result = bzrlib.tests.TextTestResult(self._log_file,
764
 
                                        descriptions=0,
765
 
                                        verbosity=1,
766
 
                                        bench_history=output
767
 
                                        )
768
 
        output_string = output.getvalue()
769
 
        # if you are wondering about the regexp please read the comment in
770
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
771
 
        # XXX: what comment?  -- Andrew Bennetts
772
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
773
 
 
774
 
    def test_benchhistory_records_test_times(self):
775
 
        self._patch_get_bzr_source_tree()
776
 
        result_stream = StringIO()
777
 
        result = bzrlib.tests.TextTestResult(
778
 
            self._log_file,
779
 
            descriptions=0,
780
 
            verbosity=1,
781
 
            bench_history=result_stream
782
 
            )
783
 
 
784
 
        # we want profile a call and check that its test duration is recorded
785
 
        # make a new test instance that when run will generate a benchmark
786
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
787
 
        # execute the test, which should succeed and record times
788
 
        example_test_case.run(result)
789
 
        lines = result_stream.getvalue().splitlines()
790
 
        self.assertEqual(2, len(lines))
791
 
        self.assertContainsRe(lines[1],
792
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
793
 
            "._time_hello_world_encoding")
794
 
 
795
767
    def _time_hello_world_encoding(self):
796
768
        """Profile two sleep calls
797
769
 
802
774
 
803
775
    def test_lsprofiling(self):
804
776
        """Verbose test result prints lsprof statistics from test cases."""
805
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
777
        self.requireFeature(features.lsprof_feature)
806
778
        result_stream = StringIO()
807
779
        result = bzrlib.tests.VerboseTestResult(
808
780
            result_stream,
837
809
        self.assertContainsRe(output,
838
810
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
839
811
 
 
812
    def test_uses_time_from_testtools(self):
 
813
        """Test case timings in verbose results should use testtools times"""
 
814
        import datetime
 
815
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
816
            def startTest(self, test):
 
817
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
818
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
819
            def addSuccess(self, test):
 
820
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
821
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
822
            def report_tests_starting(self): pass
 
823
        sio = StringIO()
 
824
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
825
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
826
 
840
827
    def test_known_failure(self):
841
 
        """A KnownFailure being raised should trigger several result actions."""
 
828
        """Using knownFailure should trigger several result actions."""
842
829
        class InstrumentedTestResult(tests.ExtendedTestResult):
843
830
            def stopTestRun(self): pass
844
831
            def report_tests_starting(self): pass
847
834
        result = InstrumentedTestResult(None, None, None, None)
848
835
        class Test(tests.TestCase):
849
836
            def test_function(self):
850
 
                raise tests.KnownFailure('failed!')
 
837
                self.knownFailure('failed!')
851
838
        test = Test("test_function")
852
839
        test.run(result)
853
840
        # it should invoke 'report_known_failure'.
869
856
            descriptions=0,
870
857
            verbosity=2,
871
858
            )
872
 
        test = self.get_passing_test()
873
 
        result.startTest(test)
874
 
        prefix = len(result_stream.getvalue())
875
 
        # the err parameter has the shape:
876
 
        # (class, exception object, traceback)
877
 
        # KnownFailures dont get their tracebacks shown though, so we
878
 
        # can skip that.
879
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
880
 
        result.report_known_failure(test, err)
881
 
        output = result_stream.getvalue()[prefix:]
882
 
        lines = output.splitlines()
883
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
884
 
        if sys.version_info > (2, 7):
885
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
886
 
                self.assertNotEqual, lines[1], '    ')
887
 
        self.assertEqual(lines[1], '    foo')
888
 
        self.assertEqual(2, len(lines))
 
859
        _get_test("test_xfail").run(result)
 
860
        self.assertContainsRe(result_stream.getvalue(),
 
861
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
862
            "\\s*(?:Text attachment: )?reason"
 
863
            "(?:\n-+\n|: {{{)"
 
864
            "this_fails"
 
865
            "(?:\n-+\n|}}}\n)")
889
866
 
890
867
    def get_passing_test(self):
891
868
        """Return a test object that can't be run usefully."""
902
879
                self._call = test, feature
903
880
        result = InstrumentedTestResult(None, None, None, None)
904
881
        test = SampleTestCase('_test_pass')
905
 
        feature = tests.Feature()
 
882
        feature = features.Feature()
906
883
        result.startTest(test)
907
884
        result.addNotSupported(test, feature)
908
885
        # it should invoke 'report_unsupported'.
927
904
            verbosity=2,
928
905
            )
929
906
        test = self.get_passing_test()
930
 
        feature = tests.Feature()
 
907
        feature = features.Feature()
931
908
        result.startTest(test)
932
909
        prefix = len(result_stream.getvalue())
933
910
        result.report_unsupported(test, feature)
946
923
            def addNotSupported(self, test, feature):
947
924
                self._call = test, feature
948
925
        result = InstrumentedTestResult(None, None, None, None)
949
 
        feature = tests.Feature()
 
926
        feature = features.Feature()
950
927
        class Test(tests.TestCase):
951
928
            def test_function(self):
952
929
                raise tests.UnavailableFeature(feature)
971
948
    def test_strict_with_known_failure(self):
972
949
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
973
950
                                             verbosity=1)
974
 
        test = self.get_passing_test()
975
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
976
 
        result.addExpectedFailure(test, err)
 
951
        test = _get_test("test_xfail")
 
952
        test.run(result)
977
953
        self.assertFalse(result.wasStrictlySuccessful())
978
954
        self.assertEqual(None, result._extractBenchmarkTime(test))
979
955
 
1011
987
        self.assertEquals(2, result.count)
1012
988
 
1013
989
 
1014
 
class TestUnicodeFilenameFeature(tests.TestCase):
1015
 
 
1016
 
    def test_probe_passes(self):
1017
 
        """UnicodeFilenameFeature._probe passes."""
1018
 
        # We can't test much more than that because the behaviour depends
1019
 
        # on the platform.
1020
 
        tests.UnicodeFilenameFeature._probe()
1021
 
 
1022
 
 
1023
990
class TestRunner(tests.TestCase):
1024
991
 
1025
992
    def dummy_test(self):
1051
1018
        test = unittest.TestSuite()
1052
1019
        test.addTest(Test("known_failure_test"))
1053
1020
        def failing_test():
1054
 
            self.fail('foo')
 
1021
            raise AssertionError('foo')
1055
1022
        test.addTest(unittest.FunctionTestCase(failing_test))
1056
1023
        stream = StringIO()
1057
1024
        runner = tests.TextTestRunner(stream=stream)
1065
1032
            '^----------------------------------------------------------------------\n'
1066
1033
            'Traceback \\(most recent call last\\):\n'
1067
1034
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1068
 
            '    self.fail\\(\'foo\'\\)\n'
 
1035
            '    raise AssertionError\\(\'foo\'\\)\n'
1069
1036
            '.*'
1070
1037
            '^----------------------------------------------------------------------\n'
1071
1038
            '.*'
1077
1044
        # the final output.
1078
1045
        class Test(tests.TestCase):
1079
1046
            def known_failure_test(self):
1080
 
                self.expectFailure('failed', self.assertTrue, False)
 
1047
                self.knownFailure("Never works...")
1081
1048
        test = Test("known_failure_test")
1082
1049
        stream = StringIO()
1083
1050
        runner = tests.TextTestRunner(stream=stream)
1089
1056
            '\n'
1090
1057
            'OK \\(known_failures=1\\)\n')
1091
1058
 
 
1059
    def test_unexpected_success_bad(self):
 
1060
        class Test(tests.TestCase):
 
1061
            def test_truth(self):
 
1062
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1063
        runner = tests.TextTestRunner(stream=StringIO())
 
1064
        result = self.run_test_runner(runner, Test("test_truth"))
 
1065
        self.assertContainsRe(runner.stream.getvalue(),
 
1066
            "=+\n"
 
1067
            "FAIL: \\S+\.test_truth\n"
 
1068
            "-+\n"
 
1069
            "(?:.*\n)*"
 
1070
            "\\s*(?:Text attachment: )?reason"
 
1071
            "(?:\n-+\n|: {{{)"
 
1072
            "No absolute truth"
 
1073
            "(?:\n-+\n|}}}\n)"
 
1074
            "(?:.*\n)*"
 
1075
            "-+\n"
 
1076
            "Ran 1 test in .*\n"
 
1077
            "\n"
 
1078
            "FAILED \\(failures=1\\)\n\\Z")
 
1079
 
1092
1080
    def test_result_decorator(self):
1093
1081
        # decorate results
1094
1082
        calls = []
1095
 
        class LoggingDecorator(tests.ForwardingResult):
 
1083
        class LoggingDecorator(ExtendedToOriginalDecorator):
1096
1084
            def startTest(self, test):
1097
 
                tests.ForwardingResult.startTest(self, test)
 
1085
                ExtendedToOriginalDecorator.startTest(self, test)
1098
1086
                calls.append('start')
1099
1087
        test = unittest.FunctionTestCase(lambda:None)
1100
1088
        stream = StringIO()
1178
1166
 
1179
1167
    def test_unsupported_features_listed(self):
1180
1168
        """When unsupported features are encountered they are detailed."""
1181
 
        class Feature1(tests.Feature):
 
1169
        class Feature1(features.Feature):
1182
1170
            def _probe(self): return False
1183
 
        class Feature2(tests.Feature):
 
1171
        class Feature2(features.Feature):
1184
1172
            def _probe(self): return False
1185
1173
        # create sample tests
1186
1174
        test1 = SampleTestCase('_test_pass')
1201
1189
            ],
1202
1190
            lines[-3:])
1203
1191
 
1204
 
    def _patch_get_bzr_source_tree(self):
1205
 
        # Reading from the actual source tree breaks isolation, but we don't
1206
 
        # want to assume that thats *all* that would happen.
1207
 
        self._get_source_tree_calls = []
1208
 
        def new_get():
1209
 
            self._get_source_tree_calls.append("called")
1210
 
            return None
1211
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1212
 
 
1213
 
    def test_bench_history(self):
1214
 
        # tests that the running the benchmark passes bench_history into
1215
 
        # the test result object. We can tell that happens if
1216
 
        # _get_bzr_source_tree is called.
1217
 
        self._patch_get_bzr_source_tree()
1218
 
        test = TestRunner('dummy_test')
1219
 
        output = StringIO()
1220
 
        runner = tests.TextTestRunner(stream=self._log_file,
1221
 
                                      bench_history=output)
1222
 
        result = self.run_test_runner(runner, test)
1223
 
        output_string = output.getvalue()
1224
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1225
 
        self.assertLength(1, self._get_source_tree_calls)
1226
 
 
1227
1192
    def test_verbose_test_count(self):
1228
1193
        """A verbose test run reports the right test count at the start"""
1229
1194
        suite = TestUtil.TestSuite([
1239
1204
    def test_startTestRun(self):
1240
1205
        """run should call result.startTestRun()"""
1241
1206
        calls = []
1242
 
        class LoggingDecorator(tests.ForwardingResult):
 
1207
        class LoggingDecorator(ExtendedToOriginalDecorator):
1243
1208
            def startTestRun(self):
1244
 
                tests.ForwardingResult.startTestRun(self)
 
1209
                ExtendedToOriginalDecorator.startTestRun(self)
1245
1210
                calls.append('startTestRun')
1246
1211
        test = unittest.FunctionTestCase(lambda:None)
1247
1212
        stream = StringIO()
1253
1218
    def test_stopTestRun(self):
1254
1219
        """run should call result.stopTestRun()"""
1255
1220
        calls = []
1256
 
        class LoggingDecorator(tests.ForwardingResult):
 
1221
        class LoggingDecorator(ExtendedToOriginalDecorator):
1257
1222
            def stopTestRun(self):
1258
 
                tests.ForwardingResult.stopTestRun(self)
 
1223
                ExtendedToOriginalDecorator.stopTestRun(self)
1259
1224
                calls.append('stopTestRun')
1260
1225
        test = unittest.FunctionTestCase(lambda:None)
1261
1226
        stream = StringIO()
1264
1229
        result = self.run_test_runner(runner, test)
1265
1230
        self.assertLength(1, calls)
1266
1231
 
 
1232
    def test_unicode_test_output_on_ascii_stream(self):
 
1233
        """Showing results should always succeed even on an ascii console"""
 
1234
        class FailureWithUnicode(tests.TestCase):
 
1235
            def test_log_unicode(self):
 
1236
                self.log(u"\u2606")
 
1237
                self.fail("Now print that log!")
 
1238
        out = StringIO()
 
1239
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1240
            lambda trace=False: "ascii")
 
1241
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1242
            FailureWithUnicode("test_log_unicode"))
 
1243
        self.assertContainsRe(out.getvalue(),
 
1244
            "(?:Text attachment: )?log"
 
1245
            "(?:\n-+\n|: {{{)"
 
1246
            "\d+\.\d+  \\\\u2606"
 
1247
            "(?:\n-+\n|}}}\n)")
 
1248
 
1267
1249
 
1268
1250
class SampleTestCase(tests.TestCase):
1269
1251
 
1457
1439
        # Note this test won't fail with hooks that the core library doesn't
1458
1440
        # use - but it trigger with a plugin that adds hooks, so its still a
1459
1441
        # useful warning in that case.
1460
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1461
 
            bzrlib.branch.Branch.hooks)
1462
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1442
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1443
        self.assertEqual(
 
1444
            bzrlib.smart.server.SmartServerHooks(),
1463
1445
            bzrlib.smart.server.SmartTCPServer.hooks)
1464
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1465
 
            bzrlib.commands.Command.hooks)
 
1446
        self.assertEqual(
 
1447
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1466
1448
 
1467
1449
    def test__gather_lsprof_in_benchmarks(self):
1468
1450
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1469
1451
 
1470
1452
        Each self.time() call is individually and separately profiled.
1471
1453
        """
1472
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1454
        self.requireFeature(features.lsprof_feature)
1473
1455
        # overrides the class member with an instance member so no cleanup
1474
1456
        # needed.
1475
1457
        self._gather_lsprof_in_benchmarks = True
1494
1476
        transport_server = memory.MemoryServer()
1495
1477
        transport_server.start_server()
1496
1478
        self.addCleanup(transport_server.stop_server)
1497
 
        t = transport.get_transport(transport_server.get_url())
 
1479
        t = transport.get_transport_from_url(transport_server.get_url())
1498
1480
        bzrdir.BzrDir.create(t.base)
1499
1481
        self.assertRaises(errors.BzrError,
1500
1482
            bzrdir.BzrDir.open_from_transport, t)
1505
1487
 
1506
1488
    def test_requireFeature_available(self):
1507
1489
        """self.requireFeature(available) is a no-op."""
1508
 
        class Available(tests.Feature):
 
1490
        class Available(features.Feature):
1509
1491
            def _probe(self):return True
1510
1492
        feature = Available()
1511
1493
        self.requireFeature(feature)
1512
1494
 
1513
1495
    def test_requireFeature_unavailable(self):
1514
1496
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1515
 
        class Unavailable(tests.Feature):
 
1497
        class Unavailable(features.Feature):
1516
1498
            def _probe(self):return False
1517
1499
        feature = Unavailable()
1518
1500
        self.assertRaises(tests.UnavailableFeature,
1677
1659
        test.run(unittest.TestResult())
1678
1660
        self.assertEqual('original', obj.test_attr)
1679
1661
 
1680
 
 
1681
 
class _MissingFeature(tests.Feature):
 
1662
    def test_recordCalls(self):
 
1663
        from bzrlib.tests import test_selftest
 
1664
        calls = self.recordCalls(
 
1665
            test_selftest, '_add_numbers')
 
1666
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1667
            12)
 
1668
        self.assertEquals(calls, [((2, 10), {})])
 
1669
 
 
1670
 
 
1671
def _add_numbers(a, b):
 
1672
    return a + b
 
1673
 
 
1674
 
 
1675
class _MissingFeature(features.Feature):
1682
1676
    def _probe(self):
1683
1677
        return False
1684
1678
missing_feature = _MissingFeature()
1735
1729
        result = self._run_test('test_fail')
1736
1730
        self.assertEqual(1, len(result.failures))
1737
1731
        result_content = result.failures[0][1]
1738
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1732
        self.assertContainsRe(result_content,
 
1733
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1739
1734
        self.assertContainsRe(result_content, 'this was a failing test')
1740
1735
 
1741
1736
    def test_error_has_log(self):
1742
1737
        result = self._run_test('test_error')
1743
1738
        self.assertEqual(1, len(result.errors))
1744
1739
        result_content = result.errors[0][1]
1745
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1740
        self.assertContainsRe(result_content,
 
1741
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1746
1742
        self.assertContainsRe(result_content, 'this test errored')
1747
1743
 
1748
1744
    def test_skip_has_no_log(self):
1767
1763
        result = self._run_test('test_xfail')
1768
1764
        self.assertEqual(1, len(result.expectedFailures))
1769
1765
        result_content = result.expectedFailures[0][1]
1770
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1766
        self.assertNotContainsRe(result_content,
 
1767
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1771
1768
        self.assertNotContainsRe(result_content, 'test with expected failure')
1772
1769
 
1773
1770
    def test_unexpected_success_has_log(self):
1946
1943
    def test_make_branch_and_tree_with_format(self):
1947
1944
        # we should be able to supply a format to make_branch_and_tree
1948
1945
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1949
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1950
1946
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1951
1947
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1952
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1953
 
                              bzrlib.bzrdir.BzrDirFormat6)
1954
1948
 
1955
1949
    def test_make_branch_and_memory_tree(self):
1956
1950
        # we should be able to get a new branch and a mutable tree from
2033
2027
        self.assertLength(2, output.readlines())
2034
2028
 
2035
2029
    def test_lsprof_tests(self):
2036
 
        self.requireFeature(test_lsprof.LSProfFeature)
2037
 
        calls = []
 
2030
        self.requireFeature(features.lsprof_feature)
 
2031
        results = []
2038
2032
        class Test(object):
2039
2033
            def __call__(test, result):
2040
2034
                test.run(result)
2041
2035
            def run(test, result):
2042
 
                self.assertIsInstance(result, tests.ForwardingResult)
2043
 
                calls.append("called")
 
2036
                results.append(result)
2044
2037
            def countTestCases(self):
2045
2038
                return 1
2046
2039
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2047
 
        self.assertLength(1, calls)
 
2040
        self.assertLength(1, results)
 
2041
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2048
2042
 
2049
2043
    def test_random(self):
2050
2044
        # test randomising by listing a number of tests.
2185
2179
        self.assertNotContainsRe(content, 'test with expected failure')
2186
2180
        self.assertEqual(1, len(result.expectedFailures))
2187
2181
        result_content = result.expectedFailures[0][1]
2188
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2182
        self.assertNotContainsRe(result_content,
 
2183
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2189
2184
        self.assertNotContainsRe(result_content, 'test with expected failure')
2190
2185
 
2191
2186
    def test_unexpected_success_has_log(self):
2192
2187
        content, result = self.run_subunit_stream('test_unexpected_success')
2193
2188
        self.assertContainsRe(content, '(?m)^log$')
2194
2189
        self.assertContainsRe(content, 'test with unexpected success')
2195
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2196
 
                           ' as a plain success',
2197
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2190
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2191
        #                success, if a min version check is added remove this
 
2192
        from subunit import TestProtocolClient as _Client
 
2193
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
 
2194
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2195
                               ' as a plain success',
 
2196
                self.assertEqual, 1, len(result.unexpectedSuccesses))
2198
2197
        self.assertEqual(1, len(result.unexpectedSuccesses))
2199
2198
        test = result.unexpectedSuccesses[0]
2200
2199
        # RemotedTestCase doesn't preserve the "details"
2335
2334
        # stdout and stderr of the invoked run_bzr
2336
2335
        current_factory = bzrlib.ui.ui_factory
2337
2336
        self.run_bzr(['foo'])
2338
 
        self.failIf(current_factory is self.factory)
 
2337
        self.assertFalse(current_factory is self.factory)
2339
2338
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2340
2339
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2341
2340
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2498
2497
 
2499
2498
 
2500
2499
class TestStartBzrSubProcess(tests.TestCase):
 
2500
    """Stub test start_bzr_subprocess."""
2501
2501
 
2502
 
    def check_popen_state(self):
2503
 
        """Replace to make assertions when popen is called."""
 
2502
    def _subprocess_log_cleanup(self):
 
2503
        """Inhibits the base version as we don't produce a log file."""
2504
2504
 
2505
2505
    def _popen(self, *args, **kwargs):
2506
 
        """Record the command that is run, so that we can ensure it is correct"""
 
2506
        """Override the base version to record the command that is run.
 
2507
 
 
2508
        From there we can ensure it is correct without spawning a real process.
 
2509
        """
2507
2510
        self.check_popen_state()
2508
2511
        self._popen_args = args
2509
2512
        self._popen_kwargs = kwargs
2510
2513
        raise _DontSpawnProcess()
2511
2514
 
 
2515
    def check_popen_state(self):
 
2516
        """Replace to make assertions when popen is called."""
 
2517
 
2512
2518
    def test_run_bzr_subprocess_no_plugins(self):
2513
2519
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2514
2520
        command = self._popen_args[0]
2518
2524
 
2519
2525
    def test_allow_plugins(self):
2520
2526
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2521
 
            allow_plugins=True)
 
2527
                          allow_plugins=True)
2522
2528
        command = self._popen_args[0]
2523
2529
        self.assertEqual([], command[2:])
2524
2530
 
2525
2531
    def test_set_env(self):
2526
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2532
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2527
2533
        # set in the child
2528
2534
        def check_environment():
2529
2535
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2530
2536
        self.check_popen_state = check_environment
2531
2537
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2532
 
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2538
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
2533
2539
        # not set in theparent
2534
2540
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2535
2541
 
2536
2542
    def test_run_bzr_subprocess_env_del(self):
2537
2543
        """run_bzr_subprocess can remove environment variables too."""
2538
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2544
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2539
2545
        def check_environment():
2540
2546
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2541
2547
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2542
2548
        self.check_popen_state = check_environment
2543
2549
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2544
 
            env_changes={'EXISTANT_ENV_VAR':None})
 
2550
                          env_changes={'EXISTANT_ENV_VAR':None})
2545
2551
        # Still set in parent
2546
2552
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2547
2553
        del os.environ['EXISTANT_ENV_VAR']
2548
2554
 
2549
2555
    def test_env_del_missing(self):
2550
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2556
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2551
2557
        def check_environment():
2552
2558
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2553
2559
        self.check_popen_state = check_environment
2554
2560
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2555
 
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2561
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
2556
2562
 
2557
2563
    def test_working_dir(self):
2558
2564
        """Test that we can specify the working dir for the child"""
2561
2567
        chdirs = []
2562
2568
        def chdir(path):
2563
2569
            chdirs.append(path)
2564
 
        os.chdir = chdir
2565
 
        try:
2566
 
            def getcwd():
2567
 
                return 'current'
2568
 
            osutils.getcwd = getcwd
2569
 
            try:
2570
 
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2571
 
                    working_dir='foo')
2572
 
            finally:
2573
 
                osutils.getcwd = orig_getcwd
2574
 
        finally:
2575
 
            os.chdir = orig_chdir
 
2570
        self.overrideAttr(os, 'chdir', chdir)
 
2571
        def getcwd():
 
2572
            return 'current'
 
2573
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2574
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2575
                          working_dir='foo')
2576
2576
        self.assertEqual(['foo', 'current'], chdirs)
2577
2577
 
2578
2578
    def test_get_bzr_path_with_cwd_bzrlib(self):
2598
2598
        self.assertEqual('bzr: interrupted\n', result[1])
2599
2599
 
2600
2600
 
2601
 
class TestFeature(tests.TestCase):
2602
 
 
2603
 
    def test_caching(self):
2604
 
        """Feature._probe is called by the feature at most once."""
2605
 
        class InstrumentedFeature(tests.Feature):
2606
 
            def __init__(self):
2607
 
                super(InstrumentedFeature, self).__init__()
2608
 
                self.calls = []
2609
 
            def _probe(self):
2610
 
                self.calls.append('_probe')
2611
 
                return False
2612
 
        feature = InstrumentedFeature()
2613
 
        feature.available()
2614
 
        self.assertEqual(['_probe'], feature.calls)
2615
 
        feature.available()
2616
 
        self.assertEqual(['_probe'], feature.calls)
2617
 
 
2618
 
    def test_named_str(self):
2619
 
        """Feature.__str__ should thunk to feature_name()."""
2620
 
        class NamedFeature(tests.Feature):
2621
 
            def feature_name(self):
2622
 
                return 'symlinks'
2623
 
        feature = NamedFeature()
2624
 
        self.assertEqual('symlinks', str(feature))
2625
 
 
2626
 
    def test_default_str(self):
2627
 
        """Feature.__str__ should default to __class__.__name__."""
2628
 
        class NamedFeature(tests.Feature):
2629
 
            pass
2630
 
        feature = NamedFeature()
2631
 
        self.assertEqual('NamedFeature', str(feature))
2632
 
 
2633
 
 
2634
 
class TestUnavailableFeature(tests.TestCase):
2635
 
 
2636
 
    def test_access_feature(self):
2637
 
        feature = tests.Feature()
2638
 
        exception = tests.UnavailableFeature(feature)
2639
 
        self.assertIs(feature, exception.args[0])
2640
 
 
2641
 
 
2642
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2643
 
    deprecated_in((2, 1, 0)),
2644
 
    'bzrlib.tests.test_selftest',
2645
 
    'simple_thunk_feature','UnicodeFilename',
2646
 
    replacement_module='bzrlib.tests'
2647
 
    )
2648
 
 
2649
 
class Test_CompatibilityFeature(tests.TestCase):
2650
 
 
2651
 
    def test_does_thunk(self):
2652
 
        res = self.callDeprecated(
2653
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2654
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2655
 
            simple_thunk_feature.available)
2656
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2657
 
 
2658
 
 
2659
 
class TestModuleAvailableFeature(tests.TestCase):
2660
 
 
2661
 
    def test_available_module(self):
2662
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2663
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2664
 
        self.assertEqual('bzrlib.tests', str(feature))
2665
 
        self.assertTrue(feature.available())
2666
 
        self.assertIs(tests, feature.module)
2667
 
 
2668
 
    def test_unavailable_module(self):
2669
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2670
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2671
 
        self.assertFalse(feature.available())
2672
 
        self.assertIs(None, feature.module)
2673
 
 
2674
 
 
2675
2601
class TestSelftestFiltering(tests.TestCase):
2676
2602
 
2677
2603
    def setUp(self):
2828
2754
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2829
2755
 
2830
2756
 
2831
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2757
class TestCheckTreeShape(tests.TestCaseWithTransport):
2832
2758
 
2833
 
    def test_check_inventory_shape(self):
 
2759
    def test_check_tree_shape(self):
2834
2760
        files = ['a', 'b/', 'b/c']
2835
2761
        tree = self.make_branch_and_tree('.')
2836
2762
        self.build_tree(files)
2837
2763
        tree.add(files)
2838
2764
        tree.lock_read()
2839
2765
        try:
2840
 
            self.check_inventory_shape(tree.inventory, files)
 
2766
            self.check_tree_shape(tree, files)
2841
2767
        finally:
2842
2768
            tree.unlock()
2843
2769
 
3175
3101
        tpr.register('bar', 'bBB.aAA.rRR')
3176
3102
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3177
3103
        self.assertThat(self.get_log(),
3178
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3104
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3105
                           doctest.ELLIPSIS))
3179
3106
 
3180
3107
    def test_get_unknown_prefix(self):
3181
3108
        tpr = self._get_registry()
3216
3143
        class Test(unittest.TestCase):
3217
3144
            def runTest(self):
3218
3145
                pass
3219
 
            addCleanup = None # for when on Python 2.7 with native addCleanup
3220
3146
        result = self.LeakRecordingResult()
3221
3147
        test = Test()
3222
 
        self.assertIs(getattr(test, "addCleanup", None), None)
3223
3148
        result.startTestRun()
3224
3149
        test.run(result)
3225
3150
        result.stopTestRun()
3289
3214
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3290
3215
 
3291
3216
 
 
3217
class TestPostMortemDebugging(tests.TestCase):
 
3218
    """Check post mortem debugging works when tests fail or error"""
 
3219
 
 
3220
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3221
        def __init__(self):
 
3222
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3223
            self.postcode = None
 
3224
        def _post_mortem(self, tb=None):
 
3225
            """Record the code object at the end of the current traceback"""
 
3226
            tb = tb or sys.exc_info()[2]
 
3227
            if tb is not None:
 
3228
                next = tb.tb_next
 
3229
                while next is not None:
 
3230
                    tb = next
 
3231
                    next = next.tb_next
 
3232
                self.postcode = tb.tb_frame.f_code
 
3233
        def report_error(self, test, err):
 
3234
            pass
 
3235
        def report_failure(self, test, err):
 
3236
            pass
 
3237
 
 
3238
    def test_location_unittest_error(self):
 
3239
        """Needs right post mortem traceback with erroring unittest case"""
 
3240
        class Test(unittest.TestCase):
 
3241
            def runTest(self):
 
3242
                raise RuntimeError
 
3243
        result = self.TracebackRecordingResult()
 
3244
        Test().run(result)
 
3245
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3246
 
 
3247
    def test_location_unittest_failure(self):
 
3248
        """Needs right post mortem traceback with failing unittest case"""
 
3249
        class Test(unittest.TestCase):
 
3250
            def runTest(self):
 
3251
                raise self.failureException
 
3252
        result = self.TracebackRecordingResult()
 
3253
        Test().run(result)
 
3254
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3255
 
 
3256
    def test_location_bt_error(self):
 
3257
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3258
        class Test(tests.TestCase):
 
3259
            def test_error(self):
 
3260
                raise RuntimeError
 
3261
        result = self.TracebackRecordingResult()
 
3262
        Test("test_error").run(result)
 
3263
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3264
 
 
3265
    def test_location_bt_failure(self):
 
3266
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3267
        class Test(tests.TestCase):
 
3268
            def test_failure(self):
 
3269
                raise self.failureException
 
3270
        result = self.TracebackRecordingResult()
 
3271
        Test("test_failure").run(result)
 
3272
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3273
 
 
3274
    def test_env_var_triggers_post_mortem(self):
 
3275
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3276
        import pdb
 
3277
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3278
        post_mortem_calls = []
 
3279
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3280
        self.overrideEnv('BZR_TEST_PDB', None)
 
3281
        result._post_mortem(1)
 
3282
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3283
        result._post_mortem(2)
 
3284
        self.assertEqual([2], post_mortem_calls)
 
3285
 
 
3286
 
3292
3287
class TestRunSuite(tests.TestCase):
3293
3288
 
3294
3289
    def test_runner_class(self):
3305
3300
                                                self.verbosity)
3306
3301
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3307
3302
        self.assertLength(1, calls)
 
3303
 
 
3304
 
 
3305
class _Selftest(object):
 
3306
    """Mixin for tests needing full selftest output"""
 
3307
 
 
3308
    def _inject_stream_into_subunit(self, stream):
 
3309
        """To be overridden by subclasses that run tests out of process"""
 
3310
 
 
3311
    def _run_selftest(self, **kwargs):
 
3312
        sio = StringIO()
 
3313
        self._inject_stream_into_subunit(sio)
 
3314
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3315
        return sio.getvalue()
 
3316
 
 
3317
 
 
3318
class _ForkedSelftest(_Selftest):
 
3319
    """Mixin for tests needing full selftest output with forked children"""
 
3320
 
 
3321
    _test_needs_features = [features.subunit]
 
3322
 
 
3323
    def _inject_stream_into_subunit(self, stream):
 
3324
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3325
 
 
3326
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3327
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3328
        """
 
3329
        from subunit import ProtocolTestCase
 
3330
        _original_init = ProtocolTestCase.__init__
 
3331
        def _init_with_passthrough(self, *args, **kwargs):
 
3332
            _original_init(self, *args, **kwargs)
 
3333
            self._passthrough = stream
 
3334
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3335
 
 
3336
    def _run_selftest(self, **kwargs):
 
3337
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3338
        if getattr(os, "fork", None) is None:
 
3339
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3340
        # Make sure the fork code is actually invoked by claiming two cores
 
3341
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3342
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3343
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3344
 
 
3345
 
 
3346
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3347
    """Check operation of --parallel=fork selftest option"""
 
3348
 
 
3349
    def test_error_in_child_during_fork(self):
 
3350
        """Error in a forked child during test setup should get reported"""
 
3351
        class Test(tests.TestCase):
 
3352
            def testMethod(self):
 
3353
                pass
 
3354
        # We don't care what, just break something that a child will run
 
3355
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3356
        out = self._run_selftest(test_suite_factory=Test)
 
3357
        # Lines from the tracebacks of the two child processes may be mixed
 
3358
        # together due to the way subunit parses and forwards the streams,
 
3359
        # so permit extra lines between each part of the error output.
 
3360
        self.assertContainsRe(out,
 
3361
            "Traceback.*:\n"
 
3362
            "(?:.*\n)*"
 
3363
            ".+ in fork_for_tests\n"
 
3364
            "(?:.*\n)*"
 
3365
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3366
            "(?:.*\n)*"
 
3367
            "TypeError:")
 
3368
 
 
3369
 
 
3370
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3371
    """Check a test case still alive after being run emits a warning"""
 
3372
 
 
3373
    class Test(tests.TestCase):
 
3374
        def test_pass(self):
 
3375
            pass
 
3376
        def test_self_ref(self):
 
3377
            self.also_self = self.test_self_ref
 
3378
        def test_skip(self):
 
3379
            self.skip("Don't need")
 
3380
 
 
3381
    def _get_suite(self):
 
3382
        return TestUtil.TestSuite([
 
3383
            self.Test("test_pass"),
 
3384
            self.Test("test_self_ref"),
 
3385
            self.Test("test_skip"),
 
3386
            ])
 
3387
 
 
3388
    def _run_selftest_with_suite(self, **kwargs):
 
3389
        old_flags = tests.selftest_debug_flags
 
3390
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3391
        gc_on = gc.isenabled()
 
3392
        if gc_on:
 
3393
            gc.disable()
 
3394
        try:
 
3395
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3396
                **kwargs)
 
3397
        finally:
 
3398
            if gc_on:
 
3399
                gc.enable()
 
3400
            tests.selftest_debug_flags = old_flags
 
3401
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3402
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3403
        return output
 
3404
 
 
3405
    def test_testsuite(self):
 
3406
        self._run_selftest_with_suite()
 
3407
 
 
3408
    def test_pattern(self):
 
3409
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3410
        self.assertNotContainsRe(out, "test_skip")
 
3411
 
 
3412
    def test_exclude_pattern(self):
 
3413
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3414
        self.assertNotContainsRe(out, "test_skip")
 
3415
 
 
3416
    def test_random_seed(self):
 
3417
        self._run_selftest_with_suite(random_seed="now")
 
3418
 
 
3419
    def test_matching_tests_first(self):
 
3420
        self._run_selftest_with_suite(matching_tests_first=True,
 
3421
            pattern="test_self_ref$")
 
3422
 
 
3423
    def test_starting_with_and_exclude(self):
 
3424
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3425
            exclude_pattern="test_skip$")
 
3426
        self.assertNotContainsRe(out, "test_skip")
 
3427
 
 
3428
    def test_additonal_decorator(self):
 
3429
        out = self._run_selftest_with_suite(
 
3430
            suite_decorators=[tests.TestDecorator])
 
3431
 
 
3432
 
 
3433
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3434
    """Check warnings from tests staying alive are emitted with subunit"""
 
3435
 
 
3436
    _test_needs_features = [features.subunit]
 
3437
 
 
3438
    def _run_selftest_with_suite(self, **kwargs):
 
3439
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3440
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3441
 
 
3442
 
 
3443
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3444
    """Check warnings from tests staying alive are emitted when forking"""
 
3445
 
 
3446
 
 
3447
class TestEnvironHandling(tests.TestCase):
 
3448
 
 
3449
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3450
        self.assertFalse('MYVAR' in os.environ)
 
3451
        self.overrideEnv('MYVAR', '42')
 
3452
        # We use an embedded test to make sure we fix the _captureVar bug
 
3453
        class Test(tests.TestCase):
 
3454
            def test_me(self):
 
3455
                # The first call save the 42 value
 
3456
                self.overrideEnv('MYVAR', None)
 
3457
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3458
                # Make sure we can call it twice
 
3459
                self.overrideEnv('MYVAR', None)
 
3460
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3461
        output = StringIO()
 
3462
        result = tests.TextTestResult(output, 0, 1)
 
3463
        Test('test_me').run(result)
 
3464
        if not result.wasStrictlySuccessful():
 
3465
            self.fail(output.getvalue())
 
3466
        # We get our value back
 
3467
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3468
 
 
3469
 
 
3470
class TestIsolatedEnv(tests.TestCase):
 
3471
    """Test isolating tests from os.environ.
 
3472
 
 
3473
    Since we use tests that are already isolated from os.environ a bit of care
 
3474
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3475
    The tests start an already clean os.environ which allow doing valid
 
3476
    assertions about which variables are present or not and design tests around
 
3477
    these assertions.
 
3478
    """
 
3479
 
 
3480
    class ScratchMonkey(tests.TestCase):
 
3481
 
 
3482
        def test_me(self):
 
3483
            pass
 
3484
 
 
3485
    def test_basics(self):
 
3486
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3487
        # for tests.TestCase.
 
3488
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3489
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3490
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3491
        self.assertFalse('BZR_HOME' in os.environ)
 
3492
        # Make sure we know the definition of LINES: part of os.environ for
 
3493
        # tests.TestCase
 
3494
        self.assertTrue('LINES' in tests.isolated_environ)
 
3495
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3496
        self.assertEquals('25', os.environ['LINES'])
 
3497
 
 
3498
    def test_injecting_unknown_variable(self):
 
3499
        # BZR_HOME is known to be absent from os.environ
 
3500
        test = self.ScratchMonkey('test_me')
 
3501
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3502
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3503
        tests.restore_os_environ(test)
 
3504
        self.assertFalse('BZR_HOME' in os.environ)
 
3505
 
 
3506
    def test_injecting_known_variable(self):
 
3507
        test = self.ScratchMonkey('test_me')
 
3508
        # LINES is known to be present in os.environ
 
3509
        tests.override_os_environ(test, {'LINES': '42'})
 
3510
        self.assertEquals('42', os.environ['LINES'])
 
3511
        tests.restore_os_environ(test)
 
3512
        self.assertEquals('25', os.environ['LINES'])
 
3513
 
 
3514
    def test_deleting_variable(self):
 
3515
        test = self.ScratchMonkey('test_me')
 
3516
        # LINES is known to be present in os.environ
 
3517
        tests.override_os_environ(test, {'LINES': None})
 
3518
        self.assertTrue('LINES' not in os.environ)
 
3519
        tests.restore_os_environ(test)
 
3520
        self.assertEquals('25', os.environ['LINES'])
 
3521
 
 
3522
 
 
3523
class TestDocTestSuiteIsolation(tests.TestCase):
 
3524
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3525
 
 
3526
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3527
    the clean environment as a base for testing. To precisely capture the
 
3528
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3529
    compare against.
 
3530
 
 
3531
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3532
    not `os.environ` so each test overrides it to suit its needs.
 
3533
 
 
3534
    """
 
3535
 
 
3536
    def get_doctest_suite_for_string(self, klass, string):
 
3537
        class Finder(doctest.DocTestFinder):
 
3538
 
 
3539
            def find(*args, **kwargs):
 
3540
                test = doctest.DocTestParser().get_doctest(
 
3541
                    string, {}, 'foo', 'foo.py', 0)
 
3542
                return [test]
 
3543
 
 
3544
        suite = klass(test_finder=Finder())
 
3545
        return suite
 
3546
 
 
3547
    def run_doctest_suite_for_string(self, klass, string):
 
3548
        suite = self.get_doctest_suite_for_string(klass, string)
 
3549
        output = StringIO()
 
3550
        result = tests.TextTestResult(output, 0, 1)
 
3551
        suite.run(result)
 
3552
        return result, output
 
3553
 
 
3554
    def assertDocTestStringSucceds(self, klass, string):
 
3555
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3556
        if not result.wasStrictlySuccessful():
 
3557
            self.fail(output.getvalue())
 
3558
 
 
3559
    def assertDocTestStringFails(self, klass, string):
 
3560
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3561
        if result.wasStrictlySuccessful():
 
3562
            self.fail(output.getvalue())
 
3563
 
 
3564
    def test_injected_variable(self):
 
3565
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3566
        test = """
 
3567
            >>> import os
 
3568
            >>> os.environ['LINES']
 
3569
            '42'
 
3570
            """
 
3571
        # doctest.DocTestSuite fails as it sees '25'
 
3572
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3573
        # tests.DocTestSuite sees '42'
 
3574
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3575
 
 
3576
    def test_deleted_variable(self):
 
3577
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3578
        test = """
 
3579
            >>> import os
 
3580
            >>> os.environ.get('LINES')
 
3581
            """
 
3582
        # doctest.DocTestSuite fails as it sees '25'
 
3583
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3584
        # tests.DocTestSuite sees None
 
3585
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3586
 
 
3587
 
 
3588
class TestSelftestExcludePatterns(tests.TestCase):
 
3589
 
 
3590
    def setUp(self):
 
3591
        super(TestSelftestExcludePatterns, self).setUp()
 
3592
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3593
 
 
3594
    def suite_factory(self, keep_only=None, starting_with=None):
 
3595
        """A test suite factory with only a few tests."""
 
3596
        class Test(tests.TestCase):
 
3597
            def id(self):
 
3598
                # We don't need the full class path
 
3599
                return self._testMethodName
 
3600
            def a(self):
 
3601
                pass
 
3602
            def b(self):
 
3603
                pass
 
3604
            def c(self):
 
3605
                pass
 
3606
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3607
 
 
3608
    def assertTestList(self, expected, *selftest_args):
 
3609
        # We rely on setUp installing the right test suite factory so we can
 
3610
        # test at the command level without loading the whole test suite
 
3611
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3612
        actual = out.splitlines()
 
3613
        self.assertEquals(expected, actual)
 
3614
 
 
3615
    def test_full_list(self):
 
3616
        self.assertTestList(['a', 'b', 'c'])
 
3617
 
 
3618
    def test_single_exclude(self):
 
3619
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3620
 
 
3621
    def test_mutiple_excludes(self):
 
3622
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3623
 
 
3624
 
 
3625
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3626
 
 
3627
    _test_needs_features = [features.subunit]
 
3628
 
 
3629
    def setUp(self):
 
3630
        super(TestCounterHooks, self).setUp()
 
3631
        class Test(tests.TestCase):
 
3632
 
 
3633
            def setUp(self):
 
3634
                super(Test, self).setUp()
 
3635
                self.hooks = hooks.Hooks()
 
3636
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
 
3637
                self.install_counter_hook(self.hooks, 'myhook')
 
3638
 
 
3639
            def no_hook(self):
 
3640
                pass
 
3641
 
 
3642
            def run_hook_once(self):
 
3643
                for hook in self.hooks['myhook']:
 
3644
                    hook(self)
 
3645
 
 
3646
        self.test_class = Test
 
3647
 
 
3648
    def assertHookCalls(self, expected_calls, test_name):
 
3649
        test = self.test_class(test_name)
 
3650
        result = unittest.TestResult()
 
3651
        test.run(result)
 
3652
        self.assertTrue(hasattr(test, '_counters'))
 
3653
        self.assertTrue(test._counters.has_key('myhook'))
 
3654
        self.assertEquals(expected_calls, test._counters['myhook'])
 
3655
 
 
3656
    def test_no_hook(self):
 
3657
        self.assertHookCalls(0, 'no_hook')
 
3658
 
 
3659
    def test_run_hook_once(self):
 
3660
        tt = features.testtools
 
3661
        if tt.module.__version__ < (0, 9, 8):
 
3662
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3663
        self.assertHookCalls(1, 'run_hook_once')