~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-16 19:18:39 UTC
  • mto: This revision was merged to the branch mainline in revision 6391.
  • Revision ID: jelmer@samba.org-20111216191839-eg681lxqibi1qxu1
Fix remaining tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import gc
 
21
import doctest
21
22
import os
22
23
import signal
23
24
import sys
26
27
import unittest
27
28
import warnings
28
29
 
29
 
from testtools import MultiTestResult
 
30
from testtools import (
 
31
    ExtendedToOriginalDecorator,
 
32
    MultiTestResult,
 
33
    )
30
34
from testtools.content import Content
31
35
from testtools.content_type import ContentType
32
36
from testtools.matchers import (
33
37
    DocTestMatches,
34
38
    Equals,
35
39
    )
36
 
import testtools.tests.helpers
 
40
import testtools.testresult.doubles
37
41
 
38
42
import bzrlib
39
43
from bzrlib import (
40
44
    branchbuilder,
41
45
    bzrdir,
42
 
    debug,
43
46
    errors,
 
47
    hooks,
44
48
    lockdir,
45
49
    memorytree,
46
50
    osutils,
50
54
    tests,
51
55
    transport,
52
56
    workingtree,
 
57
    workingtree_3,
 
58
    workingtree_4,
53
59
    )
54
60
from bzrlib.repofmt import (
55
61
    groupcompress_repo,
56
 
    pack_repo,
57
 
    weaverepo,
58
62
    )
59
63
from bzrlib.symbol_versioning import (
60
64
    deprecated_function,
65
69
    features,
66
70
    test_lsprof,
67
71
    test_server,
68
 
    test_sftp_transport,
69
72
    TestUtil,
70
73
    )
71
74
from bzrlib.trace import note, mutter
72
75
from bzrlib.transport import memory
73
 
from bzrlib.version import _get_bzr_source_tree
74
76
 
75
77
 
76
78
def _test_ids(test_suite):
78
80
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
79
81
 
80
82
 
81
 
class SelftestTests(tests.TestCase):
82
 
 
83
 
    def test_import_tests(self):
84
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
85
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
86
 
 
87
 
    def test_import_test_failure(self):
88
 
        self.assertRaises(ImportError,
89
 
                          TestUtil._load_module_by_name,
90
 
                          'bzrlib.no-name-yet')
91
 
 
92
 
 
93
83
class MetaTestLog(tests.TestCase):
94
84
 
95
85
    def test_logging(self):
101
91
            "text", "plain", {"charset": "utf8"})))
102
92
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
103
93
        self.assertThat(self.get_log(),
104
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
105
 
 
106
 
 
107
 
class TestUnicodeFilename(tests.TestCase):
108
 
 
109
 
    def test_probe_passes(self):
110
 
        """UnicodeFilename._probe passes."""
111
 
        # We can't test much more than that because the behaviour depends
112
 
        # on the platform.
113
 
        tests.UnicodeFilename._probe()
 
94
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
114
95
 
115
96
 
116
97
class TestTreeShape(tests.TestCaseInTempDir):
117
98
 
118
99
    def test_unicode_paths(self):
119
 
        self.requireFeature(tests.UnicodeFilename)
 
100
        self.requireFeature(features.UnicodeFilenameFeature)
120
101
 
121
102
        filename = u'hell\u00d8'
122
103
        self.build_tree_contents([(filename, 'contents of hello')])
123
 
        self.failUnlessExists(filename)
 
104
        self.assertPathExists(filename)
124
105
 
125
106
 
126
107
class TestClassesAvailable(tests.TestCase):
352
333
        from bzrlib.tests.per_workingtree import make_scenarios
353
334
        server1 = "a"
354
335
        server2 = "b"
355
 
        formats = [workingtree.WorkingTreeFormat2(),
356
 
                   workingtree.WorkingTreeFormat3(),]
 
336
        formats = [workingtree_4.WorkingTreeFormat4(),
 
337
                   workingtree_3.WorkingTreeFormat3(),]
357
338
        scenarios = make_scenarios(server1, server2, formats)
358
339
        self.assertEqual([
359
 
            ('WorkingTreeFormat2',
 
340
            ('WorkingTreeFormat4',
360
341
             {'bzrdir_format': formats[0]._matchingbzrdir,
361
342
              'transport_readonly_server': 'b',
362
343
              'transport_server': 'a',
389
370
            )
390
371
        server1 = "a"
391
372
        server2 = "b"
392
 
        formats = [workingtree.WorkingTreeFormat2(),
393
 
                   workingtree.WorkingTreeFormat3(),]
 
373
        formats = [workingtree_4.WorkingTreeFormat4(),
 
374
                   workingtree_3.WorkingTreeFormat3(),]
394
375
        scenarios = make_scenarios(server1, server2, formats)
395
376
        self.assertEqual(7, len(scenarios))
396
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
397
 
        wt4_format = workingtree.WorkingTreeFormat4()
398
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
377
        default_wt_format = workingtree.format_registry.get_default()
 
378
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
379
        wt5_format = workingtree_4.WorkingTreeFormat5()
399
380
        expected_scenarios = [
400
 
            ('WorkingTreeFormat2',
 
381
            ('WorkingTreeFormat4',
401
382
             {'bzrdir_format': formats[0]._matchingbzrdir,
402
383
              'transport_readonly_server': 'b',
403
384
              'transport_server': 'a',
463
444
        # ones to add.
464
445
        from bzrlib.tests.per_tree import (
465
446
            return_parameter,
466
 
            revision_tree_from_workingtree
467
447
            )
468
448
        from bzrlib.tests.per_intertree import (
469
449
            make_scenarios,
470
450
            )
471
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
451
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
452
        from bzrlib.workingtree_4 import WorkingTreeFormat4
472
453
        input_test = TestInterTreeScenarios(
473
454
            "test_scenarios")
474
455
        server1 = "a"
475
456
        server2 = "b"
476
 
        format1 = WorkingTreeFormat2()
 
457
        format1 = WorkingTreeFormat4()
477
458
        format2 = WorkingTreeFormat3()
478
459
        formats = [("1", str, format1, format2, "converter1"),
479
460
            ("2", int, format2, format1, "converter2")]
525
506
        self.assertRaises(AssertionError, self.assertEqualStat,
526
507
            os.lstat("foo"), os.lstat("longname"))
527
508
 
 
509
    def test_failUnlessExists(self):
 
510
        """Deprecated failUnlessExists and failIfExists"""
 
511
        self.applyDeprecated(
 
512
            deprecated_in((2, 4)),
 
513
            self.failUnlessExists, '.')
 
514
        self.build_tree(['foo/', 'foo/bar'])
 
515
        self.applyDeprecated(
 
516
            deprecated_in((2, 4)),
 
517
            self.failUnlessExists, 'foo/bar')
 
518
        self.applyDeprecated(
 
519
            deprecated_in((2, 4)),
 
520
            self.failIfExists, 'foo/foo')
 
521
 
 
522
    def test_assertPathExists(self):
 
523
        self.assertPathExists('.')
 
524
        self.build_tree(['foo/', 'foo/bar'])
 
525
        self.assertPathExists('foo/bar')
 
526
        self.assertPathDoesNotExist('foo/foo')
 
527
 
528
528
 
529
529
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
530
530
 
564
564
        tree = self.make_branch_and_memory_tree('dir')
565
565
        # Guard against regression into MemoryTransport leaking
566
566
        # files to disk instead of keeping them in memory.
567
 
        self.failIf(osutils.lexists('dir'))
 
567
        self.assertFalse(osutils.lexists('dir'))
568
568
        self.assertIsInstance(tree, memorytree.MemoryTree)
569
569
 
570
570
    def test_make_branch_and_memory_tree_with_format(self):
571
571
        """make_branch_and_memory_tree should accept a format option."""
572
572
        format = bzrdir.BzrDirMetaFormat1()
573
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
573
        format.repository_format = repository.format_registry.get_default()
574
574
        tree = self.make_branch_and_memory_tree('dir', format=format)
575
575
        # Guard against regression into MemoryTransport leaking
576
576
        # files to disk instead of keeping them in memory.
577
 
        self.failIf(osutils.lexists('dir'))
 
577
        self.assertFalse(osutils.lexists('dir'))
578
578
        self.assertIsInstance(tree, memorytree.MemoryTree)
579
579
        self.assertEqual(format.repository_format.__class__,
580
580
            tree.branch.repository._format.__class__)
584
584
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
585
585
        # Guard against regression into MemoryTransport leaking
586
586
        # files to disk instead of keeping them in memory.
587
 
        self.failIf(osutils.lexists('dir'))
 
587
        self.assertFalse(osutils.lexists('dir'))
588
588
 
589
589
    def test_make_branch_builder_with_format(self):
590
590
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
591
591
        # that the format objects are used.
592
592
        format = bzrdir.BzrDirMetaFormat1()
593
 
        repo_format = weaverepo.RepositoryFormat7()
 
593
        repo_format = repository.format_registry.get_default()
594
594
        format.repository_format = repo_format
595
595
        builder = self.make_branch_builder('dir', format=format)
596
596
        the_branch = builder.get_branch()
597
597
        # Guard against regression into MemoryTransport leaking
598
598
        # files to disk instead of keeping them in memory.
599
 
        self.failIf(osutils.lexists('dir'))
 
599
        self.assertFalse(osutils.lexists('dir'))
600
600
        self.assertEqual(format.repository_format.__class__,
601
601
                         the_branch.repository._format.__class__)
602
602
        self.assertEqual(repo_format.get_format_string(),
608
608
        the_branch = builder.get_branch()
609
609
        # Guard against regression into MemoryTransport leaking
610
610
        # files to disk instead of keeping them in memory.
611
 
        self.failIf(osutils.lexists('dir'))
 
611
        self.assertFalse(osutils.lexists('dir'))
612
612
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
613
613
        self.assertEqual(dir_format.repository_format.__class__,
614
614
                         the_branch.repository._format.__class__)
619
619
    def test_dangling_locks_cause_failures(self):
620
620
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
621
621
            def test_function(self):
622
 
                t = self.get_transport('.')
 
622
                t = self.get_transport_from_path('.')
623
623
                l = lockdir.LockDir(t, 'lock')
624
624
                l.create()
625
625
                l.attempt_lock()
645
645
        # for the server
646
646
        url = self.get_readonly_url()
647
647
        url2 = self.get_readonly_url('foo/bar')
648
 
        t = transport.get_transport(url)
649
 
        t2 = transport.get_transport(url2)
650
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
651
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
648
        t = transport.get_transport_from_url(url)
 
649
        t2 = transport.get_transport_from_url(url2)
 
650
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
651
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
652
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
653
653
 
654
654
    def test_get_readonly_url_http(self):
660
660
        url = self.get_readonly_url()
661
661
        url2 = self.get_readonly_url('foo/bar')
662
662
        # the transport returned may be any HttpTransportBase subclass
663
 
        t = transport.get_transport(url)
664
 
        t2 = transport.get_transport(url2)
665
 
        self.failUnless(isinstance(t, HttpTransportBase))
666
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
663
        t = transport.get_transport_from_url(url)
 
664
        t2 = transport.get_transport_from_url(url2)
 
665
        self.assertIsInstance(t, HttpTransportBase)
 
666
        self.assertIsInstance(t2, HttpTransportBase)
667
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
668
668
 
669
669
    def test_is_directory(self):
677
677
    def test_make_branch_builder(self):
678
678
        builder = self.make_branch_builder('dir')
679
679
        rev_id = builder.build_commit()
680
 
        self.failUnlessExists('dir')
 
680
        self.assertPathExists('dir')
681
681
        a_dir = bzrdir.BzrDir.open('dir')
682
682
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
683
683
        a_branch = a_dir.open_branch()
699
699
        self.assertIsInstance(result_bzrdir.transport,
700
700
                              memory.MemoryTransport)
701
701
        # should not be on disk, should only be in memory
702
 
        self.failIfExists('subdir')
 
702
        self.assertPathDoesNotExist('subdir')
703
703
 
704
704
 
705
705
class TestChrootedTest(tests.ChrootedTestCase):
706
706
 
707
707
    def test_root_is_root(self):
708
 
        t = transport.get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport_from_url(self.get_readonly_url())
709
709
        url = t.base
710
710
        self.assertEqual(url, t.clone('..').base)
711
711
 
713
713
class TestProfileResult(tests.TestCase):
714
714
 
715
715
    def test_profiles_tests(self):
716
 
        self.requireFeature(test_lsprof.LSProfFeature)
717
 
        terminal = testtools.tests.helpers.ExtendedTestResult()
 
716
        self.requireFeature(features.lsprof_feature)
 
717
        terminal = testtools.testresult.doubles.ExtendedTestResult()
718
718
        result = tests.ProfileResult(terminal)
719
719
        class Sample(tests.TestCase):
720
720
            def a(self):
737
737
                descriptions=0,
738
738
                verbosity=1,
739
739
                )
740
 
        capture = testtools.tests.helpers.ExtendedTestResult()
 
740
        capture = testtools.testresult.doubles.ExtendedTestResult()
741
741
        test_case.run(MultiTestResult(result, capture))
742
742
        run_case = capture._events[0][1]
743
743
        timed_string = result._testTimeString(run_case)
764
764
        self.check_timing(ShortDelayTestCase('test_short_delay'),
765
765
                          r"^ +[0-9]+ms$")
766
766
 
767
 
    def _patch_get_bzr_source_tree(self):
768
 
        # Reading from the actual source tree breaks isolation, but we don't
769
 
        # want to assume that thats *all* that would happen.
770
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
771
 
 
772
 
    def test_assigned_benchmark_file_stores_date(self):
773
 
        self._patch_get_bzr_source_tree()
774
 
        output = StringIO()
775
 
        result = bzrlib.tests.TextTestResult(self._log_file,
776
 
                                        descriptions=0,
777
 
                                        verbosity=1,
778
 
                                        bench_history=output
779
 
                                        )
780
 
        output_string = output.getvalue()
781
 
        # if you are wondering about the regexp please read the comment in
782
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
783
 
        # XXX: what comment?  -- Andrew Bennetts
784
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
785
 
 
786
 
    def test_benchhistory_records_test_times(self):
787
 
        self._patch_get_bzr_source_tree()
788
 
        result_stream = StringIO()
789
 
        result = bzrlib.tests.TextTestResult(
790
 
            self._log_file,
791
 
            descriptions=0,
792
 
            verbosity=1,
793
 
            bench_history=result_stream
794
 
            )
795
 
 
796
 
        # we want profile a call and check that its test duration is recorded
797
 
        # make a new test instance that when run will generate a benchmark
798
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
799
 
        # execute the test, which should succeed and record times
800
 
        example_test_case.run(result)
801
 
        lines = result_stream.getvalue().splitlines()
802
 
        self.assertEqual(2, len(lines))
803
 
        self.assertContainsRe(lines[1],
804
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
805
 
            "._time_hello_world_encoding")
806
 
 
807
767
    def _time_hello_world_encoding(self):
808
768
        """Profile two sleep calls
809
769
 
814
774
 
815
775
    def test_lsprofiling(self):
816
776
        """Verbose test result prints lsprof statistics from test cases."""
817
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
777
        self.requireFeature(features.lsprof_feature)
818
778
        result_stream = StringIO()
819
779
        result = bzrlib.tests.VerboseTestResult(
820
780
            result_stream,
849
809
        self.assertContainsRe(output,
850
810
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
851
811
 
 
812
    def test_uses_time_from_testtools(self):
 
813
        """Test case timings in verbose results should use testtools times"""
 
814
        import datetime
 
815
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
816
            def startTest(self, test):
 
817
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
818
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
819
            def addSuccess(self, test):
 
820
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
821
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
822
            def report_tests_starting(self): pass
 
823
        sio = StringIO()
 
824
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
825
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
826
 
852
827
    def test_known_failure(self):
853
 
        """A KnownFailure being raised should trigger several result actions."""
 
828
        """Using knownFailure should trigger several result actions."""
854
829
        class InstrumentedTestResult(tests.ExtendedTestResult):
855
830
            def stopTestRun(self): pass
856
831
            def report_tests_starting(self): pass
859
834
        result = InstrumentedTestResult(None, None, None, None)
860
835
        class Test(tests.TestCase):
861
836
            def test_function(self):
862
 
                raise tests.KnownFailure('failed!')
 
837
                self.knownFailure('failed!')
863
838
        test = Test("test_function")
864
839
        test.run(result)
865
840
        # it should invoke 'report_known_failure'.
881
856
            descriptions=0,
882
857
            verbosity=2,
883
858
            )
884
 
        test = self.get_passing_test()
885
 
        result.startTest(test)
886
 
        prefix = len(result_stream.getvalue())
887
 
        # the err parameter has the shape:
888
 
        # (class, exception object, traceback)
889
 
        # KnownFailures dont get their tracebacks shown though, so we
890
 
        # can skip that.
891
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
892
 
        result.report_known_failure(test, err)
893
 
        output = result_stream.getvalue()[prefix:]
894
 
        lines = output.splitlines()
895
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
896
 
        if sys.version_info > (2, 7):
897
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
898
 
                self.assertNotEqual, lines[1], '    ')
899
 
        self.assertEqual(lines[1], '    foo')
900
 
        self.assertEqual(2, len(lines))
 
859
        _get_test("test_xfail").run(result)
 
860
        self.assertContainsRe(result_stream.getvalue(),
 
861
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
862
            "\\s*(?:Text attachment: )?reason"
 
863
            "(?:\n-+\n|: {{{)"
 
864
            "this_fails"
 
865
            "(?:\n-+\n|}}}\n)")
901
866
 
902
867
    def get_passing_test(self):
903
868
        """Return a test object that can't be run usefully."""
914
879
                self._call = test, feature
915
880
        result = InstrumentedTestResult(None, None, None, None)
916
881
        test = SampleTestCase('_test_pass')
917
 
        feature = tests.Feature()
 
882
        feature = features.Feature()
918
883
        result.startTest(test)
919
884
        result.addNotSupported(test, feature)
920
885
        # it should invoke 'report_unsupported'.
939
904
            verbosity=2,
940
905
            )
941
906
        test = self.get_passing_test()
942
 
        feature = tests.Feature()
 
907
        feature = features.Feature()
943
908
        result.startTest(test)
944
909
        prefix = len(result_stream.getvalue())
945
910
        result.report_unsupported(test, feature)
958
923
            def addNotSupported(self, test, feature):
959
924
                self._call = test, feature
960
925
        result = InstrumentedTestResult(None, None, None, None)
961
 
        feature = tests.Feature()
 
926
        feature = features.Feature()
962
927
        class Test(tests.TestCase):
963
928
            def test_function(self):
964
929
                raise tests.UnavailableFeature(feature)
983
948
    def test_strict_with_known_failure(self):
984
949
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
985
950
                                             verbosity=1)
986
 
        test = self.get_passing_test()
987
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
988
 
        result.addExpectedFailure(test, err)
 
951
        test = _get_test("test_xfail")
 
952
        test.run(result)
989
953
        self.assertFalse(result.wasStrictlySuccessful())
990
954
        self.assertEqual(None, result._extractBenchmarkTime(test))
991
955
 
1023
987
        self.assertEquals(2, result.count)
1024
988
 
1025
989
 
1026
 
class TestUnicodeFilenameFeature(tests.TestCase):
1027
 
 
1028
 
    def test_probe_passes(self):
1029
 
        """UnicodeFilenameFeature._probe passes."""
1030
 
        # We can't test much more than that because the behaviour depends
1031
 
        # on the platform.
1032
 
        tests.UnicodeFilenameFeature._probe()
1033
 
 
1034
 
 
1035
990
class TestRunner(tests.TestCase):
1036
991
 
1037
992
    def dummy_test(self):
1063
1018
        test = unittest.TestSuite()
1064
1019
        test.addTest(Test("known_failure_test"))
1065
1020
        def failing_test():
1066
 
            self.fail('foo')
 
1021
            raise AssertionError('foo')
1067
1022
        test.addTest(unittest.FunctionTestCase(failing_test))
1068
1023
        stream = StringIO()
1069
1024
        runner = tests.TextTestRunner(stream=stream)
1077
1032
            '^----------------------------------------------------------------------\n'
1078
1033
            'Traceback \\(most recent call last\\):\n'
1079
1034
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1080
 
            '    self.fail\\(\'foo\'\\)\n'
 
1035
            '    raise AssertionError\\(\'foo\'\\)\n'
1081
1036
            '.*'
1082
1037
            '^----------------------------------------------------------------------\n'
1083
1038
            '.*'
1089
1044
        # the final output.
1090
1045
        class Test(tests.TestCase):
1091
1046
            def known_failure_test(self):
1092
 
                self.expectFailure('failed', self.assertTrue, False)
 
1047
                self.knownFailure("Never works...")
1093
1048
        test = Test("known_failure_test")
1094
1049
        stream = StringIO()
1095
1050
        runner = tests.TextTestRunner(stream=stream)
1101
1056
            '\n'
1102
1057
            'OK \\(known_failures=1\\)\n')
1103
1058
 
 
1059
    def test_unexpected_success_bad(self):
 
1060
        class Test(tests.TestCase):
 
1061
            def test_truth(self):
 
1062
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1063
        runner = tests.TextTestRunner(stream=StringIO())
 
1064
        result = self.run_test_runner(runner, Test("test_truth"))
 
1065
        self.assertContainsRe(runner.stream.getvalue(),
 
1066
            "=+\n"
 
1067
            "FAIL: \\S+\.test_truth\n"
 
1068
            "-+\n"
 
1069
            "(?:.*\n)*"
 
1070
            "\\s*(?:Text attachment: )?reason"
 
1071
            "(?:\n-+\n|: {{{)"
 
1072
            "No absolute truth"
 
1073
            "(?:\n-+\n|}}}\n)"
 
1074
            "(?:.*\n)*"
 
1075
            "-+\n"
 
1076
            "Ran 1 test in .*\n"
 
1077
            "\n"
 
1078
            "FAILED \\(failures=1\\)\n\\Z")
 
1079
 
1104
1080
    def test_result_decorator(self):
1105
1081
        # decorate results
1106
1082
        calls = []
1107
 
        class LoggingDecorator(tests.ForwardingResult):
 
1083
        class LoggingDecorator(ExtendedToOriginalDecorator):
1108
1084
            def startTest(self, test):
1109
 
                tests.ForwardingResult.startTest(self, test)
 
1085
                ExtendedToOriginalDecorator.startTest(self, test)
1110
1086
                calls.append('start')
1111
1087
        test = unittest.FunctionTestCase(lambda:None)
1112
1088
        stream = StringIO()
1190
1166
 
1191
1167
    def test_unsupported_features_listed(self):
1192
1168
        """When unsupported features are encountered they are detailed."""
1193
 
        class Feature1(tests.Feature):
 
1169
        class Feature1(features.Feature):
1194
1170
            def _probe(self): return False
1195
 
        class Feature2(tests.Feature):
 
1171
        class Feature2(features.Feature):
1196
1172
            def _probe(self): return False
1197
1173
        # create sample tests
1198
1174
        test1 = SampleTestCase('_test_pass')
1213
1189
            ],
1214
1190
            lines[-3:])
1215
1191
 
1216
 
    def _patch_get_bzr_source_tree(self):
1217
 
        # Reading from the actual source tree breaks isolation, but we don't
1218
 
        # want to assume that thats *all* that would happen.
1219
 
        self._get_source_tree_calls = []
1220
 
        def new_get():
1221
 
            self._get_source_tree_calls.append("called")
1222
 
            return None
1223
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1224
 
 
1225
 
    def test_bench_history(self):
1226
 
        # tests that the running the benchmark passes bench_history into
1227
 
        # the test result object. We can tell that happens if
1228
 
        # _get_bzr_source_tree is called.
1229
 
        self._patch_get_bzr_source_tree()
1230
 
        test = TestRunner('dummy_test')
1231
 
        output = StringIO()
1232
 
        runner = tests.TextTestRunner(stream=self._log_file,
1233
 
                                      bench_history=output)
1234
 
        result = self.run_test_runner(runner, test)
1235
 
        output_string = output.getvalue()
1236
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1237
 
        self.assertLength(1, self._get_source_tree_calls)
1238
 
 
1239
1192
    def test_verbose_test_count(self):
1240
1193
        """A verbose test run reports the right test count at the start"""
1241
1194
        suite = TestUtil.TestSuite([
1251
1204
    def test_startTestRun(self):
1252
1205
        """run should call result.startTestRun()"""
1253
1206
        calls = []
1254
 
        class LoggingDecorator(tests.ForwardingResult):
 
1207
        class LoggingDecorator(ExtendedToOriginalDecorator):
1255
1208
            def startTestRun(self):
1256
 
                tests.ForwardingResult.startTestRun(self)
 
1209
                ExtendedToOriginalDecorator.startTestRun(self)
1257
1210
                calls.append('startTestRun')
1258
1211
        test = unittest.FunctionTestCase(lambda:None)
1259
1212
        stream = StringIO()
1265
1218
    def test_stopTestRun(self):
1266
1219
        """run should call result.stopTestRun()"""
1267
1220
        calls = []
1268
 
        class LoggingDecorator(tests.ForwardingResult):
 
1221
        class LoggingDecorator(ExtendedToOriginalDecorator):
1269
1222
            def stopTestRun(self):
1270
 
                tests.ForwardingResult.stopTestRun(self)
 
1223
                ExtendedToOriginalDecorator.stopTestRun(self)
1271
1224
                calls.append('stopTestRun')
1272
1225
        test = unittest.FunctionTestCase(lambda:None)
1273
1226
        stream = StringIO()
1276
1229
        result = self.run_test_runner(runner, test)
1277
1230
        self.assertLength(1, calls)
1278
1231
 
 
1232
    def test_unicode_test_output_on_ascii_stream(self):
 
1233
        """Showing results should always succeed even on an ascii console"""
 
1234
        class FailureWithUnicode(tests.TestCase):
 
1235
            def test_log_unicode(self):
 
1236
                self.log(u"\u2606")
 
1237
                self.fail("Now print that log!")
 
1238
        out = StringIO()
 
1239
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1240
            lambda trace=False: "ascii")
 
1241
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1242
            FailureWithUnicode("test_log_unicode"))
 
1243
        self.assertContainsRe(out.getvalue(),
 
1244
            "(?:Text attachment: )?log"
 
1245
            "(?:\n-+\n|: {{{)"
 
1246
            "\d+\.\d+  \\\\u2606"
 
1247
            "(?:\n-+\n|}}}\n)")
 
1248
 
1279
1249
 
1280
1250
class SampleTestCase(tests.TestCase):
1281
1251
 
1469
1439
        # Note this test won't fail with hooks that the core library doesn't
1470
1440
        # use - but it trigger with a plugin that adds hooks, so its still a
1471
1441
        # useful warning in that case.
1472
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1473
 
            bzrlib.branch.Branch.hooks)
1474
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1442
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1443
        self.assertEqual(
 
1444
            bzrlib.smart.server.SmartServerHooks(),
1475
1445
            bzrlib.smart.server.SmartTCPServer.hooks)
1476
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1477
 
            bzrlib.commands.Command.hooks)
 
1446
        self.assertEqual(
 
1447
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1478
1448
 
1479
1449
    def test__gather_lsprof_in_benchmarks(self):
1480
1450
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1481
1451
 
1482
1452
        Each self.time() call is individually and separately profiled.
1483
1453
        """
1484
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1454
        self.requireFeature(features.lsprof_feature)
1485
1455
        # overrides the class member with an instance member so no cleanup
1486
1456
        # needed.
1487
1457
        self._gather_lsprof_in_benchmarks = True
1506
1476
        transport_server = memory.MemoryServer()
1507
1477
        transport_server.start_server()
1508
1478
        self.addCleanup(transport_server.stop_server)
1509
 
        t = transport.get_transport(transport_server.get_url())
 
1479
        t = transport.get_transport_from_url(transport_server.get_url())
1510
1480
        bzrdir.BzrDir.create(t.base)
1511
1481
        self.assertRaises(errors.BzrError,
1512
1482
            bzrdir.BzrDir.open_from_transport, t)
1517
1487
 
1518
1488
    def test_requireFeature_available(self):
1519
1489
        """self.requireFeature(available) is a no-op."""
1520
 
        class Available(tests.Feature):
 
1490
        class Available(features.Feature):
1521
1491
            def _probe(self):return True
1522
1492
        feature = Available()
1523
1493
        self.requireFeature(feature)
1524
1494
 
1525
1495
    def test_requireFeature_unavailable(self):
1526
1496
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1527
 
        class Unavailable(tests.Feature):
 
1497
        class Unavailable(features.Feature):
1528
1498
            def _probe(self):return False
1529
1499
        feature = Unavailable()
1530
1500
        self.assertRaises(tests.UnavailableFeature,
1689
1659
        test.run(unittest.TestResult())
1690
1660
        self.assertEqual('original', obj.test_attr)
1691
1661
 
1692
 
 
1693
 
class _MissingFeature(tests.Feature):
 
1662
    def test_recordCalls(self):
 
1663
        from bzrlib.tests import test_selftest
 
1664
        calls = self.recordCalls(
 
1665
            test_selftest, '_add_numbers')
 
1666
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1667
            12)
 
1668
        self.assertEquals(calls, [((2, 10), {})])
 
1669
 
 
1670
 
 
1671
def _add_numbers(a, b):
 
1672
    return a + b
 
1673
 
 
1674
 
 
1675
class _MissingFeature(features.Feature):
1694
1676
    def _probe(self):
1695
1677
        return False
1696
1678
missing_feature = _MissingFeature()
1747
1729
        result = self._run_test('test_fail')
1748
1730
        self.assertEqual(1, len(result.failures))
1749
1731
        result_content = result.failures[0][1]
1750
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1732
        self.assertContainsRe(result_content,
 
1733
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1751
1734
        self.assertContainsRe(result_content, 'this was a failing test')
1752
1735
 
1753
1736
    def test_error_has_log(self):
1754
1737
        result = self._run_test('test_error')
1755
1738
        self.assertEqual(1, len(result.errors))
1756
1739
        result_content = result.errors[0][1]
1757
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1740
        self.assertContainsRe(result_content,
 
1741
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1758
1742
        self.assertContainsRe(result_content, 'this test errored')
1759
1743
 
1760
1744
    def test_skip_has_no_log(self):
1779
1763
        result = self._run_test('test_xfail')
1780
1764
        self.assertEqual(1, len(result.expectedFailures))
1781
1765
        result_content = result.expectedFailures[0][1]
1782
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1766
        self.assertNotContainsRe(result_content,
 
1767
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1783
1768
        self.assertNotContainsRe(result_content, 'test with expected failure')
1784
1769
 
1785
1770
    def test_unexpected_success_has_log(self):
1958
1943
    def test_make_branch_and_tree_with_format(self):
1959
1944
        # we should be able to supply a format to make_branch_and_tree
1960
1945
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1961
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1962
1946
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1963
1947
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1964
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1965
 
                              bzrlib.bzrdir.BzrDirFormat6)
1966
1948
 
1967
1949
    def test_make_branch_and_memory_tree(self):
1968
1950
        # we should be able to get a new branch and a mutable tree from
2045
2027
        self.assertLength(2, output.readlines())
2046
2028
 
2047
2029
    def test_lsprof_tests(self):
2048
 
        self.requireFeature(test_lsprof.LSProfFeature)
2049
 
        calls = []
 
2030
        self.requireFeature(features.lsprof_feature)
 
2031
        results = []
2050
2032
        class Test(object):
2051
2033
            def __call__(test, result):
2052
2034
                test.run(result)
2053
2035
            def run(test, result):
2054
 
                self.assertIsInstance(result, tests.ForwardingResult)
2055
 
                calls.append("called")
 
2036
                results.append(result)
2056
2037
            def countTestCases(self):
2057
2038
                return 1
2058
2039
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2059
 
        self.assertLength(1, calls)
 
2040
        self.assertLength(1, results)
 
2041
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2060
2042
 
2061
2043
    def test_random(self):
2062
2044
        # test randomising by listing a number of tests.
2197
2179
        self.assertNotContainsRe(content, 'test with expected failure')
2198
2180
        self.assertEqual(1, len(result.expectedFailures))
2199
2181
        result_content = result.expectedFailures[0][1]
2200
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2182
        self.assertNotContainsRe(result_content,
 
2183
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2201
2184
        self.assertNotContainsRe(result_content, 'test with expected failure')
2202
2185
 
2203
2186
    def test_unexpected_success_has_log(self):
2204
2187
        content, result = self.run_subunit_stream('test_unexpected_success')
2205
2188
        self.assertContainsRe(content, '(?m)^log$')
2206
2189
        self.assertContainsRe(content, 'test with unexpected success')
2207
 
        self.expectFailure('subunit treats "unexpectedSuccess"'
2208
 
                           ' as a plain success',
2209
 
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2190
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2191
        #                success, if a min version check is added remove this
 
2192
        from subunit import TestProtocolClient as _Client
 
2193
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
 
2194
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2195
                               ' as a plain success',
 
2196
                self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
2197
        self.assertEqual(1, len(result.unexpectedSuccesses))
2211
2198
        test = result.unexpectedSuccesses[0]
2212
2199
        # RemotedTestCase doesn't preserve the "details"
2347
2334
        # stdout and stderr of the invoked run_bzr
2348
2335
        current_factory = bzrlib.ui.ui_factory
2349
2336
        self.run_bzr(['foo'])
2350
 
        self.failIf(current_factory is self.factory)
 
2337
        self.assertFalse(current_factory is self.factory)
2351
2338
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2352
2339
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2353
2340
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2510
2497
 
2511
2498
 
2512
2499
class TestStartBzrSubProcess(tests.TestCase):
 
2500
    """Stub test start_bzr_subprocess."""
2513
2501
 
2514
 
    def check_popen_state(self):
2515
 
        """Replace to make assertions when popen is called."""
 
2502
    def _subprocess_log_cleanup(self):
 
2503
        """Inhibits the base version as we don't produce a log file."""
2516
2504
 
2517
2505
    def _popen(self, *args, **kwargs):
2518
 
        """Record the command that is run, so that we can ensure it is correct"""
 
2506
        """Override the base version to record the command that is run.
 
2507
 
 
2508
        From there we can ensure it is correct without spawning a real process.
 
2509
        """
2519
2510
        self.check_popen_state()
2520
2511
        self._popen_args = args
2521
2512
        self._popen_kwargs = kwargs
2522
2513
        raise _DontSpawnProcess()
2523
2514
 
 
2515
    def check_popen_state(self):
 
2516
        """Replace to make assertions when popen is called."""
 
2517
 
2524
2518
    def test_run_bzr_subprocess_no_plugins(self):
2525
2519
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2526
2520
        command = self._popen_args[0]
2530
2524
 
2531
2525
    def test_allow_plugins(self):
2532
2526
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2533
 
            allow_plugins=True)
 
2527
                          allow_plugins=True)
2534
2528
        command = self._popen_args[0]
2535
2529
        self.assertEqual([], command[2:])
2536
2530
 
2537
2531
    def test_set_env(self):
2538
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2532
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2539
2533
        # set in the child
2540
2534
        def check_environment():
2541
2535
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2542
2536
        self.check_popen_state = check_environment
2543
2537
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2544
 
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2538
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
2545
2539
        # not set in theparent
2546
2540
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2547
2541
 
2548
2542
    def test_run_bzr_subprocess_env_del(self):
2549
2543
        """run_bzr_subprocess can remove environment variables too."""
2550
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2544
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2551
2545
        def check_environment():
2552
2546
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2553
2547
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2554
2548
        self.check_popen_state = check_environment
2555
2549
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2556
 
            env_changes={'EXISTANT_ENV_VAR':None})
 
2550
                          env_changes={'EXISTANT_ENV_VAR':None})
2557
2551
        # Still set in parent
2558
2552
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2559
2553
        del os.environ['EXISTANT_ENV_VAR']
2560
2554
 
2561
2555
    def test_env_del_missing(self):
2562
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2556
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2563
2557
        def check_environment():
2564
2558
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2565
2559
        self.check_popen_state = check_environment
2566
2560
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2567
 
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2561
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
2568
2562
 
2569
2563
    def test_working_dir(self):
2570
2564
        """Test that we can specify the working dir for the child"""
2573
2567
        chdirs = []
2574
2568
        def chdir(path):
2575
2569
            chdirs.append(path)
2576
 
        os.chdir = chdir
2577
 
        try:
2578
 
            def getcwd():
2579
 
                return 'current'
2580
 
            osutils.getcwd = getcwd
2581
 
            try:
2582
 
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2583
 
                    working_dir='foo')
2584
 
            finally:
2585
 
                osutils.getcwd = orig_getcwd
2586
 
        finally:
2587
 
            os.chdir = orig_chdir
 
2570
        self.overrideAttr(os, 'chdir', chdir)
 
2571
        def getcwd():
 
2572
            return 'current'
 
2573
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2574
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2575
                          working_dir='foo')
2588
2576
        self.assertEqual(['foo', 'current'], chdirs)
2589
2577
 
2590
2578
    def test_get_bzr_path_with_cwd_bzrlib(self):
2610
2598
        self.assertEqual('bzr: interrupted\n', result[1])
2611
2599
 
2612
2600
 
2613
 
class TestFeature(tests.TestCase):
2614
 
 
2615
 
    def test_caching(self):
2616
 
        """Feature._probe is called by the feature at most once."""
2617
 
        class InstrumentedFeature(tests.Feature):
2618
 
            def __init__(self):
2619
 
                super(InstrumentedFeature, self).__init__()
2620
 
                self.calls = []
2621
 
            def _probe(self):
2622
 
                self.calls.append('_probe')
2623
 
                return False
2624
 
        feature = InstrumentedFeature()
2625
 
        feature.available()
2626
 
        self.assertEqual(['_probe'], feature.calls)
2627
 
        feature.available()
2628
 
        self.assertEqual(['_probe'], feature.calls)
2629
 
 
2630
 
    def test_named_str(self):
2631
 
        """Feature.__str__ should thunk to feature_name()."""
2632
 
        class NamedFeature(tests.Feature):
2633
 
            def feature_name(self):
2634
 
                return 'symlinks'
2635
 
        feature = NamedFeature()
2636
 
        self.assertEqual('symlinks', str(feature))
2637
 
 
2638
 
    def test_default_str(self):
2639
 
        """Feature.__str__ should default to __class__.__name__."""
2640
 
        class NamedFeature(tests.Feature):
2641
 
            pass
2642
 
        feature = NamedFeature()
2643
 
        self.assertEqual('NamedFeature', str(feature))
2644
 
 
2645
 
 
2646
 
class TestUnavailableFeature(tests.TestCase):
2647
 
 
2648
 
    def test_access_feature(self):
2649
 
        feature = tests.Feature()
2650
 
        exception = tests.UnavailableFeature(feature)
2651
 
        self.assertIs(feature, exception.args[0])
2652
 
 
2653
 
 
2654
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2655
 
    deprecated_in((2, 1, 0)),
2656
 
    'bzrlib.tests.test_selftest',
2657
 
    'simple_thunk_feature','UnicodeFilename',
2658
 
    replacement_module='bzrlib.tests'
2659
 
    )
2660
 
 
2661
 
class Test_CompatibilityFeature(tests.TestCase):
2662
 
 
2663
 
    def test_does_thunk(self):
2664
 
        res = self.callDeprecated(
2665
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2666
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2667
 
            simple_thunk_feature.available)
2668
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2669
 
 
2670
 
 
2671
 
class TestModuleAvailableFeature(tests.TestCase):
2672
 
 
2673
 
    def test_available_module(self):
2674
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2675
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2676
 
        self.assertEqual('bzrlib.tests', str(feature))
2677
 
        self.assertTrue(feature.available())
2678
 
        self.assertIs(tests, feature.module)
2679
 
 
2680
 
    def test_unavailable_module(self):
2681
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2682
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2683
 
        self.assertFalse(feature.available())
2684
 
        self.assertIs(None, feature.module)
2685
 
 
2686
 
 
2687
2601
class TestSelftestFiltering(tests.TestCase):
2688
2602
 
2689
2603
    def setUp(self):
2840
2754
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2841
2755
 
2842
2756
 
2843
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2757
class TestCheckTreeShape(tests.TestCaseWithTransport):
2844
2758
 
2845
 
    def test_check_inventory_shape(self):
 
2759
    def test_check_tree_shape(self):
2846
2760
        files = ['a', 'b/', 'b/c']
2847
2761
        tree = self.make_branch_and_tree('.')
2848
2762
        self.build_tree(files)
2849
2763
        tree.add(files)
2850
2764
        tree.lock_read()
2851
2765
        try:
2852
 
            self.check_inventory_shape(tree.inventory, files)
 
2766
            self.check_tree_shape(tree, files)
2853
2767
        finally:
2854
2768
            tree.unlock()
2855
2769
 
3187
3101
        tpr.register('bar', 'bBB.aAA.rRR')
3188
3102
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3189
3103
        self.assertThat(self.get_log(),
3190
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3104
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3105
                           doctest.ELLIPSIS))
3191
3106
 
3192
3107
    def test_get_unknown_prefix(self):
3193
3108
        tpr = self._get_registry()
3228
3143
        class Test(unittest.TestCase):
3229
3144
            def runTest(self):
3230
3145
                pass
3231
 
            addCleanup = None # for when on Python 2.7 with native addCleanup
3232
3146
        result = self.LeakRecordingResult()
3233
3147
        test = Test()
3234
 
        self.assertIs(getattr(test, "addCleanup", None), None)
3235
3148
        result.startTestRun()
3236
3149
        test.run(result)
3237
3150
        result.stopTestRun()
3301
3214
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3302
3215
 
3303
3216
 
 
3217
class TestPostMortemDebugging(tests.TestCase):
 
3218
    """Check post mortem debugging works when tests fail or error"""
 
3219
 
 
3220
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3221
        def __init__(self):
 
3222
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3223
            self.postcode = None
 
3224
        def _post_mortem(self, tb=None):
 
3225
            """Record the code object at the end of the current traceback"""
 
3226
            tb = tb or sys.exc_info()[2]
 
3227
            if tb is not None:
 
3228
                next = tb.tb_next
 
3229
                while next is not None:
 
3230
                    tb = next
 
3231
                    next = next.tb_next
 
3232
                self.postcode = tb.tb_frame.f_code
 
3233
        def report_error(self, test, err):
 
3234
            pass
 
3235
        def report_failure(self, test, err):
 
3236
            pass
 
3237
 
 
3238
    def test_location_unittest_error(self):
 
3239
        """Needs right post mortem traceback with erroring unittest case"""
 
3240
        class Test(unittest.TestCase):
 
3241
            def runTest(self):
 
3242
                raise RuntimeError
 
3243
        result = self.TracebackRecordingResult()
 
3244
        Test().run(result)
 
3245
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3246
 
 
3247
    def test_location_unittest_failure(self):
 
3248
        """Needs right post mortem traceback with failing unittest case"""
 
3249
        class Test(unittest.TestCase):
 
3250
            def runTest(self):
 
3251
                raise self.failureException
 
3252
        result = self.TracebackRecordingResult()
 
3253
        Test().run(result)
 
3254
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3255
 
 
3256
    def test_location_bt_error(self):
 
3257
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3258
        class Test(tests.TestCase):
 
3259
            def test_error(self):
 
3260
                raise RuntimeError
 
3261
        result = self.TracebackRecordingResult()
 
3262
        Test("test_error").run(result)
 
3263
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3264
 
 
3265
    def test_location_bt_failure(self):
 
3266
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3267
        class Test(tests.TestCase):
 
3268
            def test_failure(self):
 
3269
                raise self.failureException
 
3270
        result = self.TracebackRecordingResult()
 
3271
        Test("test_failure").run(result)
 
3272
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3273
 
 
3274
    def test_env_var_triggers_post_mortem(self):
 
3275
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3276
        import pdb
 
3277
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3278
        post_mortem_calls = []
 
3279
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3280
        self.overrideEnv('BZR_TEST_PDB', None)
 
3281
        result._post_mortem(1)
 
3282
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3283
        result._post_mortem(2)
 
3284
        self.assertEqual([2], post_mortem_calls)
 
3285
 
 
3286
 
3304
3287
class TestRunSuite(tests.TestCase):
3305
3288
 
3306
3289
    def test_runner_class(self):
3317
3300
                                                self.verbosity)
3318
3301
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3319
3302
        self.assertLength(1, calls)
 
3303
 
 
3304
 
 
3305
class _Selftest(object):
 
3306
    """Mixin for tests needing full selftest output"""
 
3307
 
 
3308
    def _inject_stream_into_subunit(self, stream):
 
3309
        """To be overridden by subclasses that run tests out of process"""
 
3310
 
 
3311
    def _run_selftest(self, **kwargs):
 
3312
        sio = StringIO()
 
3313
        self._inject_stream_into_subunit(sio)
 
3314
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3315
        return sio.getvalue()
 
3316
 
 
3317
 
 
3318
class _ForkedSelftest(_Selftest):
 
3319
    """Mixin for tests needing full selftest output with forked children"""
 
3320
 
 
3321
    _test_needs_features = [features.subunit]
 
3322
 
 
3323
    def _inject_stream_into_subunit(self, stream):
 
3324
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3325
 
 
3326
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3327
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3328
        """
 
3329
        from subunit import ProtocolTestCase
 
3330
        _original_init = ProtocolTestCase.__init__
 
3331
        def _init_with_passthrough(self, *args, **kwargs):
 
3332
            _original_init(self, *args, **kwargs)
 
3333
            self._passthrough = stream
 
3334
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3335
 
 
3336
    def _run_selftest(self, **kwargs):
 
3337
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3338
        if getattr(os, "fork", None) is None:
 
3339
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3340
        # Make sure the fork code is actually invoked by claiming two cores
 
3341
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3342
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3343
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3344
 
 
3345
 
 
3346
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3347
    """Check operation of --parallel=fork selftest option"""
 
3348
 
 
3349
    def test_error_in_child_during_fork(self):
 
3350
        """Error in a forked child during test setup should get reported"""
 
3351
        class Test(tests.TestCase):
 
3352
            def testMethod(self):
 
3353
                pass
 
3354
        # We don't care what, just break something that a child will run
 
3355
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3356
        out = self._run_selftest(test_suite_factory=Test)
 
3357
        # Lines from the tracebacks of the two child processes may be mixed
 
3358
        # together due to the way subunit parses and forwards the streams,
 
3359
        # so permit extra lines between each part of the error output.
 
3360
        self.assertContainsRe(out,
 
3361
            "Traceback.*:\n"
 
3362
            "(?:.*\n)*"
 
3363
            ".+ in fork_for_tests\n"
 
3364
            "(?:.*\n)*"
 
3365
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3366
            "(?:.*\n)*"
 
3367
            "TypeError:")
 
3368
 
 
3369
 
 
3370
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3371
    """Check a test case still alive after being run emits a warning"""
 
3372
 
 
3373
    class Test(tests.TestCase):
 
3374
        def test_pass(self):
 
3375
            pass
 
3376
        def test_self_ref(self):
 
3377
            self.also_self = self.test_self_ref
 
3378
        def test_skip(self):
 
3379
            self.skip("Don't need")
 
3380
 
 
3381
    def _get_suite(self):
 
3382
        return TestUtil.TestSuite([
 
3383
            self.Test("test_pass"),
 
3384
            self.Test("test_self_ref"),
 
3385
            self.Test("test_skip"),
 
3386
            ])
 
3387
 
 
3388
    def _run_selftest_with_suite(self, **kwargs):
 
3389
        old_flags = tests.selftest_debug_flags
 
3390
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3391
        gc_on = gc.isenabled()
 
3392
        if gc_on:
 
3393
            gc.disable()
 
3394
        try:
 
3395
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3396
                **kwargs)
 
3397
        finally:
 
3398
            if gc_on:
 
3399
                gc.enable()
 
3400
            tests.selftest_debug_flags = old_flags
 
3401
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3402
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3403
        return output
 
3404
 
 
3405
    def test_testsuite(self):
 
3406
        self._run_selftest_with_suite()
 
3407
 
 
3408
    def test_pattern(self):
 
3409
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3410
        self.assertNotContainsRe(out, "test_skip")
 
3411
 
 
3412
    def test_exclude_pattern(self):
 
3413
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3414
        self.assertNotContainsRe(out, "test_skip")
 
3415
 
 
3416
    def test_random_seed(self):
 
3417
        self._run_selftest_with_suite(random_seed="now")
 
3418
 
 
3419
    def test_matching_tests_first(self):
 
3420
        self._run_selftest_with_suite(matching_tests_first=True,
 
3421
            pattern="test_self_ref$")
 
3422
 
 
3423
    def test_starting_with_and_exclude(self):
 
3424
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3425
            exclude_pattern="test_skip$")
 
3426
        self.assertNotContainsRe(out, "test_skip")
 
3427
 
 
3428
    def test_additonal_decorator(self):
 
3429
        out = self._run_selftest_with_suite(
 
3430
            suite_decorators=[tests.TestDecorator])
 
3431
 
 
3432
 
 
3433
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3434
    """Check warnings from tests staying alive are emitted with subunit"""
 
3435
 
 
3436
    _test_needs_features = [features.subunit]
 
3437
 
 
3438
    def _run_selftest_with_suite(self, **kwargs):
 
3439
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3440
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3441
 
 
3442
 
 
3443
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3444
    """Check warnings from tests staying alive are emitted when forking"""
 
3445
 
 
3446
 
 
3447
class TestEnvironHandling(tests.TestCase):
 
3448
 
 
3449
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3450
        self.assertFalse('MYVAR' in os.environ)
 
3451
        self.overrideEnv('MYVAR', '42')
 
3452
        # We use an embedded test to make sure we fix the _captureVar bug
 
3453
        class Test(tests.TestCase):
 
3454
            def test_me(self):
 
3455
                # The first call save the 42 value
 
3456
                self.overrideEnv('MYVAR', None)
 
3457
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3458
                # Make sure we can call it twice
 
3459
                self.overrideEnv('MYVAR', None)
 
3460
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3461
        output = StringIO()
 
3462
        result = tests.TextTestResult(output, 0, 1)
 
3463
        Test('test_me').run(result)
 
3464
        if not result.wasStrictlySuccessful():
 
3465
            self.fail(output.getvalue())
 
3466
        # We get our value back
 
3467
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3468
 
 
3469
 
 
3470
class TestIsolatedEnv(tests.TestCase):
 
3471
    """Test isolating tests from os.environ.
 
3472
 
 
3473
    Since we use tests that are already isolated from os.environ a bit of care
 
3474
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3475
    The tests start an already clean os.environ which allow doing valid
 
3476
    assertions about which variables are present or not and design tests around
 
3477
    these assertions.
 
3478
    """
 
3479
 
 
3480
    class ScratchMonkey(tests.TestCase):
 
3481
 
 
3482
        def test_me(self):
 
3483
            pass
 
3484
 
 
3485
    def test_basics(self):
 
3486
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3487
        # for tests.TestCase.
 
3488
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3489
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3490
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3491
        self.assertFalse('BZR_HOME' in os.environ)
 
3492
        # Make sure we know the definition of LINES: part of os.environ for
 
3493
        # tests.TestCase
 
3494
        self.assertTrue('LINES' in tests.isolated_environ)
 
3495
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3496
        self.assertEquals('25', os.environ['LINES'])
 
3497
 
 
3498
    def test_injecting_unknown_variable(self):
 
3499
        # BZR_HOME is known to be absent from os.environ
 
3500
        test = self.ScratchMonkey('test_me')
 
3501
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3502
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3503
        tests.restore_os_environ(test)
 
3504
        self.assertFalse('BZR_HOME' in os.environ)
 
3505
 
 
3506
    def test_injecting_known_variable(self):
 
3507
        test = self.ScratchMonkey('test_me')
 
3508
        # LINES is known to be present in os.environ
 
3509
        tests.override_os_environ(test, {'LINES': '42'})
 
3510
        self.assertEquals('42', os.environ['LINES'])
 
3511
        tests.restore_os_environ(test)
 
3512
        self.assertEquals('25', os.environ['LINES'])
 
3513
 
 
3514
    def test_deleting_variable(self):
 
3515
        test = self.ScratchMonkey('test_me')
 
3516
        # LINES is known to be present in os.environ
 
3517
        tests.override_os_environ(test, {'LINES': None})
 
3518
        self.assertTrue('LINES' not in os.environ)
 
3519
        tests.restore_os_environ(test)
 
3520
        self.assertEquals('25', os.environ['LINES'])
 
3521
 
 
3522
 
 
3523
class TestDocTestSuiteIsolation(tests.TestCase):
 
3524
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3525
 
 
3526
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3527
    the clean environment as a base for testing. To precisely capture the
 
3528
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3529
    compare against.
 
3530
 
 
3531
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3532
    not `os.environ` so each test overrides it to suit its needs.
 
3533
 
 
3534
    """
 
3535
 
 
3536
    def get_doctest_suite_for_string(self, klass, string):
 
3537
        class Finder(doctest.DocTestFinder):
 
3538
 
 
3539
            def find(*args, **kwargs):
 
3540
                test = doctest.DocTestParser().get_doctest(
 
3541
                    string, {}, 'foo', 'foo.py', 0)
 
3542
                return [test]
 
3543
 
 
3544
        suite = klass(test_finder=Finder())
 
3545
        return suite
 
3546
 
 
3547
    def run_doctest_suite_for_string(self, klass, string):
 
3548
        suite = self.get_doctest_suite_for_string(klass, string)
 
3549
        output = StringIO()
 
3550
        result = tests.TextTestResult(output, 0, 1)
 
3551
        suite.run(result)
 
3552
        return result, output
 
3553
 
 
3554
    def assertDocTestStringSucceds(self, klass, string):
 
3555
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3556
        if not result.wasStrictlySuccessful():
 
3557
            self.fail(output.getvalue())
 
3558
 
 
3559
    def assertDocTestStringFails(self, klass, string):
 
3560
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3561
        if result.wasStrictlySuccessful():
 
3562
            self.fail(output.getvalue())
 
3563
 
 
3564
    def test_injected_variable(self):
 
3565
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3566
        test = """
 
3567
            >>> import os
 
3568
            >>> os.environ['LINES']
 
3569
            '42'
 
3570
            """
 
3571
        # doctest.DocTestSuite fails as it sees '25'
 
3572
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3573
        # tests.DocTestSuite sees '42'
 
3574
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3575
 
 
3576
    def test_deleted_variable(self):
 
3577
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3578
        test = """
 
3579
            >>> import os
 
3580
            >>> os.environ.get('LINES')
 
3581
            """
 
3582
        # doctest.DocTestSuite fails as it sees '25'
 
3583
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3584
        # tests.DocTestSuite sees None
 
3585
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3586
 
 
3587
 
 
3588
class TestSelftestExcludePatterns(tests.TestCase):
 
3589
 
 
3590
    def setUp(self):
 
3591
        super(TestSelftestExcludePatterns, self).setUp()
 
3592
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3593
 
 
3594
    def suite_factory(self, keep_only=None, starting_with=None):
 
3595
        """A test suite factory with only a few tests."""
 
3596
        class Test(tests.TestCase):
 
3597
            def id(self):
 
3598
                # We don't need the full class path
 
3599
                return self._testMethodName
 
3600
            def a(self):
 
3601
                pass
 
3602
            def b(self):
 
3603
                pass
 
3604
            def c(self):
 
3605
                pass
 
3606
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3607
 
 
3608
    def assertTestList(self, expected, *selftest_args):
 
3609
        # We rely on setUp installing the right test suite factory so we can
 
3610
        # test at the command level without loading the whole test suite
 
3611
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3612
        actual = out.splitlines()
 
3613
        self.assertEquals(expected, actual)
 
3614
 
 
3615
    def test_full_list(self):
 
3616
        self.assertTestList(['a', 'b', 'c'])
 
3617
 
 
3618
    def test_single_exclude(self):
 
3619
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3620
 
 
3621
    def test_mutiple_excludes(self):
 
3622
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3623
 
 
3624
 
 
3625
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3626
 
 
3627
    _test_needs_features = [features.subunit]
 
3628
 
 
3629
    def setUp(self):
 
3630
        super(TestCounterHooks, self).setUp()
 
3631
        class Test(tests.TestCase):
 
3632
 
 
3633
            def setUp(self):
 
3634
                super(Test, self).setUp()
 
3635
                self.hooks = hooks.Hooks()
 
3636
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
 
3637
                self.install_counter_hook(self.hooks, 'myhook')
 
3638
 
 
3639
            def no_hook(self):
 
3640
                pass
 
3641
 
 
3642
            def run_hook_once(self):
 
3643
                for hook in self.hooks['myhook']:
 
3644
                    hook(self)
 
3645
 
 
3646
        self.test_class = Test
 
3647
 
 
3648
    def assertHookCalls(self, expected_calls, test_name):
 
3649
        test = self.test_class(test_name)
 
3650
        result = unittest.TestResult()
 
3651
        test.run(result)
 
3652
        self.assertTrue(hasattr(test, '_counters'))
 
3653
        self.assertTrue(test._counters.has_key('myhook'))
 
3654
        self.assertEquals(expected_calls, test._counters['myhook'])
 
3655
 
 
3656
    def test_no_hook(self):
 
3657
        self.assertHookCalls(0, 'no_hook')
 
3658
 
 
3659
    def test_run_hook_once(self):
 
3660
        tt = features.testtools
 
3661
        if tt.module.__version__ < (0, 9, 8):
 
3662
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3663
        self.assertHookCalls(1, 'run_hook_once')