~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-29 22:03:03 UTC
  • mfrom: (5416.2.6 jam-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20100929220303-cr95h8iwtggco721
(mbp) Add 'break-lock --force'

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import gc
21
 
import doctest
 
20
from doctest import ELLIPSIS
22
21
import os
23
22
import signal
24
23
import sys
27
26
import unittest
28
27
import warnings
29
28
 
30
 
from testtools import (
31
 
    ExtendedToOriginalDecorator,
32
 
    MultiTestResult,
33
 
    )
 
29
from testtools import MultiTestResult
34
30
from testtools.content import Content
35
31
from testtools.content_type import ContentType
36
32
from testtools.matchers import (
37
33
    DocTestMatches,
38
34
    Equals,
39
35
    )
40
 
import testtools.testresult.doubles
 
36
import testtools.tests.helpers
41
37
 
42
38
import bzrlib
43
39
from bzrlib import (
44
40
    branchbuilder,
45
41
    bzrdir,
46
 
    controldir,
 
42
    debug,
47
43
    errors,
48
 
    hooks,
49
44
    lockdir,
50
45
    memorytree,
51
46
    osutils,
55
50
    tests,
56
51
    transport,
57
52
    workingtree,
58
 
    workingtree_3,
59
 
    workingtree_4,
60
53
    )
61
54
from bzrlib.repofmt import (
62
55
    groupcompress_repo,
 
56
    pack_repo,
 
57
    weaverepo,
63
58
    )
64
59
from bzrlib.symbol_versioning import (
65
60
    deprecated_function,
70
65
    features,
71
66
    test_lsprof,
72
67
    test_server,
 
68
    test_sftp_transport,
73
69
    TestUtil,
74
70
    )
75
71
from bzrlib.trace import note, mutter
76
72
from bzrlib.transport import memory
 
73
from bzrlib.version import _get_bzr_source_tree
77
74
 
78
75
 
79
76
def _test_ids(test_suite):
81
78
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
82
79
 
83
80
 
 
81
class SelftestTests(tests.TestCase):
 
82
 
 
83
    def test_import_tests(self):
 
84
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
85
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
86
 
 
87
    def test_import_test_failure(self):
 
88
        self.assertRaises(ImportError,
 
89
                          TestUtil._load_module_by_name,
 
90
                          'bzrlib.no-name-yet')
 
91
 
 
92
 
84
93
class MetaTestLog(tests.TestCase):
85
94
 
86
95
    def test_logging(self):
92
101
            "text", "plain", {"charset": "utf8"})))
93
102
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
94
103
        self.assertThat(self.get_log(),
95
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
104
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
105
 
 
106
 
 
107
class TestUnicodeFilename(tests.TestCase):
 
108
 
 
109
    def test_probe_passes(self):
 
110
        """UnicodeFilename._probe passes."""
 
111
        # We can't test much more than that because the behaviour depends
 
112
        # on the platform.
 
113
        tests.UnicodeFilename._probe()
96
114
 
97
115
 
98
116
class TestTreeShape(tests.TestCaseInTempDir):
99
117
 
100
118
    def test_unicode_paths(self):
101
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
119
        self.requireFeature(tests.UnicodeFilename)
102
120
 
103
121
        filename = u'hell\u00d8'
104
122
        self.build_tree_contents([(filename, 'contents of hello')])
105
 
        self.assertPathExists(filename)
 
123
        self.failUnlessExists(filename)
106
124
 
107
125
 
108
126
class TestClassesAvailable(tests.TestCase):
334
352
        from bzrlib.tests.per_workingtree import make_scenarios
335
353
        server1 = "a"
336
354
        server2 = "b"
337
 
        formats = [workingtree_4.WorkingTreeFormat4(),
338
 
                   workingtree_3.WorkingTreeFormat3(),
339
 
                   workingtree_4.WorkingTreeFormat6()]
340
 
        scenarios = make_scenarios(server1, server2, formats,
341
 
            remote_server='c', remote_readonly_server='d',
342
 
            remote_backing_server='e')
 
355
        formats = [workingtree.WorkingTreeFormat2(),
 
356
                   workingtree.WorkingTreeFormat3(),]
 
357
        scenarios = make_scenarios(server1, server2, formats)
343
358
        self.assertEqual([
344
 
            ('WorkingTreeFormat4',
 
359
            ('WorkingTreeFormat2',
345
360
             {'bzrdir_format': formats[0]._matchingbzrdir,
346
361
              'transport_readonly_server': 'b',
347
362
              'transport_server': 'a',
350
365
             {'bzrdir_format': formats[1]._matchingbzrdir,
351
366
              'transport_readonly_server': 'b',
352
367
              'transport_server': 'a',
353
 
              'workingtree_format': formats[1]}),
354
 
            ('WorkingTreeFormat6',
355
 
             {'bzrdir_format': formats[2]._matchingbzrdir,
356
 
              'transport_readonly_server': 'b',
357
 
              'transport_server': 'a',
358
 
              'workingtree_format': formats[2]}),
359
 
            ('WorkingTreeFormat6,remote',
360
 
             {'bzrdir_format': formats[2]._matchingbzrdir,
361
 
              'repo_is_remote': True,
362
 
              'transport_readonly_server': 'd',
363
 
              'transport_server': 'c',
364
 
              'vfs_transport_factory': 'e',
365
 
              'workingtree_format': formats[2]}),
366
 
            ], scenarios)
 
368
              'workingtree_format': formats[1]})],
 
369
            scenarios)
367
370
 
368
371
 
369
372
class TestTreeScenarios(tests.TestCase):
370
373
 
371
374
    def test_scenarios(self):
372
375
        # the tree implementation scenario generator is meant to setup one
373
 
        # instance for each working tree format, one additional instance
 
376
        # instance for each working tree format, and one additional instance
374
377
        # that will use the default wt format, but create a revision tree for
375
 
        # the tests, and one more that uses the default wt format as a
376
 
        # lightweight checkout of a remote repository.  This means that the wt
377
 
        # ones should have the workingtree_to_test_tree attribute set to
378
 
        # 'return_parameter' and the revision one set to
379
 
        # revision_tree_from_workingtree.
 
378
        # the tests.  this means that the wt ones should have the
 
379
        # workingtree_to_test_tree attribute set to 'return_parameter' and the
 
380
        # revision one set to revision_tree_from_workingtree.
380
381
 
381
382
        from bzrlib.tests.per_tree import (
382
383
            _dirstate_tree_from_workingtree,
388
389
            )
389
390
        server1 = "a"
390
391
        server2 = "b"
391
 
        smart_server = test_server.SmartTCPServer_for_testing
392
 
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
393
 
        mem_server = memory.MemoryServer
394
 
        formats = [workingtree_4.WorkingTreeFormat4(),
395
 
                   workingtree_3.WorkingTreeFormat3(),]
 
392
        formats = [workingtree.WorkingTreeFormat2(),
 
393
                   workingtree.WorkingTreeFormat3(),]
396
394
        scenarios = make_scenarios(server1, server2, formats)
397
 
        self.assertEqual(8, len(scenarios))
398
 
        default_wt_format = workingtree.format_registry.get_default()
399
 
        wt4_format = workingtree_4.WorkingTreeFormat4()
400
 
        wt5_format = workingtree_4.WorkingTreeFormat5()
401
 
        wt6_format = workingtree_4.WorkingTreeFormat6()
 
395
        self.assertEqual(7, len(scenarios))
 
396
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
397
        wt4_format = workingtree.WorkingTreeFormat4()
 
398
        wt5_format = workingtree.WorkingTreeFormat5()
402
399
        expected_scenarios = [
403
 
            ('WorkingTreeFormat4',
 
400
            ('WorkingTreeFormat2',
404
401
             {'bzrdir_format': formats[0]._matchingbzrdir,
405
402
              'transport_readonly_server': 'b',
406
403
              'transport_server': 'a',
414
411
              'workingtree_format': formats[1],
415
412
              '_workingtree_to_test_tree': return_parameter,
416
413
             }),
417
 
            ('WorkingTreeFormat6,remote',
418
 
             {'bzrdir_format': wt6_format._matchingbzrdir,
419
 
              'repo_is_remote': True,
420
 
              'transport_readonly_server': smart_readonly_server,
421
 
              'transport_server': smart_server,
422
 
              'vfs_transport_factory': mem_server,
423
 
              'workingtree_format': wt6_format,
424
 
              '_workingtree_to_test_tree': return_parameter,
425
 
             }),
426
414
            ('RevisionTree',
427
415
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
428
416
              'bzrdir_format': default_wt_format._matchingbzrdir,
475
463
        # ones to add.
476
464
        from bzrlib.tests.per_tree import (
477
465
            return_parameter,
 
466
            revision_tree_from_workingtree
478
467
            )
479
468
        from bzrlib.tests.per_intertree import (
480
469
            make_scenarios,
481
470
            )
482
 
        from bzrlib.workingtree_3 import WorkingTreeFormat3
483
 
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
471
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
484
472
        input_test = TestInterTreeScenarios(
485
473
            "test_scenarios")
486
474
        server1 = "a"
487
475
        server2 = "b"
488
 
        format1 = WorkingTreeFormat4()
 
476
        format1 = WorkingTreeFormat2()
489
477
        format2 = WorkingTreeFormat3()
490
478
        formats = [("1", str, format1, format2, "converter1"),
491
479
            ("2", int, format2, format1, "converter2")]
537
525
        self.assertRaises(AssertionError, self.assertEqualStat,
538
526
            os.lstat("foo"), os.lstat("longname"))
539
527
 
540
 
    def test_failUnlessExists(self):
541
 
        """Deprecated failUnlessExists and failIfExists"""
542
 
        self.applyDeprecated(
543
 
            deprecated_in((2, 4)),
544
 
            self.failUnlessExists, '.')
545
 
        self.build_tree(['foo/', 'foo/bar'])
546
 
        self.applyDeprecated(
547
 
            deprecated_in((2, 4)),
548
 
            self.failUnlessExists, 'foo/bar')
549
 
        self.applyDeprecated(
550
 
            deprecated_in((2, 4)),
551
 
            self.failIfExists, 'foo/foo')
552
 
 
553
 
    def test_assertPathExists(self):
554
 
        self.assertPathExists('.')
555
 
        self.build_tree(['foo/', 'foo/bar'])
556
 
        self.assertPathExists('foo/bar')
557
 
        self.assertPathDoesNotExist('foo/foo')
558
 
 
559
528
 
560
529
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
561
530
 
595
564
        tree = self.make_branch_and_memory_tree('dir')
596
565
        # Guard against regression into MemoryTransport leaking
597
566
        # files to disk instead of keeping them in memory.
598
 
        self.assertFalse(osutils.lexists('dir'))
 
567
        self.failIf(osutils.lexists('dir'))
599
568
        self.assertIsInstance(tree, memorytree.MemoryTree)
600
569
 
601
570
    def test_make_branch_and_memory_tree_with_format(self):
602
571
        """make_branch_and_memory_tree should accept a format option."""
603
572
        format = bzrdir.BzrDirMetaFormat1()
604
 
        format.repository_format = repository.format_registry.get_default()
 
573
        format.repository_format = weaverepo.RepositoryFormat7()
605
574
        tree = self.make_branch_and_memory_tree('dir', format=format)
606
575
        # Guard against regression into MemoryTransport leaking
607
576
        # files to disk instead of keeping them in memory.
608
 
        self.assertFalse(osutils.lexists('dir'))
 
577
        self.failIf(osutils.lexists('dir'))
609
578
        self.assertIsInstance(tree, memorytree.MemoryTree)
610
579
        self.assertEqual(format.repository_format.__class__,
611
580
            tree.branch.repository._format.__class__)
615
584
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
616
585
        # Guard against regression into MemoryTransport leaking
617
586
        # files to disk instead of keeping them in memory.
618
 
        self.assertFalse(osutils.lexists('dir'))
 
587
        self.failIf(osutils.lexists('dir'))
619
588
 
620
589
    def test_make_branch_builder_with_format(self):
621
590
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
622
591
        # that the format objects are used.
623
592
        format = bzrdir.BzrDirMetaFormat1()
624
 
        repo_format = repository.format_registry.get_default()
 
593
        repo_format = weaverepo.RepositoryFormat7()
625
594
        format.repository_format = repo_format
626
595
        builder = self.make_branch_builder('dir', format=format)
627
596
        the_branch = builder.get_branch()
628
597
        # Guard against regression into MemoryTransport leaking
629
598
        # files to disk instead of keeping them in memory.
630
 
        self.assertFalse(osutils.lexists('dir'))
 
599
        self.failIf(osutils.lexists('dir'))
631
600
        self.assertEqual(format.repository_format.__class__,
632
601
                         the_branch.repository._format.__class__)
633
602
        self.assertEqual(repo_format.get_format_string(),
639
608
        the_branch = builder.get_branch()
640
609
        # Guard against regression into MemoryTransport leaking
641
610
        # files to disk instead of keeping them in memory.
642
 
        self.assertFalse(osutils.lexists('dir'))
643
 
        dir_format = controldir.format_registry.make_bzrdir('knit')
 
611
        self.failIf(osutils.lexists('dir'))
 
612
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
644
613
        self.assertEqual(dir_format.repository_format.__class__,
645
614
                         the_branch.repository._format.__class__)
646
615
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
650
619
    def test_dangling_locks_cause_failures(self):
651
620
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
652
621
            def test_function(self):
653
 
                t = self.get_transport_from_path('.')
 
622
                t = self.get_transport('.')
654
623
                l = lockdir.LockDir(t, 'lock')
655
624
                l.create()
656
625
                l.attempt_lock()
676
645
        # for the server
677
646
        url = self.get_readonly_url()
678
647
        url2 = self.get_readonly_url('foo/bar')
679
 
        t = transport.get_transport_from_url(url)
680
 
        t2 = transport.get_transport_from_url(url2)
681
 
        self.assertIsInstance(t, ReadonlyTransportDecorator)
682
 
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
 
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
683
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
684
653
 
685
654
    def test_get_readonly_url_http(self):
691
660
        url = self.get_readonly_url()
692
661
        url2 = self.get_readonly_url('foo/bar')
693
662
        # the transport returned may be any HttpTransportBase subclass
694
 
        t = transport.get_transport_from_url(url)
695
 
        t2 = transport.get_transport_from_url(url2)
696
 
        self.assertIsInstance(t, HttpTransportBase)
697
 
        self.assertIsInstance(t2, HttpTransportBase)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
 
665
        self.failUnless(isinstance(t, HttpTransportBase))
 
666
        self.failUnless(isinstance(t2, HttpTransportBase))
698
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
699
668
 
700
669
    def test_is_directory(self):
708
677
    def test_make_branch_builder(self):
709
678
        builder = self.make_branch_builder('dir')
710
679
        rev_id = builder.build_commit()
711
 
        self.assertPathExists('dir')
712
 
        a_dir = controldir.ControlDir.open('dir')
 
680
        self.failUnlessExists('dir')
 
681
        a_dir = bzrdir.BzrDir.open('dir')
713
682
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
714
683
        a_branch = a_dir.open_branch()
715
684
        builder_branch = builder.get_branch()
730
699
        self.assertIsInstance(result_bzrdir.transport,
731
700
                              memory.MemoryTransport)
732
701
        # should not be on disk, should only be in memory
733
 
        self.assertPathDoesNotExist('subdir')
 
702
        self.failIfExists('subdir')
734
703
 
735
704
 
736
705
class TestChrootedTest(tests.ChrootedTestCase):
737
706
 
738
707
    def test_root_is_root(self):
739
 
        t = transport.get_transport_from_url(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
740
709
        url = t.base
741
710
        self.assertEqual(url, t.clone('..').base)
742
711
 
744
713
class TestProfileResult(tests.TestCase):
745
714
 
746
715
    def test_profiles_tests(self):
747
 
        self.requireFeature(features.lsprof_feature)
748
 
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
716
        self.requireFeature(test_lsprof.LSProfFeature)
 
717
        terminal = testtools.tests.helpers.ExtendedTestResult()
749
718
        result = tests.ProfileResult(terminal)
750
719
        class Sample(tests.TestCase):
751
720
            def a(self):
768
737
                descriptions=0,
769
738
                verbosity=1,
770
739
                )
771
 
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
740
        capture = testtools.tests.helpers.ExtendedTestResult()
772
741
        test_case.run(MultiTestResult(result, capture))
773
742
        run_case = capture._events[0][1]
774
743
        timed_string = result._testTimeString(run_case)
795
764
        self.check_timing(ShortDelayTestCase('test_short_delay'),
796
765
                          r"^ +[0-9]+ms$")
797
766
 
 
767
    def _patch_get_bzr_source_tree(self):
 
768
        # Reading from the actual source tree breaks isolation, but we don't
 
769
        # want to assume that thats *all* that would happen.
 
770
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
 
771
 
 
772
    def test_assigned_benchmark_file_stores_date(self):
 
773
        self._patch_get_bzr_source_tree()
 
774
        output = StringIO()
 
775
        result = bzrlib.tests.TextTestResult(self._log_file,
 
776
                                        descriptions=0,
 
777
                                        verbosity=1,
 
778
                                        bench_history=output
 
779
                                        )
 
780
        output_string = output.getvalue()
 
781
        # if you are wondering about the regexp please read the comment in
 
782
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
783
        # XXX: what comment?  -- Andrew Bennetts
 
784
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
785
 
 
786
    def test_benchhistory_records_test_times(self):
 
787
        self._patch_get_bzr_source_tree()
 
788
        result_stream = StringIO()
 
789
        result = bzrlib.tests.TextTestResult(
 
790
            self._log_file,
 
791
            descriptions=0,
 
792
            verbosity=1,
 
793
            bench_history=result_stream
 
794
            )
 
795
 
 
796
        # we want profile a call and check that its test duration is recorded
 
797
        # make a new test instance that when run will generate a benchmark
 
798
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
799
        # execute the test, which should succeed and record times
 
800
        example_test_case.run(result)
 
801
        lines = result_stream.getvalue().splitlines()
 
802
        self.assertEqual(2, len(lines))
 
803
        self.assertContainsRe(lines[1],
 
804
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
805
            "._time_hello_world_encoding")
 
806
 
798
807
    def _time_hello_world_encoding(self):
799
808
        """Profile two sleep calls
800
809
 
805
814
 
806
815
    def test_lsprofiling(self):
807
816
        """Verbose test result prints lsprof statistics from test cases."""
808
 
        self.requireFeature(features.lsprof_feature)
 
817
        self.requireFeature(test_lsprof.LSProfFeature)
809
818
        result_stream = StringIO()
810
819
        result = bzrlib.tests.VerboseTestResult(
811
820
            result_stream,
840
849
        self.assertContainsRe(output,
841
850
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
842
851
 
843
 
    def test_uses_time_from_testtools(self):
844
 
        """Test case timings in verbose results should use testtools times"""
845
 
        import datetime
846
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
847
 
            def startTest(self, test):
848
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
849
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
850
 
            def addSuccess(self, test):
851
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
852
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
853
 
            def report_tests_starting(self): pass
854
 
        sio = StringIO()
855
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
856
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
857
 
 
858
852
    def test_known_failure(self):
859
 
        """Using knownFailure should trigger several result actions."""
 
853
        """A KnownFailure being raised should trigger several result actions."""
860
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
861
855
            def stopTestRun(self): pass
862
856
            def report_tests_starting(self): pass
865
859
        result = InstrumentedTestResult(None, None, None, None)
866
860
        class Test(tests.TestCase):
867
861
            def test_function(self):
868
 
                self.knownFailure('failed!')
 
862
                raise tests.KnownFailure('failed!')
869
863
        test = Test("test_function")
870
864
        test.run(result)
871
865
        # it should invoke 'report_known_failure'.
887
881
            descriptions=0,
888
882
            verbosity=2,
889
883
            )
890
 
        _get_test("test_xfail").run(result)
891
 
        self.assertContainsRe(result_stream.getvalue(),
892
 
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
893
 
            "\\s*(?:Text attachment: )?reason"
894
 
            "(?:\n-+\n|: {{{)"
895
 
            "this_fails"
896
 
            "(?:\n-+\n|}}}\n)")
 
884
        test = self.get_passing_test()
 
885
        result.startTest(test)
 
886
        prefix = len(result_stream.getvalue())
 
887
        # the err parameter has the shape:
 
888
        # (class, exception object, traceback)
 
889
        # KnownFailures dont get their tracebacks shown though, so we
 
890
        # can skip that.
 
891
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
892
        result.report_known_failure(test, err)
 
893
        output = result_stream.getvalue()[prefix:]
 
894
        lines = output.splitlines()
 
895
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
896
        if sys.version_info > (2, 7):
 
897
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
898
                self.assertNotEqual, lines[1], '    ')
 
899
        self.assertEqual(lines[1], '    foo')
 
900
        self.assertEqual(2, len(lines))
897
901
 
898
902
    def get_passing_test(self):
899
903
        """Return a test object that can't be run usefully."""
910
914
                self._call = test, feature
911
915
        result = InstrumentedTestResult(None, None, None, None)
912
916
        test = SampleTestCase('_test_pass')
913
 
        feature = features.Feature()
 
917
        feature = tests.Feature()
914
918
        result.startTest(test)
915
919
        result.addNotSupported(test, feature)
916
920
        # it should invoke 'report_unsupported'.
935
939
            verbosity=2,
936
940
            )
937
941
        test = self.get_passing_test()
938
 
        feature = features.Feature()
 
942
        feature = tests.Feature()
939
943
        result.startTest(test)
940
944
        prefix = len(result_stream.getvalue())
941
945
        result.report_unsupported(test, feature)
954
958
            def addNotSupported(self, test, feature):
955
959
                self._call = test, feature
956
960
        result = InstrumentedTestResult(None, None, None, None)
957
 
        feature = features.Feature()
 
961
        feature = tests.Feature()
958
962
        class Test(tests.TestCase):
959
963
            def test_function(self):
960
964
                raise tests.UnavailableFeature(feature)
979
983
    def test_strict_with_known_failure(self):
980
984
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
981
985
                                             verbosity=1)
982
 
        test = _get_test("test_xfail")
983
 
        test.run(result)
 
986
        test = self.get_passing_test()
 
987
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
988
        result.addExpectedFailure(test, err)
984
989
        self.assertFalse(result.wasStrictlySuccessful())
985
990
        self.assertEqual(None, result._extractBenchmarkTime(test))
986
991
 
1018
1023
        self.assertEquals(2, result.count)
1019
1024
 
1020
1025
 
 
1026
class TestUnicodeFilenameFeature(tests.TestCase):
 
1027
 
 
1028
    def test_probe_passes(self):
 
1029
        """UnicodeFilenameFeature._probe passes."""
 
1030
        # We can't test much more than that because the behaviour depends
 
1031
        # on the platform.
 
1032
        tests.UnicodeFilenameFeature._probe()
 
1033
 
 
1034
 
1021
1035
class TestRunner(tests.TestCase):
1022
1036
 
1023
1037
    def dummy_test(self):
1049
1063
        test = unittest.TestSuite()
1050
1064
        test.addTest(Test("known_failure_test"))
1051
1065
        def failing_test():
1052
 
            raise AssertionError('foo')
 
1066
            self.fail('foo')
1053
1067
        test.addTest(unittest.FunctionTestCase(failing_test))
1054
1068
        stream = StringIO()
1055
1069
        runner = tests.TextTestRunner(stream=stream)
1063
1077
            '^----------------------------------------------------------------------\n'
1064
1078
            'Traceback \\(most recent call last\\):\n'
1065
1079
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1066
 
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1080
            '    self.fail\\(\'foo\'\\)\n'
1067
1081
            '.*'
1068
1082
            '^----------------------------------------------------------------------\n'
1069
1083
            '.*'
1075
1089
        # the final output.
1076
1090
        class Test(tests.TestCase):
1077
1091
            def known_failure_test(self):
1078
 
                self.knownFailure("Never works...")
 
1092
                self.expectFailure('failed', self.assertTrue, False)
1079
1093
        test = Test("known_failure_test")
1080
1094
        stream = StringIO()
1081
1095
        runner = tests.TextTestRunner(stream=stream)
1087
1101
            '\n'
1088
1102
            'OK \\(known_failures=1\\)\n')
1089
1103
 
1090
 
    def test_unexpected_success_bad(self):
1091
 
        class Test(tests.TestCase):
1092
 
            def test_truth(self):
1093
 
                self.expectFailure("No absolute truth", self.assertTrue, True)
1094
 
        runner = tests.TextTestRunner(stream=StringIO())
1095
 
        result = self.run_test_runner(runner, Test("test_truth"))
1096
 
        self.assertContainsRe(runner.stream.getvalue(),
1097
 
            "=+\n"
1098
 
            "FAIL: \\S+\.test_truth\n"
1099
 
            "-+\n"
1100
 
            "(?:.*\n)*"
1101
 
            "\\s*(?:Text attachment: )?reason"
1102
 
            "(?:\n-+\n|: {{{)"
1103
 
            "No absolute truth"
1104
 
            "(?:\n-+\n|}}}\n)"
1105
 
            "(?:.*\n)*"
1106
 
            "-+\n"
1107
 
            "Ran 1 test in .*\n"
1108
 
            "\n"
1109
 
            "FAILED \\(failures=1\\)\n\\Z")
1110
 
 
1111
1104
    def test_result_decorator(self):
1112
1105
        # decorate results
1113
1106
        calls = []
1114
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1107
        class LoggingDecorator(tests.ForwardingResult):
1115
1108
            def startTest(self, test):
1116
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1109
                tests.ForwardingResult.startTest(self, test)
1117
1110
                calls.append('start')
1118
1111
        test = unittest.FunctionTestCase(lambda:None)
1119
1112
        stream = StringIO()
1161
1154
        class SkippedTest(tests.TestCase):
1162
1155
 
1163
1156
            def setUp(self):
1164
 
                super(SkippedTest, self).setUp()
 
1157
                tests.TestCase.setUp(self)
1165
1158
                calls.append('setUp')
1166
1159
                self.addCleanup(self.cleanup)
1167
1160
 
1197
1190
 
1198
1191
    def test_unsupported_features_listed(self):
1199
1192
        """When unsupported features are encountered they are detailed."""
1200
 
        class Feature1(features.Feature):
 
1193
        class Feature1(tests.Feature):
1201
1194
            def _probe(self): return False
1202
 
        class Feature2(features.Feature):
 
1195
        class Feature2(tests.Feature):
1203
1196
            def _probe(self): return False
1204
1197
        # create sample tests
1205
1198
        test1 = SampleTestCase('_test_pass')
1220
1213
            ],
1221
1214
            lines[-3:])
1222
1215
 
 
1216
    def _patch_get_bzr_source_tree(self):
 
1217
        # Reading from the actual source tree breaks isolation, but we don't
 
1218
        # want to assume that thats *all* that would happen.
 
1219
        self._get_source_tree_calls = []
 
1220
        def new_get():
 
1221
            self._get_source_tree_calls.append("called")
 
1222
            return None
 
1223
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
 
1224
 
 
1225
    def test_bench_history(self):
 
1226
        # tests that the running the benchmark passes bench_history into
 
1227
        # the test result object. We can tell that happens if
 
1228
        # _get_bzr_source_tree is called.
 
1229
        self._patch_get_bzr_source_tree()
 
1230
        test = TestRunner('dummy_test')
 
1231
        output = StringIO()
 
1232
        runner = tests.TextTestRunner(stream=self._log_file,
 
1233
                                      bench_history=output)
 
1234
        result = self.run_test_runner(runner, test)
 
1235
        output_string = output.getvalue()
 
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1237
        self.assertLength(1, self._get_source_tree_calls)
 
1238
 
1223
1239
    def test_verbose_test_count(self):
1224
1240
        """A verbose test run reports the right test count at the start"""
1225
1241
        suite = TestUtil.TestSuite([
1235
1251
    def test_startTestRun(self):
1236
1252
        """run should call result.startTestRun()"""
1237
1253
        calls = []
1238
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1254
        class LoggingDecorator(tests.ForwardingResult):
1239
1255
            def startTestRun(self):
1240
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1256
                tests.ForwardingResult.startTestRun(self)
1241
1257
                calls.append('startTestRun')
1242
1258
        test = unittest.FunctionTestCase(lambda:None)
1243
1259
        stream = StringIO()
1249
1265
    def test_stopTestRun(self):
1250
1266
        """run should call result.stopTestRun()"""
1251
1267
        calls = []
1252
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1268
        class LoggingDecorator(tests.ForwardingResult):
1253
1269
            def stopTestRun(self):
1254
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1270
                tests.ForwardingResult.stopTestRun(self)
1255
1271
                calls.append('stopTestRun')
1256
1272
        test = unittest.FunctionTestCase(lambda:None)
1257
1273
        stream = StringIO()
1260
1276
        result = self.run_test_runner(runner, test)
1261
1277
        self.assertLength(1, calls)
1262
1278
 
1263
 
    def test_unicode_test_output_on_ascii_stream(self):
1264
 
        """Showing results should always succeed even on an ascii console"""
1265
 
        class FailureWithUnicode(tests.TestCase):
1266
 
            def test_log_unicode(self):
1267
 
                self.log(u"\u2606")
1268
 
                self.fail("Now print that log!")
1269
 
        out = StringIO()
1270
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1271
 
            lambda trace=False: "ascii")
1272
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1273
 
            FailureWithUnicode("test_log_unicode"))
1274
 
        self.assertContainsRe(out.getvalue(),
1275
 
            "(?:Text attachment: )?log"
1276
 
            "(?:\n-+\n|: {{{)"
1277
 
            "\d+\.\d+  \\\\u2606"
1278
 
            "(?:\n-+\n|}}}\n)")
1279
 
 
1280
1279
 
1281
1280
class SampleTestCase(tests.TestCase):
1282
1281
 
1470
1469
        # Note this test won't fail with hooks that the core library doesn't
1471
1470
        # use - but it trigger with a plugin that adds hooks, so its still a
1472
1471
        # useful warning in that case.
1473
 
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1474
 
        self.assertEqual(
1475
 
            bzrlib.smart.server.SmartServerHooks(),
 
1472
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1473
            bzrlib.branch.Branch.hooks)
 
1474
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
1476
1475
            bzrlib.smart.server.SmartTCPServer.hooks)
1477
 
        self.assertEqual(
1478
 
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1476
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1477
            bzrlib.commands.Command.hooks)
1479
1478
 
1480
1479
    def test__gather_lsprof_in_benchmarks(self):
1481
1480
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1482
1481
 
1483
1482
        Each self.time() call is individually and separately profiled.
1484
1483
        """
1485
 
        self.requireFeature(features.lsprof_feature)
 
1484
        self.requireFeature(test_lsprof.LSProfFeature)
1486
1485
        # overrides the class member with an instance member so no cleanup
1487
1486
        # needed.
1488
1487
        self._gather_lsprof_in_benchmarks = True
1507
1506
        transport_server = memory.MemoryServer()
1508
1507
        transport_server.start_server()
1509
1508
        self.addCleanup(transport_server.stop_server)
1510
 
        t = transport.get_transport_from_url(transport_server.get_url())
1511
 
        controldir.ControlDir.create(t.base)
 
1509
        t = transport.get_transport(transport_server.get_url())
 
1510
        bzrdir.BzrDir.create(t.base)
1512
1511
        self.assertRaises(errors.BzrError,
1513
 
            controldir.ControlDir.open_from_transport, t)
 
1512
            bzrdir.BzrDir.open_from_transport, t)
1514
1513
        # But if we declare this as safe, we can open the bzrdir.
1515
1514
        self.permit_url(t.base)
1516
1515
        self._bzr_selftest_roots.append(t.base)
1517
 
        controldir.ControlDir.open_from_transport(t)
 
1516
        bzrdir.BzrDir.open_from_transport(t)
1518
1517
 
1519
1518
    def test_requireFeature_available(self):
1520
1519
        """self.requireFeature(available) is a no-op."""
1521
 
        class Available(features.Feature):
 
1520
        class Available(tests.Feature):
1522
1521
            def _probe(self):return True
1523
1522
        feature = Available()
1524
1523
        self.requireFeature(feature)
1525
1524
 
1526
1525
    def test_requireFeature_unavailable(self):
1527
1526
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1528
 
        class Unavailable(features.Feature):
 
1527
        class Unavailable(tests.Feature):
1529
1528
            def _probe(self):return False
1530
1529
        feature = Unavailable()
1531
1530
        self.assertRaises(tests.UnavailableFeature,
1660
1659
        class Test(tests.TestCase):
1661
1660
 
1662
1661
            def setUp(self):
1663
 
                super(Test, self).setUp()
 
1662
                tests.TestCase.setUp(self)
1664
1663
                self.orig = self.overrideAttr(obj, 'test_attr')
1665
1664
 
1666
1665
            def test_value(self):
1679
1678
        class Test(tests.TestCase):
1680
1679
 
1681
1680
            def setUp(self):
1682
 
                super(Test, self).setUp()
 
1681
                tests.TestCase.setUp(self)
1683
1682
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1684
1683
 
1685
1684
            def test_value(self):
1690
1689
        test.run(unittest.TestResult())
1691
1690
        self.assertEqual('original', obj.test_attr)
1692
1691
 
1693
 
    def test_recordCalls(self):
1694
 
        from bzrlib.tests import test_selftest
1695
 
        calls = self.recordCalls(
1696
 
            test_selftest, '_add_numbers')
1697
 
        self.assertEqual(test_selftest._add_numbers(2, 10),
1698
 
            12)
1699
 
        self.assertEquals(calls, [((2, 10), {})])
1700
 
 
1701
 
 
1702
 
def _add_numbers(a, b):
1703
 
    return a + b
1704
 
 
1705
 
 
1706
 
class _MissingFeature(features.Feature):
 
1692
 
 
1693
class _MissingFeature(tests.Feature):
1707
1694
    def _probe(self):
1708
1695
        return False
1709
1696
missing_feature = _MissingFeature()
1760
1747
        result = self._run_test('test_fail')
1761
1748
        self.assertEqual(1, len(result.failures))
1762
1749
        result_content = result.failures[0][1]
1763
 
        self.assertContainsRe(result_content,
1764
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1750
        self.assertContainsRe(result_content, 'Text attachment: log')
1765
1751
        self.assertContainsRe(result_content, 'this was a failing test')
1766
1752
 
1767
1753
    def test_error_has_log(self):
1768
1754
        result = self._run_test('test_error')
1769
1755
        self.assertEqual(1, len(result.errors))
1770
1756
        result_content = result.errors[0][1]
1771
 
        self.assertContainsRe(result_content,
1772
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1757
        self.assertContainsRe(result_content, 'Text attachment: log')
1773
1758
        self.assertContainsRe(result_content, 'this test errored')
1774
1759
 
1775
1760
    def test_skip_has_no_log(self):
1794
1779
        result = self._run_test('test_xfail')
1795
1780
        self.assertEqual(1, len(result.expectedFailures))
1796
1781
        result_content = result.expectedFailures[0][1]
1797
 
        self.assertNotContainsRe(result_content,
1798
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1782
        self.assertNotContainsRe(result_content, 'Text attachment: log')
1799
1783
        self.assertNotContainsRe(result_content, 'test with expected failure')
1800
1784
 
1801
1785
    def test_unexpected_success_has_log(self):
1974
1958
    def test_make_branch_and_tree_with_format(self):
1975
1959
        # we should be able to supply a format to make_branch_and_tree
1976
1960
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1977
 
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
 
1961
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
 
1962
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1978
1963
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1964
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1965
                              bzrlib.bzrdir.BzrDirFormat6)
1979
1966
 
1980
1967
    def test_make_branch_and_memory_tree(self):
1981
1968
        # we should be able to get a new branch and a mutable tree from
2058
2045
        self.assertLength(2, output.readlines())
2059
2046
 
2060
2047
    def test_lsprof_tests(self):
2061
 
        self.requireFeature(features.lsprof_feature)
2062
 
        results = []
 
2048
        self.requireFeature(test_lsprof.LSProfFeature)
 
2049
        calls = []
2063
2050
        class Test(object):
2064
2051
            def __call__(test, result):
2065
2052
                test.run(result)
2066
2053
            def run(test, result):
2067
 
                results.append(result)
 
2054
                self.assertIsInstance(result, tests.ForwardingResult)
 
2055
                calls.append("called")
2068
2056
            def countTestCases(self):
2069
2057
                return 1
2070
2058
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2071
 
        self.assertLength(1, results)
2072
 
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
2059
        self.assertLength(1, calls)
2073
2060
 
2074
2061
    def test_random(self):
2075
2062
        # test randomising by listing a number of tests.
2210
2197
        self.assertNotContainsRe(content, 'test with expected failure')
2211
2198
        self.assertEqual(1, len(result.expectedFailures))
2212
2199
        result_content = result.expectedFailures[0][1]
2213
 
        self.assertNotContainsRe(result_content,
2214
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2200
        self.assertNotContainsRe(result_content, 'Text attachment: log')
2215
2201
        self.assertNotContainsRe(result_content, 'test with expected failure')
2216
2202
 
2217
2203
    def test_unexpected_success_has_log(self):
2218
2204
        content, result = self.run_subunit_stream('test_unexpected_success')
2219
2205
        self.assertContainsRe(content, '(?m)^log$')
2220
2206
        self.assertContainsRe(content, 'test with unexpected success')
2221
 
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2222
 
        #                success, if a min version check is added remove this
2223
 
        from subunit import TestProtocolClient as _Client
2224
 
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2225
 
            self.expectFailure('subunit treats "unexpectedSuccess"'
2226
 
                               ' as a plain success',
2227
 
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2207
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2208
                           ' as a plain success',
 
2209
            self.assertEqual, 1, len(result.unexpectedSuccesses))
2228
2210
        self.assertEqual(1, len(result.unexpectedSuccesses))
2229
2211
        test = result.unexpectedSuccesses[0]
2230
2212
        # RemotedTestCase doesn't preserve the "details"
2365
2347
        # stdout and stderr of the invoked run_bzr
2366
2348
        current_factory = bzrlib.ui.ui_factory
2367
2349
        self.run_bzr(['foo'])
2368
 
        self.assertFalse(current_factory is self.factory)
 
2350
        self.failIf(current_factory is self.factory)
2369
2351
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2370
2352
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2371
2353
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2412
2394
    """Base class for tests testing how we might run bzr."""
2413
2395
 
2414
2396
    def setUp(self):
2415
 
        super(TestWithFakedStartBzrSubprocess, self).setUp()
 
2397
        tests.TestCaseWithTransport.setUp(self)
2416
2398
        self.subprocess_calls = []
2417
2399
 
2418
2400
    def start_bzr_subprocess(self, process_args, env_changes=None,
2528
2510
 
2529
2511
 
2530
2512
class TestStartBzrSubProcess(tests.TestCase):
2531
 
    """Stub test start_bzr_subprocess."""
2532
2513
 
2533
 
    def _subprocess_log_cleanup(self):
2534
 
        """Inhibits the base version as we don't produce a log file."""
 
2514
    def check_popen_state(self):
 
2515
        """Replace to make assertions when popen is called."""
2535
2516
 
2536
2517
    def _popen(self, *args, **kwargs):
2537
 
        """Override the base version to record the command that is run.
2538
 
 
2539
 
        From there we can ensure it is correct without spawning a real process.
2540
 
        """
 
2518
        """Record the command that is run, so that we can ensure it is correct"""
2541
2519
        self.check_popen_state()
2542
2520
        self._popen_args = args
2543
2521
        self._popen_kwargs = kwargs
2544
2522
        raise _DontSpawnProcess()
2545
2523
 
2546
 
    def check_popen_state(self):
2547
 
        """Replace to make assertions when popen is called."""
2548
 
 
2549
2524
    def test_run_bzr_subprocess_no_plugins(self):
2550
2525
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2551
2526
        command = self._popen_args[0]
2555
2530
 
2556
2531
    def test_allow_plugins(self):
2557
2532
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2558
 
                          allow_plugins=True)
 
2533
            allow_plugins=True)
2559
2534
        command = self._popen_args[0]
2560
2535
        self.assertEqual([], command[2:])
2561
2536
 
2562
2537
    def test_set_env(self):
2563
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2538
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2564
2539
        # set in the child
2565
2540
        def check_environment():
2566
2541
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2567
2542
        self.check_popen_state = check_environment
2568
2543
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2569
 
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2544
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
2570
2545
        # not set in theparent
2571
2546
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2572
2547
 
2573
2548
    def test_run_bzr_subprocess_env_del(self):
2574
2549
        """run_bzr_subprocess can remove environment variables too."""
2575
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2550
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2576
2551
        def check_environment():
2577
2552
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2578
2553
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2579
2554
        self.check_popen_state = check_environment
2580
2555
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2581
 
                          env_changes={'EXISTANT_ENV_VAR':None})
 
2556
            env_changes={'EXISTANT_ENV_VAR':None})
2582
2557
        # Still set in parent
2583
2558
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2584
2559
        del os.environ['EXISTANT_ENV_VAR']
2585
2560
 
2586
2561
    def test_env_del_missing(self):
2587
 
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2562
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2588
2563
        def check_environment():
2589
2564
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2590
2565
        self.check_popen_state = check_environment
2591
2566
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2592
 
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2567
            env_changes={'NON_EXISTANT_ENV_VAR':None})
2593
2568
 
2594
2569
    def test_working_dir(self):
2595
2570
        """Test that we can specify the working dir for the child"""
2598
2573
        chdirs = []
2599
2574
        def chdir(path):
2600
2575
            chdirs.append(path)
2601
 
        self.overrideAttr(os, 'chdir', chdir)
2602
 
        def getcwd():
2603
 
            return 'current'
2604
 
        self.overrideAttr(osutils, 'getcwd', getcwd)
2605
 
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2606
 
                          working_dir='foo')
 
2576
        os.chdir = chdir
 
2577
        try:
 
2578
            def getcwd():
 
2579
                return 'current'
 
2580
            osutils.getcwd = getcwd
 
2581
            try:
 
2582
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2583
                    working_dir='foo')
 
2584
            finally:
 
2585
                osutils.getcwd = orig_getcwd
 
2586
        finally:
 
2587
            os.chdir = orig_chdir
2607
2588
        self.assertEqual(['foo', 'current'], chdirs)
2608
2589
 
2609
2590
    def test_get_bzr_path_with_cwd_bzrlib(self):
2629
2610
        self.assertEqual('bzr: interrupted\n', result[1])
2630
2611
 
2631
2612
 
 
2613
class TestFeature(tests.TestCase):
 
2614
 
 
2615
    def test_caching(self):
 
2616
        """Feature._probe is called by the feature at most once."""
 
2617
        class InstrumentedFeature(tests.Feature):
 
2618
            def __init__(self):
 
2619
                super(InstrumentedFeature, self).__init__()
 
2620
                self.calls = []
 
2621
            def _probe(self):
 
2622
                self.calls.append('_probe')
 
2623
                return False
 
2624
        feature = InstrumentedFeature()
 
2625
        feature.available()
 
2626
        self.assertEqual(['_probe'], feature.calls)
 
2627
        feature.available()
 
2628
        self.assertEqual(['_probe'], feature.calls)
 
2629
 
 
2630
    def test_named_str(self):
 
2631
        """Feature.__str__ should thunk to feature_name()."""
 
2632
        class NamedFeature(tests.Feature):
 
2633
            def feature_name(self):
 
2634
                return 'symlinks'
 
2635
        feature = NamedFeature()
 
2636
        self.assertEqual('symlinks', str(feature))
 
2637
 
 
2638
    def test_default_str(self):
 
2639
        """Feature.__str__ should default to __class__.__name__."""
 
2640
        class NamedFeature(tests.Feature):
 
2641
            pass
 
2642
        feature = NamedFeature()
 
2643
        self.assertEqual('NamedFeature', str(feature))
 
2644
 
 
2645
 
 
2646
class TestUnavailableFeature(tests.TestCase):
 
2647
 
 
2648
    def test_access_feature(self):
 
2649
        feature = tests.Feature()
 
2650
        exception = tests.UnavailableFeature(feature)
 
2651
        self.assertIs(feature, exception.args[0])
 
2652
 
 
2653
 
 
2654
simple_thunk_feature = tests._CompatabilityThunkFeature(
 
2655
    deprecated_in((2, 1, 0)),
 
2656
    'bzrlib.tests.test_selftest',
 
2657
    'simple_thunk_feature','UnicodeFilename',
 
2658
    replacement_module='bzrlib.tests'
 
2659
    )
 
2660
 
 
2661
class Test_CompatibilityFeature(tests.TestCase):
 
2662
 
 
2663
    def test_does_thunk(self):
 
2664
        res = self.callDeprecated(
 
2665
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
 
2666
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
 
2667
            simple_thunk_feature.available)
 
2668
        self.assertEqual(tests.UnicodeFilename.available(), res)
 
2669
 
 
2670
 
 
2671
class TestModuleAvailableFeature(tests.TestCase):
 
2672
 
 
2673
    def test_available_module(self):
 
2674
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
 
2675
        self.assertEqual('bzrlib.tests', feature.module_name)
 
2676
        self.assertEqual('bzrlib.tests', str(feature))
 
2677
        self.assertTrue(feature.available())
 
2678
        self.assertIs(tests, feature.module)
 
2679
 
 
2680
    def test_unavailable_module(self):
 
2681
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
 
2682
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
 
2683
        self.assertFalse(feature.available())
 
2684
        self.assertIs(None, feature.module)
 
2685
 
 
2686
 
2632
2687
class TestSelftestFiltering(tests.TestCase):
2633
2688
 
2634
2689
    def setUp(self):
2635
 
        super(TestSelftestFiltering, self).setUp()
 
2690
        tests.TestCase.setUp(self)
2636
2691
        self.suite = TestUtil.TestSuite()
2637
2692
        self.loader = TestUtil.TestLoader()
2638
2693
        self.suite.addTest(self.loader.loadTestsFromModule(
2785
2840
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2786
2841
 
2787
2842
 
2788
 
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2843
class TestCheckInventoryShape(tests.TestCaseWithTransport):
2789
2844
 
2790
 
    def test_check_tree_shape(self):
 
2845
    def test_check_inventory_shape(self):
2791
2846
        files = ['a', 'b/', 'b/c']
2792
2847
        tree = self.make_branch_and_tree('.')
2793
2848
        self.build_tree(files)
2794
2849
        tree.add(files)
2795
2850
        tree.lock_read()
2796
2851
        try:
2797
 
            self.check_tree_shape(tree, files)
 
2852
            self.check_inventory_shape(tree.inventory, files)
2798
2853
        finally:
2799
2854
            tree.unlock()
2800
2855
 
3132
3187
        tpr.register('bar', 'bBB.aAA.rRR')
3133
3188
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3134
3189
        self.assertThat(self.get_log(),
3135
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3136
 
                           doctest.ELLIPSIS))
 
3190
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3137
3191
 
3138
3192
    def test_get_unknown_prefix(self):
3139
3193
        tpr = self._get_registry()
3174
3228
        class Test(unittest.TestCase):
3175
3229
            def runTest(self):
3176
3230
                pass
 
3231
            addCleanup = None # for when on Python 2.7 with native addCleanup
3177
3232
        result = self.LeakRecordingResult()
3178
3233
        test = Test()
 
3234
        self.assertIs(getattr(test, "addCleanup", None), None)
3179
3235
        result.startTestRun()
3180
3236
        test.run(result)
3181
3237
        result.stopTestRun()
3245
3301
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3246
3302
 
3247
3303
 
3248
 
class TestPostMortemDebugging(tests.TestCase):
3249
 
    """Check post mortem debugging works when tests fail or error"""
3250
 
 
3251
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3252
 
        def __init__(self):
3253
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3254
 
            self.postcode = None
3255
 
        def _post_mortem(self, tb=None):
3256
 
            """Record the code object at the end of the current traceback"""
3257
 
            tb = tb or sys.exc_info()[2]
3258
 
            if tb is not None:
3259
 
                next = tb.tb_next
3260
 
                while next is not None:
3261
 
                    tb = next
3262
 
                    next = next.tb_next
3263
 
                self.postcode = tb.tb_frame.f_code
3264
 
        def report_error(self, test, err):
3265
 
            pass
3266
 
        def report_failure(self, test, err):
3267
 
            pass
3268
 
 
3269
 
    def test_location_unittest_error(self):
3270
 
        """Needs right post mortem traceback with erroring unittest case"""
3271
 
        class Test(unittest.TestCase):
3272
 
            def runTest(self):
3273
 
                raise RuntimeError
3274
 
        result = self.TracebackRecordingResult()
3275
 
        Test().run(result)
3276
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3277
 
 
3278
 
    def test_location_unittest_failure(self):
3279
 
        """Needs right post mortem traceback with failing unittest case"""
3280
 
        class Test(unittest.TestCase):
3281
 
            def runTest(self):
3282
 
                raise self.failureException
3283
 
        result = self.TracebackRecordingResult()
3284
 
        Test().run(result)
3285
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3286
 
 
3287
 
    def test_location_bt_error(self):
3288
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3289
 
        class Test(tests.TestCase):
3290
 
            def test_error(self):
3291
 
                raise RuntimeError
3292
 
        result = self.TracebackRecordingResult()
3293
 
        Test("test_error").run(result)
3294
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3295
 
 
3296
 
    def test_location_bt_failure(self):
3297
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3298
 
        class Test(tests.TestCase):
3299
 
            def test_failure(self):
3300
 
                raise self.failureException
3301
 
        result = self.TracebackRecordingResult()
3302
 
        Test("test_failure").run(result)
3303
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3304
 
 
3305
 
    def test_env_var_triggers_post_mortem(self):
3306
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3307
 
        import pdb
3308
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3309
 
        post_mortem_calls = []
3310
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3311
 
        self.overrideEnv('BZR_TEST_PDB', None)
3312
 
        result._post_mortem(1)
3313
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3314
 
        result._post_mortem(2)
3315
 
        self.assertEqual([2], post_mortem_calls)
3316
 
 
3317
 
 
3318
3304
class TestRunSuite(tests.TestCase):
3319
3305
 
3320
3306
    def test_runner_class(self):
3331
3317
                                                self.verbosity)
3332
3318
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3333
3319
        self.assertLength(1, calls)
3334
 
 
3335
 
 
3336
 
class _Selftest(object):
3337
 
    """Mixin for tests needing full selftest output"""
3338
 
 
3339
 
    def _inject_stream_into_subunit(self, stream):
3340
 
        """To be overridden by subclasses that run tests out of process"""
3341
 
 
3342
 
    def _run_selftest(self, **kwargs):
3343
 
        sio = StringIO()
3344
 
        self._inject_stream_into_subunit(sio)
3345
 
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3346
 
        return sio.getvalue()
3347
 
 
3348
 
 
3349
 
class _ForkedSelftest(_Selftest):
3350
 
    """Mixin for tests needing full selftest output with forked children"""
3351
 
 
3352
 
    _test_needs_features = [features.subunit]
3353
 
 
3354
 
    def _inject_stream_into_subunit(self, stream):
3355
 
        """Monkey-patch subunit so the extra output goes to stream not stdout
3356
 
 
3357
 
        Some APIs need rewriting so this kind of bogus hackery can be replaced
3358
 
        by passing the stream param from run_tests down into ProtocolTestCase.
3359
 
        """
3360
 
        from subunit import ProtocolTestCase
3361
 
        _original_init = ProtocolTestCase.__init__
3362
 
        def _init_with_passthrough(self, *args, **kwargs):
3363
 
            _original_init(self, *args, **kwargs)
3364
 
            self._passthrough = stream
3365
 
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3366
 
 
3367
 
    def _run_selftest(self, **kwargs):
3368
 
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3369
 
        if getattr(os, "fork", None) is None:
3370
 
            raise tests.TestNotApplicable("Platform doesn't support forking")
3371
 
        # Make sure the fork code is actually invoked by claiming two cores
3372
 
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3373
 
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3374
 
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3375
 
 
3376
 
 
3377
 
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3378
 
    """Check operation of --parallel=fork selftest option"""
3379
 
 
3380
 
    def test_error_in_child_during_fork(self):
3381
 
        """Error in a forked child during test setup should get reported"""
3382
 
        class Test(tests.TestCase):
3383
 
            def testMethod(self):
3384
 
                pass
3385
 
        # We don't care what, just break something that a child will run
3386
 
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3387
 
        out = self._run_selftest(test_suite_factory=Test)
3388
 
        # Lines from the tracebacks of the two child processes may be mixed
3389
 
        # together due to the way subunit parses and forwards the streams,
3390
 
        # so permit extra lines between each part of the error output.
3391
 
        self.assertContainsRe(out,
3392
 
            "Traceback.*:\n"
3393
 
            "(?:.*\n)*"
3394
 
            ".+ in fork_for_tests\n"
3395
 
            "(?:.*\n)*"
3396
 
            "\s*workaround_zealous_crypto_random\(\)\n"
3397
 
            "(?:.*\n)*"
3398
 
            "TypeError:")
3399
 
 
3400
 
 
3401
 
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3402
 
    """Check a test case still alive after being run emits a warning"""
3403
 
 
3404
 
    class Test(tests.TestCase):
3405
 
        def test_pass(self):
3406
 
            pass
3407
 
        def test_self_ref(self):
3408
 
            self.also_self = self.test_self_ref
3409
 
        def test_skip(self):
3410
 
            self.skip("Don't need")
3411
 
 
3412
 
    def _get_suite(self):
3413
 
        return TestUtil.TestSuite([
3414
 
            self.Test("test_pass"),
3415
 
            self.Test("test_self_ref"),
3416
 
            self.Test("test_skip"),
3417
 
            ])
3418
 
 
3419
 
    def _run_selftest_with_suite(self, **kwargs):
3420
 
        old_flags = tests.selftest_debug_flags
3421
 
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3422
 
        gc_on = gc.isenabled()
3423
 
        if gc_on:
3424
 
            gc.disable()
3425
 
        try:
3426
 
            output = self._run_selftest(test_suite_factory=self._get_suite,
3427
 
                **kwargs)
3428
 
        finally:
3429
 
            if gc_on:
3430
 
                gc.enable()
3431
 
            tests.selftest_debug_flags = old_flags
3432
 
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3433
 
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3434
 
        return output
3435
 
 
3436
 
    def test_testsuite(self):
3437
 
        self._run_selftest_with_suite()
3438
 
 
3439
 
    def test_pattern(self):
3440
 
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3441
 
        self.assertNotContainsRe(out, "test_skip")
3442
 
 
3443
 
    def test_exclude_pattern(self):
3444
 
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3445
 
        self.assertNotContainsRe(out, "test_skip")
3446
 
 
3447
 
    def test_random_seed(self):
3448
 
        self._run_selftest_with_suite(random_seed="now")
3449
 
 
3450
 
    def test_matching_tests_first(self):
3451
 
        self._run_selftest_with_suite(matching_tests_first=True,
3452
 
            pattern="test_self_ref$")
3453
 
 
3454
 
    def test_starting_with_and_exclude(self):
3455
 
        out = self._run_selftest_with_suite(starting_with=["bt."],
3456
 
            exclude_pattern="test_skip$")
3457
 
        self.assertNotContainsRe(out, "test_skip")
3458
 
 
3459
 
    def test_additonal_decorator(self):
3460
 
        out = self._run_selftest_with_suite(
3461
 
            suite_decorators=[tests.TestDecorator])
3462
 
 
3463
 
 
3464
 
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3465
 
    """Check warnings from tests staying alive are emitted with subunit"""
3466
 
 
3467
 
    _test_needs_features = [features.subunit]
3468
 
 
3469
 
    def _run_selftest_with_suite(self, **kwargs):
3470
 
        return TestUncollectedWarnings._run_selftest_with_suite(self,
3471
 
            runner_class=tests.SubUnitBzrRunner, **kwargs)
3472
 
 
3473
 
 
3474
 
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3475
 
    """Check warnings from tests staying alive are emitted when forking"""
3476
 
 
3477
 
 
3478
 
class TestEnvironHandling(tests.TestCase):
3479
 
 
3480
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3481
 
        self.assertFalse('MYVAR' in os.environ)
3482
 
        self.overrideEnv('MYVAR', '42')
3483
 
        # We use an embedded test to make sure we fix the _captureVar bug
3484
 
        class Test(tests.TestCase):
3485
 
            def test_me(self):
3486
 
                # The first call save the 42 value
3487
 
                self.overrideEnv('MYVAR', None)
3488
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3489
 
                # Make sure we can call it twice
3490
 
                self.overrideEnv('MYVAR', None)
3491
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3492
 
        output = StringIO()
3493
 
        result = tests.TextTestResult(output, 0, 1)
3494
 
        Test('test_me').run(result)
3495
 
        if not result.wasStrictlySuccessful():
3496
 
            self.fail(output.getvalue())
3497
 
        # We get our value back
3498
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3499
 
 
3500
 
 
3501
 
class TestIsolatedEnv(tests.TestCase):
3502
 
    """Test isolating tests from os.environ.
3503
 
 
3504
 
    Since we use tests that are already isolated from os.environ a bit of care
3505
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3506
 
    The tests start an already clean os.environ which allow doing valid
3507
 
    assertions about which variables are present or not and design tests around
3508
 
    these assertions.
3509
 
    """
3510
 
 
3511
 
    class ScratchMonkey(tests.TestCase):
3512
 
 
3513
 
        def test_me(self):
3514
 
            pass
3515
 
 
3516
 
    def test_basics(self):
3517
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3518
 
        # for tests.TestCase.
3519
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3520
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3521
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3522
 
        self.assertFalse('BZR_HOME' in os.environ)
3523
 
        # Make sure we know the definition of LINES: part of os.environ for
3524
 
        # tests.TestCase
3525
 
        self.assertTrue('LINES' in tests.isolated_environ)
3526
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3527
 
        self.assertEquals('25', os.environ['LINES'])
3528
 
 
3529
 
    def test_injecting_unknown_variable(self):
3530
 
        # BZR_HOME is known to be absent from os.environ
3531
 
        test = self.ScratchMonkey('test_me')
3532
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3533
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3534
 
        tests.restore_os_environ(test)
3535
 
        self.assertFalse('BZR_HOME' in os.environ)
3536
 
 
3537
 
    def test_injecting_known_variable(self):
3538
 
        test = self.ScratchMonkey('test_me')
3539
 
        # LINES is known to be present in os.environ
3540
 
        tests.override_os_environ(test, {'LINES': '42'})
3541
 
        self.assertEquals('42', os.environ['LINES'])
3542
 
        tests.restore_os_environ(test)
3543
 
        self.assertEquals('25', os.environ['LINES'])
3544
 
 
3545
 
    def test_deleting_variable(self):
3546
 
        test = self.ScratchMonkey('test_me')
3547
 
        # LINES is known to be present in os.environ
3548
 
        tests.override_os_environ(test, {'LINES': None})
3549
 
        self.assertTrue('LINES' not in os.environ)
3550
 
        tests.restore_os_environ(test)
3551
 
        self.assertEquals('25', os.environ['LINES'])
3552
 
 
3553
 
 
3554
 
class TestDocTestSuiteIsolation(tests.TestCase):
3555
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3556
 
 
3557
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3558
 
    the clean environment as a base for testing. To precisely capture the
3559
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3560
 
    compare against.
3561
 
 
3562
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3563
 
    not `os.environ` so each test overrides it to suit its needs.
3564
 
 
3565
 
    """
3566
 
 
3567
 
    def get_doctest_suite_for_string(self, klass, string):
3568
 
        class Finder(doctest.DocTestFinder):
3569
 
 
3570
 
            def find(*args, **kwargs):
3571
 
                test = doctest.DocTestParser().get_doctest(
3572
 
                    string, {}, 'foo', 'foo.py', 0)
3573
 
                return [test]
3574
 
 
3575
 
        suite = klass(test_finder=Finder())
3576
 
        return suite
3577
 
 
3578
 
    def run_doctest_suite_for_string(self, klass, string):
3579
 
        suite = self.get_doctest_suite_for_string(klass, string)
3580
 
        output = StringIO()
3581
 
        result = tests.TextTestResult(output, 0, 1)
3582
 
        suite.run(result)
3583
 
        return result, output
3584
 
 
3585
 
    def assertDocTestStringSucceds(self, klass, string):
3586
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3587
 
        if not result.wasStrictlySuccessful():
3588
 
            self.fail(output.getvalue())
3589
 
 
3590
 
    def assertDocTestStringFails(self, klass, string):
3591
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3592
 
        if result.wasStrictlySuccessful():
3593
 
            self.fail(output.getvalue())
3594
 
 
3595
 
    def test_injected_variable(self):
3596
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3597
 
        test = """
3598
 
            >>> import os
3599
 
            >>> os.environ['LINES']
3600
 
            '42'
3601
 
            """
3602
 
        # doctest.DocTestSuite fails as it sees '25'
3603
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3604
 
        # tests.DocTestSuite sees '42'
3605
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3606
 
 
3607
 
    def test_deleted_variable(self):
3608
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3609
 
        test = """
3610
 
            >>> import os
3611
 
            >>> os.environ.get('LINES')
3612
 
            """
3613
 
        # doctest.DocTestSuite fails as it sees '25'
3614
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3615
 
        # tests.DocTestSuite sees None
3616
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3617
 
 
3618
 
 
3619
 
class TestSelftestExcludePatterns(tests.TestCase):
3620
 
 
3621
 
    def setUp(self):
3622
 
        super(TestSelftestExcludePatterns, self).setUp()
3623
 
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
3624
 
 
3625
 
    def suite_factory(self, keep_only=None, starting_with=None):
3626
 
        """A test suite factory with only a few tests."""
3627
 
        class Test(tests.TestCase):
3628
 
            def id(self):
3629
 
                # We don't need the full class path
3630
 
                return self._testMethodName
3631
 
            def a(self):
3632
 
                pass
3633
 
            def b(self):
3634
 
                pass
3635
 
            def c(self):
3636
 
                pass
3637
 
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3638
 
 
3639
 
    def assertTestList(self, expected, *selftest_args):
3640
 
        # We rely on setUp installing the right test suite factory so we can
3641
 
        # test at the command level without loading the whole test suite
3642
 
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3643
 
        actual = out.splitlines()
3644
 
        self.assertEquals(expected, actual)
3645
 
 
3646
 
    def test_full_list(self):
3647
 
        self.assertTestList(['a', 'b', 'c'])
3648
 
 
3649
 
    def test_single_exclude(self):
3650
 
        self.assertTestList(['b', 'c'], '-x', 'a')
3651
 
 
3652
 
    def test_mutiple_excludes(self):
3653
 
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3654
 
 
3655
 
 
3656
 
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3657
 
 
3658
 
    _test_needs_features = [features.subunit]
3659
 
 
3660
 
    def setUp(self):
3661
 
        super(TestCounterHooks, self).setUp()
3662
 
        class Test(tests.TestCase):
3663
 
 
3664
 
            def setUp(self):
3665
 
                super(Test, self).setUp()
3666
 
                self.hooks = hooks.Hooks()
3667
 
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3668
 
                self.install_counter_hook(self.hooks, 'myhook')
3669
 
 
3670
 
            def no_hook(self):
3671
 
                pass
3672
 
 
3673
 
            def run_hook_once(self):
3674
 
                for hook in self.hooks['myhook']:
3675
 
                    hook(self)
3676
 
 
3677
 
        self.test_class = Test
3678
 
 
3679
 
    def assertHookCalls(self, expected_calls, test_name):
3680
 
        test = self.test_class(test_name)
3681
 
        result = unittest.TestResult()
3682
 
        test.run(result)
3683
 
        self.assertTrue(hasattr(test, '_counters'))
3684
 
        self.assertTrue(test._counters.has_key('myhook'))
3685
 
        self.assertEquals(expected_calls, test._counters['myhook'])
3686
 
 
3687
 
    def test_no_hook(self):
3688
 
        self.assertHookCalls(0, 'no_hook')
3689
 
 
3690
 
    def test_run_hook_once(self):
3691
 
        tt = features.testtools
3692
 
        if tt.module.__version__ < (0, 9, 8):
3693
 
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3694
 
        self.assertHookCalls(1, 'run_hook_once')