~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Patch Queue Manager
  • Date: 2012-03-28 16:13:49 UTC
  • mfrom: (6499.2.3 948339-config-caching)
  • Revision ID: pqm@pqm.ubuntu.com-20120328161349-2gsc0g11fcu43hlc
(vila) Properly share mutable config sections and save the branch config
 only during the final unlock (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import gc
 
21
import doctest
21
22
import os
22
23
import signal
23
24
import sys
 
25
import threading
24
26
import time
25
27
import unittest
26
28
import warnings
27
29
 
28
 
from testtools import MultiTestResult
 
30
from testtools import (
 
31
    ExtendedToOriginalDecorator,
 
32
    MultiTestResult,
 
33
    )
 
34
from testtools.content import Content
29
35
from testtools.content_type import ContentType
30
36
from testtools.matchers import (
31
37
    DocTestMatches,
32
38
    Equals,
33
39
    )
34
 
import testtools.tests.helpers
 
40
import testtools.testresult.doubles
35
41
 
36
42
import bzrlib
37
43
from bzrlib import (
38
44
    branchbuilder,
39
45
    bzrdir,
40
 
    debug,
 
46
    controldir,
41
47
    errors,
 
48
    hooks,
42
49
    lockdir,
43
50
    memorytree,
44
51
    osutils,
45
 
    progress,
46
52
    remote,
47
53
    repository,
48
54
    symbol_versioning,
49
55
    tests,
50
56
    transport,
51
57
    workingtree,
 
58
    workingtree_3,
 
59
    workingtree_4,
52
60
    )
53
61
from bzrlib.repofmt import (
54
62
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
63
    )
58
64
from bzrlib.symbol_versioning import (
59
65
    deprecated_function,
63
69
from bzrlib.tests import (
64
70
    features,
65
71
    test_lsprof,
66
 
    test_sftp_transport,
 
72
    test_server,
67
73
    TestUtil,
68
74
    )
69
 
from bzrlib.trace import note
70
 
from bzrlib.transport.memory import MemoryServer, MemoryTransport
71
 
from bzrlib.version import _get_bzr_source_tree
 
75
from bzrlib.trace import note, mutter
 
76
from bzrlib.transport import memory
72
77
 
73
78
 
74
79
def _test_ids(test_suite):
76
81
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
77
82
 
78
83
 
79
 
class SelftestTests(tests.TestCase):
80
 
 
81
 
    def test_import_tests(self):
82
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
83
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
84
 
 
85
 
    def test_import_test_failure(self):
86
 
        self.assertRaises(ImportError,
87
 
                          TestUtil._load_module_by_name,
88
 
                          'bzrlib.no-name-yet')
89
 
 
90
 
 
91
84
class MetaTestLog(tests.TestCase):
92
85
 
93
86
    def test_logging(self):
99
92
            "text", "plain", {"charset": "utf8"})))
100
93
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
101
94
        self.assertThat(self.get_log(),
102
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
103
 
 
104
 
 
105
 
class TestUnicodeFilename(tests.TestCase):
106
 
 
107
 
    def test_probe_passes(self):
108
 
        """UnicodeFilename._probe passes."""
109
 
        # We can't test much more than that because the behaviour depends
110
 
        # on the platform.
111
 
        tests.UnicodeFilename._probe()
 
95
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
112
96
 
113
97
 
114
98
class TestTreeShape(tests.TestCaseInTempDir):
115
99
 
116
100
    def test_unicode_paths(self):
117
 
        self.requireFeature(tests.UnicodeFilename)
 
101
        self.requireFeature(features.UnicodeFilenameFeature)
118
102
 
119
103
        filename = u'hell\u00d8'
120
104
        self.build_tree_contents([(filename, 'contents of hello')])
121
 
        self.failUnlessExists(filename)
 
105
        self.assertPathExists(filename)
 
106
 
 
107
 
 
108
class TestClassesAvailable(tests.TestCase):
 
109
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
110
 
 
111
    def test_test_case(self):
 
112
        from bzrlib.tests import TestCase
 
113
 
 
114
    def test_test_loader(self):
 
115
        from bzrlib.tests import TestLoader
 
116
 
 
117
    def test_test_suite(self):
 
118
        from bzrlib.tests import TestSuite
122
119
 
123
120
 
124
121
class TestTransportScenarios(tests.TestCase):
207
204
    def test_scenarios(self):
208
205
        # check that constructor parameters are passed through to the adapted
209
206
        # test.
210
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
207
        from bzrlib.tests.per_controldir import make_scenarios
211
208
        vfs_factory = "v"
212
209
        server1 = "a"
213
210
        server2 = "b"
311
308
        from bzrlib.tests.per_interrepository import make_scenarios
312
309
        server1 = "a"
313
310
        server2 = "b"
314
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
311
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
315
312
        scenarios = make_scenarios(server1, server2, formats)
316
313
        self.assertEqual([
317
314
            ('C0,str,str',
318
315
             {'repository_format': 'C1',
319
316
              'repository_format_to': 'C2',
320
317
              'transport_readonly_server': 'b',
321
 
              'transport_server': 'a'}),
 
318
              'transport_server': 'a',
 
319
              'extra_setup': 'C3'}),
322
320
            ('D0,str,str',
323
321
             {'repository_format': 'D1',
324
322
              'repository_format_to': 'D2',
325
323
              'transport_readonly_server': 'b',
326
 
              'transport_server': 'a'})],
 
324
              'transport_server': 'a',
 
325
              'extra_setup': 'D3'})],
327
326
            scenarios)
328
327
 
329
328
 
335
334
        from bzrlib.tests.per_workingtree import make_scenarios
336
335
        server1 = "a"
337
336
        server2 = "b"
338
 
        formats = [workingtree.WorkingTreeFormat2(),
339
 
                   workingtree.WorkingTreeFormat3(),]
 
337
        formats = [workingtree_4.WorkingTreeFormat4(),
 
338
                   workingtree_3.WorkingTreeFormat3(),]
340
339
        scenarios = make_scenarios(server1, server2, formats)
341
340
        self.assertEqual([
342
 
            ('WorkingTreeFormat2',
 
341
            ('WorkingTreeFormat4',
343
342
             {'bzrdir_format': formats[0]._matchingbzrdir,
344
343
              'transport_readonly_server': 'b',
345
344
              'transport_server': 'a',
372
371
            )
373
372
        server1 = "a"
374
373
        server2 = "b"
375
 
        formats = [workingtree.WorkingTreeFormat2(),
376
 
                   workingtree.WorkingTreeFormat3(),]
 
374
        formats = [workingtree_4.WorkingTreeFormat4(),
 
375
                   workingtree_3.WorkingTreeFormat3(),]
377
376
        scenarios = make_scenarios(server1, server2, formats)
378
377
        self.assertEqual(7, len(scenarios))
379
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
380
 
        wt4_format = workingtree.WorkingTreeFormat4()
381
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
378
        default_wt_format = workingtree.format_registry.get_default()
 
379
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
380
        wt5_format = workingtree_4.WorkingTreeFormat5()
382
381
        expected_scenarios = [
383
 
            ('WorkingTreeFormat2',
 
382
            ('WorkingTreeFormat4',
384
383
             {'bzrdir_format': formats[0]._matchingbzrdir,
385
384
              'transport_readonly_server': 'b',
386
385
              'transport_server': 'a',
446
445
        # ones to add.
447
446
        from bzrlib.tests.per_tree import (
448
447
            return_parameter,
449
 
            revision_tree_from_workingtree
450
448
            )
451
449
        from bzrlib.tests.per_intertree import (
452
450
            make_scenarios,
453
451
            )
454
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
452
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
453
        from bzrlib.workingtree_4 import WorkingTreeFormat4
455
454
        input_test = TestInterTreeScenarios(
456
455
            "test_scenarios")
457
456
        server1 = "a"
458
457
        server2 = "b"
459
 
        format1 = WorkingTreeFormat2()
 
458
        format1 = WorkingTreeFormat4()
460
459
        format2 = WorkingTreeFormat3()
461
460
        formats = [("1", str, format1, format2, "converter1"),
462
461
            ("2", int, format2, format1, "converter2")]
508
507
        self.assertRaises(AssertionError, self.assertEqualStat,
509
508
            os.lstat("foo"), os.lstat("longname"))
510
509
 
 
510
    def test_failUnlessExists(self):
 
511
        """Deprecated failUnlessExists and failIfExists"""
 
512
        self.applyDeprecated(
 
513
            deprecated_in((2, 4)),
 
514
            self.failUnlessExists, '.')
 
515
        self.build_tree(['foo/', 'foo/bar'])
 
516
        self.applyDeprecated(
 
517
            deprecated_in((2, 4)),
 
518
            self.failUnlessExists, 'foo/bar')
 
519
        self.applyDeprecated(
 
520
            deprecated_in((2, 4)),
 
521
            self.failIfExists, 'foo/foo')
 
522
 
 
523
    def test_assertPathExists(self):
 
524
        self.assertPathExists('.')
 
525
        self.build_tree(['foo/', 'foo/bar'])
 
526
        self.assertPathExists('foo/bar')
 
527
        self.assertPathDoesNotExist('foo/foo')
 
528
 
511
529
 
512
530
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
513
531
 
547
565
        tree = self.make_branch_and_memory_tree('dir')
548
566
        # Guard against regression into MemoryTransport leaking
549
567
        # files to disk instead of keeping them in memory.
550
 
        self.failIf(osutils.lexists('dir'))
 
568
        self.assertFalse(osutils.lexists('dir'))
551
569
        self.assertIsInstance(tree, memorytree.MemoryTree)
552
570
 
553
571
    def test_make_branch_and_memory_tree_with_format(self):
554
572
        """make_branch_and_memory_tree should accept a format option."""
555
573
        format = bzrdir.BzrDirMetaFormat1()
556
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
574
        format.repository_format = repository.format_registry.get_default()
557
575
        tree = self.make_branch_and_memory_tree('dir', format=format)
558
576
        # Guard against regression into MemoryTransport leaking
559
577
        # files to disk instead of keeping them in memory.
560
 
        self.failIf(osutils.lexists('dir'))
 
578
        self.assertFalse(osutils.lexists('dir'))
561
579
        self.assertIsInstance(tree, memorytree.MemoryTree)
562
580
        self.assertEqual(format.repository_format.__class__,
563
581
            tree.branch.repository._format.__class__)
567
585
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
568
586
        # Guard against regression into MemoryTransport leaking
569
587
        # files to disk instead of keeping them in memory.
570
 
        self.failIf(osutils.lexists('dir'))
 
588
        self.assertFalse(osutils.lexists('dir'))
571
589
 
572
590
    def test_make_branch_builder_with_format(self):
573
591
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
574
592
        # that the format objects are used.
575
593
        format = bzrdir.BzrDirMetaFormat1()
576
 
        repo_format = weaverepo.RepositoryFormat7()
 
594
        repo_format = repository.format_registry.get_default()
577
595
        format.repository_format = repo_format
578
596
        builder = self.make_branch_builder('dir', format=format)
579
597
        the_branch = builder.get_branch()
580
598
        # Guard against regression into MemoryTransport leaking
581
599
        # files to disk instead of keeping them in memory.
582
 
        self.failIf(osutils.lexists('dir'))
 
600
        self.assertFalse(osutils.lexists('dir'))
583
601
        self.assertEqual(format.repository_format.__class__,
584
602
                         the_branch.repository._format.__class__)
585
603
        self.assertEqual(repo_format.get_format_string(),
591
609
        the_branch = builder.get_branch()
592
610
        # Guard against regression into MemoryTransport leaking
593
611
        # files to disk instead of keeping them in memory.
594
 
        self.failIf(osutils.lexists('dir'))
595
 
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
612
        self.assertFalse(osutils.lexists('dir'))
 
613
        dir_format = controldir.format_registry.make_bzrdir('knit')
596
614
        self.assertEqual(dir_format.repository_format.__class__,
597
615
                         the_branch.repository._format.__class__)
598
616
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
602
620
    def test_dangling_locks_cause_failures(self):
603
621
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
604
622
            def test_function(self):
605
 
                t = self.get_transport('.')
 
623
                t = self.get_transport_from_path('.')
606
624
                l = lockdir.LockDir(t, 'lock')
607
625
                l.create()
608
626
                l.attempt_lock()
609
627
        test = TestDanglingLock('test_function')
610
628
        result = test.run()
 
629
        total_failures = result.errors + result.failures
611
630
        if self._lock_check_thorough:
612
 
            self.assertEqual(1, len(result.errors))
 
631
            self.assertEqual(1, len(total_failures))
613
632
        else:
614
633
            # When _lock_check_thorough is disabled, then we don't trigger a
615
634
            # failure
616
 
            self.assertEqual(0, len(result.errors))
 
635
            self.assertEqual(0, len(total_failures))
617
636
 
618
637
 
619
638
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
620
639
    """Tests for the convenience functions TestCaseWithTransport introduces."""
621
640
 
622
641
    def test_get_readonly_url_none(self):
623
 
        from bzrlib.transport import get_transport
624
 
        from bzrlib.transport.memory import MemoryServer
625
642
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
 
        self.vfs_transport_factory = MemoryServer
 
643
        self.vfs_transport_factory = memory.MemoryServer
627
644
        self.transport_readonly_server = None
628
645
        # calling get_readonly_transport() constructs a decorator on the url
629
646
        # for the server
630
647
        url = self.get_readonly_url()
631
648
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
634
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
649
        t = transport.get_transport_from_url(url)
 
650
        t2 = transport.get_transport_from_url(url2)
 
651
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
652
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
653
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
654
 
638
655
    def test_get_readonly_url_http(self):
639
656
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
 
        from bzrlib.transport.local import LocalURLServer
642
657
        from bzrlib.transport.http import HttpTransportBase
643
 
        self.transport_server = LocalURLServer
 
658
        self.transport_server = test_server.LocalURLServer
644
659
        self.transport_readonly_server = HttpServer
645
660
        # calling get_readonly_transport() gives us a HTTP server instance.
646
661
        url = self.get_readonly_url()
647
662
        url2 = self.get_readonly_url('foo/bar')
648
663
        # the transport returned may be any HttpTransportBase subclass
649
 
        t = get_transport(url)
650
 
        t2 = get_transport(url2)
651
 
        self.failUnless(isinstance(t, HttpTransportBase))
652
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
664
        t = transport.get_transport_from_url(url)
 
665
        t2 = transport.get_transport_from_url(url2)
 
666
        self.assertIsInstance(t, HttpTransportBase)
 
667
        self.assertIsInstance(t2, HttpTransportBase)
653
668
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
669
 
655
670
    def test_is_directory(self):
663
678
    def test_make_branch_builder(self):
664
679
        builder = self.make_branch_builder('dir')
665
680
        rev_id = builder.build_commit()
666
 
        self.failUnlessExists('dir')
667
 
        a_dir = bzrdir.BzrDir.open('dir')
 
681
        self.assertPathExists('dir')
 
682
        a_dir = controldir.ControlDir.open('dir')
668
683
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
669
684
        a_branch = a_dir.open_branch()
670
685
        builder_branch = builder.get_branch()
677
692
 
678
693
    def setUp(self):
679
694
        super(TestTestCaseTransports, self).setUp()
680
 
        self.vfs_transport_factory = MemoryServer
 
695
        self.vfs_transport_factory = memory.MemoryServer
681
696
 
682
697
    def test_make_bzrdir_preserves_transport(self):
683
698
        t = self.get_transport()
684
699
        result_bzrdir = self.make_bzrdir('subdir')
685
700
        self.assertIsInstance(result_bzrdir.transport,
686
 
                              MemoryTransport)
 
701
                              memory.MemoryTransport)
687
702
        # should not be on disk, should only be in memory
688
 
        self.failIfExists('subdir')
 
703
        self.assertPathDoesNotExist('subdir')
689
704
 
690
705
 
691
706
class TestChrootedTest(tests.ChrootedTestCase):
692
707
 
693
708
    def test_root_is_root(self):
694
 
        from bzrlib.transport import get_transport
695
 
        t = get_transport(self.get_readonly_url())
 
709
        t = transport.get_transport_from_url(self.get_readonly_url())
696
710
        url = t.base
697
711
        self.assertEqual(url, t.clone('..').base)
698
712
 
700
714
class TestProfileResult(tests.TestCase):
701
715
 
702
716
    def test_profiles_tests(self):
703
 
        self.requireFeature(test_lsprof.LSProfFeature)
704
 
        terminal = testtools.tests.helpers.ExtendedTestResult()
 
717
        self.requireFeature(features.lsprof_feature)
 
718
        terminal = testtools.testresult.doubles.ExtendedTestResult()
705
719
        result = tests.ProfileResult(terminal)
706
720
        class Sample(tests.TestCase):
707
721
            def a(self):
724
738
                descriptions=0,
725
739
                verbosity=1,
726
740
                )
727
 
        capture = testtools.tests.helpers.ExtendedTestResult()
 
741
        capture = testtools.testresult.doubles.ExtendedTestResult()
728
742
        test_case.run(MultiTestResult(result, capture))
729
743
        run_case = capture._events[0][1]
730
744
        timed_string = result._testTimeString(run_case)
751
765
        self.check_timing(ShortDelayTestCase('test_short_delay'),
752
766
                          r"^ +[0-9]+ms$")
753
767
 
754
 
    def _patch_get_bzr_source_tree(self):
755
 
        # Reading from the actual source tree breaks isolation, but we don't
756
 
        # want to assume that thats *all* that would happen.
757
 
        def _get_bzr_source_tree():
758
 
            return None
759
 
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
760
 
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
761
 
        def restore():
762
 
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
763
 
        self.addCleanup(restore)
764
 
 
765
 
    def test_assigned_benchmark_file_stores_date(self):
766
 
        self._patch_get_bzr_source_tree()
767
 
        output = StringIO()
768
 
        result = bzrlib.tests.TextTestResult(self._log_file,
769
 
                                        descriptions=0,
770
 
                                        verbosity=1,
771
 
                                        bench_history=output
772
 
                                        )
773
 
        output_string = output.getvalue()
774
 
        # if you are wondering about the regexp please read the comment in
775
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
776
 
        # XXX: what comment?  -- Andrew Bennetts
777
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
778
 
 
779
 
    def test_benchhistory_records_test_times(self):
780
 
        self._patch_get_bzr_source_tree()
781
 
        result_stream = StringIO()
782
 
        result = bzrlib.tests.TextTestResult(
783
 
            self._log_file,
784
 
            descriptions=0,
785
 
            verbosity=1,
786
 
            bench_history=result_stream
787
 
            )
788
 
 
789
 
        # we want profile a call and check that its test duration is recorded
790
 
        # make a new test instance that when run will generate a benchmark
791
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
792
 
        # execute the test, which should succeed and record times
793
 
        example_test_case.run(result)
794
 
        lines = result_stream.getvalue().splitlines()
795
 
        self.assertEqual(2, len(lines))
796
 
        self.assertContainsRe(lines[1],
797
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
798
 
            "._time_hello_world_encoding")
799
 
 
800
768
    def _time_hello_world_encoding(self):
801
769
        """Profile two sleep calls
802
770
 
807
775
 
808
776
    def test_lsprofiling(self):
809
777
        """Verbose test result prints lsprof statistics from test cases."""
810
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
778
        self.requireFeature(features.lsprof_feature)
811
779
        result_stream = StringIO()
812
780
        result = bzrlib.tests.VerboseTestResult(
813
 
            unittest._WritelnDecorator(result_stream),
 
781
            result_stream,
814
782
            descriptions=0,
815
783
            verbosity=2,
816
784
            )
842
810
        self.assertContainsRe(output,
843
811
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
844
812
 
 
813
    def test_uses_time_from_testtools(self):
 
814
        """Test case timings in verbose results should use testtools times"""
 
815
        import datetime
 
816
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
817
            def startTest(self, test):
 
818
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
819
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
820
            def addSuccess(self, test):
 
821
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
822
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
823
            def report_tests_starting(self): pass
 
824
        sio = StringIO()
 
825
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
826
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
827
 
845
828
    def test_known_failure(self):
846
 
        """A KnownFailure being raised should trigger several result actions."""
 
829
        """Using knownFailure should trigger several result actions."""
847
830
        class InstrumentedTestResult(tests.ExtendedTestResult):
848
831
            def stopTestRun(self): pass
849
 
            def startTests(self): pass
850
 
            def report_test_start(self, test): pass
 
832
            def report_tests_starting(self): pass
851
833
            def report_known_failure(self, test, err=None, details=None):
852
834
                self._call = test, 'known failure'
853
835
        result = InstrumentedTestResult(None, None, None, None)
854
836
        class Test(tests.TestCase):
855
837
            def test_function(self):
856
 
                raise tests.KnownFailure('failed!')
 
838
                self.knownFailure('failed!')
857
839
        test = Test("test_function")
858
840
        test.run(result)
859
841
        # it should invoke 'report_known_failure'.
871
853
        # verbose test output formatting
872
854
        result_stream = StringIO()
873
855
        result = bzrlib.tests.VerboseTestResult(
874
 
            unittest._WritelnDecorator(result_stream),
 
856
            result_stream,
875
857
            descriptions=0,
876
858
            verbosity=2,
877
859
            )
878
 
        test = self.get_passing_test()
879
 
        result.startTest(test)
880
 
        prefix = len(result_stream.getvalue())
881
 
        # the err parameter has the shape:
882
 
        # (class, exception object, traceback)
883
 
        # KnownFailures dont get their tracebacks shown though, so we
884
 
        # can skip that.
885
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
886
 
        result.report_known_failure(test, err)
887
 
        output = result_stream.getvalue()[prefix:]
888
 
        lines = output.splitlines()
889
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
890
 
        self.assertEqual(lines[1], '    foo')
891
 
        self.assertEqual(2, len(lines))
 
860
        _get_test("test_xfail").run(result)
 
861
        self.assertContainsRe(result_stream.getvalue(),
 
862
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
863
            "\\s*(?:Text attachment: )?reason"
 
864
            "(?:\n-+\n|: {{{)"
 
865
            "this_fails"
 
866
            "(?:\n-+\n|}}}\n)")
892
867
 
893
868
    def get_passing_test(self):
894
869
        """Return a test object that can't be run usefully."""
900
875
        """Test the behaviour of invoking addNotSupported."""
901
876
        class InstrumentedTestResult(tests.ExtendedTestResult):
902
877
            def stopTestRun(self): pass
903
 
            def startTests(self): pass
904
 
            def report_test_start(self, test): pass
 
878
            def report_tests_starting(self): pass
905
879
            def report_unsupported(self, test, feature):
906
880
                self._call = test, feature
907
881
        result = InstrumentedTestResult(None, None, None, None)
908
882
        test = SampleTestCase('_test_pass')
909
 
        feature = tests.Feature()
 
883
        feature = features.Feature()
910
884
        result.startTest(test)
911
885
        result.addNotSupported(test, feature)
912
886
        # it should invoke 'report_unsupported'.
926
900
        # verbose test output formatting
927
901
        result_stream = StringIO()
928
902
        result = bzrlib.tests.VerboseTestResult(
929
 
            unittest._WritelnDecorator(result_stream),
 
903
            result_stream,
930
904
            descriptions=0,
931
905
            verbosity=2,
932
906
            )
933
907
        test = self.get_passing_test()
934
 
        feature = tests.Feature()
 
908
        feature = features.Feature()
935
909
        result.startTest(test)
936
910
        prefix = len(result_stream.getvalue())
937
911
        result.report_unsupported(test, feature)
946
920
        """An UnavailableFeature being raised should invoke addNotSupported."""
947
921
        class InstrumentedTestResult(tests.ExtendedTestResult):
948
922
            def stopTestRun(self): pass
949
 
            def startTests(self): pass
950
 
            def report_test_start(self, test): pass
 
923
            def report_tests_starting(self): pass
951
924
            def addNotSupported(self, test, feature):
952
925
                self._call = test, feature
953
926
        result = InstrumentedTestResult(None, None, None, None)
954
 
        feature = tests.Feature()
 
927
        feature = features.Feature()
955
928
        class Test(tests.TestCase):
956
929
            def test_function(self):
957
930
                raise tests.UnavailableFeature(feature)
976
949
    def test_strict_with_known_failure(self):
977
950
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
978
951
                                             verbosity=1)
979
 
        test = self.get_passing_test()
980
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
981
 
        result.addExpectedFailure(test, err)
 
952
        test = _get_test("test_xfail")
 
953
        test.run(result)
982
954
        self.assertFalse(result.wasStrictlySuccessful())
983
955
        self.assertEqual(None, result._extractBenchmarkTime(test))
984
956
 
995
967
        class InstrumentedTestResult(tests.ExtendedTestResult):
996
968
            calls = 0
997
969
            def startTests(self): self.calls += 1
998
 
            def report_test_start(self, test): pass
999
970
        result = InstrumentedTestResult(None, None, None, None)
1000
971
        def test_function():
1001
972
            pass
1003
974
        test.run(result)
1004
975
        self.assertEquals(1, result.calls)
1005
976
 
1006
 
 
1007
 
class TestUnicodeFilenameFeature(tests.TestCase):
1008
 
 
1009
 
    def test_probe_passes(self):
1010
 
        """UnicodeFilenameFeature._probe passes."""
1011
 
        # We can't test much more than that because the behaviour depends
1012
 
        # on the platform.
1013
 
        tests.UnicodeFilenameFeature._probe()
 
977
    def test_startTests_only_once(self):
 
978
        """With multiple tests startTests should still only be called once"""
 
979
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
980
            calls = 0
 
981
            def startTests(self): self.calls += 1
 
982
        result = InstrumentedTestResult(None, None, None, None)
 
983
        suite = unittest.TestSuite([
 
984
            unittest.FunctionTestCase(lambda: None),
 
985
            unittest.FunctionTestCase(lambda: None)])
 
986
        suite.run(result)
 
987
        self.assertEquals(1, result.calls)
 
988
        self.assertEquals(2, result.count)
1014
989
 
1015
990
 
1016
991
class TestRunner(tests.TestCase):
1029
1004
        because of our use of global state.
1030
1005
        """
1031
1006
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1032
 
        old_leak = tests.TestCase._first_thread_leaker_id
1033
1007
        try:
1034
1008
            tests.TestCaseInTempDir.TEST_ROOT = None
1035
 
            tests.TestCase._first_thread_leaker_id = None
1036
1009
            return testrunner.run(test)
1037
1010
        finally:
1038
1011
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1039
 
            tests.TestCase._first_thread_leaker_id = old_leak
1040
1012
 
1041
1013
    def test_known_failure_failed_run(self):
1042
1014
        # run a test that generates a known failure which should be printed in
1047
1019
        test = unittest.TestSuite()
1048
1020
        test.addTest(Test("known_failure_test"))
1049
1021
        def failing_test():
1050
 
            self.fail('foo')
 
1022
            raise AssertionError('foo')
1051
1023
        test.addTest(unittest.FunctionTestCase(failing_test))
1052
1024
        stream = StringIO()
1053
1025
        runner = tests.TextTestRunner(stream=stream)
1057
1029
            '(?sm)^bzr selftest.*$'
1058
1030
            '.*'
1059
1031
            '^======================================================================\n'
1060
 
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1032
            '^FAIL: failing_test\n'
1061
1033
            '^----------------------------------------------------------------------\n'
1062
1034
            'Traceback \\(most recent call last\\):\n'
1063
1035
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1064
 
            '    self.fail\\(\'foo\'\\)\n'
 
1036
            '    raise AssertionError\\(\'foo\'\\)\n'
1065
1037
            '.*'
1066
1038
            '^----------------------------------------------------------------------\n'
1067
1039
            '.*'
1073
1045
        # the final output.
1074
1046
        class Test(tests.TestCase):
1075
1047
            def known_failure_test(self):
1076
 
                self.expectFailure('failed', self.assertTrue, False)
 
1048
                self.knownFailure("Never works...")
1077
1049
        test = Test("known_failure_test")
1078
1050
        stream = StringIO()
1079
1051
        runner = tests.TextTestRunner(stream=stream)
1085
1057
            '\n'
1086
1058
            'OK \\(known_failures=1\\)\n')
1087
1059
 
 
1060
    def test_unexpected_success_bad(self):
 
1061
        class Test(tests.TestCase):
 
1062
            def test_truth(self):
 
1063
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1064
        runner = tests.TextTestRunner(stream=StringIO())
 
1065
        result = self.run_test_runner(runner, Test("test_truth"))
 
1066
        self.assertContainsRe(runner.stream.getvalue(),
 
1067
            "=+\n"
 
1068
            "FAIL: \\S+\.test_truth\n"
 
1069
            "-+\n"
 
1070
            "(?:.*\n)*"
 
1071
            "\\s*(?:Text attachment: )?reason"
 
1072
            "(?:\n-+\n|: {{{)"
 
1073
            "No absolute truth"
 
1074
            "(?:\n-+\n|}}}\n)"
 
1075
            "(?:.*\n)*"
 
1076
            "-+\n"
 
1077
            "Ran 1 test in .*\n"
 
1078
            "\n"
 
1079
            "FAILED \\(failures=1\\)\n\\Z")
 
1080
 
1088
1081
    def test_result_decorator(self):
1089
1082
        # decorate results
1090
1083
        calls = []
1091
 
        class LoggingDecorator(tests.ForwardingResult):
 
1084
        class LoggingDecorator(ExtendedToOriginalDecorator):
1092
1085
            def startTest(self, test):
1093
 
                tests.ForwardingResult.startTest(self, test)
 
1086
                ExtendedToOriginalDecorator.startTest(self, test)
1094
1087
                calls.append('start')
1095
1088
        test = unittest.FunctionTestCase(lambda:None)
1096
1089
        stream = StringIO()
1174
1167
 
1175
1168
    def test_unsupported_features_listed(self):
1176
1169
        """When unsupported features are encountered they are detailed."""
1177
 
        class Feature1(tests.Feature):
 
1170
        class Feature1(features.Feature):
1178
1171
            def _probe(self): return False
1179
 
        class Feature2(tests.Feature):
 
1172
        class Feature2(features.Feature):
1180
1173
            def _probe(self): return False
1181
1174
        # create sample tests
1182
1175
        test1 = SampleTestCase('_test_pass')
1197
1190
            ],
1198
1191
            lines[-3:])
1199
1192
 
1200
 
    def _patch_get_bzr_source_tree(self):
1201
 
        # Reading from the actual source tree breaks isolation, but we don't
1202
 
        # want to assume that thats *all* that would happen.
1203
 
        self._get_source_tree_calls = []
1204
 
        def _get_bzr_source_tree():
1205
 
            self._get_source_tree_calls.append("called")
1206
 
            return None
1207
 
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
1208
 
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
1209
 
        def restore():
1210
 
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
1211
 
        self.addCleanup(restore)
1212
 
 
1213
 
    def test_bench_history(self):
1214
 
        # tests that the running the benchmark passes bench_history into
1215
 
        # the test result object. We can tell that happens if
1216
 
        # _get_bzr_source_tree is called.
1217
 
        self._patch_get_bzr_source_tree()
1218
 
        test = TestRunner('dummy_test')
1219
 
        output = StringIO()
1220
 
        runner = tests.TextTestRunner(stream=self._log_file,
1221
 
                                      bench_history=output)
1222
 
        result = self.run_test_runner(runner, test)
1223
 
        output_string = output.getvalue()
1224
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1225
 
        self.assertLength(1, self._get_source_tree_calls)
 
1193
    def test_verbose_test_count(self):
 
1194
        """A verbose test run reports the right test count at the start"""
 
1195
        suite = TestUtil.TestSuite([
 
1196
            unittest.FunctionTestCase(lambda:None),
 
1197
            unittest.FunctionTestCase(lambda:None)])
 
1198
        self.assertEqual(suite.countTestCases(), 2)
 
1199
        stream = StringIO()
 
1200
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1201
        # Need to use the CountingDecorator as that's what sets num_tests
 
1202
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1203
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1226
1204
 
1227
1205
    def test_startTestRun(self):
1228
1206
        """run should call result.startTestRun()"""
1229
1207
        calls = []
1230
 
        class LoggingDecorator(tests.ForwardingResult):
 
1208
        class LoggingDecorator(ExtendedToOriginalDecorator):
1231
1209
            def startTestRun(self):
1232
 
                tests.ForwardingResult.startTestRun(self)
 
1210
                ExtendedToOriginalDecorator.startTestRun(self)
1233
1211
                calls.append('startTestRun')
1234
1212
        test = unittest.FunctionTestCase(lambda:None)
1235
1213
        stream = StringIO()
1241
1219
    def test_stopTestRun(self):
1242
1220
        """run should call result.stopTestRun()"""
1243
1221
        calls = []
1244
 
        class LoggingDecorator(tests.ForwardingResult):
 
1222
        class LoggingDecorator(ExtendedToOriginalDecorator):
1245
1223
            def stopTestRun(self):
1246
 
                tests.ForwardingResult.stopTestRun(self)
 
1224
                ExtendedToOriginalDecorator.stopTestRun(self)
1247
1225
                calls.append('stopTestRun')
1248
1226
        test = unittest.FunctionTestCase(lambda:None)
1249
1227
        stream = StringIO()
1252
1230
        result = self.run_test_runner(runner, test)
1253
1231
        self.assertLength(1, calls)
1254
1232
 
 
1233
    def test_unicode_test_output_on_ascii_stream(self):
 
1234
        """Showing results should always succeed even on an ascii console"""
 
1235
        class FailureWithUnicode(tests.TestCase):
 
1236
            def test_log_unicode(self):
 
1237
                self.log(u"\u2606")
 
1238
                self.fail("Now print that log!")
 
1239
        out = StringIO()
 
1240
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1241
            lambda trace=False: "ascii")
 
1242
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1243
            FailureWithUnicode("test_log_unicode"))
 
1244
        self.assertContainsRe(out.getvalue(),
 
1245
            "(?:Text attachment: )?log"
 
1246
            "(?:\n-+\n|: {{{)"
 
1247
            "\d+\.\d+  \\\\u2606"
 
1248
            "(?:\n-+\n|}}}\n)")
 
1249
 
1255
1250
 
1256
1251
class SampleTestCase(tests.TestCase):
1257
1252
 
1322
1317
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1323
1318
 
1324
1319
    def change_selftest_debug_flags(self, new_flags):
1325
 
        orig_selftest_flags = tests.selftest_debug_flags
1326
 
        self.addCleanup(self._restore_selftest_debug_flags, orig_selftest_flags)
1327
 
        tests.selftest_debug_flags = set(new_flags)
1328
 
 
1329
 
    def _restore_selftest_debug_flags(self, flags):
1330
 
        tests.selftest_debug_flags = flags
 
1320
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1331
1321
 
1332
1322
    def test_allow_debug_flag(self):
1333
1323
        """The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
1437
1427
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1438
1428
        output_stream = StringIO()
1439
1429
        result = bzrlib.tests.VerboseTestResult(
1440
 
            unittest._WritelnDecorator(output_stream),
 
1430
            output_stream,
1441
1431
            descriptions=0,
1442
1432
            verbosity=2)
1443
1433
        sample_test.run(result)
1450
1440
        # Note this test won't fail with hooks that the core library doesn't
1451
1441
        # use - but it trigger with a plugin that adds hooks, so its still a
1452
1442
        # useful warning in that case.
1453
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1454
 
            bzrlib.branch.Branch.hooks)
1455
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1443
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1444
        self.assertEqual(
 
1445
            bzrlib.smart.server.SmartServerHooks(),
1456
1446
            bzrlib.smart.server.SmartTCPServer.hooks)
1457
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1458
 
            bzrlib.commands.Command.hooks)
 
1447
        self.assertEqual(
 
1448
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1459
1449
 
1460
1450
    def test__gather_lsprof_in_benchmarks(self):
1461
1451
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1462
1452
 
1463
1453
        Each self.time() call is individually and separately profiled.
1464
1454
        """
1465
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1455
        self.requireFeature(features.lsprof_feature)
1466
1456
        # overrides the class member with an instance member so no cleanup
1467
1457
        # needed.
1468
1458
        self._gather_lsprof_in_benchmarks = True
1484
1474
        # permitted.
1485
1475
        # Manually set one up (TestCase doesn't and shouldn't provide magic
1486
1476
        # machinery)
1487
 
        transport_server = MemoryServer()
 
1477
        transport_server = memory.MemoryServer()
1488
1478
        transport_server.start_server()
1489
1479
        self.addCleanup(transport_server.stop_server)
1490
 
        t = transport.get_transport(transport_server.get_url())
1491
 
        bzrdir.BzrDir.create(t.base)
 
1480
        t = transport.get_transport_from_url(transport_server.get_url())
 
1481
        controldir.ControlDir.create(t.base)
1492
1482
        self.assertRaises(errors.BzrError,
1493
 
            bzrdir.BzrDir.open_from_transport, t)
 
1483
            controldir.ControlDir.open_from_transport, t)
1494
1484
        # But if we declare this as safe, we can open the bzrdir.
1495
1485
        self.permit_url(t.base)
1496
1486
        self._bzr_selftest_roots.append(t.base)
1497
 
        bzrdir.BzrDir.open_from_transport(t)
 
1487
        controldir.ControlDir.open_from_transport(t)
1498
1488
 
1499
1489
    def test_requireFeature_available(self):
1500
1490
        """self.requireFeature(available) is a no-op."""
1501
 
        class Available(tests.Feature):
 
1491
        class Available(features.Feature):
1502
1492
            def _probe(self):return True
1503
1493
        feature = Available()
1504
1494
        self.requireFeature(feature)
1505
1495
 
1506
1496
    def test_requireFeature_unavailable(self):
1507
1497
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1508
 
        class Unavailable(tests.Feature):
 
1498
        class Unavailable(features.Feature):
1509
1499
            def _probe(self):return False
1510
1500
        feature = Unavailable()
1511
1501
        self.assertRaises(tests.UnavailableFeature,
1572
1562
            result.calls)
1573
1563
 
1574
1564
    def test_start_server_registers_url(self):
1575
 
        transport_server = MemoryServer()
 
1565
        transport_server = memory.MemoryServer()
1576
1566
        # A little strict, but unlikely to be changed soon.
1577
1567
        self.assertEqual([], self._bzr_selftest_roots)
1578
1568
        self.start_server(transport_server)
1634
1624
        self.assertRaises(AssertionError,
1635
1625
            self.assertListRaises, _TestException, success_generator)
1636
1626
 
 
1627
    def test_overrideAttr_without_value(self):
 
1628
        self.test_attr = 'original' # Define a test attribute
 
1629
        obj = self # Make 'obj' visible to the embedded test
 
1630
        class Test(tests.TestCase):
 
1631
 
 
1632
            def setUp(self):
 
1633
                tests.TestCase.setUp(self)
 
1634
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1635
 
 
1636
            def test_value(self):
 
1637
                self.assertEqual('original', self.orig)
 
1638
                self.assertEqual('original', obj.test_attr)
 
1639
                obj.test_attr = 'modified'
 
1640
                self.assertEqual('modified', obj.test_attr)
 
1641
 
 
1642
        test = Test('test_value')
 
1643
        test.run(unittest.TestResult())
 
1644
        self.assertEqual('original', obj.test_attr)
 
1645
 
 
1646
    def test_overrideAttr_with_value(self):
 
1647
        self.test_attr = 'original' # Define a test attribute
 
1648
        obj = self # Make 'obj' visible to the embedded test
 
1649
        class Test(tests.TestCase):
 
1650
 
 
1651
            def setUp(self):
 
1652
                tests.TestCase.setUp(self)
 
1653
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1654
 
 
1655
            def test_value(self):
 
1656
                self.assertEqual('original', self.orig)
 
1657
                self.assertEqual('modified', obj.test_attr)
 
1658
 
 
1659
        test = Test('test_value')
 
1660
        test.run(unittest.TestResult())
 
1661
        self.assertEqual('original', obj.test_attr)
 
1662
 
 
1663
    def test_recordCalls(self):
 
1664
        from bzrlib.tests import test_selftest
 
1665
        calls = self.recordCalls(
 
1666
            test_selftest, '_add_numbers')
 
1667
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1668
            12)
 
1669
        self.assertEquals(calls, [((2, 10), {})])
 
1670
 
 
1671
 
 
1672
def _add_numbers(a, b):
 
1673
    return a + b
 
1674
 
 
1675
 
 
1676
class _MissingFeature(features.Feature):
 
1677
    def _probe(self):
 
1678
        return False
 
1679
missing_feature = _MissingFeature()
 
1680
 
 
1681
 
 
1682
def _get_test(name):
 
1683
    """Get an instance of a specific example test.
 
1684
 
 
1685
    We protect this in a function so that they don't auto-run in the test
 
1686
    suite.
 
1687
    """
 
1688
 
 
1689
    class ExampleTests(tests.TestCase):
 
1690
 
 
1691
        def test_fail(self):
 
1692
            mutter('this was a failing test')
 
1693
            self.fail('this test will fail')
 
1694
 
 
1695
        def test_error(self):
 
1696
            mutter('this test errored')
 
1697
            raise RuntimeError('gotcha')
 
1698
 
 
1699
        def test_missing_feature(self):
 
1700
            mutter('missing the feature')
 
1701
            self.requireFeature(missing_feature)
 
1702
 
 
1703
        def test_skip(self):
 
1704
            mutter('this test will be skipped')
 
1705
            raise tests.TestSkipped('reason')
 
1706
 
 
1707
        def test_success(self):
 
1708
            mutter('this test succeeds')
 
1709
 
 
1710
        def test_xfail(self):
 
1711
            mutter('test with expected failure')
 
1712
            self.knownFailure('this_fails')
 
1713
 
 
1714
        def test_unexpected_success(self):
 
1715
            mutter('test with unexpected success')
 
1716
            self.expectFailure('should_fail', lambda: None)
 
1717
 
 
1718
    return ExampleTests(name)
 
1719
 
 
1720
 
 
1721
class TestTestCaseLogDetails(tests.TestCase):
 
1722
 
 
1723
    def _run_test(self, test_name):
 
1724
        test = _get_test(test_name)
 
1725
        result = testtools.TestResult()
 
1726
        test.run(result)
 
1727
        return result
 
1728
 
 
1729
    def test_fail_has_log(self):
 
1730
        result = self._run_test('test_fail')
 
1731
        self.assertEqual(1, len(result.failures))
 
1732
        result_content = result.failures[0][1]
 
1733
        self.assertContainsRe(result_content,
 
1734
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1735
        self.assertContainsRe(result_content, 'this was a failing test')
 
1736
 
 
1737
    def test_error_has_log(self):
 
1738
        result = self._run_test('test_error')
 
1739
        self.assertEqual(1, len(result.errors))
 
1740
        result_content = result.errors[0][1]
 
1741
        self.assertContainsRe(result_content,
 
1742
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1743
        self.assertContainsRe(result_content, 'this test errored')
 
1744
 
 
1745
    def test_skip_has_no_log(self):
 
1746
        result = self._run_test('test_skip')
 
1747
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1748
        skips = result.skip_reasons['reason']
 
1749
        self.assertEqual(1, len(skips))
 
1750
        test = skips[0]
 
1751
        self.assertFalse('log' in test.getDetails())
 
1752
 
 
1753
    def test_missing_feature_has_no_log(self):
 
1754
        # testtools doesn't know about addNotSupported, so it just gets
 
1755
        # considered as a skip
 
1756
        result = self._run_test('test_missing_feature')
 
1757
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1758
        skips = result.skip_reasons[missing_feature]
 
1759
        self.assertEqual(1, len(skips))
 
1760
        test = skips[0]
 
1761
        self.assertFalse('log' in test.getDetails())
 
1762
 
 
1763
    def test_xfail_has_no_log(self):
 
1764
        result = self._run_test('test_xfail')
 
1765
        self.assertEqual(1, len(result.expectedFailures))
 
1766
        result_content = result.expectedFailures[0][1]
 
1767
        self.assertNotContainsRe(result_content,
 
1768
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1769
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1770
 
 
1771
    def test_unexpected_success_has_log(self):
 
1772
        result = self._run_test('test_unexpected_success')
 
1773
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1774
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1775
        # expectedFailures is a list of reasons?
 
1776
        test = result.unexpectedSuccesses[0]
 
1777
        details = test.getDetails()
 
1778
        self.assertTrue('log' in details)
 
1779
 
 
1780
 
 
1781
class TestTestCloning(tests.TestCase):
 
1782
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1783
 
 
1784
    def test_cloned_testcase_does_not_share_details(self):
 
1785
        """A TestCase cloned with clone_test does not share mutable attributes
 
1786
        such as details or cleanups.
 
1787
        """
 
1788
        class Test(tests.TestCase):
 
1789
            def test_foo(self):
 
1790
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1791
        orig_test = Test('test_foo')
 
1792
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1793
        orig_test.run(unittest.TestResult())
 
1794
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1795
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1796
 
 
1797
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1798
        """Applying two levels of scenarios to a test preserves the attributes
 
1799
        added by both scenarios.
 
1800
        """
 
1801
        class Test(tests.TestCase):
 
1802
            def test_foo(self):
 
1803
                pass
 
1804
        test = Test('test_foo')
 
1805
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1806
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1807
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1808
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1809
        all_tests = list(tests.iter_suite_tests(suite))
 
1810
        self.assertLength(4, all_tests)
 
1811
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1812
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1813
 
1637
1814
 
1638
1815
# NB: Don't delete this; it's not actually from 0.11!
1639
1816
@deprecated_function(deprecated_in((0, 11, 0)))
1767
1944
    def test_make_branch_and_tree_with_format(self):
1768
1945
        # we should be able to supply a format to make_branch_and_tree
1769
1946
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1770
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1771
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
1947
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1772
1948
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1773
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1774
 
                              bzrlib.bzrdir.BzrDirFormat6)
1775
1949
 
1776
1950
    def test_make_branch_and_memory_tree(self):
1777
1951
        # we should be able to get a new branch and a mutable tree from
1783
1957
        # make_branch_and_tree has to use local branch and repositories
1784
1958
        # when the vfs transport and local disk are colocated, even if
1785
1959
        # a different transport is in use for url generation.
1786
 
        from bzrlib.transport.fakevfat import FakeVFATServer
1787
 
        self.transport_server = FakeVFATServer
 
1960
        self.transport_server = test_server.FakeVFATServer
1788
1961
        self.assertFalse(self.get_url('t1').startswith('file://'))
1789
1962
        tree = self.make_branch_and_tree('t1')
1790
1963
        base = tree.bzrdir.root_transport.base
1795
1968
                tree.branch.repository.bzrdir.root_transport)
1796
1969
 
1797
1970
 
1798
 
class SelfTestHelper:
 
1971
class SelfTestHelper(object):
1799
1972
 
1800
1973
    def run_selftest(self, **kwargs):
1801
1974
        """Run selftest returning its output."""
1855
2028
        self.assertLength(2, output.readlines())
1856
2029
 
1857
2030
    def test_lsprof_tests(self):
1858
 
        self.requireFeature(test_lsprof.LSProfFeature)
1859
 
        calls = []
 
2031
        self.requireFeature(features.lsprof_feature)
 
2032
        results = []
1860
2033
        class Test(object):
1861
2034
            def __call__(test, result):
1862
2035
                test.run(result)
1863
2036
            def run(test, result):
1864
 
                self.assertIsInstance(result, tests.ForwardingResult)
1865
 
                calls.append("called")
 
2037
                results.append(result)
1866
2038
            def countTestCases(self):
1867
2039
                return 1
1868
2040
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
1869
 
        self.assertLength(1, calls)
 
2041
        self.assertLength(1, results)
 
2042
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
1870
2043
 
1871
2044
    def test_random(self):
1872
2045
        # test randomising by listing a number of tests.
1927
2100
 
1928
2101
    def test_transport_sftp(self):
1929
2102
        self.requireFeature(features.paramiko)
1930
 
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
2103
        from bzrlib.tests import stub_sftp
 
2104
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
1931
2105
 
1932
2106
    def test_transport_memory(self):
1933
 
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
2107
        self.check_transport_set(memory.MemoryServer)
1934
2108
 
1935
2109
 
1936
2110
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1951
2125
            load_list='missing file name', list_only=True)
1952
2126
 
1953
2127
 
 
2128
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2129
 
 
2130
    _test_needs_features = [features.subunit]
 
2131
 
 
2132
    def run_subunit_stream(self, test_name):
 
2133
        from subunit import ProtocolTestCase
 
2134
        def factory():
 
2135
            return TestUtil.TestSuite([_get_test(test_name)])
 
2136
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2137
            test_suite_factory=factory)
 
2138
        test = ProtocolTestCase(stream)
 
2139
        result = testtools.TestResult()
 
2140
        test.run(result)
 
2141
        content = stream.getvalue()
 
2142
        return content, result
 
2143
 
 
2144
    def test_fail_has_log(self):
 
2145
        content, result = self.run_subunit_stream('test_fail')
 
2146
        self.assertEqual(1, len(result.failures))
 
2147
        self.assertContainsRe(content, '(?m)^log$')
 
2148
        self.assertContainsRe(content, 'this test will fail')
 
2149
 
 
2150
    def test_error_has_log(self):
 
2151
        content, result = self.run_subunit_stream('test_error')
 
2152
        self.assertContainsRe(content, '(?m)^log$')
 
2153
        self.assertContainsRe(content, 'this test errored')
 
2154
 
 
2155
    def test_skip_has_no_log(self):
 
2156
        content, result = self.run_subunit_stream('test_skip')
 
2157
        self.assertNotContainsRe(content, '(?m)^log$')
 
2158
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2159
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2160
        skips = result.skip_reasons['reason']
 
2161
        self.assertEqual(1, len(skips))
 
2162
        test = skips[0]
 
2163
        # RemotedTestCase doesn't preserve the "details"
 
2164
        ## self.assertFalse('log' in test.getDetails())
 
2165
 
 
2166
    def test_missing_feature_has_no_log(self):
 
2167
        content, result = self.run_subunit_stream('test_missing_feature')
 
2168
        self.assertNotContainsRe(content, '(?m)^log$')
 
2169
        self.assertNotContainsRe(content, 'missing the feature')
 
2170
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2171
        skips = result.skip_reasons['_MissingFeature\n']
 
2172
        self.assertEqual(1, len(skips))
 
2173
        test = skips[0]
 
2174
        # RemotedTestCase doesn't preserve the "details"
 
2175
        ## self.assertFalse('log' in test.getDetails())
 
2176
 
 
2177
    def test_xfail_has_no_log(self):
 
2178
        content, result = self.run_subunit_stream('test_xfail')
 
2179
        self.assertNotContainsRe(content, '(?m)^log$')
 
2180
        self.assertNotContainsRe(content, 'test with expected failure')
 
2181
        self.assertEqual(1, len(result.expectedFailures))
 
2182
        result_content = result.expectedFailures[0][1]
 
2183
        self.assertNotContainsRe(result_content,
 
2184
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2185
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2186
 
 
2187
    def test_unexpected_success_has_log(self):
 
2188
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2189
        self.assertContainsRe(content, '(?m)^log$')
 
2190
        self.assertContainsRe(content, 'test with unexpected success')
 
2191
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2192
        #                success, if a min version check is added remove this
 
2193
        from subunit import TestProtocolClient as _Client
 
2194
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
 
2195
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2196
                               ' as a plain success',
 
2197
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2198
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2199
        test = result.unexpectedSuccesses[0]
 
2200
        # RemotedTestCase doesn't preserve the "details"
 
2201
        ## self.assertTrue('log' in test.getDetails())
 
2202
 
 
2203
    def test_success_has_no_log(self):
 
2204
        content, result = self.run_subunit_stream('test_success')
 
2205
        self.assertEqual(1, result.testsRun)
 
2206
        self.assertNotContainsRe(content, '(?m)^log$')
 
2207
        self.assertNotContainsRe(content, 'this test succeeds')
 
2208
 
 
2209
 
1954
2210
class TestRunBzr(tests.TestCase):
1955
2211
 
1956
2212
    out = ''
2079
2335
        # stdout and stderr of the invoked run_bzr
2080
2336
        current_factory = bzrlib.ui.ui_factory
2081
2337
        self.run_bzr(['foo'])
2082
 
        self.failIf(current_factory is self.factory)
 
2338
        self.assertFalse(current_factory is self.factory)
2083
2339
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2084
2340
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2085
2341
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2242
2498
 
2243
2499
 
2244
2500
class TestStartBzrSubProcess(tests.TestCase):
 
2501
    """Stub test start_bzr_subprocess."""
2245
2502
 
2246
 
    def check_popen_state(self):
2247
 
        """Replace to make assertions when popen is called."""
 
2503
    def _subprocess_log_cleanup(self):
 
2504
        """Inhibits the base version as we don't produce a log file."""
2248
2505
 
2249
2506
    def _popen(self, *args, **kwargs):
2250
 
        """Record the command that is run, so that we can ensure it is correct"""
 
2507
        """Override the base version to record the command that is run.
 
2508
 
 
2509
        From there we can ensure it is correct without spawning a real process.
 
2510
        """
2251
2511
        self.check_popen_state()
2252
2512
        self._popen_args = args
2253
2513
        self._popen_kwargs = kwargs
2254
2514
        raise _DontSpawnProcess()
2255
2515
 
 
2516
    def check_popen_state(self):
 
2517
        """Replace to make assertions when popen is called."""
 
2518
 
2256
2519
    def test_run_bzr_subprocess_no_plugins(self):
2257
2520
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2258
2521
        command = self._popen_args[0]
2262
2525
 
2263
2526
    def test_allow_plugins(self):
2264
2527
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2265
 
            allow_plugins=True)
 
2528
                          allow_plugins=True)
2266
2529
        command = self._popen_args[0]
2267
2530
        self.assertEqual([], command[2:])
2268
2531
 
2269
2532
    def test_set_env(self):
2270
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2533
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2271
2534
        # set in the child
2272
2535
        def check_environment():
2273
2536
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2274
2537
        self.check_popen_state = check_environment
2275
2538
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2276
 
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2539
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
2277
2540
        # not set in theparent
2278
2541
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2279
2542
 
2280
2543
    def test_run_bzr_subprocess_env_del(self):
2281
2544
        """run_bzr_subprocess can remove environment variables too."""
2282
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2545
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2283
2546
        def check_environment():
2284
2547
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2285
2548
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2286
2549
        self.check_popen_state = check_environment
2287
2550
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2288
 
            env_changes={'EXISTANT_ENV_VAR':None})
 
2551
                          env_changes={'EXISTANT_ENV_VAR':None})
2289
2552
        # Still set in parent
2290
2553
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2291
2554
        del os.environ['EXISTANT_ENV_VAR']
2292
2555
 
2293
2556
    def test_env_del_missing(self):
2294
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2557
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2295
2558
        def check_environment():
2296
2559
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2297
2560
        self.check_popen_state = check_environment
2298
2561
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2299
 
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2562
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
2300
2563
 
2301
2564
    def test_working_dir(self):
2302
2565
        """Test that we can specify the working dir for the child"""
2305
2568
        chdirs = []
2306
2569
        def chdir(path):
2307
2570
            chdirs.append(path)
2308
 
        os.chdir = chdir
2309
 
        try:
2310
 
            def getcwd():
2311
 
                return 'current'
2312
 
            osutils.getcwd = getcwd
2313
 
            try:
2314
 
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2315
 
                    working_dir='foo')
2316
 
            finally:
2317
 
                osutils.getcwd = orig_getcwd
2318
 
        finally:
2319
 
            os.chdir = orig_chdir
 
2571
        self.overrideAttr(os, 'chdir', chdir)
 
2572
        def getcwd():
 
2573
            return 'current'
 
2574
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2575
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2576
                          working_dir='foo')
2320
2577
        self.assertEqual(['foo', 'current'], chdirs)
2321
2578
 
 
2579
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2580
        self.get_source_path = lambda: ""
 
2581
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2582
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2583
 
2322
2584
 
2323
2585
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2324
2586
    """Tests that really need to do things with an external bzr."""
2337
2599
        self.assertEqual('bzr: interrupted\n', result[1])
2338
2600
 
2339
2601
 
2340
 
class TestFeature(tests.TestCase):
2341
 
 
2342
 
    def test_caching(self):
2343
 
        """Feature._probe is called by the feature at most once."""
2344
 
        class InstrumentedFeature(tests.Feature):
2345
 
            def __init__(self):
2346
 
                super(InstrumentedFeature, self).__init__()
2347
 
                self.calls = []
2348
 
            def _probe(self):
2349
 
                self.calls.append('_probe')
2350
 
                return False
2351
 
        feature = InstrumentedFeature()
2352
 
        feature.available()
2353
 
        self.assertEqual(['_probe'], feature.calls)
2354
 
        feature.available()
2355
 
        self.assertEqual(['_probe'], feature.calls)
2356
 
 
2357
 
    def test_named_str(self):
2358
 
        """Feature.__str__ should thunk to feature_name()."""
2359
 
        class NamedFeature(tests.Feature):
2360
 
            def feature_name(self):
2361
 
                return 'symlinks'
2362
 
        feature = NamedFeature()
2363
 
        self.assertEqual('symlinks', str(feature))
2364
 
 
2365
 
    def test_default_str(self):
2366
 
        """Feature.__str__ should default to __class__.__name__."""
2367
 
        class NamedFeature(tests.Feature):
2368
 
            pass
2369
 
        feature = NamedFeature()
2370
 
        self.assertEqual('NamedFeature', str(feature))
2371
 
 
2372
 
 
2373
 
class TestUnavailableFeature(tests.TestCase):
2374
 
 
2375
 
    def test_access_feature(self):
2376
 
        feature = tests.Feature()
2377
 
        exception = tests.UnavailableFeature(feature)
2378
 
        self.assertIs(feature, exception.args[0])
2379
 
 
2380
 
 
2381
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2382
 
    'bzrlib.tests', 'UnicodeFilename',
2383
 
    'bzrlib.tests.test_selftest.simple_thunk_feature',
2384
 
    deprecated_in((2,1,0)))
2385
 
 
2386
 
class Test_CompatibilityFeature(tests.TestCase):
2387
 
 
2388
 
    def test_does_thunk(self):
2389
 
        res = self.callDeprecated(
2390
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2391
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2392
 
            simple_thunk_feature.available)
2393
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2394
 
 
2395
 
        
2396
 
class TestModuleAvailableFeature(tests.TestCase):
2397
 
 
2398
 
    def test_available_module(self):
2399
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2400
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2401
 
        self.assertEqual('bzrlib.tests', str(feature))
2402
 
        self.assertTrue(feature.available())
2403
 
        self.assertIs(tests, feature.module)
2404
 
 
2405
 
    def test_unavailable_module(self):
2406
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2407
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2408
 
        self.assertFalse(feature.available())
2409
 
        self.assertIs(None, feature.module)
2410
 
 
2411
 
 
2412
2602
class TestSelftestFiltering(tests.TestCase):
2413
2603
 
2414
2604
    def setUp(self):
2565
2755
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2566
2756
 
2567
2757
 
2568
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2758
class TestCheckTreeShape(tests.TestCaseWithTransport):
2569
2759
 
2570
 
    def test_check_inventory_shape(self):
 
2760
    def test_check_tree_shape(self):
2571
2761
        files = ['a', 'b/', 'b/c']
2572
2762
        tree = self.make_branch_and_tree('.')
2573
2763
        self.build_tree(files)
2574
2764
        tree.add(files)
2575
2765
        tree.lock_read()
2576
2766
        try:
2577
 
            self.check_inventory_shape(tree.inventory, files)
 
2767
            self.check_tree_shape(tree, files)
2578
2768
        finally:
2579
2769
            tree.unlock()
2580
2770
 
2599
2789
        # Running bzr in blackbox mode, normal/expected/user errors should be
2600
2790
        # caught in the regular way and turned into an error message plus exit
2601
2791
        # code.
2602
 
        transport_server = MemoryServer()
 
2792
        transport_server = memory.MemoryServer()
2603
2793
        transport_server.start_server()
2604
2794
        self.addCleanup(transport_server.stop_server)
2605
2795
        url = transport_server.get_url()
2751
2941
        # Test that a plausible list of modules to doctest is returned
2752
2942
        # by _test_suite_modules_to_doctest.
2753
2943
        test_list = tests._test_suite_modules_to_doctest()
 
2944
        if __doc__ is None:
 
2945
            # When docstrings are stripped, there are no modules to doctest
 
2946
            self.assertEqual([], test_list)
 
2947
            return
2754
2948
        self.assertSubset([
2755
2949
            'bzrlib.timestamp',
2756
2950
            ],
2763
2957
        # test doubles that supply a few sample tests to load, and check they
2764
2958
        # are loaded.
2765
2959
        calls = []
2766
 
        def _test_suite_testmod_names():
 
2960
        def testmod_names():
2767
2961
            calls.append("testmod_names")
2768
2962
            return [
2769
2963
                'bzrlib.tests.blackbox.test_branch',
2770
2964
                'bzrlib.tests.per_transport',
2771
2965
                'bzrlib.tests.test_selftest',
2772
2966
                ]
2773
 
        original_testmod_names = tests._test_suite_testmod_names
2774
 
        def _test_suite_modules_to_doctest():
 
2967
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
 
2968
        def doctests():
2775
2969
            calls.append("modules_to_doctest")
 
2970
            if __doc__ is None:
 
2971
                return []
2776
2972
            return ['bzrlib.timestamp']
2777
 
        orig_modules_to_doctest = tests._test_suite_modules_to_doctest
2778
 
        def restore_names():
2779
 
            tests._test_suite_testmod_names = original_testmod_names
2780
 
            tests._test_suite_modules_to_doctest = orig_modules_to_doctest
2781
 
        self.addCleanup(restore_names)
2782
 
        tests._test_suite_testmod_names = _test_suite_testmod_names
2783
 
        tests._test_suite_modules_to_doctest = _test_suite_modules_to_doctest
 
2973
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2784
2974
        expected_test_list = [
2785
2975
            # testmod_names
2786
2976
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2787
2977
            ('bzrlib.tests.per_transport.TransportTests'
2788
2978
             '.test_abspath(LocalTransport,LocalURLServer)'),
2789
2979
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2790
 
            # modules_to_doctest
2791
 
            'bzrlib.timestamp.format_highres_date',
2792
2980
            # plugins can't be tested that way since selftest may be run with
2793
2981
            # --no-plugins
2794
2982
            ]
 
2983
        if __doc__ is not None:
 
2984
            expected_test_list.extend([
 
2985
                # modules_to_doctest
 
2986
                'bzrlib.timestamp.format_highres_date',
 
2987
                ])
2795
2988
        suite = tests.test_suite()
2796
2989
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
2797
2990
            set(calls))
2909
3102
        tpr.register('bar', 'bBB.aAA.rRR')
2910
3103
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2911
3104
        self.assertThat(self.get_log(),
2912
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3105
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3106
                           doctest.ELLIPSIS))
2913
3107
 
2914
3108
    def test_get_unknown_prefix(self):
2915
3109
        tpr = self._get_registry()
2935
3129
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2936
3130
 
2937
3131
 
 
3132
class TestThreadLeakDetection(tests.TestCase):
 
3133
    """Ensure when tests leak threads we detect and report it"""
 
3134
 
 
3135
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3136
        def __init__(self):
 
3137
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3138
            self.leaks = []
 
3139
        def _report_thread_leak(self, test, leaks, alive):
 
3140
            self.leaks.append((test, leaks))
 
3141
 
 
3142
    def test_testcase_without_addCleanups(self):
 
3143
        """Check old TestCase instances don't break with leak detection"""
 
3144
        class Test(unittest.TestCase):
 
3145
            def runTest(self):
 
3146
                pass
 
3147
        result = self.LeakRecordingResult()
 
3148
        test = Test()
 
3149
        result.startTestRun()
 
3150
        test.run(result)
 
3151
        result.stopTestRun()
 
3152
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3153
        self.assertEqual(result.leaks, [])
 
3154
        
 
3155
    def test_thread_leak(self):
 
3156
        """Ensure a thread that outlives the running of a test is reported
 
3157
 
 
3158
        Uses a thread that blocks on an event, and is started by the inner
 
3159
        test case. As the thread outlives the inner case's run, it should be
 
3160
        detected as a leak, but the event is then set so that the thread can
 
3161
        be safely joined in cleanup so it's not leaked for real.
 
3162
        """
 
3163
        event = threading.Event()
 
3164
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3165
        class Test(tests.TestCase):
 
3166
            def test_leak(self):
 
3167
                thread.start()
 
3168
        result = self.LeakRecordingResult()
 
3169
        test = Test("test_leak")
 
3170
        self.addCleanup(thread.join)
 
3171
        self.addCleanup(event.set)
 
3172
        result.startTestRun()
 
3173
        test.run(result)
 
3174
        result.stopTestRun()
 
3175
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3176
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3177
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3178
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3179
 
 
3180
    def test_multiple_leaks(self):
 
3181
        """Check multiple leaks are blamed on the test cases at fault
 
3182
 
 
3183
        Same concept as the previous test, but has one inner test method that
 
3184
        leaks two threads, and one that doesn't leak at all.
 
3185
        """
 
3186
        event = threading.Event()
 
3187
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3188
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3189
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3190
        class Test(tests.TestCase):
 
3191
            def test_first_leak(self):
 
3192
                thread_b.start()
 
3193
            def test_second_no_leak(self):
 
3194
                pass
 
3195
            def test_third_leak(self):
 
3196
                thread_c.start()
 
3197
                thread_a.start()
 
3198
        result = self.LeakRecordingResult()
 
3199
        first_test = Test("test_first_leak")
 
3200
        third_test = Test("test_third_leak")
 
3201
        self.addCleanup(thread_a.join)
 
3202
        self.addCleanup(thread_b.join)
 
3203
        self.addCleanup(thread_c.join)
 
3204
        self.addCleanup(event.set)
 
3205
        result.startTestRun()
 
3206
        unittest.TestSuite(
 
3207
            [first_test, Test("test_second_no_leak"), third_test]
 
3208
            ).run(result)
 
3209
        result.stopTestRun()
 
3210
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3211
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3212
        self.assertEqual(result.leaks, [
 
3213
            (first_test, set([thread_b])),
 
3214
            (third_test, set([thread_a, thread_c]))])
 
3215
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3216
 
 
3217
 
 
3218
class TestPostMortemDebugging(tests.TestCase):
 
3219
    """Check post mortem debugging works when tests fail or error"""
 
3220
 
 
3221
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3222
        def __init__(self):
 
3223
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3224
            self.postcode = None
 
3225
        def _post_mortem(self, tb=None):
 
3226
            """Record the code object at the end of the current traceback"""
 
3227
            tb = tb or sys.exc_info()[2]
 
3228
            if tb is not None:
 
3229
                next = tb.tb_next
 
3230
                while next is not None:
 
3231
                    tb = next
 
3232
                    next = next.tb_next
 
3233
                self.postcode = tb.tb_frame.f_code
 
3234
        def report_error(self, test, err):
 
3235
            pass
 
3236
        def report_failure(self, test, err):
 
3237
            pass
 
3238
 
 
3239
    def test_location_unittest_error(self):
 
3240
        """Needs right post mortem traceback with erroring unittest case"""
 
3241
        class Test(unittest.TestCase):
 
3242
            def runTest(self):
 
3243
                raise RuntimeError
 
3244
        result = self.TracebackRecordingResult()
 
3245
        Test().run(result)
 
3246
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3247
 
 
3248
    def test_location_unittest_failure(self):
 
3249
        """Needs right post mortem traceback with failing unittest case"""
 
3250
        class Test(unittest.TestCase):
 
3251
            def runTest(self):
 
3252
                raise self.failureException
 
3253
        result = self.TracebackRecordingResult()
 
3254
        Test().run(result)
 
3255
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3256
 
 
3257
    def test_location_bt_error(self):
 
3258
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3259
        class Test(tests.TestCase):
 
3260
            def test_error(self):
 
3261
                raise RuntimeError
 
3262
        result = self.TracebackRecordingResult()
 
3263
        Test("test_error").run(result)
 
3264
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3265
 
 
3266
    def test_location_bt_failure(self):
 
3267
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3268
        class Test(tests.TestCase):
 
3269
            def test_failure(self):
 
3270
                raise self.failureException
 
3271
        result = self.TracebackRecordingResult()
 
3272
        Test("test_failure").run(result)
 
3273
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3274
 
 
3275
    def test_env_var_triggers_post_mortem(self):
 
3276
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3277
        import pdb
 
3278
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3279
        post_mortem_calls = []
 
3280
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3281
        self.overrideEnv('BZR_TEST_PDB', None)
 
3282
        result._post_mortem(1)
 
3283
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3284
        result._post_mortem(2)
 
3285
        self.assertEqual([2], post_mortem_calls)
 
3286
 
 
3287
 
2938
3288
class TestRunSuite(tests.TestCase):
2939
3289
 
2940
3290
    def test_runner_class(self):
2951
3301
                                                self.verbosity)
2952
3302
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2953
3303
        self.assertLength(1, calls)
 
3304
 
 
3305
 
 
3306
class _Selftest(object):
 
3307
    """Mixin for tests needing full selftest output"""
 
3308
 
 
3309
    def _inject_stream_into_subunit(self, stream):
 
3310
        """To be overridden by subclasses that run tests out of process"""
 
3311
 
 
3312
    def _run_selftest(self, **kwargs):
 
3313
        sio = StringIO()
 
3314
        self._inject_stream_into_subunit(sio)
 
3315
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3316
        return sio.getvalue()
 
3317
 
 
3318
 
 
3319
class _ForkedSelftest(_Selftest):
 
3320
    """Mixin for tests needing full selftest output with forked children"""
 
3321
 
 
3322
    _test_needs_features = [features.subunit]
 
3323
 
 
3324
    def _inject_stream_into_subunit(self, stream):
 
3325
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3326
 
 
3327
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3328
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3329
        """
 
3330
        from subunit import ProtocolTestCase
 
3331
        _original_init = ProtocolTestCase.__init__
 
3332
        def _init_with_passthrough(self, *args, **kwargs):
 
3333
            _original_init(self, *args, **kwargs)
 
3334
            self._passthrough = stream
 
3335
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3336
 
 
3337
    def _run_selftest(self, **kwargs):
 
3338
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3339
        if getattr(os, "fork", None) is None:
 
3340
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3341
        # Make sure the fork code is actually invoked by claiming two cores
 
3342
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3343
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3344
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3345
 
 
3346
 
 
3347
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3348
    """Check operation of --parallel=fork selftest option"""
 
3349
 
 
3350
    def test_error_in_child_during_fork(self):
 
3351
        """Error in a forked child during test setup should get reported"""
 
3352
        class Test(tests.TestCase):
 
3353
            def testMethod(self):
 
3354
                pass
 
3355
        # We don't care what, just break something that a child will run
 
3356
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3357
        out = self._run_selftest(test_suite_factory=Test)
 
3358
        # Lines from the tracebacks of the two child processes may be mixed
 
3359
        # together due to the way subunit parses and forwards the streams,
 
3360
        # so permit extra lines between each part of the error output.
 
3361
        self.assertContainsRe(out,
 
3362
            "Traceback.*:\n"
 
3363
            "(?:.*\n)*"
 
3364
            ".+ in fork_for_tests\n"
 
3365
            "(?:.*\n)*"
 
3366
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3367
            "(?:.*\n)*"
 
3368
            "TypeError:")
 
3369
 
 
3370
 
 
3371
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3372
    """Check a test case still alive after being run emits a warning"""
 
3373
 
 
3374
    class Test(tests.TestCase):
 
3375
        def test_pass(self):
 
3376
            pass
 
3377
        def test_self_ref(self):
 
3378
            self.also_self = self.test_self_ref
 
3379
        def test_skip(self):
 
3380
            self.skip("Don't need")
 
3381
 
 
3382
    def _get_suite(self):
 
3383
        return TestUtil.TestSuite([
 
3384
            self.Test("test_pass"),
 
3385
            self.Test("test_self_ref"),
 
3386
            self.Test("test_skip"),
 
3387
            ])
 
3388
 
 
3389
    def _run_selftest_with_suite(self, **kwargs):
 
3390
        old_flags = tests.selftest_debug_flags
 
3391
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3392
        gc_on = gc.isenabled()
 
3393
        if gc_on:
 
3394
            gc.disable()
 
3395
        try:
 
3396
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3397
                **kwargs)
 
3398
        finally:
 
3399
            if gc_on:
 
3400
                gc.enable()
 
3401
            tests.selftest_debug_flags = old_flags
 
3402
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3403
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3404
        return output
 
3405
 
 
3406
    def test_testsuite(self):
 
3407
        self._run_selftest_with_suite()
 
3408
 
 
3409
    def test_pattern(self):
 
3410
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3411
        self.assertNotContainsRe(out, "test_skip")
 
3412
 
 
3413
    def test_exclude_pattern(self):
 
3414
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3415
        self.assertNotContainsRe(out, "test_skip")
 
3416
 
 
3417
    def test_random_seed(self):
 
3418
        self._run_selftest_with_suite(random_seed="now")
 
3419
 
 
3420
    def test_matching_tests_first(self):
 
3421
        self._run_selftest_with_suite(matching_tests_first=True,
 
3422
            pattern="test_self_ref$")
 
3423
 
 
3424
    def test_starting_with_and_exclude(self):
 
3425
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3426
            exclude_pattern="test_skip$")
 
3427
        self.assertNotContainsRe(out, "test_skip")
 
3428
 
 
3429
    def test_additonal_decorator(self):
 
3430
        out = self._run_selftest_with_suite(
 
3431
            suite_decorators=[tests.TestDecorator])
 
3432
 
 
3433
 
 
3434
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3435
    """Check warnings from tests staying alive are emitted with subunit"""
 
3436
 
 
3437
    _test_needs_features = [features.subunit]
 
3438
 
 
3439
    def _run_selftest_with_suite(self, **kwargs):
 
3440
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3441
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3442
 
 
3443
 
 
3444
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3445
    """Check warnings from tests staying alive are emitted when forking"""
 
3446
 
 
3447
 
 
3448
class TestEnvironHandling(tests.TestCase):
 
3449
 
 
3450
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3451
        self.assertFalse('MYVAR' in os.environ)
 
3452
        self.overrideEnv('MYVAR', '42')
 
3453
        # We use an embedded test to make sure we fix the _captureVar bug
 
3454
        class Test(tests.TestCase):
 
3455
            def test_me(self):
 
3456
                # The first call save the 42 value
 
3457
                self.overrideEnv('MYVAR', None)
 
3458
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3459
                # Make sure we can call it twice
 
3460
                self.overrideEnv('MYVAR', None)
 
3461
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3462
        output = StringIO()
 
3463
        result = tests.TextTestResult(output, 0, 1)
 
3464
        Test('test_me').run(result)
 
3465
        if not result.wasStrictlySuccessful():
 
3466
            self.fail(output.getvalue())
 
3467
        # We get our value back
 
3468
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3469
 
 
3470
 
 
3471
class TestIsolatedEnv(tests.TestCase):
 
3472
    """Test isolating tests from os.environ.
 
3473
 
 
3474
    Since we use tests that are already isolated from os.environ a bit of care
 
3475
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3476
    The tests start an already clean os.environ which allow doing valid
 
3477
    assertions about which variables are present or not and design tests around
 
3478
    these assertions.
 
3479
    """
 
3480
 
 
3481
    class ScratchMonkey(tests.TestCase):
 
3482
 
 
3483
        def test_me(self):
 
3484
            pass
 
3485
 
 
3486
    def test_basics(self):
 
3487
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3488
        # for tests.TestCase.
 
3489
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3490
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3491
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3492
        self.assertFalse('BZR_HOME' in os.environ)
 
3493
        # Make sure we know the definition of LINES: part of os.environ for
 
3494
        # tests.TestCase
 
3495
        self.assertTrue('LINES' in tests.isolated_environ)
 
3496
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3497
        self.assertEquals('25', os.environ['LINES'])
 
3498
 
 
3499
    def test_injecting_unknown_variable(self):
 
3500
        # BZR_HOME is known to be absent from os.environ
 
3501
        test = self.ScratchMonkey('test_me')
 
3502
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3503
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3504
        tests.restore_os_environ(test)
 
3505
        self.assertFalse('BZR_HOME' in os.environ)
 
3506
 
 
3507
    def test_injecting_known_variable(self):
 
3508
        test = self.ScratchMonkey('test_me')
 
3509
        # LINES is known to be present in os.environ
 
3510
        tests.override_os_environ(test, {'LINES': '42'})
 
3511
        self.assertEquals('42', os.environ['LINES'])
 
3512
        tests.restore_os_environ(test)
 
3513
        self.assertEquals('25', os.environ['LINES'])
 
3514
 
 
3515
    def test_deleting_variable(self):
 
3516
        test = self.ScratchMonkey('test_me')
 
3517
        # LINES is known to be present in os.environ
 
3518
        tests.override_os_environ(test, {'LINES': None})
 
3519
        self.assertTrue('LINES' not in os.environ)
 
3520
        tests.restore_os_environ(test)
 
3521
        self.assertEquals('25', os.environ['LINES'])
 
3522
 
 
3523
 
 
3524
class TestDocTestSuiteIsolation(tests.TestCase):
 
3525
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3526
 
 
3527
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3528
    the clean environment as a base for testing. To precisely capture the
 
3529
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3530
    compare against.
 
3531
 
 
3532
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3533
    not `os.environ` so each test overrides it to suit its needs.
 
3534
 
 
3535
    """
 
3536
 
 
3537
    def get_doctest_suite_for_string(self, klass, string):
 
3538
        class Finder(doctest.DocTestFinder):
 
3539
 
 
3540
            def find(*args, **kwargs):
 
3541
                test = doctest.DocTestParser().get_doctest(
 
3542
                    string, {}, 'foo', 'foo.py', 0)
 
3543
                return [test]
 
3544
 
 
3545
        suite = klass(test_finder=Finder())
 
3546
        return suite
 
3547
 
 
3548
    def run_doctest_suite_for_string(self, klass, string):
 
3549
        suite = self.get_doctest_suite_for_string(klass, string)
 
3550
        output = StringIO()
 
3551
        result = tests.TextTestResult(output, 0, 1)
 
3552
        suite.run(result)
 
3553
        return result, output
 
3554
 
 
3555
    def assertDocTestStringSucceds(self, klass, string):
 
3556
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3557
        if not result.wasStrictlySuccessful():
 
3558
            self.fail(output.getvalue())
 
3559
 
 
3560
    def assertDocTestStringFails(self, klass, string):
 
3561
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3562
        if result.wasStrictlySuccessful():
 
3563
            self.fail(output.getvalue())
 
3564
 
 
3565
    def test_injected_variable(self):
 
3566
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3567
        test = """
 
3568
            >>> import os
 
3569
            >>> os.environ['LINES']
 
3570
            '42'
 
3571
            """
 
3572
        # doctest.DocTestSuite fails as it sees '25'
 
3573
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3574
        # tests.DocTestSuite sees '42'
 
3575
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3576
 
 
3577
    def test_deleted_variable(self):
 
3578
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3579
        test = """
 
3580
            >>> import os
 
3581
            >>> os.environ.get('LINES')
 
3582
            """
 
3583
        # doctest.DocTestSuite fails as it sees '25'
 
3584
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3585
        # tests.DocTestSuite sees None
 
3586
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3587
 
 
3588
 
 
3589
class TestSelftestExcludePatterns(tests.TestCase):
 
3590
 
 
3591
    def setUp(self):
 
3592
        super(TestSelftestExcludePatterns, self).setUp()
 
3593
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3594
 
 
3595
    def suite_factory(self, keep_only=None, starting_with=None):
 
3596
        """A test suite factory with only a few tests."""
 
3597
        class Test(tests.TestCase):
 
3598
            def id(self):
 
3599
                # We don't need the full class path
 
3600
                return self._testMethodName
 
3601
            def a(self):
 
3602
                pass
 
3603
            def b(self):
 
3604
                pass
 
3605
            def c(self):
 
3606
                pass
 
3607
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3608
 
 
3609
    def assertTestList(self, expected, *selftest_args):
 
3610
        # We rely on setUp installing the right test suite factory so we can
 
3611
        # test at the command level without loading the whole test suite
 
3612
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3613
        actual = out.splitlines()
 
3614
        self.assertEquals(expected, actual)
 
3615
 
 
3616
    def test_full_list(self):
 
3617
        self.assertTestList(['a', 'b', 'c'])
 
3618
 
 
3619
    def test_single_exclude(self):
 
3620
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3621
 
 
3622
    def test_mutiple_excludes(self):
 
3623
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3624
 
 
3625
 
 
3626
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3627
 
 
3628
    _test_needs_features = [features.subunit]
 
3629
 
 
3630
    def setUp(self):
 
3631
        super(TestCounterHooks, self).setUp()
 
3632
        class Test(tests.TestCase):
 
3633
 
 
3634
            def setUp(self):
 
3635
                super(Test, self).setUp()
 
3636
                self.hooks = hooks.Hooks()
 
3637
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
 
3638
                self.install_counter_hook(self.hooks, 'myhook')
 
3639
 
 
3640
            def no_hook(self):
 
3641
                pass
 
3642
 
 
3643
            def run_hook_once(self):
 
3644
                for hook in self.hooks['myhook']:
 
3645
                    hook(self)
 
3646
 
 
3647
        self.test_class = Test
 
3648
 
 
3649
    def assertHookCalls(self, expected_calls, test_name):
 
3650
        test = self.test_class(test_name)
 
3651
        result = unittest.TestResult()
 
3652
        test.run(result)
 
3653
        self.assertTrue(hasattr(test, '_counters'))
 
3654
        self.assertTrue(test._counters.has_key('myhook'))
 
3655
        self.assertEquals(expected_calls, test._counters['myhook'])
 
3656
 
 
3657
    def test_no_hook(self):
 
3658
        self.assertHookCalls(0, 'no_hook')
 
3659
 
 
3660
    def test_run_hook_once(self):
 
3661
        tt = features.testtools
 
3662
        if tt.module.__version__ < (0, 9, 8):
 
3663
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3664
        self.assertHookCalls(1, 'run_hook_once')