~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2012-01-24 13:14:06 UTC
  • mto: (6445.4.5 nested-trees-spec)
  • mto: This revision was merged to the branch mainline in revision 6518.
  • Revision ID: jelmer@samba.org-20120124131406-wedftkorbpv37bm0
Import nested tree doc from devnotes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import gc
 
21
import doctest
21
22
import os
22
23
import signal
23
24
import sys
 
25
import threading
24
26
import time
25
27
import unittest
26
28
import warnings
27
29
 
28
 
from testtools import MultiTestResult
 
30
from testtools import (
 
31
    ExtendedToOriginalDecorator,
 
32
    MultiTestResult,
 
33
    )
 
34
from testtools.content import Content
29
35
from testtools.content_type import ContentType
30
36
from testtools.matchers import (
31
37
    DocTestMatches,
32
38
    Equals,
33
39
    )
34
 
import testtools.tests.helpers
 
40
import testtools.testresult.doubles
35
41
 
36
42
import bzrlib
37
43
from bzrlib import (
38
44
    branchbuilder,
39
45
    bzrdir,
40
 
    debug,
41
46
    errors,
 
47
    hooks,
42
48
    lockdir,
43
49
    memorytree,
44
50
    osutils,
45
 
    progress,
46
51
    remote,
47
52
    repository,
48
53
    symbol_versioning,
49
54
    tests,
50
55
    transport,
51
56
    workingtree,
 
57
    workingtree_3,
 
58
    workingtree_4,
52
59
    )
53
60
from bzrlib.repofmt import (
54
61
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
62
    )
58
63
from bzrlib.symbol_versioning import (
59
64
    deprecated_function,
64
69
    features,
65
70
    test_lsprof,
66
71
    test_server,
67
 
    test_sftp_transport,
68
72
    TestUtil,
69
73
    )
70
 
from bzrlib.trace import note
 
74
from bzrlib.trace import note, mutter
71
75
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
76
 
74
77
 
75
78
def _test_ids(test_suite):
77
80
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
81
 
79
82
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
83
class MetaTestLog(tests.TestCase):
93
84
 
94
85
    def test_logging(self):
100
91
            "text", "plain", {"charset": "utf8"})))
101
92
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
93
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
104
 
 
105
 
 
106
 
class TestUnicodeFilename(tests.TestCase):
107
 
 
108
 
    def test_probe_passes(self):
109
 
        """UnicodeFilename._probe passes."""
110
 
        # We can't test much more than that because the behaviour depends
111
 
        # on the platform.
112
 
        tests.UnicodeFilename._probe()
 
94
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
113
95
 
114
96
 
115
97
class TestTreeShape(tests.TestCaseInTempDir):
116
98
 
117
99
    def test_unicode_paths(self):
118
 
        self.requireFeature(tests.UnicodeFilename)
 
100
        self.requireFeature(features.UnicodeFilenameFeature)
119
101
 
120
102
        filename = u'hell\u00d8'
121
103
        self.build_tree_contents([(filename, 'contents of hello')])
122
 
        self.failUnlessExists(filename)
 
104
        self.assertPathExists(filename)
 
105
 
 
106
 
 
107
class TestClassesAvailable(tests.TestCase):
 
108
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
109
 
 
110
    def test_test_case(self):
 
111
        from bzrlib.tests import TestCase
 
112
 
 
113
    def test_test_loader(self):
 
114
        from bzrlib.tests import TestLoader
 
115
 
 
116
    def test_test_suite(self):
 
117
        from bzrlib.tests import TestSuite
123
118
 
124
119
 
125
120
class TestTransportScenarios(tests.TestCase):
312
307
        from bzrlib.tests.per_interrepository import make_scenarios
313
308
        server1 = "a"
314
309
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
310
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
311
        scenarios = make_scenarios(server1, server2, formats)
317
312
        self.assertEqual([
318
313
            ('C0,str,str',
319
314
             {'repository_format': 'C1',
320
315
              'repository_format_to': 'C2',
321
316
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
317
              'transport_server': 'a',
 
318
              'extra_setup': 'C3'}),
323
319
            ('D0,str,str',
324
320
             {'repository_format': 'D1',
325
321
              'repository_format_to': 'D2',
326
322
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
323
              'transport_server': 'a',
 
324
              'extra_setup': 'D3'})],
328
325
            scenarios)
329
326
 
330
327
 
336
333
        from bzrlib.tests.per_workingtree import make_scenarios
337
334
        server1 = "a"
338
335
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
340
 
                   workingtree.WorkingTreeFormat3(),]
 
336
        formats = [workingtree_4.WorkingTreeFormat4(),
 
337
                   workingtree_3.WorkingTreeFormat3(),]
341
338
        scenarios = make_scenarios(server1, server2, formats)
342
339
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
340
            ('WorkingTreeFormat4',
344
341
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
342
              'transport_readonly_server': 'b',
346
343
              'transport_server': 'a',
373
370
            )
374
371
        server1 = "a"
375
372
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
377
 
                   workingtree.WorkingTreeFormat3(),]
 
373
        formats = [workingtree_4.WorkingTreeFormat4(),
 
374
                   workingtree_3.WorkingTreeFormat3(),]
378
375
        scenarios = make_scenarios(server1, server2, formats)
379
376
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
 
        wt4_format = workingtree.WorkingTreeFormat4()
382
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
377
        default_wt_format = workingtree.format_registry.get_default()
 
378
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
379
        wt5_format = workingtree_4.WorkingTreeFormat5()
383
380
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
381
            ('WorkingTreeFormat4',
385
382
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
383
              'transport_readonly_server': 'b',
387
384
              'transport_server': 'a',
447
444
        # ones to add.
448
445
        from bzrlib.tests.per_tree import (
449
446
            return_parameter,
450
 
            revision_tree_from_workingtree
451
447
            )
452
448
        from bzrlib.tests.per_intertree import (
453
449
            make_scenarios,
454
450
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
451
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
452
        from bzrlib.workingtree_4 import WorkingTreeFormat4
456
453
        input_test = TestInterTreeScenarios(
457
454
            "test_scenarios")
458
455
        server1 = "a"
459
456
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
457
        format1 = WorkingTreeFormat4()
461
458
        format2 = WorkingTreeFormat3()
462
459
        formats = [("1", str, format1, format2, "converter1"),
463
460
            ("2", int, format2, format1, "converter2")]
509
506
        self.assertRaises(AssertionError, self.assertEqualStat,
510
507
            os.lstat("foo"), os.lstat("longname"))
511
508
 
 
509
    def test_failUnlessExists(self):
 
510
        """Deprecated failUnlessExists and failIfExists"""
 
511
        self.applyDeprecated(
 
512
            deprecated_in((2, 4)),
 
513
            self.failUnlessExists, '.')
 
514
        self.build_tree(['foo/', 'foo/bar'])
 
515
        self.applyDeprecated(
 
516
            deprecated_in((2, 4)),
 
517
            self.failUnlessExists, 'foo/bar')
 
518
        self.applyDeprecated(
 
519
            deprecated_in((2, 4)),
 
520
            self.failIfExists, 'foo/foo')
 
521
 
 
522
    def test_assertPathExists(self):
 
523
        self.assertPathExists('.')
 
524
        self.build_tree(['foo/', 'foo/bar'])
 
525
        self.assertPathExists('foo/bar')
 
526
        self.assertPathDoesNotExist('foo/foo')
 
527
 
512
528
 
513
529
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
514
530
 
548
564
        tree = self.make_branch_and_memory_tree('dir')
549
565
        # Guard against regression into MemoryTransport leaking
550
566
        # files to disk instead of keeping them in memory.
551
 
        self.failIf(osutils.lexists('dir'))
 
567
        self.assertFalse(osutils.lexists('dir'))
552
568
        self.assertIsInstance(tree, memorytree.MemoryTree)
553
569
 
554
570
    def test_make_branch_and_memory_tree_with_format(self):
555
571
        """make_branch_and_memory_tree should accept a format option."""
556
572
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
573
        format.repository_format = repository.format_registry.get_default()
558
574
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
575
        # Guard against regression into MemoryTransport leaking
560
576
        # files to disk instead of keeping them in memory.
561
 
        self.failIf(osutils.lexists('dir'))
 
577
        self.assertFalse(osutils.lexists('dir'))
562
578
        self.assertIsInstance(tree, memorytree.MemoryTree)
563
579
        self.assertEqual(format.repository_format.__class__,
564
580
            tree.branch.repository._format.__class__)
568
584
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
569
585
        # Guard against regression into MemoryTransport leaking
570
586
        # files to disk instead of keeping them in memory.
571
 
        self.failIf(osutils.lexists('dir'))
 
587
        self.assertFalse(osutils.lexists('dir'))
572
588
 
573
589
    def test_make_branch_builder_with_format(self):
574
590
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
591
        # that the format objects are used.
576
592
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
593
        repo_format = repository.format_registry.get_default()
578
594
        format.repository_format = repo_format
579
595
        builder = self.make_branch_builder('dir', format=format)
580
596
        the_branch = builder.get_branch()
581
597
        # Guard against regression into MemoryTransport leaking
582
598
        # files to disk instead of keeping them in memory.
583
 
        self.failIf(osutils.lexists('dir'))
 
599
        self.assertFalse(osutils.lexists('dir'))
584
600
        self.assertEqual(format.repository_format.__class__,
585
601
                         the_branch.repository._format.__class__)
586
602
        self.assertEqual(repo_format.get_format_string(),
592
608
        the_branch = builder.get_branch()
593
609
        # Guard against regression into MemoryTransport leaking
594
610
        # files to disk instead of keeping them in memory.
595
 
        self.failIf(osutils.lexists('dir'))
 
611
        self.assertFalse(osutils.lexists('dir'))
596
612
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
597
613
        self.assertEqual(dir_format.repository_format.__class__,
598
614
                         the_branch.repository._format.__class__)
603
619
    def test_dangling_locks_cause_failures(self):
604
620
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
621
            def test_function(self):
606
 
                t = self.get_transport('.')
 
622
                t = self.get_transport_from_path('.')
607
623
                l = lockdir.LockDir(t, 'lock')
608
624
                l.create()
609
625
                l.attempt_lock()
611
627
        result = test.run()
612
628
        total_failures = result.errors + result.failures
613
629
        if self._lock_check_thorough:
614
 
            self.assertLength(1, total_failures)
 
630
            self.assertEqual(1, len(total_failures))
615
631
        else:
616
632
            # When _lock_check_thorough is disabled, then we don't trigger a
617
633
            # failure
618
 
            self.assertLength(0, total_failures)
 
634
            self.assertEqual(0, len(total_failures))
619
635
 
620
636
 
621
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
629
645
        # for the server
630
646
        url = self.get_readonly_url()
631
647
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = transport.get_transport(url)
633
 
        t2 = transport.get_transport(url2)
634
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
648
        t = transport.get_transport_from_url(url)
 
649
        t2 = transport.get_transport_from_url(url2)
 
650
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
651
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
653
 
638
654
    def test_get_readonly_url_http(self):
644
660
        url = self.get_readonly_url()
645
661
        url2 = self.get_readonly_url('foo/bar')
646
662
        # the transport returned may be any HttpTransportBase subclass
647
 
        t = transport.get_transport(url)
648
 
        t2 = transport.get_transport(url2)
649
 
        self.failUnless(isinstance(t, HttpTransportBase))
650
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
663
        t = transport.get_transport_from_url(url)
 
664
        t2 = transport.get_transport_from_url(url2)
 
665
        self.assertIsInstance(t, HttpTransportBase)
 
666
        self.assertIsInstance(t2, HttpTransportBase)
651
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
652
668
 
653
669
    def test_is_directory(self):
661
677
    def test_make_branch_builder(self):
662
678
        builder = self.make_branch_builder('dir')
663
679
        rev_id = builder.build_commit()
664
 
        self.failUnlessExists('dir')
 
680
        self.assertPathExists('dir')
665
681
        a_dir = bzrdir.BzrDir.open('dir')
666
682
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
667
683
        a_branch = a_dir.open_branch()
683
699
        self.assertIsInstance(result_bzrdir.transport,
684
700
                              memory.MemoryTransport)
685
701
        # should not be on disk, should only be in memory
686
 
        self.failIfExists('subdir')
 
702
        self.assertPathDoesNotExist('subdir')
687
703
 
688
704
 
689
705
class TestChrootedTest(tests.ChrootedTestCase):
690
706
 
691
707
    def test_root_is_root(self):
692
 
        t = transport.get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport_from_url(self.get_readonly_url())
693
709
        url = t.base
694
710
        self.assertEqual(url, t.clone('..').base)
695
711
 
697
713
class TestProfileResult(tests.TestCase):
698
714
 
699
715
    def test_profiles_tests(self):
700
 
        self.requireFeature(test_lsprof.LSProfFeature)
701
 
        terminal = testtools.tests.helpers.ExtendedTestResult()
 
716
        self.requireFeature(features.lsprof_feature)
 
717
        terminal = testtools.testresult.doubles.ExtendedTestResult()
702
718
        result = tests.ProfileResult(terminal)
703
719
        class Sample(tests.TestCase):
704
720
            def a(self):
721
737
                descriptions=0,
722
738
                verbosity=1,
723
739
                )
724
 
        capture = testtools.tests.helpers.ExtendedTestResult()
 
740
        capture = testtools.testresult.doubles.ExtendedTestResult()
725
741
        test_case.run(MultiTestResult(result, capture))
726
742
        run_case = capture._events[0][1]
727
743
        timed_string = result._testTimeString(run_case)
748
764
        self.check_timing(ShortDelayTestCase('test_short_delay'),
749
765
                          r"^ +[0-9]+ms$")
750
766
 
751
 
    def _patch_get_bzr_source_tree(self):
752
 
        # Reading from the actual source tree breaks isolation, but we don't
753
 
        # want to assume that thats *all* that would happen.
754
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
755
 
 
756
 
    def test_assigned_benchmark_file_stores_date(self):
757
 
        self._patch_get_bzr_source_tree()
758
 
        output = StringIO()
759
 
        result = bzrlib.tests.TextTestResult(self._log_file,
760
 
                                        descriptions=0,
761
 
                                        verbosity=1,
762
 
                                        bench_history=output
763
 
                                        )
764
 
        output_string = output.getvalue()
765
 
        # if you are wondering about the regexp please read the comment in
766
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
767
 
        # XXX: what comment?  -- Andrew Bennetts
768
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
769
 
 
770
 
    def test_benchhistory_records_test_times(self):
771
 
        self._patch_get_bzr_source_tree()
772
 
        result_stream = StringIO()
773
 
        result = bzrlib.tests.TextTestResult(
774
 
            self._log_file,
775
 
            descriptions=0,
776
 
            verbosity=1,
777
 
            bench_history=result_stream
778
 
            )
779
 
 
780
 
        # we want profile a call and check that its test duration is recorded
781
 
        # make a new test instance that when run will generate a benchmark
782
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
783
 
        # execute the test, which should succeed and record times
784
 
        example_test_case.run(result)
785
 
        lines = result_stream.getvalue().splitlines()
786
 
        self.assertEqual(2, len(lines))
787
 
        self.assertContainsRe(lines[1],
788
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
789
 
            "._time_hello_world_encoding")
790
 
 
791
767
    def _time_hello_world_encoding(self):
792
768
        """Profile two sleep calls
793
769
 
798
774
 
799
775
    def test_lsprofiling(self):
800
776
        """Verbose test result prints lsprof statistics from test cases."""
801
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
777
        self.requireFeature(features.lsprof_feature)
802
778
        result_stream = StringIO()
803
779
        result = bzrlib.tests.VerboseTestResult(
804
780
            result_stream,
833
809
        self.assertContainsRe(output,
834
810
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
835
811
 
 
812
    def test_uses_time_from_testtools(self):
 
813
        """Test case timings in verbose results should use testtools times"""
 
814
        import datetime
 
815
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
816
            def startTest(self, test):
 
817
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
818
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
819
            def addSuccess(self, test):
 
820
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
821
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
822
            def report_tests_starting(self): pass
 
823
        sio = StringIO()
 
824
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
825
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
826
 
836
827
    def test_known_failure(self):
837
 
        """A KnownFailure being raised should trigger several result actions."""
 
828
        """Using knownFailure should trigger several result actions."""
838
829
        class InstrumentedTestResult(tests.ExtendedTestResult):
839
830
            def stopTestRun(self): pass
840
 
            def startTests(self): pass
841
 
            def report_test_start(self, test): pass
 
831
            def report_tests_starting(self): pass
842
832
            def report_known_failure(self, test, err=None, details=None):
843
833
                self._call = test, 'known failure'
844
834
        result = InstrumentedTestResult(None, None, None, None)
845
835
        class Test(tests.TestCase):
846
836
            def test_function(self):
847
 
                raise tests.KnownFailure('failed!')
 
837
                self.knownFailure('failed!')
848
838
        test = Test("test_function")
849
839
        test.run(result)
850
840
        # it should invoke 'report_known_failure'.
866
856
            descriptions=0,
867
857
            verbosity=2,
868
858
            )
869
 
        test = self.get_passing_test()
870
 
        result.startTest(test)
871
 
        prefix = len(result_stream.getvalue())
872
 
        # the err parameter has the shape:
873
 
        # (class, exception object, traceback)
874
 
        # KnownFailures dont get their tracebacks shown though, so we
875
 
        # can skip that.
876
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
877
 
        result.report_known_failure(test, err)
878
 
        output = result_stream.getvalue()[prefix:]
879
 
        lines = output.splitlines()
880
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
881
 
        if sys.version_info > (2, 7):
882
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
883
 
                self.assertNotEqual, lines[1], '    ')
884
 
        self.assertEqual(lines[1], '    foo')
885
 
        self.assertEqual(2, len(lines))
 
859
        _get_test("test_xfail").run(result)
 
860
        self.assertContainsRe(result_stream.getvalue(),
 
861
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
862
            "\\s*(?:Text attachment: )?reason"
 
863
            "(?:\n-+\n|: {{{)"
 
864
            "this_fails"
 
865
            "(?:\n-+\n|}}}\n)")
886
866
 
887
867
    def get_passing_test(self):
888
868
        """Return a test object that can't be run usefully."""
894
874
        """Test the behaviour of invoking addNotSupported."""
895
875
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
876
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
877
            def report_tests_starting(self): pass
899
878
            def report_unsupported(self, test, feature):
900
879
                self._call = test, feature
901
880
        result = InstrumentedTestResult(None, None, None, None)
902
881
        test = SampleTestCase('_test_pass')
903
 
        feature = tests.Feature()
 
882
        feature = features.Feature()
904
883
        result.startTest(test)
905
884
        result.addNotSupported(test, feature)
906
885
        # it should invoke 'report_unsupported'.
925
904
            verbosity=2,
926
905
            )
927
906
        test = self.get_passing_test()
928
 
        feature = tests.Feature()
 
907
        feature = features.Feature()
929
908
        result.startTest(test)
930
909
        prefix = len(result_stream.getvalue())
931
910
        result.report_unsupported(test, feature)
940
919
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
920
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
921
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
922
            def report_tests_starting(self): pass
945
923
            def addNotSupported(self, test, feature):
946
924
                self._call = test, feature
947
925
        result = InstrumentedTestResult(None, None, None, None)
948
 
        feature = tests.Feature()
 
926
        feature = features.Feature()
949
927
        class Test(tests.TestCase):
950
928
            def test_function(self):
951
929
                raise tests.UnavailableFeature(feature)
970
948
    def test_strict_with_known_failure(self):
971
949
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
972
950
                                             verbosity=1)
973
 
        test = self.get_passing_test()
974
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
975
 
        result.addExpectedFailure(test, err)
 
951
        test = _get_test("test_xfail")
 
952
        test.run(result)
976
953
        self.assertFalse(result.wasStrictlySuccessful())
977
954
        self.assertEqual(None, result._extractBenchmarkTime(test))
978
955
 
989
966
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
967
            calls = 0
991
968
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
969
        result = InstrumentedTestResult(None, None, None, None)
994
970
        def test_function():
995
971
            pass
997
973
        test.run(result)
998
974
        self.assertEquals(1, result.calls)
999
975
 
1000
 
 
1001
 
class TestUnicodeFilenameFeature(tests.TestCase):
1002
 
 
1003
 
    def test_probe_passes(self):
1004
 
        """UnicodeFilenameFeature._probe passes."""
1005
 
        # We can't test much more than that because the behaviour depends
1006
 
        # on the platform.
1007
 
        tests.UnicodeFilenameFeature._probe()
 
976
    def test_startTests_only_once(self):
 
977
        """With multiple tests startTests should still only be called once"""
 
978
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
979
            calls = 0
 
980
            def startTests(self): self.calls += 1
 
981
        result = InstrumentedTestResult(None, None, None, None)
 
982
        suite = unittest.TestSuite([
 
983
            unittest.FunctionTestCase(lambda: None),
 
984
            unittest.FunctionTestCase(lambda: None)])
 
985
        suite.run(result)
 
986
        self.assertEquals(1, result.calls)
 
987
        self.assertEquals(2, result.count)
1008
988
 
1009
989
 
1010
990
class TestRunner(tests.TestCase):
1023
1003
        because of our use of global state.
1024
1004
        """
1025
1005
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1006
        try:
1028
1007
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1008
            return testrunner.run(test)
1031
1009
        finally:
1032
1010
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1011
 
1035
1012
    def test_known_failure_failed_run(self):
1036
1013
        # run a test that generates a known failure which should be printed in
1041
1018
        test = unittest.TestSuite()
1042
1019
        test.addTest(Test("known_failure_test"))
1043
1020
        def failing_test():
1044
 
            self.fail('foo')
 
1021
            raise AssertionError('foo')
1045
1022
        test.addTest(unittest.FunctionTestCase(failing_test))
1046
1023
        stream = StringIO()
1047
1024
        runner = tests.TextTestRunner(stream=stream)
1055
1032
            '^----------------------------------------------------------------------\n'
1056
1033
            'Traceback \\(most recent call last\\):\n'
1057
1034
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1058
 
            '    self.fail\\(\'foo\'\\)\n'
 
1035
            '    raise AssertionError\\(\'foo\'\\)\n'
1059
1036
            '.*'
1060
1037
            '^----------------------------------------------------------------------\n'
1061
1038
            '.*'
1067
1044
        # the final output.
1068
1045
        class Test(tests.TestCase):
1069
1046
            def known_failure_test(self):
1070
 
                self.expectFailure('failed', self.assertTrue, False)
 
1047
                self.knownFailure("Never works...")
1071
1048
        test = Test("known_failure_test")
1072
1049
        stream = StringIO()
1073
1050
        runner = tests.TextTestRunner(stream=stream)
1079
1056
            '\n'
1080
1057
            'OK \\(known_failures=1\\)\n')
1081
1058
 
 
1059
    def test_unexpected_success_bad(self):
 
1060
        class Test(tests.TestCase):
 
1061
            def test_truth(self):
 
1062
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1063
        runner = tests.TextTestRunner(stream=StringIO())
 
1064
        result = self.run_test_runner(runner, Test("test_truth"))
 
1065
        self.assertContainsRe(runner.stream.getvalue(),
 
1066
            "=+\n"
 
1067
            "FAIL: \\S+\.test_truth\n"
 
1068
            "-+\n"
 
1069
            "(?:.*\n)*"
 
1070
            "\\s*(?:Text attachment: )?reason"
 
1071
            "(?:\n-+\n|: {{{)"
 
1072
            "No absolute truth"
 
1073
            "(?:\n-+\n|}}}\n)"
 
1074
            "(?:.*\n)*"
 
1075
            "-+\n"
 
1076
            "Ran 1 test in .*\n"
 
1077
            "\n"
 
1078
            "FAILED \\(failures=1\\)\n\\Z")
 
1079
 
1082
1080
    def test_result_decorator(self):
1083
1081
        # decorate results
1084
1082
        calls = []
1085
 
        class LoggingDecorator(tests.ForwardingResult):
 
1083
        class LoggingDecorator(ExtendedToOriginalDecorator):
1086
1084
            def startTest(self, test):
1087
 
                tests.ForwardingResult.startTest(self, test)
 
1085
                ExtendedToOriginalDecorator.startTest(self, test)
1088
1086
                calls.append('start')
1089
1087
        test = unittest.FunctionTestCase(lambda:None)
1090
1088
        stream = StringIO()
1168
1166
 
1169
1167
    def test_unsupported_features_listed(self):
1170
1168
        """When unsupported features are encountered they are detailed."""
1171
 
        class Feature1(tests.Feature):
 
1169
        class Feature1(features.Feature):
1172
1170
            def _probe(self): return False
1173
 
        class Feature2(tests.Feature):
 
1171
        class Feature2(features.Feature):
1174
1172
            def _probe(self): return False
1175
1173
        # create sample tests
1176
1174
        test1 = SampleTestCase('_test_pass')
1191
1189
            ],
1192
1190
            lines[-3:])
1193
1191
 
1194
 
    def _patch_get_bzr_source_tree(self):
1195
 
        # Reading from the actual source tree breaks isolation, but we don't
1196
 
        # want to assume that thats *all* that would happen.
1197
 
        self._get_source_tree_calls = []
1198
 
        def new_get():
1199
 
            self._get_source_tree_calls.append("called")
1200
 
            return None
1201
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1202
 
 
1203
 
    def test_bench_history(self):
1204
 
        # tests that the running the benchmark passes bench_history into
1205
 
        # the test result object. We can tell that happens if
1206
 
        # _get_bzr_source_tree is called.
1207
 
        self._patch_get_bzr_source_tree()
1208
 
        test = TestRunner('dummy_test')
1209
 
        output = StringIO()
1210
 
        runner = tests.TextTestRunner(stream=self._log_file,
1211
 
                                      bench_history=output)
1212
 
        result = self.run_test_runner(runner, test)
1213
 
        output_string = output.getvalue()
1214
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
 
        self.assertLength(1, self._get_source_tree_calls)
 
1192
    def test_verbose_test_count(self):
 
1193
        """A verbose test run reports the right test count at the start"""
 
1194
        suite = TestUtil.TestSuite([
 
1195
            unittest.FunctionTestCase(lambda:None),
 
1196
            unittest.FunctionTestCase(lambda:None)])
 
1197
        self.assertEqual(suite.countTestCases(), 2)
 
1198
        stream = StringIO()
 
1199
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1200
        # Need to use the CountingDecorator as that's what sets num_tests
 
1201
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1202
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1216
1203
 
1217
1204
    def test_startTestRun(self):
1218
1205
        """run should call result.startTestRun()"""
1219
1206
        calls = []
1220
 
        class LoggingDecorator(tests.ForwardingResult):
 
1207
        class LoggingDecorator(ExtendedToOriginalDecorator):
1221
1208
            def startTestRun(self):
1222
 
                tests.ForwardingResult.startTestRun(self)
 
1209
                ExtendedToOriginalDecorator.startTestRun(self)
1223
1210
                calls.append('startTestRun')
1224
1211
        test = unittest.FunctionTestCase(lambda:None)
1225
1212
        stream = StringIO()
1231
1218
    def test_stopTestRun(self):
1232
1219
        """run should call result.stopTestRun()"""
1233
1220
        calls = []
1234
 
        class LoggingDecorator(tests.ForwardingResult):
 
1221
        class LoggingDecorator(ExtendedToOriginalDecorator):
1235
1222
            def stopTestRun(self):
1236
 
                tests.ForwardingResult.stopTestRun(self)
 
1223
                ExtendedToOriginalDecorator.stopTestRun(self)
1237
1224
                calls.append('stopTestRun')
1238
1225
        test = unittest.FunctionTestCase(lambda:None)
1239
1226
        stream = StringIO()
1242
1229
        result = self.run_test_runner(runner, test)
1243
1230
        self.assertLength(1, calls)
1244
1231
 
 
1232
    def test_unicode_test_output_on_ascii_stream(self):
 
1233
        """Showing results should always succeed even on an ascii console"""
 
1234
        class FailureWithUnicode(tests.TestCase):
 
1235
            def test_log_unicode(self):
 
1236
                self.log(u"\u2606")
 
1237
                self.fail("Now print that log!")
 
1238
        out = StringIO()
 
1239
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1240
            lambda trace=False: "ascii")
 
1241
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1242
            FailureWithUnicode("test_log_unicode"))
 
1243
        self.assertContainsRe(out.getvalue(),
 
1244
            "(?:Text attachment: )?log"
 
1245
            "(?:\n-+\n|: {{{)"
 
1246
            "\d+\.\d+  \\\\u2606"
 
1247
            "(?:\n-+\n|}}}\n)")
 
1248
 
1245
1249
 
1246
1250
class SampleTestCase(tests.TestCase):
1247
1251
 
1435
1439
        # Note this test won't fail with hooks that the core library doesn't
1436
1440
        # use - but it trigger with a plugin that adds hooks, so its still a
1437
1441
        # useful warning in that case.
1438
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1439
 
            bzrlib.branch.Branch.hooks)
1440
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1442
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1443
        self.assertEqual(
 
1444
            bzrlib.smart.server.SmartServerHooks(),
1441
1445
            bzrlib.smart.server.SmartTCPServer.hooks)
1442
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1443
 
            bzrlib.commands.Command.hooks)
 
1446
        self.assertEqual(
 
1447
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1444
1448
 
1445
1449
    def test__gather_lsprof_in_benchmarks(self):
1446
1450
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1447
1451
 
1448
1452
        Each self.time() call is individually and separately profiled.
1449
1453
        """
1450
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1454
        self.requireFeature(features.lsprof_feature)
1451
1455
        # overrides the class member with an instance member so no cleanup
1452
1456
        # needed.
1453
1457
        self._gather_lsprof_in_benchmarks = True
1472
1476
        transport_server = memory.MemoryServer()
1473
1477
        transport_server.start_server()
1474
1478
        self.addCleanup(transport_server.stop_server)
1475
 
        t = transport.get_transport(transport_server.get_url())
 
1479
        t = transport.get_transport_from_url(transport_server.get_url())
1476
1480
        bzrdir.BzrDir.create(t.base)
1477
1481
        self.assertRaises(errors.BzrError,
1478
1482
            bzrdir.BzrDir.open_from_transport, t)
1483
1487
 
1484
1488
    def test_requireFeature_available(self):
1485
1489
        """self.requireFeature(available) is a no-op."""
1486
 
        class Available(tests.Feature):
 
1490
        class Available(features.Feature):
1487
1491
            def _probe(self):return True
1488
1492
        feature = Available()
1489
1493
        self.requireFeature(feature)
1490
1494
 
1491
1495
    def test_requireFeature_unavailable(self):
1492
1496
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1493
 
        class Unavailable(tests.Feature):
 
1497
        class Unavailable(features.Feature):
1494
1498
            def _probe(self):return False
1495
1499
        feature = Unavailable()
1496
1500
        self.assertRaises(tests.UnavailableFeature,
1655
1659
        test.run(unittest.TestResult())
1656
1660
        self.assertEqual('original', obj.test_attr)
1657
1661
 
 
1662
    def test_recordCalls(self):
 
1663
        from bzrlib.tests import test_selftest
 
1664
        calls = self.recordCalls(
 
1665
            test_selftest, '_add_numbers')
 
1666
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1667
            12)
 
1668
        self.assertEquals(calls, [((2, 10), {})])
 
1669
 
 
1670
 
 
1671
def _add_numbers(a, b):
 
1672
    return a + b
 
1673
 
 
1674
 
 
1675
class _MissingFeature(features.Feature):
 
1676
    def _probe(self):
 
1677
        return False
 
1678
missing_feature = _MissingFeature()
 
1679
 
 
1680
 
 
1681
def _get_test(name):
 
1682
    """Get an instance of a specific example test.
 
1683
 
 
1684
    We protect this in a function so that they don't auto-run in the test
 
1685
    suite.
 
1686
    """
 
1687
 
 
1688
    class ExampleTests(tests.TestCase):
 
1689
 
 
1690
        def test_fail(self):
 
1691
            mutter('this was a failing test')
 
1692
            self.fail('this test will fail')
 
1693
 
 
1694
        def test_error(self):
 
1695
            mutter('this test errored')
 
1696
            raise RuntimeError('gotcha')
 
1697
 
 
1698
        def test_missing_feature(self):
 
1699
            mutter('missing the feature')
 
1700
            self.requireFeature(missing_feature)
 
1701
 
 
1702
        def test_skip(self):
 
1703
            mutter('this test will be skipped')
 
1704
            raise tests.TestSkipped('reason')
 
1705
 
 
1706
        def test_success(self):
 
1707
            mutter('this test succeeds')
 
1708
 
 
1709
        def test_xfail(self):
 
1710
            mutter('test with expected failure')
 
1711
            self.knownFailure('this_fails')
 
1712
 
 
1713
        def test_unexpected_success(self):
 
1714
            mutter('test with unexpected success')
 
1715
            self.expectFailure('should_fail', lambda: None)
 
1716
 
 
1717
    return ExampleTests(name)
 
1718
 
 
1719
 
 
1720
class TestTestCaseLogDetails(tests.TestCase):
 
1721
 
 
1722
    def _run_test(self, test_name):
 
1723
        test = _get_test(test_name)
 
1724
        result = testtools.TestResult()
 
1725
        test.run(result)
 
1726
        return result
 
1727
 
 
1728
    def test_fail_has_log(self):
 
1729
        result = self._run_test('test_fail')
 
1730
        self.assertEqual(1, len(result.failures))
 
1731
        result_content = result.failures[0][1]
 
1732
        self.assertContainsRe(result_content,
 
1733
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1734
        self.assertContainsRe(result_content, 'this was a failing test')
 
1735
 
 
1736
    def test_error_has_log(self):
 
1737
        result = self._run_test('test_error')
 
1738
        self.assertEqual(1, len(result.errors))
 
1739
        result_content = result.errors[0][1]
 
1740
        self.assertContainsRe(result_content,
 
1741
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1742
        self.assertContainsRe(result_content, 'this test errored')
 
1743
 
 
1744
    def test_skip_has_no_log(self):
 
1745
        result = self._run_test('test_skip')
 
1746
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1747
        skips = result.skip_reasons['reason']
 
1748
        self.assertEqual(1, len(skips))
 
1749
        test = skips[0]
 
1750
        self.assertFalse('log' in test.getDetails())
 
1751
 
 
1752
    def test_missing_feature_has_no_log(self):
 
1753
        # testtools doesn't know about addNotSupported, so it just gets
 
1754
        # considered as a skip
 
1755
        result = self._run_test('test_missing_feature')
 
1756
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1757
        skips = result.skip_reasons[missing_feature]
 
1758
        self.assertEqual(1, len(skips))
 
1759
        test = skips[0]
 
1760
        self.assertFalse('log' in test.getDetails())
 
1761
 
 
1762
    def test_xfail_has_no_log(self):
 
1763
        result = self._run_test('test_xfail')
 
1764
        self.assertEqual(1, len(result.expectedFailures))
 
1765
        result_content = result.expectedFailures[0][1]
 
1766
        self.assertNotContainsRe(result_content,
 
1767
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1768
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1769
 
 
1770
    def test_unexpected_success_has_log(self):
 
1771
        result = self._run_test('test_unexpected_success')
 
1772
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1773
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1774
        # expectedFailures is a list of reasons?
 
1775
        test = result.unexpectedSuccesses[0]
 
1776
        details = test.getDetails()
 
1777
        self.assertTrue('log' in details)
 
1778
 
 
1779
 
 
1780
class TestTestCloning(tests.TestCase):
 
1781
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1782
 
 
1783
    def test_cloned_testcase_does_not_share_details(self):
 
1784
        """A TestCase cloned with clone_test does not share mutable attributes
 
1785
        such as details or cleanups.
 
1786
        """
 
1787
        class Test(tests.TestCase):
 
1788
            def test_foo(self):
 
1789
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1790
        orig_test = Test('test_foo')
 
1791
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1792
        orig_test.run(unittest.TestResult())
 
1793
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1794
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1795
 
 
1796
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1797
        """Applying two levels of scenarios to a test preserves the attributes
 
1798
        added by both scenarios.
 
1799
        """
 
1800
        class Test(tests.TestCase):
 
1801
            def test_foo(self):
 
1802
                pass
 
1803
        test = Test('test_foo')
 
1804
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1805
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1806
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1807
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1808
        all_tests = list(tests.iter_suite_tests(suite))
 
1809
        self.assertLength(4, all_tests)
 
1810
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1811
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1812
 
1658
1813
 
1659
1814
# NB: Don't delete this; it's not actually from 0.11!
1660
1815
@deprecated_function(deprecated_in((0, 11, 0)))
1788
1943
    def test_make_branch_and_tree_with_format(self):
1789
1944
        # we should be able to supply a format to make_branch_and_tree
1790
1945
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1791
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1792
1946
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1793
1947
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1794
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1795
 
                              bzrlib.bzrdir.BzrDirFormat6)
1796
1948
 
1797
1949
    def test_make_branch_and_memory_tree(self):
1798
1950
        # we should be able to get a new branch and a mutable tree from
1815
1967
                tree.branch.repository.bzrdir.root_transport)
1816
1968
 
1817
1969
 
1818
 
class SelfTestHelper:
 
1970
class SelfTestHelper(object):
1819
1971
 
1820
1972
    def run_selftest(self, **kwargs):
1821
1973
        """Run selftest returning its output."""
1875
2027
        self.assertLength(2, output.readlines())
1876
2028
 
1877
2029
    def test_lsprof_tests(self):
1878
 
        self.requireFeature(test_lsprof.LSProfFeature)
1879
 
        calls = []
 
2030
        self.requireFeature(features.lsprof_feature)
 
2031
        results = []
1880
2032
        class Test(object):
1881
2033
            def __call__(test, result):
1882
2034
                test.run(result)
1883
2035
            def run(test, result):
1884
 
                self.assertIsInstance(result, tests.ForwardingResult)
1885
 
                calls.append("called")
 
2036
                results.append(result)
1886
2037
            def countTestCases(self):
1887
2038
                return 1
1888
2039
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
1889
 
        self.assertLength(1, calls)
 
2040
        self.assertLength(1, results)
 
2041
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
1890
2042
 
1891
2043
    def test_random(self):
1892
2044
        # test randomising by listing a number of tests.
1972
2124
            load_list='missing file name', list_only=True)
1973
2125
 
1974
2126
 
 
2127
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2128
 
 
2129
    _test_needs_features = [features.subunit]
 
2130
 
 
2131
    def run_subunit_stream(self, test_name):
 
2132
        from subunit import ProtocolTestCase
 
2133
        def factory():
 
2134
            return TestUtil.TestSuite([_get_test(test_name)])
 
2135
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2136
            test_suite_factory=factory)
 
2137
        test = ProtocolTestCase(stream)
 
2138
        result = testtools.TestResult()
 
2139
        test.run(result)
 
2140
        content = stream.getvalue()
 
2141
        return content, result
 
2142
 
 
2143
    def test_fail_has_log(self):
 
2144
        content, result = self.run_subunit_stream('test_fail')
 
2145
        self.assertEqual(1, len(result.failures))
 
2146
        self.assertContainsRe(content, '(?m)^log$')
 
2147
        self.assertContainsRe(content, 'this test will fail')
 
2148
 
 
2149
    def test_error_has_log(self):
 
2150
        content, result = self.run_subunit_stream('test_error')
 
2151
        self.assertContainsRe(content, '(?m)^log$')
 
2152
        self.assertContainsRe(content, 'this test errored')
 
2153
 
 
2154
    def test_skip_has_no_log(self):
 
2155
        content, result = self.run_subunit_stream('test_skip')
 
2156
        self.assertNotContainsRe(content, '(?m)^log$')
 
2157
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2158
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2159
        skips = result.skip_reasons['reason']
 
2160
        self.assertEqual(1, len(skips))
 
2161
        test = skips[0]
 
2162
        # RemotedTestCase doesn't preserve the "details"
 
2163
        ## self.assertFalse('log' in test.getDetails())
 
2164
 
 
2165
    def test_missing_feature_has_no_log(self):
 
2166
        content, result = self.run_subunit_stream('test_missing_feature')
 
2167
        self.assertNotContainsRe(content, '(?m)^log$')
 
2168
        self.assertNotContainsRe(content, 'missing the feature')
 
2169
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2170
        skips = result.skip_reasons['_MissingFeature\n']
 
2171
        self.assertEqual(1, len(skips))
 
2172
        test = skips[0]
 
2173
        # RemotedTestCase doesn't preserve the "details"
 
2174
        ## self.assertFalse('log' in test.getDetails())
 
2175
 
 
2176
    def test_xfail_has_no_log(self):
 
2177
        content, result = self.run_subunit_stream('test_xfail')
 
2178
        self.assertNotContainsRe(content, '(?m)^log$')
 
2179
        self.assertNotContainsRe(content, 'test with expected failure')
 
2180
        self.assertEqual(1, len(result.expectedFailures))
 
2181
        result_content = result.expectedFailures[0][1]
 
2182
        self.assertNotContainsRe(result_content,
 
2183
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2184
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2185
 
 
2186
    def test_unexpected_success_has_log(self):
 
2187
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2188
        self.assertContainsRe(content, '(?m)^log$')
 
2189
        self.assertContainsRe(content, 'test with unexpected success')
 
2190
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2191
        #                success, if a min version check is added remove this
 
2192
        from subunit import TestProtocolClient as _Client
 
2193
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
 
2194
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2195
                               ' as a plain success',
 
2196
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2197
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2198
        test = result.unexpectedSuccesses[0]
 
2199
        # RemotedTestCase doesn't preserve the "details"
 
2200
        ## self.assertTrue('log' in test.getDetails())
 
2201
 
 
2202
    def test_success_has_no_log(self):
 
2203
        content, result = self.run_subunit_stream('test_success')
 
2204
        self.assertEqual(1, result.testsRun)
 
2205
        self.assertNotContainsRe(content, '(?m)^log$')
 
2206
        self.assertNotContainsRe(content, 'this test succeeds')
 
2207
 
 
2208
 
1975
2209
class TestRunBzr(tests.TestCase):
1976
2210
 
1977
2211
    out = ''
2100
2334
        # stdout and stderr of the invoked run_bzr
2101
2335
        current_factory = bzrlib.ui.ui_factory
2102
2336
        self.run_bzr(['foo'])
2103
 
        self.failIf(current_factory is self.factory)
 
2337
        self.assertFalse(current_factory is self.factory)
2104
2338
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2105
2339
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2106
2340
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2263
2497
 
2264
2498
 
2265
2499
class TestStartBzrSubProcess(tests.TestCase):
 
2500
    """Stub test start_bzr_subprocess."""
2266
2501
 
2267
 
    def check_popen_state(self):
2268
 
        """Replace to make assertions when popen is called."""
 
2502
    def _subprocess_log_cleanup(self):
 
2503
        """Inhibits the base version as we don't produce a log file."""
2269
2504
 
2270
2505
    def _popen(self, *args, **kwargs):
2271
 
        """Record the command that is run, so that we can ensure it is correct"""
 
2506
        """Override the base version to record the command that is run.
 
2507
 
 
2508
        From there we can ensure it is correct without spawning a real process.
 
2509
        """
2272
2510
        self.check_popen_state()
2273
2511
        self._popen_args = args
2274
2512
        self._popen_kwargs = kwargs
2275
2513
        raise _DontSpawnProcess()
2276
2514
 
 
2515
    def check_popen_state(self):
 
2516
        """Replace to make assertions when popen is called."""
 
2517
 
2277
2518
    def test_run_bzr_subprocess_no_plugins(self):
2278
2519
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2279
2520
        command = self._popen_args[0]
2283
2524
 
2284
2525
    def test_allow_plugins(self):
2285
2526
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
 
            allow_plugins=True)
 
2527
                          allow_plugins=True)
2287
2528
        command = self._popen_args[0]
2288
2529
        self.assertEqual([], command[2:])
2289
2530
 
2290
2531
    def test_set_env(self):
2291
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2532
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2292
2533
        # set in the child
2293
2534
        def check_environment():
2294
2535
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2295
2536
        self.check_popen_state = check_environment
2296
2537
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2297
 
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2538
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
2298
2539
        # not set in theparent
2299
2540
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2541
 
2301
2542
    def test_run_bzr_subprocess_env_del(self):
2302
2543
        """run_bzr_subprocess can remove environment variables too."""
2303
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2544
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2304
2545
        def check_environment():
2305
2546
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2306
2547
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2307
2548
        self.check_popen_state = check_environment
2308
2549
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2309
 
            env_changes={'EXISTANT_ENV_VAR':None})
 
2550
                          env_changes={'EXISTANT_ENV_VAR':None})
2310
2551
        # Still set in parent
2311
2552
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2312
2553
        del os.environ['EXISTANT_ENV_VAR']
2313
2554
 
2314
2555
    def test_env_del_missing(self):
2315
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2556
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2316
2557
        def check_environment():
2317
2558
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2318
2559
        self.check_popen_state = check_environment
2319
2560
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2320
 
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2561
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2562
 
2322
2563
    def test_working_dir(self):
2323
2564
        """Test that we can specify the working dir for the child"""
2326
2567
        chdirs = []
2327
2568
        def chdir(path):
2328
2569
            chdirs.append(path)
2329
 
        os.chdir = chdir
2330
 
        try:
2331
 
            def getcwd():
2332
 
                return 'current'
2333
 
            osutils.getcwd = getcwd
2334
 
            try:
2335
 
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2336
 
                    working_dir='foo')
2337
 
            finally:
2338
 
                osutils.getcwd = orig_getcwd
2339
 
        finally:
2340
 
            os.chdir = orig_chdir
 
2570
        self.overrideAttr(os, 'chdir', chdir)
 
2571
        def getcwd():
 
2572
            return 'current'
 
2573
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2574
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2575
                          working_dir='foo')
2341
2576
        self.assertEqual(['foo', 'current'], chdirs)
2342
2577
 
2343
2578
    def test_get_bzr_path_with_cwd_bzrlib(self):
2363
2598
        self.assertEqual('bzr: interrupted\n', result[1])
2364
2599
 
2365
2600
 
2366
 
class TestFeature(tests.TestCase):
2367
 
 
2368
 
    def test_caching(self):
2369
 
        """Feature._probe is called by the feature at most once."""
2370
 
        class InstrumentedFeature(tests.Feature):
2371
 
            def __init__(self):
2372
 
                super(InstrumentedFeature, self).__init__()
2373
 
                self.calls = []
2374
 
            def _probe(self):
2375
 
                self.calls.append('_probe')
2376
 
                return False
2377
 
        feature = InstrumentedFeature()
2378
 
        feature.available()
2379
 
        self.assertEqual(['_probe'], feature.calls)
2380
 
        feature.available()
2381
 
        self.assertEqual(['_probe'], feature.calls)
2382
 
 
2383
 
    def test_named_str(self):
2384
 
        """Feature.__str__ should thunk to feature_name()."""
2385
 
        class NamedFeature(tests.Feature):
2386
 
            def feature_name(self):
2387
 
                return 'symlinks'
2388
 
        feature = NamedFeature()
2389
 
        self.assertEqual('symlinks', str(feature))
2390
 
 
2391
 
    def test_default_str(self):
2392
 
        """Feature.__str__ should default to __class__.__name__."""
2393
 
        class NamedFeature(tests.Feature):
2394
 
            pass
2395
 
        feature = NamedFeature()
2396
 
        self.assertEqual('NamedFeature', str(feature))
2397
 
 
2398
 
 
2399
 
class TestUnavailableFeature(tests.TestCase):
2400
 
 
2401
 
    def test_access_feature(self):
2402
 
        feature = tests.Feature()
2403
 
        exception = tests.UnavailableFeature(feature)
2404
 
        self.assertIs(feature, exception.args[0])
2405
 
 
2406
 
 
2407
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2408
 
    deprecated_in((2, 1, 0)),
2409
 
    'bzrlib.tests.test_selftest',
2410
 
    'simple_thunk_feature','UnicodeFilename',
2411
 
    replacement_module='bzrlib.tests'
2412
 
    )
2413
 
 
2414
 
class Test_CompatibilityFeature(tests.TestCase):
2415
 
 
2416
 
    def test_does_thunk(self):
2417
 
        res = self.callDeprecated(
2418
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2419
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2420
 
            simple_thunk_feature.available)
2421
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2422
 
 
2423
 
 
2424
 
class TestModuleAvailableFeature(tests.TestCase):
2425
 
 
2426
 
    def test_available_module(self):
2427
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2428
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2429
 
        self.assertEqual('bzrlib.tests', str(feature))
2430
 
        self.assertTrue(feature.available())
2431
 
        self.assertIs(tests, feature.module)
2432
 
 
2433
 
    def test_unavailable_module(self):
2434
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2435
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2436
 
        self.assertFalse(feature.available())
2437
 
        self.assertIs(None, feature.module)
2438
 
 
2439
 
 
2440
2601
class TestSelftestFiltering(tests.TestCase):
2441
2602
 
2442
2603
    def setUp(self):
2593
2754
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2594
2755
 
2595
2756
 
2596
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2757
class TestCheckTreeShape(tests.TestCaseWithTransport):
2597
2758
 
2598
 
    def test_check_inventory_shape(self):
 
2759
    def test_check_tree_shape(self):
2599
2760
        files = ['a', 'b/', 'b/c']
2600
2761
        tree = self.make_branch_and_tree('.')
2601
2762
        self.build_tree(files)
2602
2763
        tree.add(files)
2603
2764
        tree.lock_read()
2604
2765
        try:
2605
 
            self.check_inventory_shape(tree.inventory, files)
 
2766
            self.check_tree_shape(tree, files)
2606
2767
        finally:
2607
2768
            tree.unlock()
2608
2769
 
2940
3101
        tpr.register('bar', 'bBB.aAA.rRR')
2941
3102
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2942
3103
        self.assertThat(self.get_log(),
2943
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3104
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3105
                           doctest.ELLIPSIS))
2944
3106
 
2945
3107
    def test_get_unknown_prefix(self):
2946
3108
        tpr = self._get_registry()
2966
3128
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2967
3129
 
2968
3130
 
 
3131
class TestThreadLeakDetection(tests.TestCase):
 
3132
    """Ensure when tests leak threads we detect and report it"""
 
3133
 
 
3134
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3135
        def __init__(self):
 
3136
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3137
            self.leaks = []
 
3138
        def _report_thread_leak(self, test, leaks, alive):
 
3139
            self.leaks.append((test, leaks))
 
3140
 
 
3141
    def test_testcase_without_addCleanups(self):
 
3142
        """Check old TestCase instances don't break with leak detection"""
 
3143
        class Test(unittest.TestCase):
 
3144
            def runTest(self):
 
3145
                pass
 
3146
        result = self.LeakRecordingResult()
 
3147
        test = Test()
 
3148
        result.startTestRun()
 
3149
        test.run(result)
 
3150
        result.stopTestRun()
 
3151
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3152
        self.assertEqual(result.leaks, [])
 
3153
        
 
3154
    def test_thread_leak(self):
 
3155
        """Ensure a thread that outlives the running of a test is reported
 
3156
 
 
3157
        Uses a thread that blocks on an event, and is started by the inner
 
3158
        test case. As the thread outlives the inner case's run, it should be
 
3159
        detected as a leak, but the event is then set so that the thread can
 
3160
        be safely joined in cleanup so it's not leaked for real.
 
3161
        """
 
3162
        event = threading.Event()
 
3163
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3164
        class Test(tests.TestCase):
 
3165
            def test_leak(self):
 
3166
                thread.start()
 
3167
        result = self.LeakRecordingResult()
 
3168
        test = Test("test_leak")
 
3169
        self.addCleanup(thread.join)
 
3170
        self.addCleanup(event.set)
 
3171
        result.startTestRun()
 
3172
        test.run(result)
 
3173
        result.stopTestRun()
 
3174
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3175
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3176
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3177
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3178
 
 
3179
    def test_multiple_leaks(self):
 
3180
        """Check multiple leaks are blamed on the test cases at fault
 
3181
 
 
3182
        Same concept as the previous test, but has one inner test method that
 
3183
        leaks two threads, and one that doesn't leak at all.
 
3184
        """
 
3185
        event = threading.Event()
 
3186
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3187
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3188
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3189
        class Test(tests.TestCase):
 
3190
            def test_first_leak(self):
 
3191
                thread_b.start()
 
3192
            def test_second_no_leak(self):
 
3193
                pass
 
3194
            def test_third_leak(self):
 
3195
                thread_c.start()
 
3196
                thread_a.start()
 
3197
        result = self.LeakRecordingResult()
 
3198
        first_test = Test("test_first_leak")
 
3199
        third_test = Test("test_third_leak")
 
3200
        self.addCleanup(thread_a.join)
 
3201
        self.addCleanup(thread_b.join)
 
3202
        self.addCleanup(thread_c.join)
 
3203
        self.addCleanup(event.set)
 
3204
        result.startTestRun()
 
3205
        unittest.TestSuite(
 
3206
            [first_test, Test("test_second_no_leak"), third_test]
 
3207
            ).run(result)
 
3208
        result.stopTestRun()
 
3209
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3210
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3211
        self.assertEqual(result.leaks, [
 
3212
            (first_test, set([thread_b])),
 
3213
            (third_test, set([thread_a, thread_c]))])
 
3214
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3215
 
 
3216
 
 
3217
class TestPostMortemDebugging(tests.TestCase):
 
3218
    """Check post mortem debugging works when tests fail or error"""
 
3219
 
 
3220
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3221
        def __init__(self):
 
3222
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3223
            self.postcode = None
 
3224
        def _post_mortem(self, tb=None):
 
3225
            """Record the code object at the end of the current traceback"""
 
3226
            tb = tb or sys.exc_info()[2]
 
3227
            if tb is not None:
 
3228
                next = tb.tb_next
 
3229
                while next is not None:
 
3230
                    tb = next
 
3231
                    next = next.tb_next
 
3232
                self.postcode = tb.tb_frame.f_code
 
3233
        def report_error(self, test, err):
 
3234
            pass
 
3235
        def report_failure(self, test, err):
 
3236
            pass
 
3237
 
 
3238
    def test_location_unittest_error(self):
 
3239
        """Needs right post mortem traceback with erroring unittest case"""
 
3240
        class Test(unittest.TestCase):
 
3241
            def runTest(self):
 
3242
                raise RuntimeError
 
3243
        result = self.TracebackRecordingResult()
 
3244
        Test().run(result)
 
3245
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3246
 
 
3247
    def test_location_unittest_failure(self):
 
3248
        """Needs right post mortem traceback with failing unittest case"""
 
3249
        class Test(unittest.TestCase):
 
3250
            def runTest(self):
 
3251
                raise self.failureException
 
3252
        result = self.TracebackRecordingResult()
 
3253
        Test().run(result)
 
3254
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3255
 
 
3256
    def test_location_bt_error(self):
 
3257
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3258
        class Test(tests.TestCase):
 
3259
            def test_error(self):
 
3260
                raise RuntimeError
 
3261
        result = self.TracebackRecordingResult()
 
3262
        Test("test_error").run(result)
 
3263
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3264
 
 
3265
    def test_location_bt_failure(self):
 
3266
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3267
        class Test(tests.TestCase):
 
3268
            def test_failure(self):
 
3269
                raise self.failureException
 
3270
        result = self.TracebackRecordingResult()
 
3271
        Test("test_failure").run(result)
 
3272
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3273
 
 
3274
    def test_env_var_triggers_post_mortem(self):
 
3275
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3276
        import pdb
 
3277
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3278
        post_mortem_calls = []
 
3279
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3280
        self.overrideEnv('BZR_TEST_PDB', None)
 
3281
        result._post_mortem(1)
 
3282
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3283
        result._post_mortem(2)
 
3284
        self.assertEqual([2], post_mortem_calls)
 
3285
 
 
3286
 
2969
3287
class TestRunSuite(tests.TestCase):
2970
3288
 
2971
3289
    def test_runner_class(self):
2982
3300
                                                self.verbosity)
2983
3301
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2984
3302
        self.assertLength(1, calls)
 
3303
 
 
3304
 
 
3305
class _Selftest(object):
 
3306
    """Mixin for tests needing full selftest output"""
 
3307
 
 
3308
    def _inject_stream_into_subunit(self, stream):
 
3309
        """To be overridden by subclasses that run tests out of process"""
 
3310
 
 
3311
    def _run_selftest(self, **kwargs):
 
3312
        sio = StringIO()
 
3313
        self._inject_stream_into_subunit(sio)
 
3314
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3315
        return sio.getvalue()
 
3316
 
 
3317
 
 
3318
class _ForkedSelftest(_Selftest):
 
3319
    """Mixin for tests needing full selftest output with forked children"""
 
3320
 
 
3321
    _test_needs_features = [features.subunit]
 
3322
 
 
3323
    def _inject_stream_into_subunit(self, stream):
 
3324
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3325
 
 
3326
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3327
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3328
        """
 
3329
        from subunit import ProtocolTestCase
 
3330
        _original_init = ProtocolTestCase.__init__
 
3331
        def _init_with_passthrough(self, *args, **kwargs):
 
3332
            _original_init(self, *args, **kwargs)
 
3333
            self._passthrough = stream
 
3334
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3335
 
 
3336
    def _run_selftest(self, **kwargs):
 
3337
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3338
        if getattr(os, "fork", None) is None:
 
3339
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3340
        # Make sure the fork code is actually invoked by claiming two cores
 
3341
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3342
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3343
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3344
 
 
3345
 
 
3346
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3347
    """Check operation of --parallel=fork selftest option"""
 
3348
 
 
3349
    def test_error_in_child_during_fork(self):
 
3350
        """Error in a forked child during test setup should get reported"""
 
3351
        class Test(tests.TestCase):
 
3352
            def testMethod(self):
 
3353
                pass
 
3354
        # We don't care what, just break something that a child will run
 
3355
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3356
        out = self._run_selftest(test_suite_factory=Test)
 
3357
        # Lines from the tracebacks of the two child processes may be mixed
 
3358
        # together due to the way subunit parses and forwards the streams,
 
3359
        # so permit extra lines between each part of the error output.
 
3360
        self.assertContainsRe(out,
 
3361
            "Traceback.*:\n"
 
3362
            "(?:.*\n)*"
 
3363
            ".+ in fork_for_tests\n"
 
3364
            "(?:.*\n)*"
 
3365
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3366
            "(?:.*\n)*"
 
3367
            "TypeError:")
 
3368
 
 
3369
 
 
3370
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3371
    """Check a test case still alive after being run emits a warning"""
 
3372
 
 
3373
    class Test(tests.TestCase):
 
3374
        def test_pass(self):
 
3375
            pass
 
3376
        def test_self_ref(self):
 
3377
            self.also_self = self.test_self_ref
 
3378
        def test_skip(self):
 
3379
            self.skip("Don't need")
 
3380
 
 
3381
    def _get_suite(self):
 
3382
        return TestUtil.TestSuite([
 
3383
            self.Test("test_pass"),
 
3384
            self.Test("test_self_ref"),
 
3385
            self.Test("test_skip"),
 
3386
            ])
 
3387
 
 
3388
    def _run_selftest_with_suite(self, **kwargs):
 
3389
        old_flags = tests.selftest_debug_flags
 
3390
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3391
        gc_on = gc.isenabled()
 
3392
        if gc_on:
 
3393
            gc.disable()
 
3394
        try:
 
3395
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3396
                **kwargs)
 
3397
        finally:
 
3398
            if gc_on:
 
3399
                gc.enable()
 
3400
            tests.selftest_debug_flags = old_flags
 
3401
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3402
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3403
        return output
 
3404
 
 
3405
    def test_testsuite(self):
 
3406
        self._run_selftest_with_suite()
 
3407
 
 
3408
    def test_pattern(self):
 
3409
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3410
        self.assertNotContainsRe(out, "test_skip")
 
3411
 
 
3412
    def test_exclude_pattern(self):
 
3413
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3414
        self.assertNotContainsRe(out, "test_skip")
 
3415
 
 
3416
    def test_random_seed(self):
 
3417
        self._run_selftest_with_suite(random_seed="now")
 
3418
 
 
3419
    def test_matching_tests_first(self):
 
3420
        self._run_selftest_with_suite(matching_tests_first=True,
 
3421
            pattern="test_self_ref$")
 
3422
 
 
3423
    def test_starting_with_and_exclude(self):
 
3424
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3425
            exclude_pattern="test_skip$")
 
3426
        self.assertNotContainsRe(out, "test_skip")
 
3427
 
 
3428
    def test_additonal_decorator(self):
 
3429
        out = self._run_selftest_with_suite(
 
3430
            suite_decorators=[tests.TestDecorator])
 
3431
 
 
3432
 
 
3433
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3434
    """Check warnings from tests staying alive are emitted with subunit"""
 
3435
 
 
3436
    _test_needs_features = [features.subunit]
 
3437
 
 
3438
    def _run_selftest_with_suite(self, **kwargs):
 
3439
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3440
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3441
 
 
3442
 
 
3443
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3444
    """Check warnings from tests staying alive are emitted when forking"""
 
3445
 
 
3446
 
 
3447
class TestEnvironHandling(tests.TestCase):
 
3448
 
 
3449
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3450
        self.assertFalse('MYVAR' in os.environ)
 
3451
        self.overrideEnv('MYVAR', '42')
 
3452
        # We use an embedded test to make sure we fix the _captureVar bug
 
3453
        class Test(tests.TestCase):
 
3454
            def test_me(self):
 
3455
                # The first call save the 42 value
 
3456
                self.overrideEnv('MYVAR', None)
 
3457
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3458
                # Make sure we can call it twice
 
3459
                self.overrideEnv('MYVAR', None)
 
3460
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3461
        output = StringIO()
 
3462
        result = tests.TextTestResult(output, 0, 1)
 
3463
        Test('test_me').run(result)
 
3464
        if not result.wasStrictlySuccessful():
 
3465
            self.fail(output.getvalue())
 
3466
        # We get our value back
 
3467
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3468
 
 
3469
 
 
3470
class TestIsolatedEnv(tests.TestCase):
 
3471
    """Test isolating tests from os.environ.
 
3472
 
 
3473
    Since we use tests that are already isolated from os.environ a bit of care
 
3474
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3475
    The tests start an already clean os.environ which allow doing valid
 
3476
    assertions about which variables are present or not and design tests around
 
3477
    these assertions.
 
3478
    """
 
3479
 
 
3480
    class ScratchMonkey(tests.TestCase):
 
3481
 
 
3482
        def test_me(self):
 
3483
            pass
 
3484
 
 
3485
    def test_basics(self):
 
3486
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3487
        # for tests.TestCase.
 
3488
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3489
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3490
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3491
        self.assertFalse('BZR_HOME' in os.environ)
 
3492
        # Make sure we know the definition of LINES: part of os.environ for
 
3493
        # tests.TestCase
 
3494
        self.assertTrue('LINES' in tests.isolated_environ)
 
3495
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3496
        self.assertEquals('25', os.environ['LINES'])
 
3497
 
 
3498
    def test_injecting_unknown_variable(self):
 
3499
        # BZR_HOME is known to be absent from os.environ
 
3500
        test = self.ScratchMonkey('test_me')
 
3501
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3502
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3503
        tests.restore_os_environ(test)
 
3504
        self.assertFalse('BZR_HOME' in os.environ)
 
3505
 
 
3506
    def test_injecting_known_variable(self):
 
3507
        test = self.ScratchMonkey('test_me')
 
3508
        # LINES is known to be present in os.environ
 
3509
        tests.override_os_environ(test, {'LINES': '42'})
 
3510
        self.assertEquals('42', os.environ['LINES'])
 
3511
        tests.restore_os_environ(test)
 
3512
        self.assertEquals('25', os.environ['LINES'])
 
3513
 
 
3514
    def test_deleting_variable(self):
 
3515
        test = self.ScratchMonkey('test_me')
 
3516
        # LINES is known to be present in os.environ
 
3517
        tests.override_os_environ(test, {'LINES': None})
 
3518
        self.assertTrue('LINES' not in os.environ)
 
3519
        tests.restore_os_environ(test)
 
3520
        self.assertEquals('25', os.environ['LINES'])
 
3521
 
 
3522
 
 
3523
class TestDocTestSuiteIsolation(tests.TestCase):
 
3524
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3525
 
 
3526
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3527
    the clean environment as a base for testing. To precisely capture the
 
3528
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3529
    compare against.
 
3530
 
 
3531
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3532
    not `os.environ` so each test overrides it to suit its needs.
 
3533
 
 
3534
    """
 
3535
 
 
3536
    def get_doctest_suite_for_string(self, klass, string):
 
3537
        class Finder(doctest.DocTestFinder):
 
3538
 
 
3539
            def find(*args, **kwargs):
 
3540
                test = doctest.DocTestParser().get_doctest(
 
3541
                    string, {}, 'foo', 'foo.py', 0)
 
3542
                return [test]
 
3543
 
 
3544
        suite = klass(test_finder=Finder())
 
3545
        return suite
 
3546
 
 
3547
    def run_doctest_suite_for_string(self, klass, string):
 
3548
        suite = self.get_doctest_suite_for_string(klass, string)
 
3549
        output = StringIO()
 
3550
        result = tests.TextTestResult(output, 0, 1)
 
3551
        suite.run(result)
 
3552
        return result, output
 
3553
 
 
3554
    def assertDocTestStringSucceds(self, klass, string):
 
3555
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3556
        if not result.wasStrictlySuccessful():
 
3557
            self.fail(output.getvalue())
 
3558
 
 
3559
    def assertDocTestStringFails(self, klass, string):
 
3560
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3561
        if result.wasStrictlySuccessful():
 
3562
            self.fail(output.getvalue())
 
3563
 
 
3564
    def test_injected_variable(self):
 
3565
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3566
        test = """
 
3567
            >>> import os
 
3568
            >>> os.environ['LINES']
 
3569
            '42'
 
3570
            """
 
3571
        # doctest.DocTestSuite fails as it sees '25'
 
3572
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3573
        # tests.DocTestSuite sees '42'
 
3574
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3575
 
 
3576
    def test_deleted_variable(self):
 
3577
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3578
        test = """
 
3579
            >>> import os
 
3580
            >>> os.environ.get('LINES')
 
3581
            """
 
3582
        # doctest.DocTestSuite fails as it sees '25'
 
3583
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3584
        # tests.DocTestSuite sees None
 
3585
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3586
 
 
3587
 
 
3588
class TestSelftestExcludePatterns(tests.TestCase):
 
3589
 
 
3590
    def setUp(self):
 
3591
        super(TestSelftestExcludePatterns, self).setUp()
 
3592
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3593
 
 
3594
    def suite_factory(self, keep_only=None, starting_with=None):
 
3595
        """A test suite factory with only a few tests."""
 
3596
        class Test(tests.TestCase):
 
3597
            def id(self):
 
3598
                # We don't need the full class path
 
3599
                return self._testMethodName
 
3600
            def a(self):
 
3601
                pass
 
3602
            def b(self):
 
3603
                pass
 
3604
            def c(self):
 
3605
                pass
 
3606
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3607
 
 
3608
    def assertTestList(self, expected, *selftest_args):
 
3609
        # We rely on setUp installing the right test suite factory so we can
 
3610
        # test at the command level without loading the whole test suite
 
3611
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3612
        actual = out.splitlines()
 
3613
        self.assertEquals(expected, actual)
 
3614
 
 
3615
    def test_full_list(self):
 
3616
        self.assertTestList(['a', 'b', 'c'])
 
3617
 
 
3618
    def test_single_exclude(self):
 
3619
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3620
 
 
3621
    def test_mutiple_excludes(self):
 
3622
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3623
 
 
3624
 
 
3625
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3626
 
 
3627
    _test_needs_features = [features.subunit]
 
3628
 
 
3629
    def setUp(self):
 
3630
        super(TestCounterHooks, self).setUp()
 
3631
        class Test(tests.TestCase):
 
3632
 
 
3633
            def setUp(self):
 
3634
                super(Test, self).setUp()
 
3635
                self.hooks = hooks.Hooks()
 
3636
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
 
3637
                self.install_counter_hook(self.hooks, 'myhook')
 
3638
 
 
3639
            def no_hook(self):
 
3640
                pass
 
3641
 
 
3642
            def run_hook_once(self):
 
3643
                for hook in self.hooks['myhook']:
 
3644
                    hook(self)
 
3645
 
 
3646
        self.test_class = Test
 
3647
 
 
3648
    def assertHookCalls(self, expected_calls, test_name):
 
3649
        test = self.test_class(test_name)
 
3650
        result = unittest.TestResult()
 
3651
        test.run(result)
 
3652
        self.assertTrue(hasattr(test, '_counters'))
 
3653
        self.assertTrue(test._counters.has_key('myhook'))
 
3654
        self.assertEquals(expected_calls, test._counters['myhook'])
 
3655
 
 
3656
    def test_no_hook(self):
 
3657
        self.assertHookCalls(0, 'no_hook')
 
3658
 
 
3659
    def test_run_hook_once(self):
 
3660
        tt = features.testtools
 
3661
        if tt.module.__version__ < (0, 9, 8):
 
3662
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3663
        self.assertHookCalls(1, 'run_hook_once')