~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Andrew Bennetts
  • Date: 2010-07-29 11:17:57 UTC
  • mfrom: (5050.3.17 2.2)
  • mto: This revision was merged to the branch mainline in revision 5365.
  • Revision ID: andrew.bennetts@canonical.com-20100729111757-018h3pcefo7z0dnq
Merge lp:bzr/2.2 into lp:bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import gc
21
 
import doctest
 
20
from doctest import ELLIPSIS
22
21
import os
23
22
import signal
24
23
import sys
25
 
import threading
26
24
import time
27
25
import unittest
28
26
import warnings
29
27
 
30
 
from testtools import (
31
 
    ExtendedToOriginalDecorator,
32
 
    MultiTestResult,
33
 
    __version__ as testtools_version,
34
 
    )
35
 
from testtools.content import Content
 
28
from testtools import MultiTestResult
36
29
from testtools.content_type import ContentType
37
30
from testtools.matchers import (
38
31
    DocTestMatches,
39
32
    Equals,
40
33
    )
41
 
import testtools.testresult.doubles
 
34
import testtools.tests.helpers
42
35
 
43
36
import bzrlib
44
37
from bzrlib import (
45
38
    branchbuilder,
46
39
    bzrdir,
 
40
    debug,
47
41
    errors,
48
 
    hooks,
49
42
    lockdir,
50
43
    memorytree,
51
44
    osutils,
 
45
    progress,
52
46
    remote,
53
47
    repository,
54
48
    symbol_versioning,
55
49
    tests,
56
50
    transport,
57
51
    workingtree,
58
 
    workingtree_3,
59
 
    workingtree_4,
60
52
    )
61
53
from bzrlib.repofmt import (
62
54
    groupcompress_repo,
 
55
    pack_repo,
 
56
    weaverepo,
63
57
    )
64
58
from bzrlib.symbol_versioning import (
65
59
    deprecated_function,
70
64
    features,
71
65
    test_lsprof,
72
66
    test_server,
 
67
    test_sftp_transport,
73
68
    TestUtil,
74
69
    )
75
 
from bzrlib.trace import note, mutter
 
70
from bzrlib.trace import note
76
71
from bzrlib.transport import memory
 
72
from bzrlib.version import _get_bzr_source_tree
77
73
 
78
74
 
79
75
def _test_ids(test_suite):
81
77
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
82
78
 
83
79
 
 
80
class SelftestTests(tests.TestCase):
 
81
 
 
82
    def test_import_tests(self):
 
83
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
84
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
85
 
 
86
    def test_import_test_failure(self):
 
87
        self.assertRaises(ImportError,
 
88
                          TestUtil._load_module_by_name,
 
89
                          'bzrlib.no-name-yet')
 
90
 
 
91
 
84
92
class MetaTestLog(tests.TestCase):
85
93
 
86
94
    def test_logging(self):
92
100
            "text", "plain", {"charset": "utf8"})))
93
101
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
94
102
        self.assertThat(self.get_log(),
95
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
103
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
104
 
 
105
 
 
106
class TestUnicodeFilename(tests.TestCase):
 
107
 
 
108
    def test_probe_passes(self):
 
109
        """UnicodeFilename._probe passes."""
 
110
        # We can't test much more than that because the behaviour depends
 
111
        # on the platform.
 
112
        tests.UnicodeFilename._probe()
96
113
 
97
114
 
98
115
class TestTreeShape(tests.TestCaseInTempDir):
99
116
 
100
117
    def test_unicode_paths(self):
101
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
118
        self.requireFeature(tests.UnicodeFilename)
102
119
 
103
120
        filename = u'hell\u00d8'
104
121
        self.build_tree_contents([(filename, 'contents of hello')])
105
 
        self.assertPathExists(filename)
106
 
 
107
 
 
108
 
class TestClassesAvailable(tests.TestCase):
109
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
110
 
 
111
 
    def test_test_case(self):
112
 
        from bzrlib.tests import TestCase
113
 
 
114
 
    def test_test_loader(self):
115
 
        from bzrlib.tests import TestLoader
116
 
 
117
 
    def test_test_suite(self):
118
 
        from bzrlib.tests import TestSuite
 
122
        self.failUnlessExists(filename)
119
123
 
120
124
 
121
125
class TestTransportScenarios(tests.TestCase):
204
208
    def test_scenarios(self):
205
209
        # check that constructor parameters are passed through to the adapted
206
210
        # test.
207
 
        from bzrlib.tests.per_controldir import make_scenarios
 
211
        from bzrlib.tests.per_bzrdir import make_scenarios
208
212
        vfs_factory = "v"
209
213
        server1 = "a"
210
214
        server2 = "b"
308
312
        from bzrlib.tests.per_interrepository import make_scenarios
309
313
        server1 = "a"
310
314
        server2 = "b"
311
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
315
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
312
316
        scenarios = make_scenarios(server1, server2, formats)
313
317
        self.assertEqual([
314
318
            ('C0,str,str',
315
319
             {'repository_format': 'C1',
316
320
              'repository_format_to': 'C2',
317
321
              'transport_readonly_server': 'b',
318
 
              'transport_server': 'a',
319
 
              'extra_setup': 'C3'}),
 
322
              'transport_server': 'a'}),
320
323
            ('D0,str,str',
321
324
             {'repository_format': 'D1',
322
325
              'repository_format_to': 'D2',
323
326
              'transport_readonly_server': 'b',
324
 
              'transport_server': 'a',
325
 
              'extra_setup': 'D3'})],
 
327
              'transport_server': 'a'})],
326
328
            scenarios)
327
329
 
328
330
 
334
336
        from bzrlib.tests.per_workingtree import make_scenarios
335
337
        server1 = "a"
336
338
        server2 = "b"
337
 
        formats = [workingtree_4.WorkingTreeFormat4(),
338
 
                   workingtree_3.WorkingTreeFormat3(),]
 
339
        formats = [workingtree.WorkingTreeFormat2(),
 
340
                   workingtree.WorkingTreeFormat3(),]
339
341
        scenarios = make_scenarios(server1, server2, formats)
340
342
        self.assertEqual([
341
 
            ('WorkingTreeFormat4',
 
343
            ('WorkingTreeFormat2',
342
344
             {'bzrdir_format': formats[0]._matchingbzrdir,
343
345
              'transport_readonly_server': 'b',
344
346
              'transport_server': 'a',
371
373
            )
372
374
        server1 = "a"
373
375
        server2 = "b"
374
 
        formats = [workingtree_4.WorkingTreeFormat4(),
375
 
                   workingtree_3.WorkingTreeFormat3(),]
 
376
        formats = [workingtree.WorkingTreeFormat2(),
 
377
                   workingtree.WorkingTreeFormat3(),]
376
378
        scenarios = make_scenarios(server1, server2, formats)
377
379
        self.assertEqual(7, len(scenarios))
378
 
        default_wt_format = workingtree.format_registry.get_default()
379
 
        wt4_format = workingtree_4.WorkingTreeFormat4()
380
 
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
380
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
381
        wt4_format = workingtree.WorkingTreeFormat4()
 
382
        wt5_format = workingtree.WorkingTreeFormat5()
381
383
        expected_scenarios = [
382
 
            ('WorkingTreeFormat4',
 
384
            ('WorkingTreeFormat2',
383
385
             {'bzrdir_format': formats[0]._matchingbzrdir,
384
386
              'transport_readonly_server': 'b',
385
387
              'transport_server': 'a',
445
447
        # ones to add.
446
448
        from bzrlib.tests.per_tree import (
447
449
            return_parameter,
 
450
            revision_tree_from_workingtree
448
451
            )
449
452
        from bzrlib.tests.per_intertree import (
450
453
            make_scenarios,
451
454
            )
452
 
        from bzrlib.workingtree_3 import WorkingTreeFormat3
453
 
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
455
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
454
456
        input_test = TestInterTreeScenarios(
455
457
            "test_scenarios")
456
458
        server1 = "a"
457
459
        server2 = "b"
458
 
        format1 = WorkingTreeFormat4()
 
460
        format1 = WorkingTreeFormat2()
459
461
        format2 = WorkingTreeFormat3()
460
462
        formats = [("1", str, format1, format2, "converter1"),
461
463
            ("2", int, format2, format1, "converter2")]
507
509
        self.assertRaises(AssertionError, self.assertEqualStat,
508
510
            os.lstat("foo"), os.lstat("longname"))
509
511
 
510
 
    def test_failUnlessExists(self):
511
 
        """Deprecated failUnlessExists and failIfExists"""
512
 
        self.applyDeprecated(
513
 
            deprecated_in((2, 4)),
514
 
            self.failUnlessExists, '.')
515
 
        self.build_tree(['foo/', 'foo/bar'])
516
 
        self.applyDeprecated(
517
 
            deprecated_in((2, 4)),
518
 
            self.failUnlessExists, 'foo/bar')
519
 
        self.applyDeprecated(
520
 
            deprecated_in((2, 4)),
521
 
            self.failIfExists, 'foo/foo')
522
 
 
523
 
    def test_assertPathExists(self):
524
 
        self.assertPathExists('.')
525
 
        self.build_tree(['foo/', 'foo/bar'])
526
 
        self.assertPathExists('foo/bar')
527
 
        self.assertPathDoesNotExist('foo/foo')
528
 
 
529
512
 
530
513
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
531
514
 
565
548
        tree = self.make_branch_and_memory_tree('dir')
566
549
        # Guard against regression into MemoryTransport leaking
567
550
        # files to disk instead of keeping them in memory.
568
 
        self.assertFalse(osutils.lexists('dir'))
 
551
        self.failIf(osutils.lexists('dir'))
569
552
        self.assertIsInstance(tree, memorytree.MemoryTree)
570
553
 
571
554
    def test_make_branch_and_memory_tree_with_format(self):
572
555
        """make_branch_and_memory_tree should accept a format option."""
573
556
        format = bzrdir.BzrDirMetaFormat1()
574
 
        format.repository_format = repository.format_registry.get_default()
 
557
        format.repository_format = weaverepo.RepositoryFormat7()
575
558
        tree = self.make_branch_and_memory_tree('dir', format=format)
576
559
        # Guard against regression into MemoryTransport leaking
577
560
        # files to disk instead of keeping them in memory.
578
 
        self.assertFalse(osutils.lexists('dir'))
 
561
        self.failIf(osutils.lexists('dir'))
579
562
        self.assertIsInstance(tree, memorytree.MemoryTree)
580
563
        self.assertEqual(format.repository_format.__class__,
581
564
            tree.branch.repository._format.__class__)
585
568
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
586
569
        # Guard against regression into MemoryTransport leaking
587
570
        # files to disk instead of keeping them in memory.
588
 
        self.assertFalse(osutils.lexists('dir'))
 
571
        self.failIf(osutils.lexists('dir'))
589
572
 
590
573
    def test_make_branch_builder_with_format(self):
591
574
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
592
575
        # that the format objects are used.
593
576
        format = bzrdir.BzrDirMetaFormat1()
594
 
        repo_format = repository.format_registry.get_default()
 
577
        repo_format = weaverepo.RepositoryFormat7()
595
578
        format.repository_format = repo_format
596
579
        builder = self.make_branch_builder('dir', format=format)
597
580
        the_branch = builder.get_branch()
598
581
        # Guard against regression into MemoryTransport leaking
599
582
        # files to disk instead of keeping them in memory.
600
 
        self.assertFalse(osutils.lexists('dir'))
 
583
        self.failIf(osutils.lexists('dir'))
601
584
        self.assertEqual(format.repository_format.__class__,
602
585
                         the_branch.repository._format.__class__)
603
586
        self.assertEqual(repo_format.get_format_string(),
609
592
        the_branch = builder.get_branch()
610
593
        # Guard against regression into MemoryTransport leaking
611
594
        # files to disk instead of keeping them in memory.
612
 
        self.assertFalse(osutils.lexists('dir'))
 
595
        self.failIf(osutils.lexists('dir'))
613
596
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
614
597
        self.assertEqual(dir_format.repository_format.__class__,
615
598
                         the_branch.repository._format.__class__)
620
603
    def test_dangling_locks_cause_failures(self):
621
604
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
622
605
            def test_function(self):
623
 
                t = self.get_transport_from_path('.')
 
606
                t = self.get_transport('.')
624
607
                l = lockdir.LockDir(t, 'lock')
625
608
                l.create()
626
609
                l.attempt_lock()
628
611
        result = test.run()
629
612
        total_failures = result.errors + result.failures
630
613
        if self._lock_check_thorough:
631
 
            self.assertEqual(1, len(total_failures))
 
614
            self.assertLength(1, total_failures)
632
615
        else:
633
616
            # When _lock_check_thorough is disabled, then we don't trigger a
634
617
            # failure
635
 
            self.assertEqual(0, len(total_failures))
 
618
            self.assertLength(0, total_failures)
636
619
 
637
620
 
638
621
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
646
629
        # for the server
647
630
        url = self.get_readonly_url()
648
631
        url2 = self.get_readonly_url('foo/bar')
649
 
        t = transport.get_transport_from_url(url)
650
 
        t2 = transport.get_transport_from_url(url2)
651
 
        self.assertIsInstance(t, ReadonlyTransportDecorator)
652
 
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
632
        t = transport.get_transport(url)
 
633
        t2 = transport.get_transport(url2)
 
634
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
635
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
653
636
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
637
 
655
638
    def test_get_readonly_url_http(self):
661
644
        url = self.get_readonly_url()
662
645
        url2 = self.get_readonly_url('foo/bar')
663
646
        # the transport returned may be any HttpTransportBase subclass
664
 
        t = transport.get_transport_from_url(url)
665
 
        t2 = transport.get_transport_from_url(url2)
666
 
        self.assertIsInstance(t, HttpTransportBase)
667
 
        self.assertIsInstance(t2, HttpTransportBase)
 
647
        t = transport.get_transport(url)
 
648
        t2 = transport.get_transport(url2)
 
649
        self.failUnless(isinstance(t, HttpTransportBase))
 
650
        self.failUnless(isinstance(t2, HttpTransportBase))
668
651
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
669
652
 
670
653
    def test_is_directory(self):
678
661
    def test_make_branch_builder(self):
679
662
        builder = self.make_branch_builder('dir')
680
663
        rev_id = builder.build_commit()
681
 
        self.assertPathExists('dir')
 
664
        self.failUnlessExists('dir')
682
665
        a_dir = bzrdir.BzrDir.open('dir')
683
666
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
684
667
        a_branch = a_dir.open_branch()
700
683
        self.assertIsInstance(result_bzrdir.transport,
701
684
                              memory.MemoryTransport)
702
685
        # should not be on disk, should only be in memory
703
 
        self.assertPathDoesNotExist('subdir')
 
686
        self.failIfExists('subdir')
704
687
 
705
688
 
706
689
class TestChrootedTest(tests.ChrootedTestCase):
707
690
 
708
691
    def test_root_is_root(self):
709
 
        t = transport.get_transport_from_url(self.get_readonly_url())
 
692
        t = transport.get_transport(self.get_readonly_url())
710
693
        url = t.base
711
694
        self.assertEqual(url, t.clone('..').base)
712
695
 
714
697
class TestProfileResult(tests.TestCase):
715
698
 
716
699
    def test_profiles_tests(self):
717
 
        self.requireFeature(features.lsprof_feature)
718
 
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
700
        self.requireFeature(test_lsprof.LSProfFeature)
 
701
        terminal = testtools.tests.helpers.ExtendedTestResult()
719
702
        result = tests.ProfileResult(terminal)
720
703
        class Sample(tests.TestCase):
721
704
            def a(self):
738
721
                descriptions=0,
739
722
                verbosity=1,
740
723
                )
741
 
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
724
        capture = testtools.tests.helpers.ExtendedTestResult()
742
725
        test_case.run(MultiTestResult(result, capture))
743
726
        run_case = capture._events[0][1]
744
727
        timed_string = result._testTimeString(run_case)
765
748
        self.check_timing(ShortDelayTestCase('test_short_delay'),
766
749
                          r"^ +[0-9]+ms$")
767
750
 
 
751
    def _patch_get_bzr_source_tree(self):
 
752
        # Reading from the actual source tree breaks isolation, but we don't
 
753
        # want to assume that thats *all* that would happen.
 
754
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
 
755
 
 
756
    def test_assigned_benchmark_file_stores_date(self):
 
757
        self._patch_get_bzr_source_tree()
 
758
        output = StringIO()
 
759
        result = bzrlib.tests.TextTestResult(self._log_file,
 
760
                                        descriptions=0,
 
761
                                        verbosity=1,
 
762
                                        bench_history=output
 
763
                                        )
 
764
        output_string = output.getvalue()
 
765
        # if you are wondering about the regexp please read the comment in
 
766
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
767
        # XXX: what comment?  -- Andrew Bennetts
 
768
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
769
 
 
770
    def test_benchhistory_records_test_times(self):
 
771
        self._patch_get_bzr_source_tree()
 
772
        result_stream = StringIO()
 
773
        result = bzrlib.tests.TextTestResult(
 
774
            self._log_file,
 
775
            descriptions=0,
 
776
            verbosity=1,
 
777
            bench_history=result_stream
 
778
            )
 
779
 
 
780
        # we want profile a call and check that its test duration is recorded
 
781
        # make a new test instance that when run will generate a benchmark
 
782
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
783
        # execute the test, which should succeed and record times
 
784
        example_test_case.run(result)
 
785
        lines = result_stream.getvalue().splitlines()
 
786
        self.assertEqual(2, len(lines))
 
787
        self.assertContainsRe(lines[1],
 
788
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
789
            "._time_hello_world_encoding")
 
790
 
768
791
    def _time_hello_world_encoding(self):
769
792
        """Profile two sleep calls
770
793
 
775
798
 
776
799
    def test_lsprofiling(self):
777
800
        """Verbose test result prints lsprof statistics from test cases."""
778
 
        self.requireFeature(features.lsprof_feature)
 
801
        self.requireFeature(test_lsprof.LSProfFeature)
779
802
        result_stream = StringIO()
780
803
        result = bzrlib.tests.VerboseTestResult(
781
 
            result_stream,
 
804
            unittest._WritelnDecorator(result_stream),
782
805
            descriptions=0,
783
806
            verbosity=2,
784
807
            )
810
833
        self.assertContainsRe(output,
811
834
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
812
835
 
813
 
    def test_uses_time_from_testtools(self):
814
 
        """Test case timings in verbose results should use testtools times"""
815
 
        import datetime
816
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
817
 
            def startTest(self, test):
818
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
819
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
820
 
            def addSuccess(self, test):
821
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
822
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
823
 
            def report_tests_starting(self): pass
824
 
        sio = StringIO()
825
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
826
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
827
 
 
828
836
    def test_known_failure(self):
829
 
        """Using knownFailure should trigger several result actions."""
 
837
        """A KnownFailure being raised should trigger several result actions."""
830
838
        class InstrumentedTestResult(tests.ExtendedTestResult):
831
839
            def stopTestRun(self): pass
832
 
            def report_tests_starting(self): pass
 
840
            def startTests(self): pass
 
841
            def report_test_start(self, test): pass
833
842
            def report_known_failure(self, test, err=None, details=None):
834
843
                self._call = test, 'known failure'
835
844
        result = InstrumentedTestResult(None, None, None, None)
836
845
        class Test(tests.TestCase):
837
846
            def test_function(self):
838
 
                self.knownFailure('failed!')
 
847
                raise tests.KnownFailure('failed!')
839
848
        test = Test("test_function")
840
849
        test.run(result)
841
850
        # it should invoke 'report_known_failure'.
853
862
        # verbose test output formatting
854
863
        result_stream = StringIO()
855
864
        result = bzrlib.tests.VerboseTestResult(
856
 
            result_stream,
 
865
            unittest._WritelnDecorator(result_stream),
857
866
            descriptions=0,
858
867
            verbosity=2,
859
868
            )
860
 
        _get_test("test_xfail").run(result)
861
 
        self.assertContainsRe(result_stream.getvalue(),
862
 
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
863
 
            "\\s*(?:Text attachment: )?reason"
864
 
            "(?:\n-+\n|: {{{)"
865
 
            "this_fails"
866
 
            "(?:\n-+\n|}}}\n)")
 
869
        test = self.get_passing_test()
 
870
        result.startTest(test)
 
871
        prefix = len(result_stream.getvalue())
 
872
        # the err parameter has the shape:
 
873
        # (class, exception object, traceback)
 
874
        # KnownFailures dont get their tracebacks shown though, so we
 
875
        # can skip that.
 
876
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
877
        result.report_known_failure(test, err)
 
878
        output = result_stream.getvalue()[prefix:]
 
879
        lines = output.splitlines()
 
880
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
881
        self.assertEqual(lines[1], '    foo')
 
882
        self.assertEqual(2, len(lines))
867
883
 
868
884
    def get_passing_test(self):
869
885
        """Return a test object that can't be run usefully."""
875
891
        """Test the behaviour of invoking addNotSupported."""
876
892
        class InstrumentedTestResult(tests.ExtendedTestResult):
877
893
            def stopTestRun(self): pass
878
 
            def report_tests_starting(self): pass
 
894
            def startTests(self): pass
 
895
            def report_test_start(self, test): pass
879
896
            def report_unsupported(self, test, feature):
880
897
                self._call = test, feature
881
898
        result = InstrumentedTestResult(None, None, None, None)
882
899
        test = SampleTestCase('_test_pass')
883
 
        feature = features.Feature()
 
900
        feature = tests.Feature()
884
901
        result.startTest(test)
885
902
        result.addNotSupported(test, feature)
886
903
        # it should invoke 'report_unsupported'.
900
917
        # verbose test output formatting
901
918
        result_stream = StringIO()
902
919
        result = bzrlib.tests.VerboseTestResult(
903
 
            result_stream,
 
920
            unittest._WritelnDecorator(result_stream),
904
921
            descriptions=0,
905
922
            verbosity=2,
906
923
            )
907
924
        test = self.get_passing_test()
908
 
        feature = features.Feature()
 
925
        feature = tests.Feature()
909
926
        result.startTest(test)
910
927
        prefix = len(result_stream.getvalue())
911
928
        result.report_unsupported(test, feature)
920
937
        """An UnavailableFeature being raised should invoke addNotSupported."""
921
938
        class InstrumentedTestResult(tests.ExtendedTestResult):
922
939
            def stopTestRun(self): pass
923
 
            def report_tests_starting(self): pass
 
940
            def startTests(self): pass
 
941
            def report_test_start(self, test): pass
924
942
            def addNotSupported(self, test, feature):
925
943
                self._call = test, feature
926
944
        result = InstrumentedTestResult(None, None, None, None)
927
 
        feature = features.Feature()
 
945
        feature = tests.Feature()
928
946
        class Test(tests.TestCase):
929
947
            def test_function(self):
930
948
                raise tests.UnavailableFeature(feature)
949
967
    def test_strict_with_known_failure(self):
950
968
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
951
969
                                             verbosity=1)
952
 
        test = _get_test("test_xfail")
953
 
        test.run(result)
 
970
        test = self.get_passing_test()
 
971
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
972
        result.addExpectedFailure(test, err)
954
973
        self.assertFalse(result.wasStrictlySuccessful())
955
974
        self.assertEqual(None, result._extractBenchmarkTime(test))
956
975
 
967
986
        class InstrumentedTestResult(tests.ExtendedTestResult):
968
987
            calls = 0
969
988
            def startTests(self): self.calls += 1
 
989
            def report_test_start(self, test): pass
970
990
        result = InstrumentedTestResult(None, None, None, None)
971
991
        def test_function():
972
992
            pass
974
994
        test.run(result)
975
995
        self.assertEquals(1, result.calls)
976
996
 
977
 
    def test_startTests_only_once(self):
978
 
        """With multiple tests startTests should still only be called once"""
979
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
980
 
            calls = 0
981
 
            def startTests(self): self.calls += 1
982
 
        result = InstrumentedTestResult(None, None, None, None)
983
 
        suite = unittest.TestSuite([
984
 
            unittest.FunctionTestCase(lambda: None),
985
 
            unittest.FunctionTestCase(lambda: None)])
986
 
        suite.run(result)
987
 
        self.assertEquals(1, result.calls)
988
 
        self.assertEquals(2, result.count)
 
997
 
 
998
class TestUnicodeFilenameFeature(tests.TestCase):
 
999
 
 
1000
    def test_probe_passes(self):
 
1001
        """UnicodeFilenameFeature._probe passes."""
 
1002
        # We can't test much more than that because the behaviour depends
 
1003
        # on the platform.
 
1004
        tests.UnicodeFilenameFeature._probe()
989
1005
 
990
1006
 
991
1007
class TestRunner(tests.TestCase):
1004
1020
        because of our use of global state.
1005
1021
        """
1006
1022
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1023
        old_leak = tests.TestCase._first_thread_leaker_id
1007
1024
        try:
1008
1025
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1026
            tests.TestCase._first_thread_leaker_id = None
1009
1027
            return testrunner.run(test)
1010
1028
        finally:
1011
1029
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1030
            tests.TestCase._first_thread_leaker_id = old_leak
1012
1031
 
1013
1032
    def test_known_failure_failed_run(self):
1014
1033
        # run a test that generates a known failure which should be printed in
1019
1038
        test = unittest.TestSuite()
1020
1039
        test.addTest(Test("known_failure_test"))
1021
1040
        def failing_test():
1022
 
            raise AssertionError('foo')
 
1041
            self.fail('foo')
1023
1042
        test.addTest(unittest.FunctionTestCase(failing_test))
1024
1043
        stream = StringIO()
1025
1044
        runner = tests.TextTestRunner(stream=stream)
1033
1052
            '^----------------------------------------------------------------------\n'
1034
1053
            'Traceback \\(most recent call last\\):\n'
1035
1054
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1036
 
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1055
            '    self.fail\\(\'foo\'\\)\n'
1037
1056
            '.*'
1038
1057
            '^----------------------------------------------------------------------\n'
1039
1058
            '.*'
1045
1064
        # the final output.
1046
1065
        class Test(tests.TestCase):
1047
1066
            def known_failure_test(self):
1048
 
                self.knownFailure("Never works...")
 
1067
                self.expectFailure('failed', self.assertTrue, False)
1049
1068
        test = Test("known_failure_test")
1050
1069
        stream = StringIO()
1051
1070
        runner = tests.TextTestRunner(stream=stream)
1057
1076
            '\n'
1058
1077
            'OK \\(known_failures=1\\)\n')
1059
1078
 
1060
 
    def test_unexpected_success_bad(self):
1061
 
        class Test(tests.TestCase):
1062
 
            def test_truth(self):
1063
 
                self.expectFailure("No absolute truth", self.assertTrue, True)
1064
 
        runner = tests.TextTestRunner(stream=StringIO())
1065
 
        result = self.run_test_runner(runner, Test("test_truth"))
1066
 
        if testtools_version[:3] <= (0, 9, 11):
1067
 
            self.assertContainsRe(runner.stream.getvalue(),
1068
 
                "=+\n"
1069
 
                "FAIL: \\S+\.test_truth\n"
1070
 
                "-+\n"
1071
 
                "(?:.*\n)*"
1072
 
                "No absolute truth\n"
1073
 
                "(?:.*\n)*"
1074
 
                "-+\n"
1075
 
                "Ran 1 test in .*\n"
1076
 
                "\n"
1077
 
                "FAILED \\(failures=1\\)\n\\Z")
1078
 
        else:
1079
 
            self.assertContainsRe(runner.stream.getvalue(),
1080
 
                "=+\n"
1081
 
                "FAIL: \\S+\.test_truth\n"
1082
 
                "-+\n"
1083
 
                "Empty attachments:\n"
1084
 
                "  log\n"
1085
 
                "\n"
1086
 
                "reason: {{{No absolute truth}}}\n"
1087
 
                "-+\n"
1088
 
                "Ran 1 test in .*\n"
1089
 
                "\n"
1090
 
                "FAILED \\(failures=1\\)\n\\Z")
1091
 
 
1092
1079
    def test_result_decorator(self):
1093
1080
        # decorate results
1094
1081
        calls = []
1095
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1082
        class LoggingDecorator(tests.ForwardingResult):
1096
1083
            def startTest(self, test):
1097
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1084
                tests.ForwardingResult.startTest(self, test)
1098
1085
                calls.append('start')
1099
1086
        test = unittest.FunctionTestCase(lambda:None)
1100
1087
        stream = StringIO()
1178
1165
 
1179
1166
    def test_unsupported_features_listed(self):
1180
1167
        """When unsupported features are encountered they are detailed."""
1181
 
        class Feature1(features.Feature):
 
1168
        class Feature1(tests.Feature):
1182
1169
            def _probe(self): return False
1183
 
        class Feature2(features.Feature):
 
1170
        class Feature2(tests.Feature):
1184
1171
            def _probe(self): return False
1185
1172
        # create sample tests
1186
1173
        test1 = SampleTestCase('_test_pass')
1201
1188
            ],
1202
1189
            lines[-3:])
1203
1190
 
1204
 
    def test_verbose_test_count(self):
1205
 
        """A verbose test run reports the right test count at the start"""
1206
 
        suite = TestUtil.TestSuite([
1207
 
            unittest.FunctionTestCase(lambda:None),
1208
 
            unittest.FunctionTestCase(lambda:None)])
1209
 
        self.assertEqual(suite.countTestCases(), 2)
1210
 
        stream = StringIO()
1211
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1212
 
        # Need to use the CountingDecorator as that's what sets num_tests
1213
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1214
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1191
    def _patch_get_bzr_source_tree(self):
 
1192
        # Reading from the actual source tree breaks isolation, but we don't
 
1193
        # want to assume that thats *all* that would happen.
 
1194
        self._get_source_tree_calls = []
 
1195
        def new_get():
 
1196
            self._get_source_tree_calls.append("called")
 
1197
            return None
 
1198
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
 
1199
 
 
1200
    def test_bench_history(self):
 
1201
        # tests that the running the benchmark passes bench_history into
 
1202
        # the test result object. We can tell that happens if
 
1203
        # _get_bzr_source_tree is called.
 
1204
        self._patch_get_bzr_source_tree()
 
1205
        test = TestRunner('dummy_test')
 
1206
        output = StringIO()
 
1207
        runner = tests.TextTestRunner(stream=self._log_file,
 
1208
                                      bench_history=output)
 
1209
        result = self.run_test_runner(runner, test)
 
1210
        output_string = output.getvalue()
 
1211
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1212
        self.assertLength(1, self._get_source_tree_calls)
1215
1213
 
1216
1214
    def test_startTestRun(self):
1217
1215
        """run should call result.startTestRun()"""
1218
1216
        calls = []
1219
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1217
        class LoggingDecorator(tests.ForwardingResult):
1220
1218
            def startTestRun(self):
1221
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1219
                tests.ForwardingResult.startTestRun(self)
1222
1220
                calls.append('startTestRun')
1223
1221
        test = unittest.FunctionTestCase(lambda:None)
1224
1222
        stream = StringIO()
1230
1228
    def test_stopTestRun(self):
1231
1229
        """run should call result.stopTestRun()"""
1232
1230
        calls = []
1233
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1231
        class LoggingDecorator(tests.ForwardingResult):
1234
1232
            def stopTestRun(self):
1235
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1233
                tests.ForwardingResult.stopTestRun(self)
1236
1234
                calls.append('stopTestRun')
1237
1235
        test = unittest.FunctionTestCase(lambda:None)
1238
1236
        stream = StringIO()
1241
1239
        result = self.run_test_runner(runner, test)
1242
1240
        self.assertLength(1, calls)
1243
1241
 
1244
 
    def test_unicode_test_output_on_ascii_stream(self):
1245
 
        """Showing results should always succeed even on an ascii console"""
1246
 
        class FailureWithUnicode(tests.TestCase):
1247
 
            def test_log_unicode(self):
1248
 
                self.log(u"\u2606")
1249
 
                self.fail("Now print that log!")
1250
 
        out = StringIO()
1251
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1252
 
            lambda trace=False: "ascii")
1253
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1254
 
            FailureWithUnicode("test_log_unicode"))
1255
 
        if testtools_version[:3] > (0, 9, 11):
1256
 
            self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+  \\\\u2606}}}")
1257
 
        else:
1258
 
            self.assertContainsRe(out.getvalue(),
1259
 
                "Text attachment: log\n"
1260
 
                "-+\n"
1261
 
                "\d+\.\d+  \\\\u2606\n"
1262
 
                "-+\n")
1263
 
 
1264
1242
 
1265
1243
class SampleTestCase(tests.TestCase):
1266
1244
 
1441
1419
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1442
1420
        output_stream = StringIO()
1443
1421
        result = bzrlib.tests.VerboseTestResult(
1444
 
            output_stream,
 
1422
            unittest._WritelnDecorator(output_stream),
1445
1423
            descriptions=0,
1446
1424
            verbosity=2)
1447
1425
        sample_test.run(result)
1454
1432
        # Note this test won't fail with hooks that the core library doesn't
1455
1433
        # use - but it trigger with a plugin that adds hooks, so its still a
1456
1434
        # useful warning in that case.
1457
 
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1458
 
        self.assertEqual(
1459
 
            bzrlib.smart.server.SmartServerHooks(),
 
1435
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1436
            bzrlib.branch.Branch.hooks)
 
1437
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
1460
1438
            bzrlib.smart.server.SmartTCPServer.hooks)
1461
 
        self.assertEqual(
1462
 
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1439
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1440
            bzrlib.commands.Command.hooks)
1463
1441
 
1464
1442
    def test__gather_lsprof_in_benchmarks(self):
1465
1443
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1466
1444
 
1467
1445
        Each self.time() call is individually and separately profiled.
1468
1446
        """
1469
 
        self.requireFeature(features.lsprof_feature)
 
1447
        self.requireFeature(test_lsprof.LSProfFeature)
1470
1448
        # overrides the class member with an instance member so no cleanup
1471
1449
        # needed.
1472
1450
        self._gather_lsprof_in_benchmarks = True
1491
1469
        transport_server = memory.MemoryServer()
1492
1470
        transport_server.start_server()
1493
1471
        self.addCleanup(transport_server.stop_server)
1494
 
        t = transport.get_transport_from_url(transport_server.get_url())
 
1472
        t = transport.get_transport(transport_server.get_url())
1495
1473
        bzrdir.BzrDir.create(t.base)
1496
1474
        self.assertRaises(errors.BzrError,
1497
1475
            bzrdir.BzrDir.open_from_transport, t)
1502
1480
 
1503
1481
    def test_requireFeature_available(self):
1504
1482
        """self.requireFeature(available) is a no-op."""
1505
 
        class Available(features.Feature):
 
1483
        class Available(tests.Feature):
1506
1484
            def _probe(self):return True
1507
1485
        feature = Available()
1508
1486
        self.requireFeature(feature)
1509
1487
 
1510
1488
    def test_requireFeature_unavailable(self):
1511
1489
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1512
 
        class Unavailable(features.Feature):
 
1490
        class Unavailable(tests.Feature):
1513
1491
            def _probe(self):return False
1514
1492
        feature = Unavailable()
1515
1493
        self.assertRaises(tests.UnavailableFeature,
1674
1652
        test.run(unittest.TestResult())
1675
1653
        self.assertEqual('original', obj.test_attr)
1676
1654
 
1677
 
    def test_recordCalls(self):
1678
 
        from bzrlib.tests import test_selftest
1679
 
        calls = self.recordCalls(
1680
 
            test_selftest, '_add_numbers')
1681
 
        self.assertEqual(test_selftest._add_numbers(2, 10),
1682
 
            12)
1683
 
        self.assertEquals(calls, [((2, 10), {})])
1684
 
 
1685
 
 
1686
 
def _add_numbers(a, b):
1687
 
    return a + b
1688
 
 
1689
 
 
1690
 
class _MissingFeature(features.Feature):
1691
 
    def _probe(self):
1692
 
        return False
1693
 
missing_feature = _MissingFeature()
1694
 
 
1695
 
 
1696
 
def _get_test(name):
1697
 
    """Get an instance of a specific example test.
1698
 
 
1699
 
    We protect this in a function so that they don't auto-run in the test
1700
 
    suite.
1701
 
    """
1702
 
 
1703
 
    class ExampleTests(tests.TestCase):
1704
 
 
1705
 
        def test_fail(self):
1706
 
            mutter('this was a failing test')
1707
 
            self.fail('this test will fail')
1708
 
 
1709
 
        def test_error(self):
1710
 
            mutter('this test errored')
1711
 
            raise RuntimeError('gotcha')
1712
 
 
1713
 
        def test_missing_feature(self):
1714
 
            mutter('missing the feature')
1715
 
            self.requireFeature(missing_feature)
1716
 
 
1717
 
        def test_skip(self):
1718
 
            mutter('this test will be skipped')
1719
 
            raise tests.TestSkipped('reason')
1720
 
 
1721
 
        def test_success(self):
1722
 
            mutter('this test succeeds')
1723
 
 
1724
 
        def test_xfail(self):
1725
 
            mutter('test with expected failure')
1726
 
            self.knownFailure('this_fails')
1727
 
 
1728
 
        def test_unexpected_success(self):
1729
 
            mutter('test with unexpected success')
1730
 
            self.expectFailure('should_fail', lambda: None)
1731
 
 
1732
 
    return ExampleTests(name)
1733
 
 
1734
 
 
1735
 
class TestTestCaseLogDetails(tests.TestCase):
1736
 
 
1737
 
    def _run_test(self, test_name):
1738
 
        test = _get_test(test_name)
1739
 
        result = testtools.TestResult()
1740
 
        test.run(result)
1741
 
        return result
1742
 
 
1743
 
    def test_fail_has_log(self):
1744
 
        result = self._run_test('test_fail')
1745
 
        self.assertEqual(1, len(result.failures))
1746
 
        result_content = result.failures[0][1]
1747
 
        if testtools_version < (0, 9, 12):
1748
 
            self.assertContainsRe(result_content, 'Text attachment: log')
1749
 
        self.assertContainsRe(result_content, 'this was a failing test')
1750
 
 
1751
 
    def test_error_has_log(self):
1752
 
        result = self._run_test('test_error')
1753
 
        self.assertEqual(1, len(result.errors))
1754
 
        result_content = result.errors[0][1]
1755
 
        if testtools_version < (0, 9, 12):
1756
 
            self.assertContainsRe(result_content, 'Text attachment: log')
1757
 
        self.assertContainsRe(result_content, 'this test errored')
1758
 
 
1759
 
    def test_skip_has_no_log(self):
1760
 
        result = self._run_test('test_skip')
1761
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1762
 
        skips = result.skip_reasons['reason']
1763
 
        self.assertEqual(1, len(skips))
1764
 
        test = skips[0]
1765
 
        self.assertFalse('log' in test.getDetails())
1766
 
 
1767
 
    def test_missing_feature_has_no_log(self):
1768
 
        # testtools doesn't know about addNotSupported, so it just gets
1769
 
        # considered as a skip
1770
 
        result = self._run_test('test_missing_feature')
1771
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1772
 
        skips = result.skip_reasons[missing_feature]
1773
 
        self.assertEqual(1, len(skips))
1774
 
        test = skips[0]
1775
 
        self.assertFalse('log' in test.getDetails())
1776
 
 
1777
 
    def test_xfail_has_no_log(self):
1778
 
        result = self._run_test('test_xfail')
1779
 
        self.assertEqual(1, len(result.expectedFailures))
1780
 
        result_content = result.expectedFailures[0][1]
1781
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1782
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1783
 
 
1784
 
    def test_unexpected_success_has_log(self):
1785
 
        result = self._run_test('test_unexpected_success')
1786
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1787
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1788
 
        # expectedFailures is a list of reasons?
1789
 
        test = result.unexpectedSuccesses[0]
1790
 
        details = test.getDetails()
1791
 
        self.assertTrue('log' in details)
1792
 
 
1793
 
 
1794
 
class TestTestCloning(tests.TestCase):
1795
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1796
 
 
1797
 
    def test_cloned_testcase_does_not_share_details(self):
1798
 
        """A TestCase cloned with clone_test does not share mutable attributes
1799
 
        such as details or cleanups.
1800
 
        """
1801
 
        class Test(tests.TestCase):
1802
 
            def test_foo(self):
1803
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1804
 
        orig_test = Test('test_foo')
1805
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1806
 
        orig_test.run(unittest.TestResult())
1807
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1808
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1809
 
 
1810
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1811
 
        """Applying two levels of scenarios to a test preserves the attributes
1812
 
        added by both scenarios.
1813
 
        """
1814
 
        class Test(tests.TestCase):
1815
 
            def test_foo(self):
1816
 
                pass
1817
 
        test = Test('test_foo')
1818
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1819
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1820
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1821
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1822
 
        all_tests = list(tests.iter_suite_tests(suite))
1823
 
        self.assertLength(4, all_tests)
1824
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1825
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1826
 
 
1827
1655
 
1828
1656
# NB: Don't delete this; it's not actually from 0.11!
1829
1657
@deprecated_function(deprecated_in((0, 11, 0)))
1957
1785
    def test_make_branch_and_tree_with_format(self):
1958
1786
        # we should be able to supply a format to make_branch_and_tree
1959
1787
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
 
1788
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1960
1789
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1961
1790
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1791
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1792
                              bzrlib.bzrdir.BzrDirFormat6)
1962
1793
 
1963
1794
    def test_make_branch_and_memory_tree(self):
1964
1795
        # we should be able to get a new branch and a mutable tree from
1981
1812
                tree.branch.repository.bzrdir.root_transport)
1982
1813
 
1983
1814
 
1984
 
class SelfTestHelper(object):
 
1815
class SelfTestHelper:
1985
1816
 
1986
1817
    def run_selftest(self, **kwargs):
1987
1818
        """Run selftest returning its output."""
2041
1872
        self.assertLength(2, output.readlines())
2042
1873
 
2043
1874
    def test_lsprof_tests(self):
2044
 
        self.requireFeature(features.lsprof_feature)
2045
 
        results = []
 
1875
        self.requireFeature(test_lsprof.LSProfFeature)
 
1876
        calls = []
2046
1877
        class Test(object):
2047
1878
            def __call__(test, result):
2048
1879
                test.run(result)
2049
1880
            def run(test, result):
2050
 
                results.append(result)
 
1881
                self.assertIsInstance(result, tests.ForwardingResult)
 
1882
                calls.append("called")
2051
1883
            def countTestCases(self):
2052
1884
                return 1
2053
1885
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2054
 
        self.assertLength(1, results)
2055
 
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
1886
        self.assertLength(1, calls)
2056
1887
 
2057
1888
    def test_random(self):
2058
1889
        # test randomising by listing a number of tests.
2138
1969
            load_list='missing file name', list_only=True)
2139
1970
 
2140
1971
 
2141
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2142
 
 
2143
 
    _test_needs_features = [features.subunit]
2144
 
 
2145
 
    def run_subunit_stream(self, test_name):
2146
 
        from subunit import ProtocolTestCase
2147
 
        def factory():
2148
 
            return TestUtil.TestSuite([_get_test(test_name)])
2149
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2150
 
            test_suite_factory=factory)
2151
 
        test = ProtocolTestCase(stream)
2152
 
        result = testtools.TestResult()
2153
 
        test.run(result)
2154
 
        content = stream.getvalue()
2155
 
        return content, result
2156
 
 
2157
 
    def test_fail_has_log(self):
2158
 
        content, result = self.run_subunit_stream('test_fail')
2159
 
        self.assertEqual(1, len(result.failures))
2160
 
        self.assertContainsRe(content, '(?m)^log$')
2161
 
        self.assertContainsRe(content, 'this test will fail')
2162
 
 
2163
 
    def test_error_has_log(self):
2164
 
        content, result = self.run_subunit_stream('test_error')
2165
 
        self.assertContainsRe(content, '(?m)^log$')
2166
 
        self.assertContainsRe(content, 'this test errored')
2167
 
 
2168
 
    def test_skip_has_no_log(self):
2169
 
        content, result = self.run_subunit_stream('test_skip')
2170
 
        self.assertNotContainsRe(content, '(?m)^log$')
2171
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2172
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2173
 
        skips = result.skip_reasons['reason']
2174
 
        self.assertEqual(1, len(skips))
2175
 
        test = skips[0]
2176
 
        # RemotedTestCase doesn't preserve the "details"
2177
 
        ## self.assertFalse('log' in test.getDetails())
2178
 
 
2179
 
    def test_missing_feature_has_no_log(self):
2180
 
        content, result = self.run_subunit_stream('test_missing_feature')
2181
 
        self.assertNotContainsRe(content, '(?m)^log$')
2182
 
        self.assertNotContainsRe(content, 'missing the feature')
2183
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2184
 
        skips = result.skip_reasons['_MissingFeature\n']
2185
 
        self.assertEqual(1, len(skips))
2186
 
        test = skips[0]
2187
 
        # RemotedTestCase doesn't preserve the "details"
2188
 
        ## self.assertFalse('log' in test.getDetails())
2189
 
 
2190
 
    def test_xfail_has_no_log(self):
2191
 
        content, result = self.run_subunit_stream('test_xfail')
2192
 
        self.assertNotContainsRe(content, '(?m)^log$')
2193
 
        self.assertNotContainsRe(content, 'test with expected failure')
2194
 
        self.assertEqual(1, len(result.expectedFailures))
2195
 
        result_content = result.expectedFailures[0][1]
2196
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2197
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2198
 
 
2199
 
    def test_unexpected_success_has_log(self):
2200
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2201
 
        self.assertContainsRe(content, '(?m)^log$')
2202
 
        self.assertContainsRe(content, 'test with unexpected success')
2203
 
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2204
 
        #                success, if a min version check is added remove this
2205
 
        from subunit import TestProtocolClient as _Client
2206
 
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2207
 
            self.expectFailure('subunit treats "unexpectedSuccess"'
2208
 
                               ' as a plain success',
2209
 
                self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2211
 
        test = result.unexpectedSuccesses[0]
2212
 
        # RemotedTestCase doesn't preserve the "details"
2213
 
        ## self.assertTrue('log' in test.getDetails())
2214
 
 
2215
 
    def test_success_has_no_log(self):
2216
 
        content, result = self.run_subunit_stream('test_success')
2217
 
        self.assertEqual(1, result.testsRun)
2218
 
        self.assertNotContainsRe(content, '(?m)^log$')
2219
 
        self.assertNotContainsRe(content, 'this test succeeds')
2220
 
 
2221
 
 
2222
1972
class TestRunBzr(tests.TestCase):
2223
1973
 
2224
1974
    out = ''
2347
2097
        # stdout and stderr of the invoked run_bzr
2348
2098
        current_factory = bzrlib.ui.ui_factory
2349
2099
        self.run_bzr(['foo'])
2350
 
        self.assertFalse(current_factory is self.factory)
 
2100
        self.failIf(current_factory is self.factory)
2351
2101
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2352
2102
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2353
2103
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2510
2260
 
2511
2261
 
2512
2262
class TestStartBzrSubProcess(tests.TestCase):
2513
 
    """Stub test start_bzr_subprocess."""
2514
2263
 
2515
 
    def _subprocess_log_cleanup(self):
2516
 
        """Inhibits the base version as we don't produce a log file."""
 
2264
    def check_popen_state(self):
 
2265
        """Replace to make assertions when popen is called."""
2517
2266
 
2518
2267
    def _popen(self, *args, **kwargs):
2519
 
        """Override the base version to record the command that is run.
2520
 
 
2521
 
        From there we can ensure it is correct without spawning a real process.
2522
 
        """
 
2268
        """Record the command that is run, so that we can ensure it is correct"""
2523
2269
        self.check_popen_state()
2524
2270
        self._popen_args = args
2525
2271
        self._popen_kwargs = kwargs
2526
2272
        raise _DontSpawnProcess()
2527
2273
 
2528
 
    def check_popen_state(self):
2529
 
        """Replace to make assertions when popen is called."""
2530
 
 
2531
2274
    def test_run_bzr_subprocess_no_plugins(self):
2532
2275
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2533
2276
        command = self._popen_args[0]
2537
2280
 
2538
2281
    def test_allow_plugins(self):
2539
2282
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2540
 
                          allow_plugins=True)
 
2283
            allow_plugins=True)
2541
2284
        command = self._popen_args[0]
2542
2285
        self.assertEqual([], command[2:])
2543
2286
 
2544
2287
    def test_set_env(self):
2545
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2288
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2546
2289
        # set in the child
2547
2290
        def check_environment():
2548
2291
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2549
2292
        self.check_popen_state = check_environment
2550
2293
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2551
 
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2294
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
2552
2295
        # not set in theparent
2553
2296
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2554
2297
 
2555
2298
    def test_run_bzr_subprocess_env_del(self):
2556
2299
        """run_bzr_subprocess can remove environment variables too."""
2557
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2300
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2558
2301
        def check_environment():
2559
2302
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2560
2303
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2561
2304
        self.check_popen_state = check_environment
2562
2305
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2563
 
                          env_changes={'EXISTANT_ENV_VAR':None})
 
2306
            env_changes={'EXISTANT_ENV_VAR':None})
2564
2307
        # Still set in parent
2565
2308
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2566
2309
        del os.environ['EXISTANT_ENV_VAR']
2567
2310
 
2568
2311
    def test_env_del_missing(self):
2569
 
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2312
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2570
2313
        def check_environment():
2571
2314
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2572
2315
        self.check_popen_state = check_environment
2573
2316
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2574
 
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2317
            env_changes={'NON_EXISTANT_ENV_VAR':None})
2575
2318
 
2576
2319
    def test_working_dir(self):
2577
2320
        """Test that we can specify the working dir for the child"""
2580
2323
        chdirs = []
2581
2324
        def chdir(path):
2582
2325
            chdirs.append(path)
2583
 
        self.overrideAttr(os, 'chdir', chdir)
2584
 
        def getcwd():
2585
 
            return 'current'
2586
 
        self.overrideAttr(osutils, 'getcwd', getcwd)
2587
 
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2588
 
                          working_dir='foo')
 
2326
        os.chdir = chdir
 
2327
        try:
 
2328
            def getcwd():
 
2329
                return 'current'
 
2330
            osutils.getcwd = getcwd
 
2331
            try:
 
2332
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2333
                    working_dir='foo')
 
2334
            finally:
 
2335
                osutils.getcwd = orig_getcwd
 
2336
        finally:
 
2337
            os.chdir = orig_chdir
2589
2338
        self.assertEqual(['foo', 'current'], chdirs)
2590
2339
 
2591
2340
    def test_get_bzr_path_with_cwd_bzrlib(self):
2611
2360
        self.assertEqual('bzr: interrupted\n', result[1])
2612
2361
 
2613
2362
 
 
2363
class TestFeature(tests.TestCase):
 
2364
 
 
2365
    def test_caching(self):
 
2366
        """Feature._probe is called by the feature at most once."""
 
2367
        class InstrumentedFeature(tests.Feature):
 
2368
            def __init__(self):
 
2369
                super(InstrumentedFeature, self).__init__()
 
2370
                self.calls = []
 
2371
            def _probe(self):
 
2372
                self.calls.append('_probe')
 
2373
                return False
 
2374
        feature = InstrumentedFeature()
 
2375
        feature.available()
 
2376
        self.assertEqual(['_probe'], feature.calls)
 
2377
        feature.available()
 
2378
        self.assertEqual(['_probe'], feature.calls)
 
2379
 
 
2380
    def test_named_str(self):
 
2381
        """Feature.__str__ should thunk to feature_name()."""
 
2382
        class NamedFeature(tests.Feature):
 
2383
            def feature_name(self):
 
2384
                return 'symlinks'
 
2385
        feature = NamedFeature()
 
2386
        self.assertEqual('symlinks', str(feature))
 
2387
 
 
2388
    def test_default_str(self):
 
2389
        """Feature.__str__ should default to __class__.__name__."""
 
2390
        class NamedFeature(tests.Feature):
 
2391
            pass
 
2392
        feature = NamedFeature()
 
2393
        self.assertEqual('NamedFeature', str(feature))
 
2394
 
 
2395
 
 
2396
class TestUnavailableFeature(tests.TestCase):
 
2397
 
 
2398
    def test_access_feature(self):
 
2399
        feature = tests.Feature()
 
2400
        exception = tests.UnavailableFeature(feature)
 
2401
        self.assertIs(feature, exception.args[0])
 
2402
 
 
2403
 
 
2404
simple_thunk_feature = tests._CompatabilityThunkFeature(
 
2405
    deprecated_in((2, 1, 0)),
 
2406
    'bzrlib.tests.test_selftest',
 
2407
    'simple_thunk_feature','UnicodeFilename',
 
2408
    replacement_module='bzrlib.tests'
 
2409
    )
 
2410
 
 
2411
class Test_CompatibilityFeature(tests.TestCase):
 
2412
 
 
2413
    def test_does_thunk(self):
 
2414
        res = self.callDeprecated(
 
2415
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
 
2416
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
 
2417
            simple_thunk_feature.available)
 
2418
        self.assertEqual(tests.UnicodeFilename.available(), res)
 
2419
 
 
2420
 
 
2421
class TestModuleAvailableFeature(tests.TestCase):
 
2422
 
 
2423
    def test_available_module(self):
 
2424
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
 
2425
        self.assertEqual('bzrlib.tests', feature.module_name)
 
2426
        self.assertEqual('bzrlib.tests', str(feature))
 
2427
        self.assertTrue(feature.available())
 
2428
        self.assertIs(tests, feature.module)
 
2429
 
 
2430
    def test_unavailable_module(self):
 
2431
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
 
2432
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
 
2433
        self.assertFalse(feature.available())
 
2434
        self.assertIs(None, feature.module)
 
2435
 
 
2436
 
2614
2437
class TestSelftestFiltering(tests.TestCase):
2615
2438
 
2616
2439
    def setUp(self):
2767
2590
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2768
2591
 
2769
2592
 
2770
 
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2593
class TestCheckInventoryShape(tests.TestCaseWithTransport):
2771
2594
 
2772
 
    def test_check_tree_shape(self):
 
2595
    def test_check_inventory_shape(self):
2773
2596
        files = ['a', 'b/', 'b/c']
2774
2597
        tree = self.make_branch_and_tree('.')
2775
2598
        self.build_tree(files)
2776
2599
        tree.add(files)
2777
2600
        tree.lock_read()
2778
2601
        try:
2779
 
            self.check_tree_shape(tree, files)
 
2602
            self.check_inventory_shape(tree.inventory, files)
2780
2603
        finally:
2781
2604
            tree.unlock()
2782
2605
 
3114
2937
        tpr.register('bar', 'bBB.aAA.rRR')
3115
2938
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3116
2939
        self.assertThat(self.get_log(),
3117
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3118
 
                           doctest.ELLIPSIS))
 
2940
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3119
2941
 
3120
2942
    def test_get_unknown_prefix(self):
3121
2943
        tpr = self._get_registry()
3141
2963
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3142
2964
 
3143
2965
 
3144
 
class TestThreadLeakDetection(tests.TestCase):
3145
 
    """Ensure when tests leak threads we detect and report it"""
3146
 
 
3147
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3148
 
        def __init__(self):
3149
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3150
 
            self.leaks = []
3151
 
        def _report_thread_leak(self, test, leaks, alive):
3152
 
            self.leaks.append((test, leaks))
3153
 
 
3154
 
    def test_testcase_without_addCleanups(self):
3155
 
        """Check old TestCase instances don't break with leak detection"""
3156
 
        class Test(unittest.TestCase):
3157
 
            def runTest(self):
3158
 
                pass
3159
 
        result = self.LeakRecordingResult()
3160
 
        test = Test()
3161
 
        result.startTestRun()
3162
 
        test.run(result)
3163
 
        result.stopTestRun()
3164
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3165
 
        self.assertEqual(result.leaks, [])
3166
 
        
3167
 
    def test_thread_leak(self):
3168
 
        """Ensure a thread that outlives the running of a test is reported
3169
 
 
3170
 
        Uses a thread that blocks on an event, and is started by the inner
3171
 
        test case. As the thread outlives the inner case's run, it should be
3172
 
        detected as a leak, but the event is then set so that the thread can
3173
 
        be safely joined in cleanup so it's not leaked for real.
3174
 
        """
3175
 
        event = threading.Event()
3176
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3177
 
        class Test(tests.TestCase):
3178
 
            def test_leak(self):
3179
 
                thread.start()
3180
 
        result = self.LeakRecordingResult()
3181
 
        test = Test("test_leak")
3182
 
        self.addCleanup(thread.join)
3183
 
        self.addCleanup(event.set)
3184
 
        result.startTestRun()
3185
 
        test.run(result)
3186
 
        result.stopTestRun()
3187
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3188
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3189
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3190
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3191
 
 
3192
 
    def test_multiple_leaks(self):
3193
 
        """Check multiple leaks are blamed on the test cases at fault
3194
 
 
3195
 
        Same concept as the previous test, but has one inner test method that
3196
 
        leaks two threads, and one that doesn't leak at all.
3197
 
        """
3198
 
        event = threading.Event()
3199
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3200
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3201
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3202
 
        class Test(tests.TestCase):
3203
 
            def test_first_leak(self):
3204
 
                thread_b.start()
3205
 
            def test_second_no_leak(self):
3206
 
                pass
3207
 
            def test_third_leak(self):
3208
 
                thread_c.start()
3209
 
                thread_a.start()
3210
 
        result = self.LeakRecordingResult()
3211
 
        first_test = Test("test_first_leak")
3212
 
        third_test = Test("test_third_leak")
3213
 
        self.addCleanup(thread_a.join)
3214
 
        self.addCleanup(thread_b.join)
3215
 
        self.addCleanup(thread_c.join)
3216
 
        self.addCleanup(event.set)
3217
 
        result.startTestRun()
3218
 
        unittest.TestSuite(
3219
 
            [first_test, Test("test_second_no_leak"), third_test]
3220
 
            ).run(result)
3221
 
        result.stopTestRun()
3222
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3223
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3224
 
        self.assertEqual(result.leaks, [
3225
 
            (first_test, set([thread_b])),
3226
 
            (third_test, set([thread_a, thread_c]))])
3227
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3228
 
 
3229
 
 
3230
 
class TestPostMortemDebugging(tests.TestCase):
3231
 
    """Check post mortem debugging works when tests fail or error"""
3232
 
 
3233
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3234
 
        def __init__(self):
3235
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3236
 
            self.postcode = None
3237
 
        def _post_mortem(self, tb=None):
3238
 
            """Record the code object at the end of the current traceback"""
3239
 
            tb = tb or sys.exc_info()[2]
3240
 
            if tb is not None:
3241
 
                next = tb.tb_next
3242
 
                while next is not None:
3243
 
                    tb = next
3244
 
                    next = next.tb_next
3245
 
                self.postcode = tb.tb_frame.f_code
3246
 
        def report_error(self, test, err):
3247
 
            pass
3248
 
        def report_failure(self, test, err):
3249
 
            pass
3250
 
 
3251
 
    def test_location_unittest_error(self):
3252
 
        """Needs right post mortem traceback with erroring unittest case"""
3253
 
        class Test(unittest.TestCase):
3254
 
            def runTest(self):
3255
 
                raise RuntimeError
3256
 
        result = self.TracebackRecordingResult()
3257
 
        Test().run(result)
3258
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3259
 
 
3260
 
    def test_location_unittest_failure(self):
3261
 
        """Needs right post mortem traceback with failing unittest case"""
3262
 
        class Test(unittest.TestCase):
3263
 
            def runTest(self):
3264
 
                raise self.failureException
3265
 
        result = self.TracebackRecordingResult()
3266
 
        Test().run(result)
3267
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3268
 
 
3269
 
    def test_location_bt_error(self):
3270
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3271
 
        class Test(tests.TestCase):
3272
 
            def test_error(self):
3273
 
                raise RuntimeError
3274
 
        result = self.TracebackRecordingResult()
3275
 
        Test("test_error").run(result)
3276
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3277
 
 
3278
 
    def test_location_bt_failure(self):
3279
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3280
 
        class Test(tests.TestCase):
3281
 
            def test_failure(self):
3282
 
                raise self.failureException
3283
 
        result = self.TracebackRecordingResult()
3284
 
        Test("test_failure").run(result)
3285
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3286
 
 
3287
 
    def test_env_var_triggers_post_mortem(self):
3288
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3289
 
        import pdb
3290
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3291
 
        post_mortem_calls = []
3292
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3293
 
        self.overrideEnv('BZR_TEST_PDB', None)
3294
 
        result._post_mortem(1)
3295
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3296
 
        result._post_mortem(2)
3297
 
        self.assertEqual([2], post_mortem_calls)
3298
 
 
3299
 
 
3300
2966
class TestRunSuite(tests.TestCase):
3301
2967
 
3302
2968
    def test_runner_class(self):
3313
2979
                                                self.verbosity)
3314
2980
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3315
2981
        self.assertLength(1, calls)
3316
 
 
3317
 
 
3318
 
class TestUncollectedWarnings(tests.TestCase):
3319
 
    """Check a test case still alive after being run emits a warning"""
3320
 
 
3321
 
    class Test(tests.TestCase):
3322
 
        def test_pass(self):
3323
 
            pass
3324
 
        def test_self_ref(self):
3325
 
            self.also_self = self.test_self_ref
3326
 
        def test_skip(self):
3327
 
            self.skip("Don't need")
3328
 
 
3329
 
    def _get_suite(self):
3330
 
        return TestUtil.TestSuite([
3331
 
            self.Test("test_pass"),
3332
 
            self.Test("test_self_ref"),
3333
 
            self.Test("test_skip"),
3334
 
            ])
3335
 
 
3336
 
    def _inject_stream_into_subunit(self, stream):
3337
 
        """To be overridden by subclasses that run tests out of process"""
3338
 
 
3339
 
    def _run_selftest_with_suite(self, **kwargs):
3340
 
        sio = StringIO()
3341
 
        self._inject_stream_into_subunit(sio)
3342
 
        old_flags = tests.selftest_debug_flags
3343
 
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3344
 
        gc_on = gc.isenabled()
3345
 
        if gc_on:
3346
 
            gc.disable()
3347
 
        try:
3348
 
            tests.selftest(test_suite_factory=self._get_suite, stream=sio,
3349
 
                stop_on_failure=False, **kwargs)
3350
 
        finally:
3351
 
            if gc_on:
3352
 
                gc.enable()
3353
 
            tests.selftest_debug_flags = old_flags
3354
 
        output = sio.getvalue()
3355
 
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3356
 
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3357
 
        return output
3358
 
 
3359
 
    def test_testsuite(self):
3360
 
        self._run_selftest_with_suite()
3361
 
 
3362
 
    def test_pattern(self):
3363
 
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3364
 
        self.assertNotContainsRe(out, "test_skip")
3365
 
 
3366
 
    def test_exclude_pattern(self):
3367
 
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3368
 
        self.assertNotContainsRe(out, "test_skip")
3369
 
 
3370
 
    def test_random_seed(self):
3371
 
        self._run_selftest_with_suite(random_seed="now")
3372
 
 
3373
 
    def test_matching_tests_first(self):
3374
 
        self._run_selftest_with_suite(matching_tests_first=True,
3375
 
            pattern="test_self_ref$")
3376
 
 
3377
 
    def test_starting_with_and_exclude(self):
3378
 
        out = self._run_selftest_with_suite(starting_with=["bt."],
3379
 
            exclude_pattern="test_skip$")
3380
 
        self.assertNotContainsRe(out, "test_skip")
3381
 
 
3382
 
    def test_additonal_decorator(self):
3383
 
        out = self._run_selftest_with_suite(
3384
 
            suite_decorators=[tests.TestDecorator])
3385
 
 
3386
 
 
3387
 
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3388
 
    """Check warnings from tests staying alive are emitted with subunit"""
3389
 
 
3390
 
    _test_needs_features = [features.subunit]
3391
 
 
3392
 
    def _run_selftest_with_suite(self, **kwargs):
3393
 
        return TestUncollectedWarnings._run_selftest_with_suite(self,
3394
 
            runner_class=tests.SubUnitBzrRunner, **kwargs)
3395
 
 
3396
 
 
3397
 
class TestUncollectedWarningsForking(TestUncollectedWarnings):
3398
 
    """Check warnings from tests staying alive are emitted when forking"""
3399
 
 
3400
 
    _test_needs_features = [features.subunit]
3401
 
 
3402
 
    def _inject_stream_into_subunit(self, stream):
3403
 
        """Monkey-patch subunit so the extra output goes to stream not stdout
3404
 
 
3405
 
        Some APIs need rewriting so this kind of bogus hackery can be replaced
3406
 
        by passing the stream param from run_tests down into ProtocolTestCase.
3407
 
        """
3408
 
        from subunit import ProtocolTestCase
3409
 
        _original_init = ProtocolTestCase.__init__
3410
 
        def _init_with_passthrough(self, *args, **kwargs):
3411
 
            _original_init(self, *args, **kwargs)
3412
 
            self._passthrough = stream
3413
 
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3414
 
 
3415
 
    def _run_selftest_with_suite(self, **kwargs):
3416
 
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3417
 
        if getattr(os, "fork", None) is None:
3418
 
            raise tests.TestNotApplicable("Platform doesn't support forking")
3419
 
        # Make sure the fork code is actually invoked by claiming two cores
3420
 
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3421
 
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3422
 
        return TestUncollectedWarnings._run_selftest_with_suite(self, **kwargs)
3423
 
 
3424
 
 
3425
 
class TestEnvironHandling(tests.TestCase):
3426
 
 
3427
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3428
 
        self.assertFalse('MYVAR' in os.environ)
3429
 
        self.overrideEnv('MYVAR', '42')
3430
 
        # We use an embedded test to make sure we fix the _captureVar bug
3431
 
        class Test(tests.TestCase):
3432
 
            def test_me(self):
3433
 
                # The first call save the 42 value
3434
 
                self.overrideEnv('MYVAR', None)
3435
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3436
 
                # Make sure we can call it twice
3437
 
                self.overrideEnv('MYVAR', None)
3438
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3439
 
        output = StringIO()
3440
 
        result = tests.TextTestResult(output, 0, 1)
3441
 
        Test('test_me').run(result)
3442
 
        if not result.wasStrictlySuccessful():
3443
 
            self.fail(output.getvalue())
3444
 
        # We get our value back
3445
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3446
 
 
3447
 
 
3448
 
class TestIsolatedEnv(tests.TestCase):
3449
 
    """Test isolating tests from os.environ.
3450
 
 
3451
 
    Since we use tests that are already isolated from os.environ a bit of care
3452
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3453
 
    The tests start an already clean os.environ which allow doing valid
3454
 
    assertions about which variables are present or not and design tests around
3455
 
    these assertions.
3456
 
    """
3457
 
 
3458
 
    class ScratchMonkey(tests.TestCase):
3459
 
 
3460
 
        def test_me(self):
3461
 
            pass
3462
 
 
3463
 
    def test_basics(self):
3464
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3465
 
        # for tests.TestCase.
3466
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3467
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3468
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3469
 
        self.assertFalse('BZR_HOME' in os.environ)
3470
 
        # Make sure we know the definition of LINES: part of os.environ for
3471
 
        # tests.TestCase
3472
 
        self.assertTrue('LINES' in tests.isolated_environ)
3473
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3474
 
        self.assertEquals('25', os.environ['LINES'])
3475
 
 
3476
 
    def test_injecting_unknown_variable(self):
3477
 
        # BZR_HOME is known to be absent from os.environ
3478
 
        test = self.ScratchMonkey('test_me')
3479
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3480
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3481
 
        tests.restore_os_environ(test)
3482
 
        self.assertFalse('BZR_HOME' in os.environ)
3483
 
 
3484
 
    def test_injecting_known_variable(self):
3485
 
        test = self.ScratchMonkey('test_me')
3486
 
        # LINES is known to be present in os.environ
3487
 
        tests.override_os_environ(test, {'LINES': '42'})
3488
 
        self.assertEquals('42', os.environ['LINES'])
3489
 
        tests.restore_os_environ(test)
3490
 
        self.assertEquals('25', os.environ['LINES'])
3491
 
 
3492
 
    def test_deleting_variable(self):
3493
 
        test = self.ScratchMonkey('test_me')
3494
 
        # LINES is known to be present in os.environ
3495
 
        tests.override_os_environ(test, {'LINES': None})
3496
 
        self.assertTrue('LINES' not in os.environ)
3497
 
        tests.restore_os_environ(test)
3498
 
        self.assertEquals('25', os.environ['LINES'])
3499
 
 
3500
 
 
3501
 
class TestDocTestSuiteIsolation(tests.TestCase):
3502
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3503
 
 
3504
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3505
 
    the clean environment as a base for testing. To precisely capture the
3506
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3507
 
    compare against.
3508
 
 
3509
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3510
 
    not `os.environ` so each test overrides it to suit its needs.
3511
 
 
3512
 
    """
3513
 
 
3514
 
    def get_doctest_suite_for_string(self, klass, string):
3515
 
        class Finder(doctest.DocTestFinder):
3516
 
 
3517
 
            def find(*args, **kwargs):
3518
 
                test = doctest.DocTestParser().get_doctest(
3519
 
                    string, {}, 'foo', 'foo.py', 0)
3520
 
                return [test]
3521
 
 
3522
 
        suite = klass(test_finder=Finder())
3523
 
        return suite
3524
 
 
3525
 
    def run_doctest_suite_for_string(self, klass, string):
3526
 
        suite = self.get_doctest_suite_for_string(klass, string)
3527
 
        output = StringIO()
3528
 
        result = tests.TextTestResult(output, 0, 1)
3529
 
        suite.run(result)
3530
 
        return result, output
3531
 
 
3532
 
    def assertDocTestStringSucceds(self, klass, string):
3533
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3534
 
        if not result.wasStrictlySuccessful():
3535
 
            self.fail(output.getvalue())
3536
 
 
3537
 
    def assertDocTestStringFails(self, klass, string):
3538
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3539
 
        if result.wasStrictlySuccessful():
3540
 
            self.fail(output.getvalue())
3541
 
 
3542
 
    def test_injected_variable(self):
3543
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3544
 
        test = """
3545
 
            >>> import os
3546
 
            >>> os.environ['LINES']
3547
 
            '42'
3548
 
            """
3549
 
        # doctest.DocTestSuite fails as it sees '25'
3550
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3551
 
        # tests.DocTestSuite sees '42'
3552
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3553
 
 
3554
 
    def test_deleted_variable(self):
3555
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3556
 
        test = """
3557
 
            >>> import os
3558
 
            >>> os.environ.get('LINES')
3559
 
            """
3560
 
        # doctest.DocTestSuite fails as it sees '25'
3561
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3562
 
        # tests.DocTestSuite sees None
3563
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3564
 
 
3565
 
 
3566
 
class TestSelftestExcludePatterns(tests.TestCase):
3567
 
 
3568
 
    def setUp(self):
3569
 
        super(TestSelftestExcludePatterns, self).setUp()
3570
 
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
3571
 
 
3572
 
    def suite_factory(self, keep_only=None, starting_with=None):
3573
 
        """A test suite factory with only a few tests."""
3574
 
        class Test(tests.TestCase):
3575
 
            def id(self):
3576
 
                # We don't need the full class path
3577
 
                return self._testMethodName
3578
 
            def a(self):
3579
 
                pass
3580
 
            def b(self):
3581
 
                pass
3582
 
            def c(self):
3583
 
                pass
3584
 
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3585
 
 
3586
 
    def assertTestList(self, expected, *selftest_args):
3587
 
        # We rely on setUp installing the right test suite factory so we can
3588
 
        # test at the command level without loading the whole test suite
3589
 
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3590
 
        actual = out.splitlines()
3591
 
        self.assertEquals(expected, actual)
3592
 
 
3593
 
    def test_full_list(self):
3594
 
        self.assertTestList(['a', 'b', 'c'])
3595
 
 
3596
 
    def test_single_exclude(self):
3597
 
        self.assertTestList(['b', 'c'], '-x', 'a')
3598
 
 
3599
 
    def test_mutiple_excludes(self):
3600
 
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3601
 
 
3602
 
 
3603
 
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3604
 
 
3605
 
    _test_needs_features = [features.subunit]
3606
 
 
3607
 
    def setUp(self):
3608
 
        super(TestCounterHooks, self).setUp()
3609
 
        class Test(tests.TestCase):
3610
 
 
3611
 
            def setUp(self):
3612
 
                super(Test, self).setUp()
3613
 
                self.hooks = hooks.Hooks()
3614
 
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3615
 
                self.install_counter_hook(self.hooks, 'myhook')
3616
 
 
3617
 
            def no_hook(self):
3618
 
                pass
3619
 
 
3620
 
            def run_hook_once(self):
3621
 
                for hook in self.hooks['myhook']:
3622
 
                    hook(self)
3623
 
 
3624
 
        self.test_class = Test
3625
 
 
3626
 
    def assertHookCalls(self, expected_calls, test_name):
3627
 
        test = self.test_class(test_name)
3628
 
        result = unittest.TestResult()
3629
 
        test.run(result)
3630
 
        self.assertTrue(hasattr(test, '_counters'))
3631
 
        self.assertTrue(test._counters.has_key('myhook'))
3632
 
        self.assertEquals(expected_calls, test._counters['myhook'])
3633
 
 
3634
 
    def test_no_hook(self):
3635
 
        self.assertHookCalls(0, 'no_hook')
3636
 
 
3637
 
    def test_run_hook_once(self):
3638
 
        tt = features.testtools
3639
 
        if tt.module.__version__ < (0, 9, 8):
3640
 
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3641
 
        self.assertHookCalls(1, 'run_hook_once')