~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Andrew Bennetts
  • Date: 2010-10-13 00:26:41 UTC
  • mto: This revision was merged to the branch mainline in revision 5498.
  • Revision ID: andrew.bennetts@canonical.com-20101013002641-9tlh9k89mlj1666m
Keep docs-plain working.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import gc
21
 
import doctest
 
20
from doctest import ELLIPSIS
22
21
import os
23
22
import signal
24
23
import sys
27
26
import unittest
28
27
import warnings
29
28
 
30
 
from testtools import (
31
 
    ExtendedToOriginalDecorator,
32
 
    MultiTestResult,
33
 
    )
 
29
from testtools import MultiTestResult
34
30
from testtools.content import Content
35
31
from testtools.content_type import ContentType
36
32
from testtools.matchers import (
37
33
    DocTestMatches,
38
34
    Equals,
39
35
    )
40
 
import testtools.testresult.doubles
 
36
import testtools.tests.helpers
41
37
 
42
38
import bzrlib
43
39
from bzrlib import (
44
40
    branchbuilder,
45
41
    bzrdir,
 
42
    debug,
46
43
    errors,
47
 
    hooks,
48
44
    lockdir,
49
45
    memorytree,
50
46
    osutils,
54
50
    tests,
55
51
    transport,
56
52
    workingtree,
57
 
    workingtree_3,
58
 
    workingtree_4,
59
53
    )
60
54
from bzrlib.repofmt import (
61
55
    groupcompress_repo,
 
56
    pack_repo,
 
57
    weaverepo,
62
58
    )
63
59
from bzrlib.symbol_versioning import (
64
60
    deprecated_function,
69
65
    features,
70
66
    test_lsprof,
71
67
    test_server,
 
68
    test_sftp_transport,
72
69
    TestUtil,
73
70
    )
74
71
from bzrlib.trace import note, mutter
75
72
from bzrlib.transport import memory
 
73
from bzrlib.version import _get_bzr_source_tree
76
74
 
77
75
 
78
76
def _test_ids(test_suite):
80
78
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
81
79
 
82
80
 
 
81
class SelftestTests(tests.TestCase):
 
82
 
 
83
    def test_import_tests(self):
 
84
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
85
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
86
 
 
87
    def test_import_test_failure(self):
 
88
        self.assertRaises(ImportError,
 
89
                          TestUtil._load_module_by_name,
 
90
                          'bzrlib.no-name-yet')
 
91
 
 
92
 
83
93
class MetaTestLog(tests.TestCase):
84
94
 
85
95
    def test_logging(self):
91
101
            "text", "plain", {"charset": "utf8"})))
92
102
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
93
103
        self.assertThat(self.get_log(),
94
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
104
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
105
 
 
106
 
 
107
class TestUnicodeFilename(tests.TestCase):
 
108
 
 
109
    def test_probe_passes(self):
 
110
        """UnicodeFilename._probe passes."""
 
111
        # We can't test much more than that because the behaviour depends
 
112
        # on the platform.
 
113
        tests.UnicodeFilename._probe()
95
114
 
96
115
 
97
116
class TestTreeShape(tests.TestCaseInTempDir):
98
117
 
99
118
    def test_unicode_paths(self):
100
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
119
        self.requireFeature(tests.UnicodeFilename)
101
120
 
102
121
        filename = u'hell\u00d8'
103
122
        self.build_tree_contents([(filename, 'contents of hello')])
104
 
        self.assertPathExists(filename)
 
123
        self.failUnlessExists(filename)
105
124
 
106
125
 
107
126
class TestClassesAvailable(tests.TestCase):
333
352
        from bzrlib.tests.per_workingtree import make_scenarios
334
353
        server1 = "a"
335
354
        server2 = "b"
336
 
        formats = [workingtree_4.WorkingTreeFormat4(),
337
 
                   workingtree_3.WorkingTreeFormat3(),]
 
355
        formats = [workingtree.WorkingTreeFormat2(),
 
356
                   workingtree.WorkingTreeFormat3(),]
338
357
        scenarios = make_scenarios(server1, server2, formats)
339
358
        self.assertEqual([
340
 
            ('WorkingTreeFormat4',
 
359
            ('WorkingTreeFormat2',
341
360
             {'bzrdir_format': formats[0]._matchingbzrdir,
342
361
              'transport_readonly_server': 'b',
343
362
              'transport_server': 'a',
370
389
            )
371
390
        server1 = "a"
372
391
        server2 = "b"
373
 
        formats = [workingtree_4.WorkingTreeFormat4(),
374
 
                   workingtree_3.WorkingTreeFormat3(),]
 
392
        formats = [workingtree.WorkingTreeFormat2(),
 
393
                   workingtree.WorkingTreeFormat3(),]
375
394
        scenarios = make_scenarios(server1, server2, formats)
376
395
        self.assertEqual(7, len(scenarios))
377
 
        default_wt_format = workingtree.format_registry.get_default()
378
 
        wt4_format = workingtree_4.WorkingTreeFormat4()
379
 
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
396
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
397
        wt4_format = workingtree.WorkingTreeFormat4()
 
398
        wt5_format = workingtree.WorkingTreeFormat5()
380
399
        expected_scenarios = [
381
 
            ('WorkingTreeFormat4',
 
400
            ('WorkingTreeFormat2',
382
401
             {'bzrdir_format': formats[0]._matchingbzrdir,
383
402
              'transport_readonly_server': 'b',
384
403
              'transport_server': 'a',
444
463
        # ones to add.
445
464
        from bzrlib.tests.per_tree import (
446
465
            return_parameter,
 
466
            revision_tree_from_workingtree
447
467
            )
448
468
        from bzrlib.tests.per_intertree import (
449
469
            make_scenarios,
450
470
            )
451
 
        from bzrlib.workingtree_3 import WorkingTreeFormat3
452
 
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
471
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
453
472
        input_test = TestInterTreeScenarios(
454
473
            "test_scenarios")
455
474
        server1 = "a"
456
475
        server2 = "b"
457
 
        format1 = WorkingTreeFormat4()
 
476
        format1 = WorkingTreeFormat2()
458
477
        format2 = WorkingTreeFormat3()
459
478
        formats = [("1", str, format1, format2, "converter1"),
460
479
            ("2", int, format2, format1, "converter2")]
506
525
        self.assertRaises(AssertionError, self.assertEqualStat,
507
526
            os.lstat("foo"), os.lstat("longname"))
508
527
 
509
 
    def test_failUnlessExists(self):
510
 
        """Deprecated failUnlessExists and failIfExists"""
511
 
        self.applyDeprecated(
512
 
            deprecated_in((2, 4)),
513
 
            self.failUnlessExists, '.')
514
 
        self.build_tree(['foo/', 'foo/bar'])
515
 
        self.applyDeprecated(
516
 
            deprecated_in((2, 4)),
517
 
            self.failUnlessExists, 'foo/bar')
518
 
        self.applyDeprecated(
519
 
            deprecated_in((2, 4)),
520
 
            self.failIfExists, 'foo/foo')
521
 
 
522
 
    def test_assertPathExists(self):
523
 
        self.assertPathExists('.')
524
 
        self.build_tree(['foo/', 'foo/bar'])
525
 
        self.assertPathExists('foo/bar')
526
 
        self.assertPathDoesNotExist('foo/foo')
527
 
 
528
528
 
529
529
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
530
530
 
564
564
        tree = self.make_branch_and_memory_tree('dir')
565
565
        # Guard against regression into MemoryTransport leaking
566
566
        # files to disk instead of keeping them in memory.
567
 
        self.assertFalse(osutils.lexists('dir'))
 
567
        self.failIf(osutils.lexists('dir'))
568
568
        self.assertIsInstance(tree, memorytree.MemoryTree)
569
569
 
570
570
    def test_make_branch_and_memory_tree_with_format(self):
571
571
        """make_branch_and_memory_tree should accept a format option."""
572
572
        format = bzrdir.BzrDirMetaFormat1()
573
 
        format.repository_format = repository.format_registry.get_default()
 
573
        format.repository_format = weaverepo.RepositoryFormat7()
574
574
        tree = self.make_branch_and_memory_tree('dir', format=format)
575
575
        # Guard against regression into MemoryTransport leaking
576
576
        # files to disk instead of keeping them in memory.
577
 
        self.assertFalse(osutils.lexists('dir'))
 
577
        self.failIf(osutils.lexists('dir'))
578
578
        self.assertIsInstance(tree, memorytree.MemoryTree)
579
579
        self.assertEqual(format.repository_format.__class__,
580
580
            tree.branch.repository._format.__class__)
584
584
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
585
585
        # Guard against regression into MemoryTransport leaking
586
586
        # files to disk instead of keeping them in memory.
587
 
        self.assertFalse(osutils.lexists('dir'))
 
587
        self.failIf(osutils.lexists('dir'))
588
588
 
589
589
    def test_make_branch_builder_with_format(self):
590
590
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
591
591
        # that the format objects are used.
592
592
        format = bzrdir.BzrDirMetaFormat1()
593
 
        repo_format = repository.format_registry.get_default()
 
593
        repo_format = weaverepo.RepositoryFormat7()
594
594
        format.repository_format = repo_format
595
595
        builder = self.make_branch_builder('dir', format=format)
596
596
        the_branch = builder.get_branch()
597
597
        # Guard against regression into MemoryTransport leaking
598
598
        # files to disk instead of keeping them in memory.
599
 
        self.assertFalse(osutils.lexists('dir'))
 
599
        self.failIf(osutils.lexists('dir'))
600
600
        self.assertEqual(format.repository_format.__class__,
601
601
                         the_branch.repository._format.__class__)
602
602
        self.assertEqual(repo_format.get_format_string(),
608
608
        the_branch = builder.get_branch()
609
609
        # Guard against regression into MemoryTransport leaking
610
610
        # files to disk instead of keeping them in memory.
611
 
        self.assertFalse(osutils.lexists('dir'))
 
611
        self.failIf(osutils.lexists('dir'))
612
612
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
613
613
        self.assertEqual(dir_format.repository_format.__class__,
614
614
                         the_branch.repository._format.__class__)
619
619
    def test_dangling_locks_cause_failures(self):
620
620
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
621
621
            def test_function(self):
622
 
                t = self.get_transport_from_path('.')
 
622
                t = self.get_transport('.')
623
623
                l = lockdir.LockDir(t, 'lock')
624
624
                l.create()
625
625
                l.attempt_lock()
645
645
        # for the server
646
646
        url = self.get_readonly_url()
647
647
        url2 = self.get_readonly_url('foo/bar')
648
 
        t = transport.get_transport_from_url(url)
649
 
        t2 = transport.get_transport_from_url(url2)
650
 
        self.assertIsInstance(t, ReadonlyTransportDecorator)
651
 
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
 
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
652
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
653
653
 
654
654
    def test_get_readonly_url_http(self):
660
660
        url = self.get_readonly_url()
661
661
        url2 = self.get_readonly_url('foo/bar')
662
662
        # the transport returned may be any HttpTransportBase subclass
663
 
        t = transport.get_transport_from_url(url)
664
 
        t2 = transport.get_transport_from_url(url2)
665
 
        self.assertIsInstance(t, HttpTransportBase)
666
 
        self.assertIsInstance(t2, HttpTransportBase)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
 
665
        self.failUnless(isinstance(t, HttpTransportBase))
 
666
        self.failUnless(isinstance(t2, HttpTransportBase))
667
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
668
668
 
669
669
    def test_is_directory(self):
677
677
    def test_make_branch_builder(self):
678
678
        builder = self.make_branch_builder('dir')
679
679
        rev_id = builder.build_commit()
680
 
        self.assertPathExists('dir')
 
680
        self.failUnlessExists('dir')
681
681
        a_dir = bzrdir.BzrDir.open('dir')
682
682
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
683
683
        a_branch = a_dir.open_branch()
699
699
        self.assertIsInstance(result_bzrdir.transport,
700
700
                              memory.MemoryTransport)
701
701
        # should not be on disk, should only be in memory
702
 
        self.assertPathDoesNotExist('subdir')
 
702
        self.failIfExists('subdir')
703
703
 
704
704
 
705
705
class TestChrootedTest(tests.ChrootedTestCase):
706
706
 
707
707
    def test_root_is_root(self):
708
 
        t = transport.get_transport_from_url(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
709
709
        url = t.base
710
710
        self.assertEqual(url, t.clone('..').base)
711
711
 
713
713
class TestProfileResult(tests.TestCase):
714
714
 
715
715
    def test_profiles_tests(self):
716
 
        self.requireFeature(features.lsprof_feature)
717
 
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
716
        self.requireFeature(test_lsprof.LSProfFeature)
 
717
        terminal = testtools.tests.helpers.ExtendedTestResult()
718
718
        result = tests.ProfileResult(terminal)
719
719
        class Sample(tests.TestCase):
720
720
            def a(self):
737
737
                descriptions=0,
738
738
                verbosity=1,
739
739
                )
740
 
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
740
        capture = testtools.tests.helpers.ExtendedTestResult()
741
741
        test_case.run(MultiTestResult(result, capture))
742
742
        run_case = capture._events[0][1]
743
743
        timed_string = result._testTimeString(run_case)
764
764
        self.check_timing(ShortDelayTestCase('test_short_delay'),
765
765
                          r"^ +[0-9]+ms$")
766
766
 
 
767
    def _patch_get_bzr_source_tree(self):
 
768
        # Reading from the actual source tree breaks isolation, but we don't
 
769
        # want to assume that thats *all* that would happen.
 
770
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
 
771
 
 
772
    def test_assigned_benchmark_file_stores_date(self):
 
773
        self._patch_get_bzr_source_tree()
 
774
        output = StringIO()
 
775
        result = bzrlib.tests.TextTestResult(self._log_file,
 
776
                                        descriptions=0,
 
777
                                        verbosity=1,
 
778
                                        bench_history=output
 
779
                                        )
 
780
        output_string = output.getvalue()
 
781
        # if you are wondering about the regexp please read the comment in
 
782
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
783
        # XXX: what comment?  -- Andrew Bennetts
 
784
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
785
 
 
786
    def test_benchhistory_records_test_times(self):
 
787
        self._patch_get_bzr_source_tree()
 
788
        result_stream = StringIO()
 
789
        result = bzrlib.tests.TextTestResult(
 
790
            self._log_file,
 
791
            descriptions=0,
 
792
            verbosity=1,
 
793
            bench_history=result_stream
 
794
            )
 
795
 
 
796
        # we want profile a call and check that its test duration is recorded
 
797
        # make a new test instance that when run will generate a benchmark
 
798
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
799
        # execute the test, which should succeed and record times
 
800
        example_test_case.run(result)
 
801
        lines = result_stream.getvalue().splitlines()
 
802
        self.assertEqual(2, len(lines))
 
803
        self.assertContainsRe(lines[1],
 
804
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
805
            "._time_hello_world_encoding")
 
806
 
767
807
    def _time_hello_world_encoding(self):
768
808
        """Profile two sleep calls
769
809
 
774
814
 
775
815
    def test_lsprofiling(self):
776
816
        """Verbose test result prints lsprof statistics from test cases."""
777
 
        self.requireFeature(features.lsprof_feature)
 
817
        self.requireFeature(test_lsprof.LSProfFeature)
778
818
        result_stream = StringIO()
779
819
        result = bzrlib.tests.VerboseTestResult(
780
820
            result_stream,
809
849
        self.assertContainsRe(output,
810
850
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
811
851
 
812
 
    def test_uses_time_from_testtools(self):
813
 
        """Test case timings in verbose results should use testtools times"""
814
 
        import datetime
815
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
816
 
            def startTest(self, test):
817
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
818
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
819
 
            def addSuccess(self, test):
820
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
821
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
822
 
            def report_tests_starting(self): pass
823
 
        sio = StringIO()
824
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
825
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
826
 
 
827
852
    def test_known_failure(self):
828
 
        """Using knownFailure should trigger several result actions."""
 
853
        """A KnownFailure being raised should trigger several result actions."""
829
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
830
855
            def stopTestRun(self): pass
831
856
            def report_tests_starting(self): pass
834
859
        result = InstrumentedTestResult(None, None, None, None)
835
860
        class Test(tests.TestCase):
836
861
            def test_function(self):
837
 
                self.knownFailure('failed!')
 
862
                raise tests.KnownFailure('failed!')
838
863
        test = Test("test_function")
839
864
        test.run(result)
840
865
        # it should invoke 'report_known_failure'.
856
881
            descriptions=0,
857
882
            verbosity=2,
858
883
            )
859
 
        _get_test("test_xfail").run(result)
860
 
        self.assertContainsRe(result_stream.getvalue(),
861
 
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
862
 
            "\\s*(?:Text attachment: )?reason"
863
 
            "(?:\n-+\n|: {{{)"
864
 
            "this_fails"
865
 
            "(?:\n-+\n|}}}\n)")
 
884
        test = self.get_passing_test()
 
885
        result.startTest(test)
 
886
        prefix = len(result_stream.getvalue())
 
887
        # the err parameter has the shape:
 
888
        # (class, exception object, traceback)
 
889
        # KnownFailures dont get their tracebacks shown though, so we
 
890
        # can skip that.
 
891
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
892
        result.report_known_failure(test, err)
 
893
        output = result_stream.getvalue()[prefix:]
 
894
        lines = output.splitlines()
 
895
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
896
        if sys.version_info > (2, 7):
 
897
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
898
                self.assertNotEqual, lines[1], '    ')
 
899
        self.assertEqual(lines[1], '    foo')
 
900
        self.assertEqual(2, len(lines))
866
901
 
867
902
    def get_passing_test(self):
868
903
        """Return a test object that can't be run usefully."""
879
914
                self._call = test, feature
880
915
        result = InstrumentedTestResult(None, None, None, None)
881
916
        test = SampleTestCase('_test_pass')
882
 
        feature = features.Feature()
 
917
        feature = tests.Feature()
883
918
        result.startTest(test)
884
919
        result.addNotSupported(test, feature)
885
920
        # it should invoke 'report_unsupported'.
904
939
            verbosity=2,
905
940
            )
906
941
        test = self.get_passing_test()
907
 
        feature = features.Feature()
 
942
        feature = tests.Feature()
908
943
        result.startTest(test)
909
944
        prefix = len(result_stream.getvalue())
910
945
        result.report_unsupported(test, feature)
923
958
            def addNotSupported(self, test, feature):
924
959
                self._call = test, feature
925
960
        result = InstrumentedTestResult(None, None, None, None)
926
 
        feature = features.Feature()
 
961
        feature = tests.Feature()
927
962
        class Test(tests.TestCase):
928
963
            def test_function(self):
929
964
                raise tests.UnavailableFeature(feature)
948
983
    def test_strict_with_known_failure(self):
949
984
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
950
985
                                             verbosity=1)
951
 
        test = _get_test("test_xfail")
952
 
        test.run(result)
 
986
        test = self.get_passing_test()
 
987
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
988
        result.addExpectedFailure(test, err)
953
989
        self.assertFalse(result.wasStrictlySuccessful())
954
990
        self.assertEqual(None, result._extractBenchmarkTime(test))
955
991
 
987
1023
        self.assertEquals(2, result.count)
988
1024
 
989
1025
 
 
1026
class TestUnicodeFilenameFeature(tests.TestCase):
 
1027
 
 
1028
    def test_probe_passes(self):
 
1029
        """UnicodeFilenameFeature._probe passes."""
 
1030
        # We can't test much more than that because the behaviour depends
 
1031
        # on the platform.
 
1032
        tests.UnicodeFilenameFeature._probe()
 
1033
 
 
1034
 
990
1035
class TestRunner(tests.TestCase):
991
1036
 
992
1037
    def dummy_test(self):
1018
1063
        test = unittest.TestSuite()
1019
1064
        test.addTest(Test("known_failure_test"))
1020
1065
        def failing_test():
1021
 
            raise AssertionError('foo')
 
1066
            self.fail('foo')
1022
1067
        test.addTest(unittest.FunctionTestCase(failing_test))
1023
1068
        stream = StringIO()
1024
1069
        runner = tests.TextTestRunner(stream=stream)
1032
1077
            '^----------------------------------------------------------------------\n'
1033
1078
            'Traceback \\(most recent call last\\):\n'
1034
1079
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1035
 
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1080
            '    self.fail\\(\'foo\'\\)\n'
1036
1081
            '.*'
1037
1082
            '^----------------------------------------------------------------------\n'
1038
1083
            '.*'
1044
1089
        # the final output.
1045
1090
        class Test(tests.TestCase):
1046
1091
            def known_failure_test(self):
1047
 
                self.knownFailure("Never works...")
 
1092
                self.expectFailure('failed', self.assertTrue, False)
1048
1093
        test = Test("known_failure_test")
1049
1094
        stream = StringIO()
1050
1095
        runner = tests.TextTestRunner(stream=stream)
1056
1101
            '\n'
1057
1102
            'OK \\(known_failures=1\\)\n')
1058
1103
 
1059
 
    def test_unexpected_success_bad(self):
1060
 
        class Test(tests.TestCase):
1061
 
            def test_truth(self):
1062
 
                self.expectFailure("No absolute truth", self.assertTrue, True)
1063
 
        runner = tests.TextTestRunner(stream=StringIO())
1064
 
        result = self.run_test_runner(runner, Test("test_truth"))
1065
 
        self.assertContainsRe(runner.stream.getvalue(),
1066
 
            "=+\n"
1067
 
            "FAIL: \\S+\.test_truth\n"
1068
 
            "-+\n"
1069
 
            "(?:.*\n)*"
1070
 
            "\\s*(?:Text attachment: )?reason"
1071
 
            "(?:\n-+\n|: {{{)"
1072
 
            "No absolute truth"
1073
 
            "(?:\n-+\n|}}}\n)"
1074
 
            "(?:.*\n)*"
1075
 
            "-+\n"
1076
 
            "Ran 1 test in .*\n"
1077
 
            "\n"
1078
 
            "FAILED \\(failures=1\\)\n\\Z")
1079
 
 
1080
1104
    def test_result_decorator(self):
1081
1105
        # decorate results
1082
1106
        calls = []
1083
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1107
        class LoggingDecorator(tests.ForwardingResult):
1084
1108
            def startTest(self, test):
1085
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1109
                tests.ForwardingResult.startTest(self, test)
1086
1110
                calls.append('start')
1087
1111
        test = unittest.FunctionTestCase(lambda:None)
1088
1112
        stream = StringIO()
1166
1190
 
1167
1191
    def test_unsupported_features_listed(self):
1168
1192
        """When unsupported features are encountered they are detailed."""
1169
 
        class Feature1(features.Feature):
 
1193
        class Feature1(tests.Feature):
1170
1194
            def _probe(self): return False
1171
 
        class Feature2(features.Feature):
 
1195
        class Feature2(tests.Feature):
1172
1196
            def _probe(self): return False
1173
1197
        # create sample tests
1174
1198
        test1 = SampleTestCase('_test_pass')
1189
1213
            ],
1190
1214
            lines[-3:])
1191
1215
 
 
1216
    def _patch_get_bzr_source_tree(self):
 
1217
        # Reading from the actual source tree breaks isolation, but we don't
 
1218
        # want to assume that thats *all* that would happen.
 
1219
        self._get_source_tree_calls = []
 
1220
        def new_get():
 
1221
            self._get_source_tree_calls.append("called")
 
1222
            return None
 
1223
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
 
1224
 
 
1225
    def test_bench_history(self):
 
1226
        # tests that the running the benchmark passes bench_history into
 
1227
        # the test result object. We can tell that happens if
 
1228
        # _get_bzr_source_tree is called.
 
1229
        self._patch_get_bzr_source_tree()
 
1230
        test = TestRunner('dummy_test')
 
1231
        output = StringIO()
 
1232
        runner = tests.TextTestRunner(stream=self._log_file,
 
1233
                                      bench_history=output)
 
1234
        result = self.run_test_runner(runner, test)
 
1235
        output_string = output.getvalue()
 
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1237
        self.assertLength(1, self._get_source_tree_calls)
 
1238
 
1192
1239
    def test_verbose_test_count(self):
1193
1240
        """A verbose test run reports the right test count at the start"""
1194
1241
        suite = TestUtil.TestSuite([
1204
1251
    def test_startTestRun(self):
1205
1252
        """run should call result.startTestRun()"""
1206
1253
        calls = []
1207
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1254
        class LoggingDecorator(tests.ForwardingResult):
1208
1255
            def startTestRun(self):
1209
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1256
                tests.ForwardingResult.startTestRun(self)
1210
1257
                calls.append('startTestRun')
1211
1258
        test = unittest.FunctionTestCase(lambda:None)
1212
1259
        stream = StringIO()
1218
1265
    def test_stopTestRun(self):
1219
1266
        """run should call result.stopTestRun()"""
1220
1267
        calls = []
1221
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1268
        class LoggingDecorator(tests.ForwardingResult):
1222
1269
            def stopTestRun(self):
1223
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1270
                tests.ForwardingResult.stopTestRun(self)
1224
1271
                calls.append('stopTestRun')
1225
1272
        test = unittest.FunctionTestCase(lambda:None)
1226
1273
        stream = StringIO()
1229
1276
        result = self.run_test_runner(runner, test)
1230
1277
        self.assertLength(1, calls)
1231
1278
 
1232
 
    def test_unicode_test_output_on_ascii_stream(self):
1233
 
        """Showing results should always succeed even on an ascii console"""
1234
 
        class FailureWithUnicode(tests.TestCase):
1235
 
            def test_log_unicode(self):
1236
 
                self.log(u"\u2606")
1237
 
                self.fail("Now print that log!")
1238
 
        out = StringIO()
1239
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1240
 
            lambda trace=False: "ascii")
1241
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1242
 
            FailureWithUnicode("test_log_unicode"))
1243
 
        self.assertContainsRe(out.getvalue(),
1244
 
            "(?:Text attachment: )?log"
1245
 
            "(?:\n-+\n|: {{{)"
1246
 
            "\d+\.\d+  \\\\u2606"
1247
 
            "(?:\n-+\n|}}}\n)")
1248
 
 
1249
1279
 
1250
1280
class SampleTestCase(tests.TestCase):
1251
1281
 
1439
1469
        # Note this test won't fail with hooks that the core library doesn't
1440
1470
        # use - but it trigger with a plugin that adds hooks, so its still a
1441
1471
        # useful warning in that case.
1442
 
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1443
 
        self.assertEqual(
1444
 
            bzrlib.smart.server.SmartServerHooks(),
 
1472
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1473
            bzrlib.branch.Branch.hooks)
 
1474
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
1445
1475
            bzrlib.smart.server.SmartTCPServer.hooks)
1446
 
        self.assertEqual(
1447
 
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1476
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1477
            bzrlib.commands.Command.hooks)
1448
1478
 
1449
1479
    def test__gather_lsprof_in_benchmarks(self):
1450
1480
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1451
1481
 
1452
1482
        Each self.time() call is individually and separately profiled.
1453
1483
        """
1454
 
        self.requireFeature(features.lsprof_feature)
 
1484
        self.requireFeature(test_lsprof.LSProfFeature)
1455
1485
        # overrides the class member with an instance member so no cleanup
1456
1486
        # needed.
1457
1487
        self._gather_lsprof_in_benchmarks = True
1476
1506
        transport_server = memory.MemoryServer()
1477
1507
        transport_server.start_server()
1478
1508
        self.addCleanup(transport_server.stop_server)
1479
 
        t = transport.get_transport_from_url(transport_server.get_url())
 
1509
        t = transport.get_transport(transport_server.get_url())
1480
1510
        bzrdir.BzrDir.create(t.base)
1481
1511
        self.assertRaises(errors.BzrError,
1482
1512
            bzrdir.BzrDir.open_from_transport, t)
1487
1517
 
1488
1518
    def test_requireFeature_available(self):
1489
1519
        """self.requireFeature(available) is a no-op."""
1490
 
        class Available(features.Feature):
 
1520
        class Available(tests.Feature):
1491
1521
            def _probe(self):return True
1492
1522
        feature = Available()
1493
1523
        self.requireFeature(feature)
1494
1524
 
1495
1525
    def test_requireFeature_unavailable(self):
1496
1526
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1497
 
        class Unavailable(features.Feature):
 
1527
        class Unavailable(tests.Feature):
1498
1528
            def _probe(self):return False
1499
1529
        feature = Unavailable()
1500
1530
        self.assertRaises(tests.UnavailableFeature,
1659
1689
        test.run(unittest.TestResult())
1660
1690
        self.assertEqual('original', obj.test_attr)
1661
1691
 
1662
 
    def test_recordCalls(self):
1663
 
        from bzrlib.tests import test_selftest
1664
 
        calls = self.recordCalls(
1665
 
            test_selftest, '_add_numbers')
1666
 
        self.assertEqual(test_selftest._add_numbers(2, 10),
1667
 
            12)
1668
 
        self.assertEquals(calls, [((2, 10), {})])
1669
 
 
1670
 
 
1671
 
def _add_numbers(a, b):
1672
 
    return a + b
1673
 
 
1674
 
 
1675
 
class _MissingFeature(features.Feature):
 
1692
 
 
1693
class _MissingFeature(tests.Feature):
1676
1694
    def _probe(self):
1677
1695
        return False
1678
1696
missing_feature = _MissingFeature()
1729
1747
        result = self._run_test('test_fail')
1730
1748
        self.assertEqual(1, len(result.failures))
1731
1749
        result_content = result.failures[0][1]
1732
 
        self.assertContainsRe(result_content,
1733
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1750
        self.assertContainsRe(result_content, 'Text attachment: log')
1734
1751
        self.assertContainsRe(result_content, 'this was a failing test')
1735
1752
 
1736
1753
    def test_error_has_log(self):
1737
1754
        result = self._run_test('test_error')
1738
1755
        self.assertEqual(1, len(result.errors))
1739
1756
        result_content = result.errors[0][1]
1740
 
        self.assertContainsRe(result_content,
1741
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1757
        self.assertContainsRe(result_content, 'Text attachment: log')
1742
1758
        self.assertContainsRe(result_content, 'this test errored')
1743
1759
 
1744
1760
    def test_skip_has_no_log(self):
1763
1779
        result = self._run_test('test_xfail')
1764
1780
        self.assertEqual(1, len(result.expectedFailures))
1765
1781
        result_content = result.expectedFailures[0][1]
1766
 
        self.assertNotContainsRe(result_content,
1767
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1782
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1768
1783
        self.assertNotContainsRe(result_content, 'test with expected failure')
1769
1784
 
1770
1785
    def test_unexpected_success_has_log(self):
1943
1958
    def test_make_branch_and_tree_with_format(self):
1944
1959
        # we should be able to supply a format to make_branch_and_tree
1945
1960
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
 
1961
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1946
1962
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1947
1963
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1964
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1965
                              bzrlib.bzrdir.BzrDirFormat6)
1948
1966
 
1949
1967
    def test_make_branch_and_memory_tree(self):
1950
1968
        # we should be able to get a new branch and a mutable tree from
2027
2045
        self.assertLength(2, output.readlines())
2028
2046
 
2029
2047
    def test_lsprof_tests(self):
2030
 
        self.requireFeature(features.lsprof_feature)
2031
 
        results = []
 
2048
        self.requireFeature(test_lsprof.LSProfFeature)
 
2049
        calls = []
2032
2050
        class Test(object):
2033
2051
            def __call__(test, result):
2034
2052
                test.run(result)
2035
2053
            def run(test, result):
2036
 
                results.append(result)
 
2054
                self.assertIsInstance(result, tests.ForwardingResult)
 
2055
                calls.append("called")
2037
2056
            def countTestCases(self):
2038
2057
                return 1
2039
2058
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2040
 
        self.assertLength(1, results)
2041
 
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
2059
        self.assertLength(1, calls)
2042
2060
 
2043
2061
    def test_random(self):
2044
2062
        # test randomising by listing a number of tests.
2179
2197
        self.assertNotContainsRe(content, 'test with expected failure')
2180
2198
        self.assertEqual(1, len(result.expectedFailures))
2181
2199
        result_content = result.expectedFailures[0][1]
2182
 
        self.assertNotContainsRe(result_content,
2183
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2200
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2184
2201
        self.assertNotContainsRe(result_content, 'test with expected failure')
2185
2202
 
2186
2203
    def test_unexpected_success_has_log(self):
2187
2204
        content, result = self.run_subunit_stream('test_unexpected_success')
2188
2205
        self.assertContainsRe(content, '(?m)^log$')
2189
2206
        self.assertContainsRe(content, 'test with unexpected success')
2190
 
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2191
 
        #                success, if a min version check is added remove this
2192
 
        from subunit import TestProtocolClient as _Client
2193
 
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2194
 
            self.expectFailure('subunit treats "unexpectedSuccess"'
2195
 
                               ' as a plain success',
2196
 
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2207
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2208
                           ' as a plain success',
 
2209
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2197
2210
        self.assertEqual(1, len(result.unexpectedSuccesses))
2198
2211
        test = result.unexpectedSuccesses[0]
2199
2212
        # RemotedTestCase doesn't preserve the "details"
2334
2347
        # stdout and stderr of the invoked run_bzr
2335
2348
        current_factory = bzrlib.ui.ui_factory
2336
2349
        self.run_bzr(['foo'])
2337
 
        self.assertFalse(current_factory is self.factory)
 
2350
        self.failIf(current_factory is self.factory)
2338
2351
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2339
2352
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2340
2353
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2497
2510
 
2498
2511
 
2499
2512
class TestStartBzrSubProcess(tests.TestCase):
2500
 
    """Stub test start_bzr_subprocess."""
2501
2513
 
2502
 
    def _subprocess_log_cleanup(self):
2503
 
        """Inhibits the base version as we don't produce a log file."""
 
2514
    def check_popen_state(self):
 
2515
        """Replace to make assertions when popen is called."""
2504
2516
 
2505
2517
    def _popen(self, *args, **kwargs):
2506
 
        """Override the base version to record the command that is run.
2507
 
 
2508
 
        From there we can ensure it is correct without spawning a real process.
2509
 
        """
 
2518
        """Record the command that is run, so that we can ensure it is correct"""
2510
2519
        self.check_popen_state()
2511
2520
        self._popen_args = args
2512
2521
        self._popen_kwargs = kwargs
2513
2522
        raise _DontSpawnProcess()
2514
2523
 
2515
 
    def check_popen_state(self):
2516
 
        """Replace to make assertions when popen is called."""
2517
 
 
2518
2524
    def test_run_bzr_subprocess_no_plugins(self):
2519
2525
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2520
2526
        command = self._popen_args[0]
2524
2530
 
2525
2531
    def test_allow_plugins(self):
2526
2532
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2527
 
                          allow_plugins=True)
 
2533
            allow_plugins=True)
2528
2534
        command = self._popen_args[0]
2529
2535
        self.assertEqual([], command[2:])
2530
2536
 
2531
2537
    def test_set_env(self):
2532
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2538
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2533
2539
        # set in the child
2534
2540
        def check_environment():
2535
2541
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2536
2542
        self.check_popen_state = check_environment
2537
2543
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2538
 
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2544
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
2539
2545
        # not set in theparent
2540
2546
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2541
2547
 
2542
2548
    def test_run_bzr_subprocess_env_del(self):
2543
2549
        """run_bzr_subprocess can remove environment variables too."""
2544
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2550
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2545
2551
        def check_environment():
2546
2552
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2547
2553
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2548
2554
        self.check_popen_state = check_environment
2549
2555
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2550
 
                          env_changes={'EXISTANT_ENV_VAR':None})
 
2556
            env_changes={'EXISTANT_ENV_VAR':None})
2551
2557
        # Still set in parent
2552
2558
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2553
2559
        del os.environ['EXISTANT_ENV_VAR']
2554
2560
 
2555
2561
    def test_env_del_missing(self):
2556
 
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2562
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2557
2563
        def check_environment():
2558
2564
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2559
2565
        self.check_popen_state = check_environment
2560
2566
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2561
 
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2567
            env_changes={'NON_EXISTANT_ENV_VAR':None})
2562
2568
 
2563
2569
    def test_working_dir(self):
2564
2570
        """Test that we can specify the working dir for the child"""
2567
2573
        chdirs = []
2568
2574
        def chdir(path):
2569
2575
            chdirs.append(path)
2570
 
        self.overrideAttr(os, 'chdir', chdir)
2571
 
        def getcwd():
2572
 
            return 'current'
2573
 
        self.overrideAttr(osutils, 'getcwd', getcwd)
2574
 
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2575
 
                          working_dir='foo')
 
2576
        os.chdir = chdir
 
2577
        try:
 
2578
            def getcwd():
 
2579
                return 'current'
 
2580
            osutils.getcwd = getcwd
 
2581
            try:
 
2582
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2583
                    working_dir='foo')
 
2584
            finally:
 
2585
                osutils.getcwd = orig_getcwd
 
2586
        finally:
 
2587
            os.chdir = orig_chdir
2576
2588
        self.assertEqual(['foo', 'current'], chdirs)
2577
2589
 
2578
2590
    def test_get_bzr_path_with_cwd_bzrlib(self):
2598
2610
        self.assertEqual('bzr: interrupted\n', result[1])
2599
2611
 
2600
2612
 
 
2613
class TestFeature(tests.TestCase):
 
2614
 
 
2615
    def test_caching(self):
 
2616
        """Feature._probe is called by the feature at most once."""
 
2617
        class InstrumentedFeature(tests.Feature):
 
2618
            def __init__(self):
 
2619
                super(InstrumentedFeature, self).__init__()
 
2620
                self.calls = []
 
2621
            def _probe(self):
 
2622
                self.calls.append('_probe')
 
2623
                return False
 
2624
        feature = InstrumentedFeature()
 
2625
        feature.available()
 
2626
        self.assertEqual(['_probe'], feature.calls)
 
2627
        feature.available()
 
2628
        self.assertEqual(['_probe'], feature.calls)
 
2629
 
 
2630
    def test_named_str(self):
 
2631
        """Feature.__str__ should thunk to feature_name()."""
 
2632
        class NamedFeature(tests.Feature):
 
2633
            def feature_name(self):
 
2634
                return 'symlinks'
 
2635
        feature = NamedFeature()
 
2636
        self.assertEqual('symlinks', str(feature))
 
2637
 
 
2638
    def test_default_str(self):
 
2639
        """Feature.__str__ should default to __class__.__name__."""
 
2640
        class NamedFeature(tests.Feature):
 
2641
            pass
 
2642
        feature = NamedFeature()
 
2643
        self.assertEqual('NamedFeature', str(feature))
 
2644
 
 
2645
 
 
2646
class TestUnavailableFeature(tests.TestCase):
 
2647
 
 
2648
    def test_access_feature(self):
 
2649
        feature = tests.Feature()
 
2650
        exception = tests.UnavailableFeature(feature)
 
2651
        self.assertIs(feature, exception.args[0])
 
2652
 
 
2653
 
 
2654
simple_thunk_feature = tests._CompatabilityThunkFeature(
 
2655
    deprecated_in((2, 1, 0)),
 
2656
    'bzrlib.tests.test_selftest',
 
2657
    'simple_thunk_feature','UnicodeFilename',
 
2658
    replacement_module='bzrlib.tests'
 
2659
    )
 
2660
 
 
2661
class Test_CompatibilityFeature(tests.TestCase):
 
2662
 
 
2663
    def test_does_thunk(self):
 
2664
        res = self.callDeprecated(
 
2665
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
 
2666
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
 
2667
            simple_thunk_feature.available)
 
2668
        self.assertEqual(tests.UnicodeFilename.available(), res)
 
2669
 
 
2670
 
 
2671
class TestModuleAvailableFeature(tests.TestCase):
 
2672
 
 
2673
    def test_available_module(self):
 
2674
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
 
2675
        self.assertEqual('bzrlib.tests', feature.module_name)
 
2676
        self.assertEqual('bzrlib.tests', str(feature))
 
2677
        self.assertTrue(feature.available())
 
2678
        self.assertIs(tests, feature.module)
 
2679
 
 
2680
    def test_unavailable_module(self):
 
2681
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
 
2682
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
 
2683
        self.assertFalse(feature.available())
 
2684
        self.assertIs(None, feature.module)
 
2685
 
 
2686
 
2601
2687
class TestSelftestFiltering(tests.TestCase):
2602
2688
 
2603
2689
    def setUp(self):
2754
2840
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2755
2841
 
2756
2842
 
2757
 
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2843
class TestCheckInventoryShape(tests.TestCaseWithTransport):
2758
2844
 
2759
 
    def test_check_tree_shape(self):
 
2845
    def test_check_inventory_shape(self):
2760
2846
        files = ['a', 'b/', 'b/c']
2761
2847
        tree = self.make_branch_and_tree('.')
2762
2848
        self.build_tree(files)
2763
2849
        tree.add(files)
2764
2850
        tree.lock_read()
2765
2851
        try:
2766
 
            self.check_tree_shape(tree, files)
 
2852
            self.check_inventory_shape(tree.inventory, files)
2767
2853
        finally:
2768
2854
            tree.unlock()
2769
2855
 
3101
3187
        tpr.register('bar', 'bBB.aAA.rRR')
3102
3188
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3103
3189
        self.assertThat(self.get_log(),
3104
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3105
 
                           doctest.ELLIPSIS))
 
3190
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3106
3191
 
3107
3192
    def test_get_unknown_prefix(self):
3108
3193
        tpr = self._get_registry()
3143
3228
        class Test(unittest.TestCase):
3144
3229
            def runTest(self):
3145
3230
                pass
 
3231
            addCleanup = None # for when on Python 2.7 with native addCleanup
3146
3232
        result = self.LeakRecordingResult()
3147
3233
        test = Test()
 
3234
        self.assertIs(getattr(test, "addCleanup", None), None)
3148
3235
        result.startTestRun()
3149
3236
        test.run(result)
3150
3237
        result.stopTestRun()
3214
3301
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3215
3302
 
3216
3303
 
3217
 
class TestPostMortemDebugging(tests.TestCase):
3218
 
    """Check post mortem debugging works when tests fail or error"""
3219
 
 
3220
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3221
 
        def __init__(self):
3222
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3223
 
            self.postcode = None
3224
 
        def _post_mortem(self, tb=None):
3225
 
            """Record the code object at the end of the current traceback"""
3226
 
            tb = tb or sys.exc_info()[2]
3227
 
            if tb is not None:
3228
 
                next = tb.tb_next
3229
 
                while next is not None:
3230
 
                    tb = next
3231
 
                    next = next.tb_next
3232
 
                self.postcode = tb.tb_frame.f_code
3233
 
        def report_error(self, test, err):
3234
 
            pass
3235
 
        def report_failure(self, test, err):
3236
 
            pass
3237
 
 
3238
 
    def test_location_unittest_error(self):
3239
 
        """Needs right post mortem traceback with erroring unittest case"""
3240
 
        class Test(unittest.TestCase):
3241
 
            def runTest(self):
3242
 
                raise RuntimeError
3243
 
        result = self.TracebackRecordingResult()
3244
 
        Test().run(result)
3245
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3246
 
 
3247
 
    def test_location_unittest_failure(self):
3248
 
        """Needs right post mortem traceback with failing unittest case"""
3249
 
        class Test(unittest.TestCase):
3250
 
            def runTest(self):
3251
 
                raise self.failureException
3252
 
        result = self.TracebackRecordingResult()
3253
 
        Test().run(result)
3254
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3255
 
 
3256
 
    def test_location_bt_error(self):
3257
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3258
 
        class Test(tests.TestCase):
3259
 
            def test_error(self):
3260
 
                raise RuntimeError
3261
 
        result = self.TracebackRecordingResult()
3262
 
        Test("test_error").run(result)
3263
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3264
 
 
3265
 
    def test_location_bt_failure(self):
3266
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3267
 
        class Test(tests.TestCase):
3268
 
            def test_failure(self):
3269
 
                raise self.failureException
3270
 
        result = self.TracebackRecordingResult()
3271
 
        Test("test_failure").run(result)
3272
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3273
 
 
3274
 
    def test_env_var_triggers_post_mortem(self):
3275
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3276
 
        import pdb
3277
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3278
 
        post_mortem_calls = []
3279
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3280
 
        self.overrideEnv('BZR_TEST_PDB', None)
3281
 
        result._post_mortem(1)
3282
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3283
 
        result._post_mortem(2)
3284
 
        self.assertEqual([2], post_mortem_calls)
3285
 
 
3286
 
 
3287
3304
class TestRunSuite(tests.TestCase):
3288
3305
 
3289
3306
    def test_runner_class(self):
3300
3317
                                                self.verbosity)
3301
3318
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3302
3319
        self.assertLength(1, calls)
3303
 
 
3304
 
 
3305
 
class _Selftest(object):
3306
 
    """Mixin for tests needing full selftest output"""
3307
 
 
3308
 
    def _inject_stream_into_subunit(self, stream):
3309
 
        """To be overridden by subclasses that run tests out of process"""
3310
 
 
3311
 
    def _run_selftest(self, **kwargs):
3312
 
        sio = StringIO()
3313
 
        self._inject_stream_into_subunit(sio)
3314
 
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3315
 
        return sio.getvalue()
3316
 
 
3317
 
 
3318
 
class _ForkedSelftest(_Selftest):
3319
 
    """Mixin for tests needing full selftest output with forked children"""
3320
 
 
3321
 
    _test_needs_features = [features.subunit]
3322
 
 
3323
 
    def _inject_stream_into_subunit(self, stream):
3324
 
        """Monkey-patch subunit so the extra output goes to stream not stdout
3325
 
 
3326
 
        Some APIs need rewriting so this kind of bogus hackery can be replaced
3327
 
        by passing the stream param from run_tests down into ProtocolTestCase.
3328
 
        """
3329
 
        from subunit import ProtocolTestCase
3330
 
        _original_init = ProtocolTestCase.__init__
3331
 
        def _init_with_passthrough(self, *args, **kwargs):
3332
 
            _original_init(self, *args, **kwargs)
3333
 
            self._passthrough = stream
3334
 
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3335
 
 
3336
 
    def _run_selftest(self, **kwargs):
3337
 
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3338
 
        if getattr(os, "fork", None) is None:
3339
 
            raise tests.TestNotApplicable("Platform doesn't support forking")
3340
 
        # Make sure the fork code is actually invoked by claiming two cores
3341
 
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3342
 
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3343
 
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3344
 
 
3345
 
 
3346
 
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3347
 
    """Check operation of --parallel=fork selftest option"""
3348
 
 
3349
 
    def test_error_in_child_during_fork(self):
3350
 
        """Error in a forked child during test setup should get reported"""
3351
 
        class Test(tests.TestCase):
3352
 
            def testMethod(self):
3353
 
                pass
3354
 
        # We don't care what, just break something that a child will run
3355
 
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3356
 
        out = self._run_selftest(test_suite_factory=Test)
3357
 
        # Lines from the tracebacks of the two child processes may be mixed
3358
 
        # together due to the way subunit parses and forwards the streams,
3359
 
        # so permit extra lines between each part of the error output.
3360
 
        self.assertContainsRe(out,
3361
 
            "Traceback.*:\n"
3362
 
            "(?:.*\n)*"
3363
 
            ".+ in fork_for_tests\n"
3364
 
            "(?:.*\n)*"
3365
 
            "\s*workaround_zealous_crypto_random\(\)\n"
3366
 
            "(?:.*\n)*"
3367
 
            "TypeError:")
3368
 
 
3369
 
 
3370
 
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3371
 
    """Check a test case still alive after being run emits a warning"""
3372
 
 
3373
 
    class Test(tests.TestCase):
3374
 
        def test_pass(self):
3375
 
            pass
3376
 
        def test_self_ref(self):
3377
 
            self.also_self = self.test_self_ref
3378
 
        def test_skip(self):
3379
 
            self.skip("Don't need")
3380
 
 
3381
 
    def _get_suite(self):
3382
 
        return TestUtil.TestSuite([
3383
 
            self.Test("test_pass"),
3384
 
            self.Test("test_self_ref"),
3385
 
            self.Test("test_skip"),
3386
 
            ])
3387
 
 
3388
 
    def _run_selftest_with_suite(self, **kwargs):
3389
 
        old_flags = tests.selftest_debug_flags
3390
 
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3391
 
        gc_on = gc.isenabled()
3392
 
        if gc_on:
3393
 
            gc.disable()
3394
 
        try:
3395
 
            output = self._run_selftest(test_suite_factory=self._get_suite,
3396
 
                **kwargs)
3397
 
        finally:
3398
 
            if gc_on:
3399
 
                gc.enable()
3400
 
            tests.selftest_debug_flags = old_flags
3401
 
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3402
 
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3403
 
        return output
3404
 
 
3405
 
    def test_testsuite(self):
3406
 
        self._run_selftest_with_suite()
3407
 
 
3408
 
    def test_pattern(self):
3409
 
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3410
 
        self.assertNotContainsRe(out, "test_skip")
3411
 
 
3412
 
    def test_exclude_pattern(self):
3413
 
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3414
 
        self.assertNotContainsRe(out, "test_skip")
3415
 
 
3416
 
    def test_random_seed(self):
3417
 
        self._run_selftest_with_suite(random_seed="now")
3418
 
 
3419
 
    def test_matching_tests_first(self):
3420
 
        self._run_selftest_with_suite(matching_tests_first=True,
3421
 
            pattern="test_self_ref$")
3422
 
 
3423
 
    def test_starting_with_and_exclude(self):
3424
 
        out = self._run_selftest_with_suite(starting_with=["bt."],
3425
 
            exclude_pattern="test_skip$")
3426
 
        self.assertNotContainsRe(out, "test_skip")
3427
 
 
3428
 
    def test_additonal_decorator(self):
3429
 
        out = self._run_selftest_with_suite(
3430
 
            suite_decorators=[tests.TestDecorator])
3431
 
 
3432
 
 
3433
 
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3434
 
    """Check warnings from tests staying alive are emitted with subunit"""
3435
 
 
3436
 
    _test_needs_features = [features.subunit]
3437
 
 
3438
 
    def _run_selftest_with_suite(self, **kwargs):
3439
 
        return TestUncollectedWarnings._run_selftest_with_suite(self,
3440
 
            runner_class=tests.SubUnitBzrRunner, **kwargs)
3441
 
 
3442
 
 
3443
 
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3444
 
    """Check warnings from tests staying alive are emitted when forking"""
3445
 
 
3446
 
 
3447
 
class TestEnvironHandling(tests.TestCase):
3448
 
 
3449
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3450
 
        self.assertFalse('MYVAR' in os.environ)
3451
 
        self.overrideEnv('MYVAR', '42')
3452
 
        # We use an embedded test to make sure we fix the _captureVar bug
3453
 
        class Test(tests.TestCase):
3454
 
            def test_me(self):
3455
 
                # The first call save the 42 value
3456
 
                self.overrideEnv('MYVAR', None)
3457
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3458
 
                # Make sure we can call it twice
3459
 
                self.overrideEnv('MYVAR', None)
3460
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3461
 
        output = StringIO()
3462
 
        result = tests.TextTestResult(output, 0, 1)
3463
 
        Test('test_me').run(result)
3464
 
        if not result.wasStrictlySuccessful():
3465
 
            self.fail(output.getvalue())
3466
 
        # We get our value back
3467
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3468
 
 
3469
 
 
3470
 
class TestIsolatedEnv(tests.TestCase):
3471
 
    """Test isolating tests from os.environ.
3472
 
 
3473
 
    Since we use tests that are already isolated from os.environ a bit of care
3474
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3475
 
    The tests start an already clean os.environ which allow doing valid
3476
 
    assertions about which variables are present or not and design tests around
3477
 
    these assertions.
3478
 
    """
3479
 
 
3480
 
    class ScratchMonkey(tests.TestCase):
3481
 
 
3482
 
        def test_me(self):
3483
 
            pass
3484
 
 
3485
 
    def test_basics(self):
3486
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3487
 
        # for tests.TestCase.
3488
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3489
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3490
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3491
 
        self.assertFalse('BZR_HOME' in os.environ)
3492
 
        # Make sure we know the definition of LINES: part of os.environ for
3493
 
        # tests.TestCase
3494
 
        self.assertTrue('LINES' in tests.isolated_environ)
3495
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3496
 
        self.assertEquals('25', os.environ['LINES'])
3497
 
 
3498
 
    def test_injecting_unknown_variable(self):
3499
 
        # BZR_HOME is known to be absent from os.environ
3500
 
        test = self.ScratchMonkey('test_me')
3501
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3502
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3503
 
        tests.restore_os_environ(test)
3504
 
        self.assertFalse('BZR_HOME' in os.environ)
3505
 
 
3506
 
    def test_injecting_known_variable(self):
3507
 
        test = self.ScratchMonkey('test_me')
3508
 
        # LINES is known to be present in os.environ
3509
 
        tests.override_os_environ(test, {'LINES': '42'})
3510
 
        self.assertEquals('42', os.environ['LINES'])
3511
 
        tests.restore_os_environ(test)
3512
 
        self.assertEquals('25', os.environ['LINES'])
3513
 
 
3514
 
    def test_deleting_variable(self):
3515
 
        test = self.ScratchMonkey('test_me')
3516
 
        # LINES is known to be present in os.environ
3517
 
        tests.override_os_environ(test, {'LINES': None})
3518
 
        self.assertTrue('LINES' not in os.environ)
3519
 
        tests.restore_os_environ(test)
3520
 
        self.assertEquals('25', os.environ['LINES'])
3521
 
 
3522
 
 
3523
 
class TestDocTestSuiteIsolation(tests.TestCase):
3524
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3525
 
 
3526
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3527
 
    the clean environment as a base for testing. To precisely capture the
3528
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3529
 
    compare against.
3530
 
 
3531
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3532
 
    not `os.environ` so each test overrides it to suit its needs.
3533
 
 
3534
 
    """
3535
 
 
3536
 
    def get_doctest_suite_for_string(self, klass, string):
3537
 
        class Finder(doctest.DocTestFinder):
3538
 
 
3539
 
            def find(*args, **kwargs):
3540
 
                test = doctest.DocTestParser().get_doctest(
3541
 
                    string, {}, 'foo', 'foo.py', 0)
3542
 
                return [test]
3543
 
 
3544
 
        suite = klass(test_finder=Finder())
3545
 
        return suite
3546
 
 
3547
 
    def run_doctest_suite_for_string(self, klass, string):
3548
 
        suite = self.get_doctest_suite_for_string(klass, string)
3549
 
        output = StringIO()
3550
 
        result = tests.TextTestResult(output, 0, 1)
3551
 
        suite.run(result)
3552
 
        return result, output
3553
 
 
3554
 
    def assertDocTestStringSucceds(self, klass, string):
3555
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3556
 
        if not result.wasStrictlySuccessful():
3557
 
            self.fail(output.getvalue())
3558
 
 
3559
 
    def assertDocTestStringFails(self, klass, string):
3560
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3561
 
        if result.wasStrictlySuccessful():
3562
 
            self.fail(output.getvalue())
3563
 
 
3564
 
    def test_injected_variable(self):
3565
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3566
 
        test = """
3567
 
            >>> import os
3568
 
            >>> os.environ['LINES']
3569
 
            '42'
3570
 
            """
3571
 
        # doctest.DocTestSuite fails as it sees '25'
3572
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3573
 
        # tests.DocTestSuite sees '42'
3574
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3575
 
 
3576
 
    def test_deleted_variable(self):
3577
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3578
 
        test = """
3579
 
            >>> import os
3580
 
            >>> os.environ.get('LINES')
3581
 
            """
3582
 
        # doctest.DocTestSuite fails as it sees '25'
3583
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3584
 
        # tests.DocTestSuite sees None
3585
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3586
 
 
3587
 
 
3588
 
class TestSelftestExcludePatterns(tests.TestCase):
3589
 
 
3590
 
    def setUp(self):
3591
 
        super(TestSelftestExcludePatterns, self).setUp()
3592
 
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
3593
 
 
3594
 
    def suite_factory(self, keep_only=None, starting_with=None):
3595
 
        """A test suite factory with only a few tests."""
3596
 
        class Test(tests.TestCase):
3597
 
            def id(self):
3598
 
                # We don't need the full class path
3599
 
                return self._testMethodName
3600
 
            def a(self):
3601
 
                pass
3602
 
            def b(self):
3603
 
                pass
3604
 
            def c(self):
3605
 
                pass
3606
 
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3607
 
 
3608
 
    def assertTestList(self, expected, *selftest_args):
3609
 
        # We rely on setUp installing the right test suite factory so we can
3610
 
        # test at the command level without loading the whole test suite
3611
 
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3612
 
        actual = out.splitlines()
3613
 
        self.assertEquals(expected, actual)
3614
 
 
3615
 
    def test_full_list(self):
3616
 
        self.assertTestList(['a', 'b', 'c'])
3617
 
 
3618
 
    def test_single_exclude(self):
3619
 
        self.assertTestList(['b', 'c'], '-x', 'a')
3620
 
 
3621
 
    def test_mutiple_excludes(self):
3622
 
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3623
 
 
3624
 
 
3625
 
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3626
 
 
3627
 
    _test_needs_features = [features.subunit]
3628
 
 
3629
 
    def setUp(self):
3630
 
        super(TestCounterHooks, self).setUp()
3631
 
        class Test(tests.TestCase):
3632
 
 
3633
 
            def setUp(self):
3634
 
                super(Test, self).setUp()
3635
 
                self.hooks = hooks.Hooks()
3636
 
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3637
 
                self.install_counter_hook(self.hooks, 'myhook')
3638
 
 
3639
 
            def no_hook(self):
3640
 
                pass
3641
 
 
3642
 
            def run_hook_once(self):
3643
 
                for hook in self.hooks['myhook']:
3644
 
                    hook(self)
3645
 
 
3646
 
        self.test_class = Test
3647
 
 
3648
 
    def assertHookCalls(self, expected_calls, test_name):
3649
 
        test = self.test_class(test_name)
3650
 
        result = unittest.TestResult()
3651
 
        test.run(result)
3652
 
        self.assertTrue(hasattr(test, '_counters'))
3653
 
        self.assertTrue(test._counters.has_key('myhook'))
3654
 
        self.assertEquals(expected_calls, test._counters['myhook'])
3655
 
 
3656
 
    def test_no_hook(self):
3657
 
        self.assertHookCalls(0, 'no_hook')
3658
 
 
3659
 
    def test_run_hook_once(self):
3660
 
        tt = features.testtools
3661
 
        if tt.module.__version__ < (0, 9, 8):
3662
 
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3663
 
        self.assertHookCalls(1, 'run_hook_once')