~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin Pool
  • Date: 2011-06-14 02:21:41 UTC
  • mto: This revision was merged to the branch mainline in revision 6001.
  • Revision ID: mbp@canonical.com-20110614022141-18hmm7s0iw3utcbj
Deprecate __contains__ on Tree and Inventory

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
32
37
    Equals,
33
38
    )
34
 
import testtools.tests.helpers
 
39
import testtools.testresult.doubles
35
40
 
36
41
import bzrlib
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
49
52
    tests,
50
53
    transport,
51
54
    workingtree,
 
55
    workingtree_3,
 
56
    workingtree_4,
52
57
    )
53
58
from bzrlib.repofmt import (
54
59
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
60
    )
58
61
from bzrlib.symbol_versioning import (
59
62
    deprecated_function,
64
67
    features,
65
68
    test_lsprof,
66
69
    test_server,
67
 
    test_sftp_transport,
68
70
    TestUtil,
69
71
    )
70
 
from bzrlib.trace import note
 
72
from bzrlib.trace import note, mutter
71
73
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
74
 
74
75
 
75
76
def _test_ids(test_suite):
77
78
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
79
 
79
80
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
81
class MetaTestLog(tests.TestCase):
93
82
 
94
83
    def test_logging(self):
100
89
            "text", "plain", {"charset": "utf8"})))
101
90
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
91
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
92
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
93
 
105
94
 
106
95
class TestUnicodeFilename(tests.TestCase):
119
108
 
120
109
        filename = u'hell\u00d8'
121
110
        self.build_tree_contents([(filename, 'contents of hello')])
122
 
        self.failUnlessExists(filename)
 
111
        self.assertPathExists(filename)
 
112
 
 
113
 
 
114
class TestClassesAvailable(tests.TestCase):
 
115
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
116
 
 
117
    def test_test_case(self):
 
118
        from bzrlib.tests import TestCase
 
119
 
 
120
    def test_test_loader(self):
 
121
        from bzrlib.tests import TestLoader
 
122
 
 
123
    def test_test_suite(self):
 
124
        from bzrlib.tests import TestSuite
123
125
 
124
126
 
125
127
class TestTransportScenarios(tests.TestCase):
208
210
    def test_scenarios(self):
209
211
        # check that constructor parameters are passed through to the adapted
210
212
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
213
        from bzrlib.tests.per_controldir import make_scenarios
212
214
        vfs_factory = "v"
213
215
        server1 = "a"
214
216
        server2 = "b"
312
314
        from bzrlib.tests.per_interrepository import make_scenarios
313
315
        server1 = "a"
314
316
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
317
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
318
        scenarios = make_scenarios(server1, server2, formats)
317
319
        self.assertEqual([
318
320
            ('C0,str,str',
319
321
             {'repository_format': 'C1',
320
322
              'repository_format_to': 'C2',
321
323
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
324
              'transport_server': 'a',
 
325
              'extra_setup': 'C3'}),
323
326
            ('D0,str,str',
324
327
             {'repository_format': 'D1',
325
328
              'repository_format_to': 'D2',
326
329
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
330
              'transport_server': 'a',
 
331
              'extra_setup': 'D3'})],
328
332
            scenarios)
329
333
 
330
334
 
336
340
        from bzrlib.tests.per_workingtree import make_scenarios
337
341
        server1 = "a"
338
342
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
340
 
                   workingtree.WorkingTreeFormat3(),]
 
343
        formats = [workingtree_4.WorkingTreeFormat4(),
 
344
                   workingtree_3.WorkingTreeFormat3(),]
341
345
        scenarios = make_scenarios(server1, server2, formats)
342
346
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
347
            ('WorkingTreeFormat4',
344
348
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
349
              'transport_readonly_server': 'b',
346
350
              'transport_server': 'a',
373
377
            )
374
378
        server1 = "a"
375
379
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
377
 
                   workingtree.WorkingTreeFormat3(),]
 
380
        formats = [workingtree_4.WorkingTreeFormat4(),
 
381
                   workingtree_3.WorkingTreeFormat3(),]
378
382
        scenarios = make_scenarios(server1, server2, formats)
379
383
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
 
        wt4_format = workingtree.WorkingTreeFormat4()
382
 
        wt5_format = workingtree.WorkingTreeFormat5()
 
384
        default_wt_format = workingtree.format_registry.get_default()
 
385
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
386
        wt5_format = workingtree_4.WorkingTreeFormat5()
383
387
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
388
            ('WorkingTreeFormat4',
385
389
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
390
              'transport_readonly_server': 'b',
387
391
              'transport_server': 'a',
447
451
        # ones to add.
448
452
        from bzrlib.tests.per_tree import (
449
453
            return_parameter,
450
 
            revision_tree_from_workingtree
451
454
            )
452
455
        from bzrlib.tests.per_intertree import (
453
456
            make_scenarios,
454
457
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
458
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
459
        from bzrlib.workingtree_4 import WorkingTreeFormat4
456
460
        input_test = TestInterTreeScenarios(
457
461
            "test_scenarios")
458
462
        server1 = "a"
459
463
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
464
        format1 = WorkingTreeFormat4()
461
465
        format2 = WorkingTreeFormat3()
462
466
        formats = [("1", str, format1, format2, "converter1"),
463
467
            ("2", int, format2, format1, "converter2")]
509
513
        self.assertRaises(AssertionError, self.assertEqualStat,
510
514
            os.lstat("foo"), os.lstat("longname"))
511
515
 
 
516
    def test_failUnlessExists(self):
 
517
        """Deprecated failUnlessExists and failIfExists"""
 
518
        self.applyDeprecated(
 
519
            deprecated_in((2, 4)),
 
520
            self.failUnlessExists, '.')
 
521
        self.build_tree(['foo/', 'foo/bar'])
 
522
        self.applyDeprecated(
 
523
            deprecated_in((2, 4)),
 
524
            self.failUnlessExists, 'foo/bar')
 
525
        self.applyDeprecated(
 
526
            deprecated_in((2, 4)),
 
527
            self.failIfExists, 'foo/foo')
 
528
 
 
529
    def test_assertPathExists(self):
 
530
        self.assertPathExists('.')
 
531
        self.build_tree(['foo/', 'foo/bar'])
 
532
        self.assertPathExists('foo/bar')
 
533
        self.assertPathDoesNotExist('foo/foo')
 
534
 
512
535
 
513
536
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
514
537
 
548
571
        tree = self.make_branch_and_memory_tree('dir')
549
572
        # Guard against regression into MemoryTransport leaking
550
573
        # files to disk instead of keeping them in memory.
551
 
        self.failIf(osutils.lexists('dir'))
 
574
        self.assertFalse(osutils.lexists('dir'))
552
575
        self.assertIsInstance(tree, memorytree.MemoryTree)
553
576
 
554
577
    def test_make_branch_and_memory_tree_with_format(self):
555
578
        """make_branch_and_memory_tree should accept a format option."""
556
579
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
580
        format.repository_format = repository.format_registry.get_default()
558
581
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
582
        # Guard against regression into MemoryTransport leaking
560
583
        # files to disk instead of keeping them in memory.
561
 
        self.failIf(osutils.lexists('dir'))
 
584
        self.assertFalse(osutils.lexists('dir'))
562
585
        self.assertIsInstance(tree, memorytree.MemoryTree)
563
586
        self.assertEqual(format.repository_format.__class__,
564
587
            tree.branch.repository._format.__class__)
568
591
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
569
592
        # Guard against regression into MemoryTransport leaking
570
593
        # files to disk instead of keeping them in memory.
571
 
        self.failIf(osutils.lexists('dir'))
 
594
        self.assertFalse(osutils.lexists('dir'))
572
595
 
573
596
    def test_make_branch_builder_with_format(self):
574
597
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
598
        # that the format objects are used.
576
599
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
600
        repo_format = repository.format_registry.get_default()
578
601
        format.repository_format = repo_format
579
602
        builder = self.make_branch_builder('dir', format=format)
580
603
        the_branch = builder.get_branch()
581
604
        # Guard against regression into MemoryTransport leaking
582
605
        # files to disk instead of keeping them in memory.
583
 
        self.failIf(osutils.lexists('dir'))
 
606
        self.assertFalse(osutils.lexists('dir'))
584
607
        self.assertEqual(format.repository_format.__class__,
585
608
                         the_branch.repository._format.__class__)
586
609
        self.assertEqual(repo_format.get_format_string(),
592
615
        the_branch = builder.get_branch()
593
616
        # Guard against regression into MemoryTransport leaking
594
617
        # files to disk instead of keeping them in memory.
595
 
        self.failIf(osutils.lexists('dir'))
 
618
        self.assertFalse(osutils.lexists('dir'))
596
619
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
597
620
        self.assertEqual(dir_format.repository_format.__class__,
598
621
                         the_branch.repository._format.__class__)
609
632
                l.attempt_lock()
610
633
        test = TestDanglingLock('test_function')
611
634
        result = test.run()
 
635
        total_failures = result.errors + result.failures
612
636
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
637
            self.assertEqual(1, len(total_failures))
614
638
        else:
615
639
            # When _lock_check_thorough is disabled, then we don't trigger a
616
640
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
641
            self.assertEqual(0, len(total_failures))
618
642
 
619
643
 
620
644
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
645
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
646
 
623
647
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
648
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
649
        self.vfs_transport_factory = memory.MemoryServer
627
650
        self.transport_readonly_server = None
629
652
        # for the server
630
653
        url = self.get_readonly_url()
631
654
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
634
 
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
 
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
655
        t = transport.get_transport(url)
 
656
        t2 = transport.get_transport(url2)
 
657
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
658
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
659
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
660
 
638
661
    def test_get_readonly_url_http(self):
639
662
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
663
        from bzrlib.transport.http import HttpTransportBase
642
664
        self.transport_server = test_server.LocalURLServer
643
665
        self.transport_readonly_server = HttpServer
645
667
        url = self.get_readonly_url()
646
668
        url2 = self.get_readonly_url('foo/bar')
647
669
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
650
 
        self.failUnless(isinstance(t, HttpTransportBase))
651
 
        self.failUnless(isinstance(t2, HttpTransportBase))
 
670
        t = transport.get_transport(url)
 
671
        t2 = transport.get_transport(url2)
 
672
        self.assertIsInstance(t, HttpTransportBase)
 
673
        self.assertIsInstance(t2, HttpTransportBase)
652
674
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
653
675
 
654
676
    def test_is_directory(self):
662
684
    def test_make_branch_builder(self):
663
685
        builder = self.make_branch_builder('dir')
664
686
        rev_id = builder.build_commit()
665
 
        self.failUnlessExists('dir')
 
687
        self.assertPathExists('dir')
666
688
        a_dir = bzrdir.BzrDir.open('dir')
667
689
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
668
690
        a_branch = a_dir.open_branch()
684
706
        self.assertIsInstance(result_bzrdir.transport,
685
707
                              memory.MemoryTransport)
686
708
        # should not be on disk, should only be in memory
687
 
        self.failIfExists('subdir')
 
709
        self.assertPathDoesNotExist('subdir')
688
710
 
689
711
 
690
712
class TestChrootedTest(tests.ChrootedTestCase):
691
713
 
692
714
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
715
        t = transport.get_transport(self.get_readonly_url())
695
716
        url = t.base
696
717
        self.assertEqual(url, t.clone('..').base)
697
718
 
700
721
 
701
722
    def test_profiles_tests(self):
702
723
        self.requireFeature(test_lsprof.LSProfFeature)
703
 
        terminal = testtools.tests.helpers.ExtendedTestResult()
 
724
        terminal = testtools.testresult.doubles.ExtendedTestResult()
704
725
        result = tests.ProfileResult(terminal)
705
726
        class Sample(tests.TestCase):
706
727
            def a(self):
723
744
                descriptions=0,
724
745
                verbosity=1,
725
746
                )
726
 
        capture = testtools.tests.helpers.ExtendedTestResult()
 
747
        capture = testtools.testresult.doubles.ExtendedTestResult()
727
748
        test_case.run(MultiTestResult(result, capture))
728
749
        run_case = capture._events[0][1]
729
750
        timed_string = result._testTimeString(run_case)
750
771
        self.check_timing(ShortDelayTestCase('test_short_delay'),
751
772
                          r"^ +[0-9]+ms$")
752
773
 
753
 
    def _patch_get_bzr_source_tree(self):
754
 
        # Reading from the actual source tree breaks isolation, but we don't
755
 
        # want to assume that thats *all* that would happen.
756
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
757
 
 
758
 
    def test_assigned_benchmark_file_stores_date(self):
759
 
        self._patch_get_bzr_source_tree()
760
 
        output = StringIO()
761
 
        result = bzrlib.tests.TextTestResult(self._log_file,
762
 
                                        descriptions=0,
763
 
                                        verbosity=1,
764
 
                                        bench_history=output
765
 
                                        )
766
 
        output_string = output.getvalue()
767
 
        # if you are wondering about the regexp please read the comment in
768
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
 
        # XXX: what comment?  -- Andrew Bennetts
770
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
771
 
 
772
 
    def test_benchhistory_records_test_times(self):
773
 
        self._patch_get_bzr_source_tree()
774
 
        result_stream = StringIO()
775
 
        result = bzrlib.tests.TextTestResult(
776
 
            self._log_file,
777
 
            descriptions=0,
778
 
            verbosity=1,
779
 
            bench_history=result_stream
780
 
            )
781
 
 
782
 
        # we want profile a call and check that its test duration is recorded
783
 
        # make a new test instance that when run will generate a benchmark
784
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
785
 
        # execute the test, which should succeed and record times
786
 
        example_test_case.run(result)
787
 
        lines = result_stream.getvalue().splitlines()
788
 
        self.assertEqual(2, len(lines))
789
 
        self.assertContainsRe(lines[1],
790
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
 
            "._time_hello_world_encoding")
792
 
 
793
774
    def _time_hello_world_encoding(self):
794
775
        """Profile two sleep calls
795
776
 
803
784
        self.requireFeature(test_lsprof.LSProfFeature)
804
785
        result_stream = StringIO()
805
786
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
787
            result_stream,
807
788
            descriptions=0,
808
789
            verbosity=2,
809
790
            )
835
816
        self.assertContainsRe(output,
836
817
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
818
 
 
819
    def test_uses_time_from_testtools(self):
 
820
        """Test case timings in verbose results should use testtools times"""
 
821
        import datetime
 
822
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
823
            def startTest(self, test):
 
824
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
825
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
826
            def addSuccess(self, test):
 
827
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
828
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
829
            def report_tests_starting(self): pass
 
830
        sio = StringIO()
 
831
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
832
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
833
 
838
834
    def test_known_failure(self):
839
835
        """A KnownFailure being raised should trigger several result actions."""
840
836
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
837
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
838
            def report_tests_starting(self): pass
844
839
            def report_known_failure(self, test, err=None, details=None):
845
840
                self._call = test, 'known failure'
846
841
        result = InstrumentedTestResult(None, None, None, None)
864
859
        # verbose test output formatting
865
860
        result_stream = StringIO()
866
861
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
862
            result_stream,
868
863
            descriptions=0,
869
864
            verbosity=2,
870
865
            )
880
875
        output = result_stream.getvalue()[prefix:]
881
876
        lines = output.splitlines()
882
877
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
878
        if sys.version_info > (2, 7):
 
879
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
880
                self.assertNotEqual, lines[1], '    ')
883
881
        self.assertEqual(lines[1], '    foo')
884
882
        self.assertEqual(2, len(lines))
885
883
 
893
891
        """Test the behaviour of invoking addNotSupported."""
894
892
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
893
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
894
            def report_tests_starting(self): pass
898
895
            def report_unsupported(self, test, feature):
899
896
                self._call = test, feature
900
897
        result = InstrumentedTestResult(None, None, None, None)
919
916
        # verbose test output formatting
920
917
        result_stream = StringIO()
921
918
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
919
            result_stream,
923
920
            descriptions=0,
924
921
            verbosity=2,
925
922
            )
939
936
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
937
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
938
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
939
            def report_tests_starting(self): pass
944
940
            def addNotSupported(self, test, feature):
945
941
                self._call = test, feature
946
942
        result = InstrumentedTestResult(None, None, None, None)
988
984
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
985
            calls = 0
990
986
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
987
        result = InstrumentedTestResult(None, None, None, None)
993
988
        def test_function():
994
989
            pass
996
991
        test.run(result)
997
992
        self.assertEquals(1, result.calls)
998
993
 
 
994
    def test_startTests_only_once(self):
 
995
        """With multiple tests startTests should still only be called once"""
 
996
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
997
            calls = 0
 
998
            def startTests(self): self.calls += 1
 
999
        result = InstrumentedTestResult(None, None, None, None)
 
1000
        suite = unittest.TestSuite([
 
1001
            unittest.FunctionTestCase(lambda: None),
 
1002
            unittest.FunctionTestCase(lambda: None)])
 
1003
        suite.run(result)
 
1004
        self.assertEquals(1, result.calls)
 
1005
        self.assertEquals(2, result.count)
 
1006
 
999
1007
 
1000
1008
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1009
 
1022
1030
        because of our use of global state.
1023
1031
        """
1024
1032
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1033
        try:
1027
1034
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1035
            return testrunner.run(test)
1030
1036
        finally:
1031
1037
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1038
 
1034
1039
    def test_known_failure_failed_run(self):
1035
1040
        # run a test that generates a known failure which should be printed in
1040
1045
        test = unittest.TestSuite()
1041
1046
        test.addTest(Test("known_failure_test"))
1042
1047
        def failing_test():
1043
 
            self.fail('foo')
 
1048
            raise AssertionError('foo')
1044
1049
        test.addTest(unittest.FunctionTestCase(failing_test))
1045
1050
        stream = StringIO()
1046
1051
        runner = tests.TextTestRunner(stream=stream)
1054
1059
            '^----------------------------------------------------------------------\n'
1055
1060
            'Traceback \\(most recent call last\\):\n'
1056
1061
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1057
 
            '    self.fail\\(\'foo\'\\)\n'
 
1062
            '    raise AssertionError\\(\'foo\'\\)\n'
1058
1063
            '.*'
1059
1064
            '^----------------------------------------------------------------------\n'
1060
1065
            '.*'
1066
1071
        # the final output.
1067
1072
        class Test(tests.TestCase):
1068
1073
            def known_failure_test(self):
1069
 
                self.expectFailure('failed', self.assertTrue, False)
 
1074
                self.knownFailure("Never works...")
1070
1075
        test = Test("known_failure_test")
1071
1076
        stream = StringIO()
1072
1077
        runner = tests.TextTestRunner(stream=stream)
1078
1083
            '\n'
1079
1084
            'OK \\(known_failures=1\\)\n')
1080
1085
 
 
1086
    def test_unexpected_success_bad(self):
 
1087
        class Test(tests.TestCase):
 
1088
            def test_truth(self):
 
1089
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1090
        runner = tests.TextTestRunner(stream=StringIO())
 
1091
        result = self.run_test_runner(runner, Test("test_truth"))
 
1092
        self.assertContainsRe(runner.stream.getvalue(),
 
1093
            "=+\n"
 
1094
            "FAIL: \\S+\.test_truth\n"
 
1095
            "-+\n"
 
1096
            "(?:.*\n)*"
 
1097
            "No absolute truth\n"
 
1098
            "(?:.*\n)*"
 
1099
            "-+\n"
 
1100
            "Ran 1 test in .*\n"
 
1101
            "\n"
 
1102
            "FAILED \\(failures=1\\)\n\\Z")
 
1103
 
1081
1104
    def test_result_decorator(self):
1082
1105
        # decorate results
1083
1106
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1107
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1108
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1109
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1110
                calls.append('start')
1088
1111
        test = unittest.FunctionTestCase(lambda:None)
1089
1112
        stream = StringIO()
1190
1213
            ],
1191
1214
            lines[-3:])
1192
1215
 
1193
 
    def _patch_get_bzr_source_tree(self):
1194
 
        # Reading from the actual source tree breaks isolation, but we don't
1195
 
        # want to assume that thats *all* that would happen.
1196
 
        self._get_source_tree_calls = []
1197
 
        def new_get():
1198
 
            self._get_source_tree_calls.append("called")
1199
 
            return None
1200
 
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1201
 
 
1202
 
    def test_bench_history(self):
1203
 
        # tests that the running the benchmark passes bench_history into
1204
 
        # the test result object. We can tell that happens if
1205
 
        # _get_bzr_source_tree is called.
1206
 
        self._patch_get_bzr_source_tree()
1207
 
        test = TestRunner('dummy_test')
1208
 
        output = StringIO()
1209
 
        runner = tests.TextTestRunner(stream=self._log_file,
1210
 
                                      bench_history=output)
1211
 
        result = self.run_test_runner(runner, test)
1212
 
        output_string = output.getvalue()
1213
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
 
        self.assertLength(1, self._get_source_tree_calls)
 
1216
    def test_verbose_test_count(self):
 
1217
        """A verbose test run reports the right test count at the start"""
 
1218
        suite = TestUtil.TestSuite([
 
1219
            unittest.FunctionTestCase(lambda:None),
 
1220
            unittest.FunctionTestCase(lambda:None)])
 
1221
        self.assertEqual(suite.countTestCases(), 2)
 
1222
        stream = StringIO()
 
1223
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1224
        # Need to use the CountingDecorator as that's what sets num_tests
 
1225
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1226
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1215
1227
 
1216
1228
    def test_startTestRun(self):
1217
1229
        """run should call result.startTestRun()"""
1218
1230
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1231
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1232
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1233
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1234
                calls.append('startTestRun')
1223
1235
        test = unittest.FunctionTestCase(lambda:None)
1224
1236
        stream = StringIO()
1230
1242
    def test_stopTestRun(self):
1231
1243
        """run should call result.stopTestRun()"""
1232
1244
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1245
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1246
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1247
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1248
                calls.append('stopTestRun')
1237
1249
        test = unittest.FunctionTestCase(lambda:None)
1238
1250
        stream = StringIO()
1241
1253
        result = self.run_test_runner(runner, test)
1242
1254
        self.assertLength(1, calls)
1243
1255
 
 
1256
    def test_unicode_test_output_on_ascii_stream(self):
 
1257
        """Showing results should always succeed even on an ascii console"""
 
1258
        class FailureWithUnicode(tests.TestCase):
 
1259
            def test_log_unicode(self):
 
1260
                self.log(u"\u2606")
 
1261
                self.fail("Now print that log!")
 
1262
        out = StringIO()
 
1263
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1264
            lambda trace=False: "ascii")
 
1265
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1266
            FailureWithUnicode("test_log_unicode"))
 
1267
        self.assertContainsRe(out.getvalue(),
 
1268
            "Text attachment: log\n"
 
1269
            "-+\n"
 
1270
            "\d+\.\d+  \\\\u2606\n"
 
1271
            "-+\n")
 
1272
 
1244
1273
 
1245
1274
class SampleTestCase(tests.TestCase):
1246
1275
 
1421
1450
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1451
        output_stream = StringIO()
1423
1452
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1453
            output_stream,
1425
1454
            descriptions=0,
1426
1455
            verbosity=2)
1427
1456
        sample_test.run(result)
1434
1463
        # Note this test won't fail with hooks that the core library doesn't
1435
1464
        # use - but it trigger with a plugin that adds hooks, so its still a
1436
1465
        # useful warning in that case.
1437
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1438
 
            bzrlib.branch.Branch.hooks)
1439
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1466
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1467
        self.assertEqual(
 
1468
            bzrlib.smart.server.SmartServerHooks(),
1440
1469
            bzrlib.smart.server.SmartTCPServer.hooks)
1441
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1442
 
            bzrlib.commands.Command.hooks)
 
1470
        self.assertEqual(
 
1471
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1443
1472
 
1444
1473
    def test__gather_lsprof_in_benchmarks(self):
1445
1474
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1655
1684
        self.assertEqual('original', obj.test_attr)
1656
1685
 
1657
1686
 
 
1687
class _MissingFeature(tests.Feature):
 
1688
    def _probe(self):
 
1689
        return False
 
1690
missing_feature = _MissingFeature()
 
1691
 
 
1692
 
 
1693
def _get_test(name):
 
1694
    """Get an instance of a specific example test.
 
1695
 
 
1696
    We protect this in a function so that they don't auto-run in the test
 
1697
    suite.
 
1698
    """
 
1699
 
 
1700
    class ExampleTests(tests.TestCase):
 
1701
 
 
1702
        def test_fail(self):
 
1703
            mutter('this was a failing test')
 
1704
            self.fail('this test will fail')
 
1705
 
 
1706
        def test_error(self):
 
1707
            mutter('this test errored')
 
1708
            raise RuntimeError('gotcha')
 
1709
 
 
1710
        def test_missing_feature(self):
 
1711
            mutter('missing the feature')
 
1712
            self.requireFeature(missing_feature)
 
1713
 
 
1714
        def test_skip(self):
 
1715
            mutter('this test will be skipped')
 
1716
            raise tests.TestSkipped('reason')
 
1717
 
 
1718
        def test_success(self):
 
1719
            mutter('this test succeeds')
 
1720
 
 
1721
        def test_xfail(self):
 
1722
            mutter('test with expected failure')
 
1723
            self.knownFailure('this_fails')
 
1724
 
 
1725
        def test_unexpected_success(self):
 
1726
            mutter('test with unexpected success')
 
1727
            self.expectFailure('should_fail', lambda: None)
 
1728
 
 
1729
    return ExampleTests(name)
 
1730
 
 
1731
 
 
1732
class TestTestCaseLogDetails(tests.TestCase):
 
1733
 
 
1734
    def _run_test(self, test_name):
 
1735
        test = _get_test(test_name)
 
1736
        result = testtools.TestResult()
 
1737
        test.run(result)
 
1738
        return result
 
1739
 
 
1740
    def test_fail_has_log(self):
 
1741
        result = self._run_test('test_fail')
 
1742
        self.assertEqual(1, len(result.failures))
 
1743
        result_content = result.failures[0][1]
 
1744
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1745
        self.assertContainsRe(result_content, 'this was a failing test')
 
1746
 
 
1747
    def test_error_has_log(self):
 
1748
        result = self._run_test('test_error')
 
1749
        self.assertEqual(1, len(result.errors))
 
1750
        result_content = result.errors[0][1]
 
1751
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1752
        self.assertContainsRe(result_content, 'this test errored')
 
1753
 
 
1754
    def test_skip_has_no_log(self):
 
1755
        result = self._run_test('test_skip')
 
1756
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1757
        skips = result.skip_reasons['reason']
 
1758
        self.assertEqual(1, len(skips))
 
1759
        test = skips[0]
 
1760
        self.assertFalse('log' in test.getDetails())
 
1761
 
 
1762
    def test_missing_feature_has_no_log(self):
 
1763
        # testtools doesn't know about addNotSupported, so it just gets
 
1764
        # considered as a skip
 
1765
        result = self._run_test('test_missing_feature')
 
1766
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1767
        skips = result.skip_reasons[missing_feature]
 
1768
        self.assertEqual(1, len(skips))
 
1769
        test = skips[0]
 
1770
        self.assertFalse('log' in test.getDetails())
 
1771
 
 
1772
    def test_xfail_has_no_log(self):
 
1773
        result = self._run_test('test_xfail')
 
1774
        self.assertEqual(1, len(result.expectedFailures))
 
1775
        result_content = result.expectedFailures[0][1]
 
1776
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1777
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1778
 
 
1779
    def test_unexpected_success_has_log(self):
 
1780
        result = self._run_test('test_unexpected_success')
 
1781
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1782
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1783
        # expectedFailures is a list of reasons?
 
1784
        test = result.unexpectedSuccesses[0]
 
1785
        details = test.getDetails()
 
1786
        self.assertTrue('log' in details)
 
1787
 
 
1788
 
 
1789
class TestTestCloning(tests.TestCase):
 
1790
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1791
 
 
1792
    def test_cloned_testcase_does_not_share_details(self):
 
1793
        """A TestCase cloned with clone_test does not share mutable attributes
 
1794
        such as details or cleanups.
 
1795
        """
 
1796
        class Test(tests.TestCase):
 
1797
            def test_foo(self):
 
1798
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1799
        orig_test = Test('test_foo')
 
1800
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1801
        orig_test.run(unittest.TestResult())
 
1802
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1803
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1804
 
 
1805
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1806
        """Applying two levels of scenarios to a test preserves the attributes
 
1807
        added by both scenarios.
 
1808
        """
 
1809
        class Test(tests.TestCase):
 
1810
            def test_foo(self):
 
1811
                pass
 
1812
        test = Test('test_foo')
 
1813
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1814
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1815
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1816
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1817
        all_tests = list(tests.iter_suite_tests(suite))
 
1818
        self.assertLength(4, all_tests)
 
1819
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1820
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1821
 
 
1822
 
1658
1823
# NB: Don't delete this; it's not actually from 0.11!
1659
1824
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1825
def sample_deprecated_function():
1787
1952
    def test_make_branch_and_tree_with_format(self):
1788
1953
        # we should be able to supply a format to make_branch_and_tree
1789
1954
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1790
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1791
1955
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1792
1956
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1793
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1794
 
                              bzrlib.bzrdir.BzrDirFormat6)
1795
1957
 
1796
1958
    def test_make_branch_and_memory_tree(self):
1797
1959
        # we should be able to get a new branch and a mutable tree from
1814
1976
                tree.branch.repository.bzrdir.root_transport)
1815
1977
 
1816
1978
 
1817
 
class SelfTestHelper:
 
1979
class SelfTestHelper(object):
1818
1980
 
1819
1981
    def run_selftest(self, **kwargs):
1820
1982
        """Run selftest returning its output."""
1875
2037
 
1876
2038
    def test_lsprof_tests(self):
1877
2039
        self.requireFeature(test_lsprof.LSProfFeature)
1878
 
        calls = []
 
2040
        results = []
1879
2041
        class Test(object):
1880
2042
            def __call__(test, result):
1881
2043
                test.run(result)
1882
2044
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
1884
 
                calls.append("called")
 
2045
                results.append(result)
1885
2046
            def countTestCases(self):
1886
2047
                return 1
1887
2048
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
1888
 
        self.assertLength(1, calls)
 
2049
        self.assertLength(1, results)
 
2050
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
1889
2051
 
1890
2052
    def test_random(self):
1891
2053
        # test randomising by listing a number of tests.
1971
2133
            load_list='missing file name', list_only=True)
1972
2134
 
1973
2135
 
 
2136
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2137
 
 
2138
    _test_needs_features = [features.subunit]
 
2139
 
 
2140
    def run_subunit_stream(self, test_name):
 
2141
        from subunit import ProtocolTestCase
 
2142
        def factory():
 
2143
            return TestUtil.TestSuite([_get_test(test_name)])
 
2144
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2145
            test_suite_factory=factory)
 
2146
        test = ProtocolTestCase(stream)
 
2147
        result = testtools.TestResult()
 
2148
        test.run(result)
 
2149
        content = stream.getvalue()
 
2150
        return content, result
 
2151
 
 
2152
    def test_fail_has_log(self):
 
2153
        content, result = self.run_subunit_stream('test_fail')
 
2154
        self.assertEqual(1, len(result.failures))
 
2155
        self.assertContainsRe(content, '(?m)^log$')
 
2156
        self.assertContainsRe(content, 'this test will fail')
 
2157
 
 
2158
    def test_error_has_log(self):
 
2159
        content, result = self.run_subunit_stream('test_error')
 
2160
        self.assertContainsRe(content, '(?m)^log$')
 
2161
        self.assertContainsRe(content, 'this test errored')
 
2162
 
 
2163
    def test_skip_has_no_log(self):
 
2164
        content, result = self.run_subunit_stream('test_skip')
 
2165
        self.assertNotContainsRe(content, '(?m)^log$')
 
2166
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2167
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2168
        skips = result.skip_reasons['reason']
 
2169
        self.assertEqual(1, len(skips))
 
2170
        test = skips[0]
 
2171
        # RemotedTestCase doesn't preserve the "details"
 
2172
        ## self.assertFalse('log' in test.getDetails())
 
2173
 
 
2174
    def test_missing_feature_has_no_log(self):
 
2175
        content, result = self.run_subunit_stream('test_missing_feature')
 
2176
        self.assertNotContainsRe(content, '(?m)^log$')
 
2177
        self.assertNotContainsRe(content, 'missing the feature')
 
2178
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2179
        skips = result.skip_reasons['_MissingFeature\n']
 
2180
        self.assertEqual(1, len(skips))
 
2181
        test = skips[0]
 
2182
        # RemotedTestCase doesn't preserve the "details"
 
2183
        ## self.assertFalse('log' in test.getDetails())
 
2184
 
 
2185
    def test_xfail_has_no_log(self):
 
2186
        content, result = self.run_subunit_stream('test_xfail')
 
2187
        self.assertNotContainsRe(content, '(?m)^log$')
 
2188
        self.assertNotContainsRe(content, 'test with expected failure')
 
2189
        self.assertEqual(1, len(result.expectedFailures))
 
2190
        result_content = result.expectedFailures[0][1]
 
2191
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2192
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2193
 
 
2194
    def test_unexpected_success_has_log(self):
 
2195
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2196
        self.assertContainsRe(content, '(?m)^log$')
 
2197
        self.assertContainsRe(content, 'test with unexpected success')
 
2198
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2199
        #                success, if a min version check is added remove this
 
2200
        from subunit import TestProtocolClient as _Client
 
2201
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
 
2202
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2203
                               ' as a plain success',
 
2204
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2205
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2206
        test = result.unexpectedSuccesses[0]
 
2207
        # RemotedTestCase doesn't preserve the "details"
 
2208
        ## self.assertTrue('log' in test.getDetails())
 
2209
 
 
2210
    def test_success_has_no_log(self):
 
2211
        content, result = self.run_subunit_stream('test_success')
 
2212
        self.assertEqual(1, result.testsRun)
 
2213
        self.assertNotContainsRe(content, '(?m)^log$')
 
2214
        self.assertNotContainsRe(content, 'this test succeeds')
 
2215
 
 
2216
 
1974
2217
class TestRunBzr(tests.TestCase):
1975
2218
 
1976
2219
    out = ''
2099
2342
        # stdout and stderr of the invoked run_bzr
2100
2343
        current_factory = bzrlib.ui.ui_factory
2101
2344
        self.run_bzr(['foo'])
2102
 
        self.failIf(current_factory is self.factory)
 
2345
        self.assertFalse(current_factory is self.factory)
2103
2346
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2104
2347
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2105
2348
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2287
2530
        self.assertEqual([], command[2:])
2288
2531
 
2289
2532
    def test_set_env(self):
2290
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2533
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2534
        # set in the child
2292
2535
        def check_environment():
2293
2536
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2299
2542
 
2300
2543
    def test_run_bzr_subprocess_env_del(self):
2301
2544
        """run_bzr_subprocess can remove environment variables too."""
2302
 
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2545
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2546
        def check_environment():
2304
2547
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2548
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2311
2554
        del os.environ['EXISTANT_ENV_VAR']
2312
2555
 
2313
2556
    def test_env_del_missing(self):
2314
 
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2557
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2558
        def check_environment():
2316
2559
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2560
        self.check_popen_state = check_environment
2339
2582
            os.chdir = orig_chdir
2340
2583
        self.assertEqual(['foo', 'current'], chdirs)
2341
2584
 
 
2585
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2586
        self.get_source_path = lambda: ""
 
2587
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2588
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2589
 
2342
2590
 
2343
2591
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2592
    """Tests that really need to do things with an external bzr."""
2587
2835
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2588
2836
 
2589
2837
 
2590
 
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2838
class TestCheckTreeShape(tests.TestCaseWithTransport):
2591
2839
 
2592
 
    def test_check_inventory_shape(self):
 
2840
    def test_check_tree_shape(self):
2593
2841
        files = ['a', 'b/', 'b/c']
2594
2842
        tree = self.make_branch_and_tree('.')
2595
2843
        self.build_tree(files)
2596
2844
        tree.add(files)
2597
2845
        tree.lock_read()
2598
2846
        try:
2599
 
            self.check_inventory_shape(tree.inventory, files)
 
2847
            self.check_tree_shape(tree, files)
2600
2848
        finally:
2601
2849
            tree.unlock()
2602
2850
 
2934
3182
        tpr.register('bar', 'bBB.aAA.rRR')
2935
3183
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2936
3184
        self.assertThat(self.get_log(),
2937
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3185
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3186
                           doctest.ELLIPSIS))
2938
3187
 
2939
3188
    def test_get_unknown_prefix(self):
2940
3189
        tpr = self._get_registry()
2960
3209
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3210
 
2962
3211
 
 
3212
class TestThreadLeakDetection(tests.TestCase):
 
3213
    """Ensure when tests leak threads we detect and report it"""
 
3214
 
 
3215
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3216
        def __init__(self):
 
3217
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3218
            self.leaks = []
 
3219
        def _report_thread_leak(self, test, leaks, alive):
 
3220
            self.leaks.append((test, leaks))
 
3221
 
 
3222
    def test_testcase_without_addCleanups(self):
 
3223
        """Check old TestCase instances don't break with leak detection"""
 
3224
        class Test(unittest.TestCase):
 
3225
            def runTest(self):
 
3226
                pass
 
3227
        result = self.LeakRecordingResult()
 
3228
        test = Test()
 
3229
        result.startTestRun()
 
3230
        test.run(result)
 
3231
        result.stopTestRun()
 
3232
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3233
        self.assertEqual(result.leaks, [])
 
3234
        
 
3235
    def test_thread_leak(self):
 
3236
        """Ensure a thread that outlives the running of a test is reported
 
3237
 
 
3238
        Uses a thread that blocks on an event, and is started by the inner
 
3239
        test case. As the thread outlives the inner case's run, it should be
 
3240
        detected as a leak, but the event is then set so that the thread can
 
3241
        be safely joined in cleanup so it's not leaked for real.
 
3242
        """
 
3243
        event = threading.Event()
 
3244
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3245
        class Test(tests.TestCase):
 
3246
            def test_leak(self):
 
3247
                thread.start()
 
3248
        result = self.LeakRecordingResult()
 
3249
        test = Test("test_leak")
 
3250
        self.addCleanup(thread.join)
 
3251
        self.addCleanup(event.set)
 
3252
        result.startTestRun()
 
3253
        test.run(result)
 
3254
        result.stopTestRun()
 
3255
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3256
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3257
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3258
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3259
 
 
3260
    def test_multiple_leaks(self):
 
3261
        """Check multiple leaks are blamed on the test cases at fault
 
3262
 
 
3263
        Same concept as the previous test, but has one inner test method that
 
3264
        leaks two threads, and one that doesn't leak at all.
 
3265
        """
 
3266
        event = threading.Event()
 
3267
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3268
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3269
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3270
        class Test(tests.TestCase):
 
3271
            def test_first_leak(self):
 
3272
                thread_b.start()
 
3273
            def test_second_no_leak(self):
 
3274
                pass
 
3275
            def test_third_leak(self):
 
3276
                thread_c.start()
 
3277
                thread_a.start()
 
3278
        result = self.LeakRecordingResult()
 
3279
        first_test = Test("test_first_leak")
 
3280
        third_test = Test("test_third_leak")
 
3281
        self.addCleanup(thread_a.join)
 
3282
        self.addCleanup(thread_b.join)
 
3283
        self.addCleanup(thread_c.join)
 
3284
        self.addCleanup(event.set)
 
3285
        result.startTestRun()
 
3286
        unittest.TestSuite(
 
3287
            [first_test, Test("test_second_no_leak"), third_test]
 
3288
            ).run(result)
 
3289
        result.stopTestRun()
 
3290
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3291
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3292
        self.assertEqual(result.leaks, [
 
3293
            (first_test, set([thread_b])),
 
3294
            (third_test, set([thread_a, thread_c]))])
 
3295
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3296
 
 
3297
 
 
3298
class TestPostMortemDebugging(tests.TestCase):
 
3299
    """Check post mortem debugging works when tests fail or error"""
 
3300
 
 
3301
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3302
        def __init__(self):
 
3303
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3304
            self.postcode = None
 
3305
        def _post_mortem(self, tb=None):
 
3306
            """Record the code object at the end of the current traceback"""
 
3307
            tb = tb or sys.exc_info()[2]
 
3308
            if tb is not None:
 
3309
                next = tb.tb_next
 
3310
                while next is not None:
 
3311
                    tb = next
 
3312
                    next = next.tb_next
 
3313
                self.postcode = tb.tb_frame.f_code
 
3314
        def report_error(self, test, err):
 
3315
            pass
 
3316
        def report_failure(self, test, err):
 
3317
            pass
 
3318
 
 
3319
    def test_location_unittest_error(self):
 
3320
        """Needs right post mortem traceback with erroring unittest case"""
 
3321
        class Test(unittest.TestCase):
 
3322
            def runTest(self):
 
3323
                raise RuntimeError
 
3324
        result = self.TracebackRecordingResult()
 
3325
        Test().run(result)
 
3326
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3327
 
 
3328
    def test_location_unittest_failure(self):
 
3329
        """Needs right post mortem traceback with failing unittest case"""
 
3330
        class Test(unittest.TestCase):
 
3331
            def runTest(self):
 
3332
                raise self.failureException
 
3333
        result = self.TracebackRecordingResult()
 
3334
        Test().run(result)
 
3335
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3336
 
 
3337
    def test_location_bt_error(self):
 
3338
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3339
        class Test(tests.TestCase):
 
3340
            def test_error(self):
 
3341
                raise RuntimeError
 
3342
        result = self.TracebackRecordingResult()
 
3343
        Test("test_error").run(result)
 
3344
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3345
 
 
3346
    def test_location_bt_failure(self):
 
3347
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3348
        class Test(tests.TestCase):
 
3349
            def test_failure(self):
 
3350
                raise self.failureException
 
3351
        result = self.TracebackRecordingResult()
 
3352
        Test("test_failure").run(result)
 
3353
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3354
 
 
3355
    def test_env_var_triggers_post_mortem(self):
 
3356
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3357
        import pdb
 
3358
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3359
        post_mortem_calls = []
 
3360
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3361
        self.overrideEnv('BZR_TEST_PDB', None)
 
3362
        result._post_mortem(1)
 
3363
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3364
        result._post_mortem(2)
 
3365
        self.assertEqual([2], post_mortem_calls)
 
3366
 
 
3367
 
2963
3368
class TestRunSuite(tests.TestCase):
2964
3369
 
2965
3370
    def test_runner_class(self):
2976
3381
                                                self.verbosity)
2977
3382
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3383
        self.assertLength(1, calls)
 
3384
 
 
3385
 
 
3386
class TestEnvironHandling(tests.TestCase):
 
3387
 
 
3388
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3389
        self.assertFalse('MYVAR' in os.environ)
 
3390
        self.overrideEnv('MYVAR', '42')
 
3391
        # We use an embedded test to make sure we fix the _captureVar bug
 
3392
        class Test(tests.TestCase):
 
3393
            def test_me(self):
 
3394
                # The first call save the 42 value
 
3395
                self.overrideEnv('MYVAR', None)
 
3396
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3397
                # Make sure we can call it twice
 
3398
                self.overrideEnv('MYVAR', None)
 
3399
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3400
        output = StringIO()
 
3401
        result = tests.TextTestResult(output, 0, 1)
 
3402
        Test('test_me').run(result)
 
3403
        if not result.wasStrictlySuccessful():
 
3404
            self.fail(output.getvalue())
 
3405
        # We get our value back
 
3406
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3407
 
 
3408
 
 
3409
class TestIsolatedEnv(tests.TestCase):
 
3410
    """Test isolating tests from os.environ.
 
3411
 
 
3412
    Since we use tests that are already isolated from os.environ a bit of care
 
3413
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3414
    The tests start an already clean os.environ which allow doing valid
 
3415
    assertions about which variables are present or not and design tests around
 
3416
    these assertions.
 
3417
    """
 
3418
 
 
3419
    class ScratchMonkey(tests.TestCase):
 
3420
 
 
3421
        def test_me(self):
 
3422
            pass
 
3423
 
 
3424
    def test_basics(self):
 
3425
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3426
        # for tests.TestCase.
 
3427
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3428
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3429
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3430
        self.assertFalse('BZR_HOME' in os.environ)
 
3431
        # Make sure we know the definition of LINES: part of os.environ for
 
3432
        # tests.TestCase
 
3433
        self.assertTrue('LINES' in tests.isolated_environ)
 
3434
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3435
        self.assertEquals('25', os.environ['LINES'])
 
3436
 
 
3437
    def test_injecting_unknown_variable(self):
 
3438
        # BZR_HOME is known to be absent from os.environ
 
3439
        test = self.ScratchMonkey('test_me')
 
3440
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3441
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3442
        tests.restore_os_environ(test)
 
3443
        self.assertFalse('BZR_HOME' in os.environ)
 
3444
 
 
3445
    def test_injecting_known_variable(self):
 
3446
        test = self.ScratchMonkey('test_me')
 
3447
        # LINES is known to be present in os.environ
 
3448
        tests.override_os_environ(test, {'LINES': '42'})
 
3449
        self.assertEquals('42', os.environ['LINES'])
 
3450
        tests.restore_os_environ(test)
 
3451
        self.assertEquals('25', os.environ['LINES'])
 
3452
 
 
3453
    def test_deleting_variable(self):
 
3454
        test = self.ScratchMonkey('test_me')
 
3455
        # LINES is known to be present in os.environ
 
3456
        tests.override_os_environ(test, {'LINES': None})
 
3457
        self.assertTrue('LINES' not in os.environ)
 
3458
        tests.restore_os_environ(test)
 
3459
        self.assertEquals('25', os.environ['LINES'])
 
3460
 
 
3461
 
 
3462
class TestDocTestSuiteIsolation(tests.TestCase):
 
3463
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3464
 
 
3465
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3466
    the clean environment as a base for testing. To precisely capture the
 
3467
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3468
    compare against.
 
3469
 
 
3470
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3471
    not `os.environ` so each test overrides it to suit its needs.
 
3472
 
 
3473
    """
 
3474
 
 
3475
    def get_doctest_suite_for_string(self, klass, string):
 
3476
        class Finder(doctest.DocTestFinder):
 
3477
 
 
3478
            def find(*args, **kwargs):
 
3479
                test = doctest.DocTestParser().get_doctest(
 
3480
                    string, {}, 'foo', 'foo.py', 0)
 
3481
                return [test]
 
3482
 
 
3483
        suite = klass(test_finder=Finder())
 
3484
        return suite
 
3485
 
 
3486
    def run_doctest_suite_for_string(self, klass, string):
 
3487
        suite = self.get_doctest_suite_for_string(klass, string)
 
3488
        output = StringIO()
 
3489
        result = tests.TextTestResult(output, 0, 1)
 
3490
        suite.run(result)
 
3491
        return result, output
 
3492
 
 
3493
    def assertDocTestStringSucceds(self, klass, string):
 
3494
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3495
        if not result.wasStrictlySuccessful():
 
3496
            self.fail(output.getvalue())
 
3497
 
 
3498
    def assertDocTestStringFails(self, klass, string):
 
3499
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3500
        if result.wasStrictlySuccessful():
 
3501
            self.fail(output.getvalue())
 
3502
 
 
3503
    def test_injected_variable(self):
 
3504
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3505
        test = """
 
3506
            >>> import os
 
3507
            >>> os.environ['LINES']
 
3508
            '42'
 
3509
            """
 
3510
        # doctest.DocTestSuite fails as it sees '25'
 
3511
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3512
        # tests.DocTestSuite sees '42'
 
3513
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3514
 
 
3515
    def test_deleted_variable(self):
 
3516
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3517
        test = """
 
3518
            >>> import os
 
3519
            >>> os.environ.get('LINES')
 
3520
            """
 
3521
        # doctest.DocTestSuite fails as it sees '25'
 
3522
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3523
        # tests.DocTestSuite sees None
 
3524
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3525
 
 
3526
 
 
3527
class TestSelftestExcludePatterns(tests.TestCase):
 
3528
 
 
3529
    def setUp(self):
 
3530
        super(TestSelftestExcludePatterns, self).setUp()
 
3531
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3532
 
 
3533
    def suite_factory(self, keep_only=None, starting_with=None):
 
3534
        """A test suite factory with only a few tests."""
 
3535
        class Test(tests.TestCase):
 
3536
            def id(self):
 
3537
                # We don't need the full class path
 
3538
                return self._testMethodName
 
3539
            def a(self):
 
3540
                pass
 
3541
            def b(self):
 
3542
                pass
 
3543
            def c(self):
 
3544
                pass
 
3545
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3546
 
 
3547
    def assertTestList(self, expected, *selftest_args):
 
3548
        # We rely on setUp installing the right test suite factory so we can
 
3549
        # test at the command level without loading the whole test suite
 
3550
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3551
        actual = out.splitlines()
 
3552
        self.assertEquals(expected, actual)
 
3553
 
 
3554
    def test_full_list(self):
 
3555
        self.assertTestList(['a', 'b', 'c'])
 
3556
 
 
3557
    def test_single_exclude(self):
 
3558
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3559
 
 
3560
    def test_mutiple_excludes(self):
 
3561
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')