~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-10-13 06:08:53 UTC
  • mfrom: (4737.1.1 merge-2.0-into-devel)
  • Revision ID: pqm@pqm.ubuntu.com-20091013060853-erk2aaj80fnkrv25
(andrew) Merge lp:bzr/2.0 into lp:bzr, including fixes for #322807,
        #389413, #402623 and documentation improvements.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
import gc
21
 
import doctest
22
20
import os
23
21
import signal
24
22
import sys
25
 
import threading
26
23
import time
27
24
import unittest
28
25
import warnings
29
26
 
30
 
from testtools import (
31
 
    ExtendedToOriginalDecorator,
32
 
    MultiTestResult,
33
 
    )
34
 
from testtools.content import Content
35
 
from testtools.content_type import ContentType
36
 
from testtools.matchers import (
37
 
    DocTestMatches,
38
 
    Equals,
39
 
    )
40
 
import testtools.testresult.doubles
41
 
 
42
27
import bzrlib
43
28
from bzrlib import (
44
29
    branchbuilder,
45
30
    bzrdir,
46
 
    controldir,
 
31
    debug,
47
32
    errors,
48
 
    hooks,
49
33
    lockdir,
50
34
    memorytree,
51
35
    osutils,
 
36
    progress,
52
37
    remote,
53
38
    repository,
54
39
    symbol_versioning,
55
40
    tests,
56
41
    transport,
57
42
    workingtree,
58
 
    workingtree_3,
59
 
    workingtree_4,
60
43
    )
61
44
from bzrlib.repofmt import (
62
45
    groupcompress_repo,
 
46
    pack_repo,
 
47
    weaverepo,
63
48
    )
64
49
from bzrlib.symbol_versioning import (
65
50
    deprecated_function,
67
52
    deprecated_method,
68
53
    )
69
54
from bzrlib.tests import (
70
 
    features,
 
55
    SubUnitFeature,
71
56
    test_lsprof,
72
 
    test_server,
 
57
    test_sftp_transport,
73
58
    TestUtil,
74
59
    )
75
 
from bzrlib.trace import note, mutter
76
 
from bzrlib.transport import memory
 
60
from bzrlib.trace import note
 
61
from bzrlib.transport.memory import MemoryServer, MemoryTransport
 
62
from bzrlib.version import _get_bzr_source_tree
77
63
 
78
64
 
79
65
def _test_ids(test_suite):
81
67
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
82
68
 
83
69
 
 
70
class SelftestTests(tests.TestCase):
 
71
 
 
72
    def test_import_tests(self):
 
73
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
74
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
75
 
 
76
    def test_import_test_failure(self):
 
77
        self.assertRaises(ImportError,
 
78
                          TestUtil._load_module_by_name,
 
79
                          'bzrlib.no-name-yet')
 
80
 
84
81
class MetaTestLog(tests.TestCase):
85
82
 
86
83
    def test_logging(self):
87
84
        """Test logs are captured when a test fails."""
88
85
        self.log('a test message')
89
 
        details = self.getDetails()
90
 
        log = details['log']
91
 
        self.assertThat(log.content_type, Equals(ContentType(
92
 
            "text", "plain", {"charset": "utf8"})))
93
 
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
94
 
        self.assertThat(self.get_log(),
95
 
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
86
        self._log_file.flush()
 
87
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
88
                              'a test message\n')
 
89
 
 
90
 
 
91
class TestUnicodeFilename(tests.TestCase):
 
92
 
 
93
    def test_probe_passes(self):
 
94
        """UnicodeFilename._probe passes."""
 
95
        # We can't test much more than that because the behaviour depends
 
96
        # on the platform.
 
97
        tests.UnicodeFilename._probe()
96
98
 
97
99
 
98
100
class TestTreeShape(tests.TestCaseInTempDir):
99
101
 
100
102
    def test_unicode_paths(self):
101
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
103
        self.requireFeature(tests.UnicodeFilename)
102
104
 
103
105
        filename = u'hell\u00d8'
104
106
        self.build_tree_contents([(filename, 'contents of hello')])
105
 
        self.assertPathExists(filename)
106
 
 
107
 
 
108
 
class TestClassesAvailable(tests.TestCase):
109
 
    """As a convenience we expose Test* classes from bzrlib.tests"""
110
 
 
111
 
    def test_test_case(self):
112
 
        from bzrlib.tests import TestCase
113
 
 
114
 
    def test_test_loader(self):
115
 
        from bzrlib.tests import TestLoader
116
 
 
117
 
    def test_test_suite(self):
118
 
        from bzrlib.tests import TestSuite
 
107
        self.failUnlessExists(filename)
119
108
 
120
109
 
121
110
class TestTransportScenarios(tests.TestCase):
204
193
    def test_scenarios(self):
205
194
        # check that constructor parameters are passed through to the adapted
206
195
        # test.
207
 
        from bzrlib.tests.per_controldir import make_scenarios
 
196
        from bzrlib.tests.per_bzrdir import make_scenarios
208
197
        vfs_factory = "v"
209
198
        server1 = "a"
210
199
        server2 = "b"
308
297
        from bzrlib.tests.per_interrepository import make_scenarios
309
298
        server1 = "a"
310
299
        server2 = "b"
311
 
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
300
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
312
301
        scenarios = make_scenarios(server1, server2, formats)
313
302
        self.assertEqual([
314
303
            ('C0,str,str',
315
304
             {'repository_format': 'C1',
316
305
              'repository_format_to': 'C2',
317
306
              'transport_readonly_server': 'b',
318
 
              'transport_server': 'a',
319
 
              'extra_setup': 'C3'}),
 
307
              'transport_server': 'a'}),
320
308
            ('D0,str,str',
321
309
             {'repository_format': 'D1',
322
310
              'repository_format_to': 'D2',
323
311
              'transport_readonly_server': 'b',
324
 
              'transport_server': 'a',
325
 
              'extra_setup': 'D3'})],
 
312
              'transport_server': 'a'})],
326
313
            scenarios)
327
314
 
328
315
 
334
321
        from bzrlib.tests.per_workingtree import make_scenarios
335
322
        server1 = "a"
336
323
        server2 = "b"
337
 
        formats = [workingtree_4.WorkingTreeFormat4(),
338
 
                   workingtree_3.WorkingTreeFormat3(),]
 
324
        formats = [workingtree.WorkingTreeFormat2(),
 
325
                   workingtree.WorkingTreeFormat3(),]
339
326
        scenarios = make_scenarios(server1, server2, formats)
340
327
        self.assertEqual([
341
 
            ('WorkingTreeFormat4',
 
328
            ('WorkingTreeFormat2',
342
329
             {'bzrdir_format': formats[0]._matchingbzrdir,
343
330
              'transport_readonly_server': 'b',
344
331
              'transport_server': 'a',
371
358
            )
372
359
        server1 = "a"
373
360
        server2 = "b"
374
 
        formats = [workingtree_4.WorkingTreeFormat4(),
375
 
                   workingtree_3.WorkingTreeFormat3(),]
 
361
        formats = [workingtree.WorkingTreeFormat2(),
 
362
                   workingtree.WorkingTreeFormat3(),]
376
363
        scenarios = make_scenarios(server1, server2, formats)
377
364
        self.assertEqual(7, len(scenarios))
378
 
        default_wt_format = workingtree.format_registry.get_default()
379
 
        wt4_format = workingtree_4.WorkingTreeFormat4()
380
 
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
365
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
366
        wt4_format = workingtree.WorkingTreeFormat4()
 
367
        wt5_format = workingtree.WorkingTreeFormat5()
381
368
        expected_scenarios = [
382
 
            ('WorkingTreeFormat4',
 
369
            ('WorkingTreeFormat2',
383
370
             {'bzrdir_format': formats[0]._matchingbzrdir,
384
371
              'transport_readonly_server': 'b',
385
372
              'transport_server': 'a',
445
432
        # ones to add.
446
433
        from bzrlib.tests.per_tree import (
447
434
            return_parameter,
 
435
            revision_tree_from_workingtree
448
436
            )
449
437
        from bzrlib.tests.per_intertree import (
450
438
            make_scenarios,
451
439
            )
452
 
        from bzrlib.workingtree_3 import WorkingTreeFormat3
453
 
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
440
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
454
441
        input_test = TestInterTreeScenarios(
455
442
            "test_scenarios")
456
443
        server1 = "a"
457
444
        server2 = "b"
458
 
        format1 = WorkingTreeFormat4()
 
445
        format1 = WorkingTreeFormat2()
459
446
        format2 = WorkingTreeFormat3()
460
447
        formats = [("1", str, format1, format2, "converter1"),
461
448
            ("2", int, format2, format1, "converter2")]
503
490
        self.assertEqualStat(real, fake)
504
491
 
505
492
    def test_assertEqualStat_notequal(self):
506
 
        self.build_tree(["foo", "longname"])
 
493
        self.build_tree(["foo", "bar"])
507
494
        self.assertRaises(AssertionError, self.assertEqualStat,
508
 
            os.lstat("foo"), os.lstat("longname"))
509
 
 
510
 
    def test_failUnlessExists(self):
511
 
        """Deprecated failUnlessExists and failIfExists"""
512
 
        self.applyDeprecated(
513
 
            deprecated_in((2, 4)),
514
 
            self.failUnlessExists, '.')
515
 
        self.build_tree(['foo/', 'foo/bar'])
516
 
        self.applyDeprecated(
517
 
            deprecated_in((2, 4)),
518
 
            self.failUnlessExists, 'foo/bar')
519
 
        self.applyDeprecated(
520
 
            deprecated_in((2, 4)),
521
 
            self.failIfExists, 'foo/foo')
522
 
 
523
 
    def test_assertPathExists(self):
524
 
        self.assertPathExists('.')
525
 
        self.build_tree(['foo/', 'foo/bar'])
526
 
        self.assertPathExists('foo/bar')
527
 
        self.assertPathDoesNotExist('foo/foo')
 
495
            os.lstat("foo"), os.lstat("bar"))
528
496
 
529
497
 
530
498
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
548
516
        cwd = osutils.getcwd()
549
517
        self.assertIsSameRealPath(self.test_dir, cwd)
550
518
 
551
 
    def test_BZR_HOME_and_HOME_are_bytestrings(self):
552
 
        """The $BZR_HOME and $HOME environment variables should not be unicode.
553
 
 
554
 
        See https://bugs.launchpad.net/bzr/+bug/464174
555
 
        """
556
 
        self.assertIsInstance(os.environ['BZR_HOME'], str)
557
 
        self.assertIsInstance(os.environ['HOME'], str)
558
 
 
559
519
    def test_make_branch_and_memory_tree(self):
560
520
        """In TestCaseWithMemoryTransport we should not make the branch on disk.
561
521
 
565
525
        tree = self.make_branch_and_memory_tree('dir')
566
526
        # Guard against regression into MemoryTransport leaking
567
527
        # files to disk instead of keeping them in memory.
568
 
        self.assertFalse(osutils.lexists('dir'))
 
528
        self.failIf(osutils.lexists('dir'))
569
529
        self.assertIsInstance(tree, memorytree.MemoryTree)
570
530
 
571
531
    def test_make_branch_and_memory_tree_with_format(self):
572
532
        """make_branch_and_memory_tree should accept a format option."""
573
533
        format = bzrdir.BzrDirMetaFormat1()
574
 
        format.repository_format = repository.format_registry.get_default()
 
534
        format.repository_format = weaverepo.RepositoryFormat7()
575
535
        tree = self.make_branch_and_memory_tree('dir', format=format)
576
536
        # Guard against regression into MemoryTransport leaking
577
537
        # files to disk instead of keeping them in memory.
578
 
        self.assertFalse(osutils.lexists('dir'))
 
538
        self.failIf(osutils.lexists('dir'))
579
539
        self.assertIsInstance(tree, memorytree.MemoryTree)
580
540
        self.assertEqual(format.repository_format.__class__,
581
541
            tree.branch.repository._format.__class__)
585
545
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
586
546
        # Guard against regression into MemoryTransport leaking
587
547
        # files to disk instead of keeping them in memory.
588
 
        self.assertFalse(osutils.lexists('dir'))
 
548
        self.failIf(osutils.lexists('dir'))
589
549
 
590
550
    def test_make_branch_builder_with_format(self):
591
551
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
592
552
        # that the format objects are used.
593
553
        format = bzrdir.BzrDirMetaFormat1()
594
 
        repo_format = repository.format_registry.get_default()
 
554
        repo_format = weaverepo.RepositoryFormat7()
595
555
        format.repository_format = repo_format
596
556
        builder = self.make_branch_builder('dir', format=format)
597
557
        the_branch = builder.get_branch()
598
558
        # Guard against regression into MemoryTransport leaking
599
559
        # files to disk instead of keeping them in memory.
600
 
        self.assertFalse(osutils.lexists('dir'))
 
560
        self.failIf(osutils.lexists('dir'))
601
561
        self.assertEqual(format.repository_format.__class__,
602
562
                         the_branch.repository._format.__class__)
603
563
        self.assertEqual(repo_format.get_format_string(),
609
569
        the_branch = builder.get_branch()
610
570
        # Guard against regression into MemoryTransport leaking
611
571
        # files to disk instead of keeping them in memory.
612
 
        self.assertFalse(osutils.lexists('dir'))
613
 
        dir_format = controldir.format_registry.make_bzrdir('knit')
 
572
        self.failIf(osutils.lexists('dir'))
 
573
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
614
574
        self.assertEqual(dir_format.repository_format.__class__,
615
575
                         the_branch.repository._format.__class__)
616
576
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
620
580
    def test_dangling_locks_cause_failures(self):
621
581
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
622
582
            def test_function(self):
623
 
                t = self.get_transport_from_path('.')
 
583
                t = self.get_transport('.')
624
584
                l = lockdir.LockDir(t, 'lock')
625
585
                l.create()
626
586
                l.attempt_lock()
627
587
        test = TestDanglingLock('test_function')
628
588
        result = test.run()
629
 
        total_failures = result.errors + result.failures
630
589
        if self._lock_check_thorough:
631
 
            self.assertEqual(1, len(total_failures))
 
590
            self.assertEqual(1, len(result.errors))
632
591
        else:
633
592
            # When _lock_check_thorough is disabled, then we don't trigger a
634
593
            # failure
635
 
            self.assertEqual(0, len(total_failures))
 
594
            self.assertEqual(0, len(result.errors))
636
595
 
637
596
 
638
597
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
639
598
    """Tests for the convenience functions TestCaseWithTransport introduces."""
640
599
 
641
600
    def test_get_readonly_url_none(self):
 
601
        from bzrlib.transport import get_transport
 
602
        from bzrlib.transport.memory import MemoryServer
642
603
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
643
 
        self.vfs_transport_factory = memory.MemoryServer
 
604
        self.vfs_transport_factory = MemoryServer
644
605
        self.transport_readonly_server = None
645
606
        # calling get_readonly_transport() constructs a decorator on the url
646
607
        # for the server
647
608
        url = self.get_readonly_url()
648
609
        url2 = self.get_readonly_url('foo/bar')
649
 
        t = transport.get_transport_from_url(url)
650
 
        t2 = transport.get_transport_from_url(url2)
651
 
        self.assertIsInstance(t, ReadonlyTransportDecorator)
652
 
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
610
        t = get_transport(url)
 
611
        t2 = get_transport(url2)
 
612
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
613
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
653
614
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
615
 
655
616
    def test_get_readonly_url_http(self):
656
617
        from bzrlib.tests.http_server import HttpServer
 
618
        from bzrlib.transport import get_transport
 
619
        from bzrlib.transport.local import LocalURLServer
657
620
        from bzrlib.transport.http import HttpTransportBase
658
 
        self.transport_server = test_server.LocalURLServer
 
621
        self.transport_server = LocalURLServer
659
622
        self.transport_readonly_server = HttpServer
660
623
        # calling get_readonly_transport() gives us a HTTP server instance.
661
624
        url = self.get_readonly_url()
662
625
        url2 = self.get_readonly_url('foo/bar')
663
626
        # the transport returned may be any HttpTransportBase subclass
664
 
        t = transport.get_transport_from_url(url)
665
 
        t2 = transport.get_transport_from_url(url2)
666
 
        self.assertIsInstance(t, HttpTransportBase)
667
 
        self.assertIsInstance(t2, HttpTransportBase)
 
627
        t = get_transport(url)
 
628
        t2 = get_transport(url2)
 
629
        self.failUnless(isinstance(t, HttpTransportBase))
 
630
        self.failUnless(isinstance(t2, HttpTransportBase))
668
631
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
669
632
 
670
633
    def test_is_directory(self):
678
641
    def test_make_branch_builder(self):
679
642
        builder = self.make_branch_builder('dir')
680
643
        rev_id = builder.build_commit()
681
 
        self.assertPathExists('dir')
682
 
        a_dir = controldir.ControlDir.open('dir')
 
644
        self.failUnlessExists('dir')
 
645
        a_dir = bzrdir.BzrDir.open('dir')
683
646
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
684
647
        a_branch = a_dir.open_branch()
685
648
        builder_branch = builder.get_branch()
692
655
 
693
656
    def setUp(self):
694
657
        super(TestTestCaseTransports, self).setUp()
695
 
        self.vfs_transport_factory = memory.MemoryServer
 
658
        self.vfs_transport_factory = MemoryServer
696
659
 
697
660
    def test_make_bzrdir_preserves_transport(self):
698
661
        t = self.get_transport()
699
662
        result_bzrdir = self.make_bzrdir('subdir')
700
663
        self.assertIsInstance(result_bzrdir.transport,
701
 
                              memory.MemoryTransport)
 
664
                              MemoryTransport)
702
665
        # should not be on disk, should only be in memory
703
 
        self.assertPathDoesNotExist('subdir')
 
666
        self.failIfExists('subdir')
704
667
 
705
668
 
706
669
class TestChrootedTest(tests.ChrootedTestCase):
707
670
 
708
671
    def test_root_is_root(self):
709
 
        t = transport.get_transport_from_url(self.get_readonly_url())
 
672
        from bzrlib.transport import get_transport
 
673
        t = get_transport(self.get_readonly_url())
710
674
        url = t.base
711
675
        self.assertEqual(url, t.clone('..').base)
712
676
 
714
678
class TestProfileResult(tests.TestCase):
715
679
 
716
680
    def test_profiles_tests(self):
717
 
        self.requireFeature(features.lsprof_feature)
718
 
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
681
        self.requireFeature(test_lsprof.LSProfFeature)
 
682
        terminal = unittest.TestResult()
719
683
        result = tests.ProfileResult(terminal)
720
684
        class Sample(tests.TestCase):
721
685
            def a(self):
723
687
            def sample_function(self):
724
688
                pass
725
689
        test = Sample("a")
 
690
        test.attrs_to_keep = test.attrs_to_keep + ('_benchcalls',)
726
691
        test.run(result)
727
 
        case = terminal._events[0][1]
728
 
        self.assertLength(1, case._benchcalls)
 
692
        self.assertLength(1, test._benchcalls)
729
693
        # We must be able to unpack it as the test reporting code wants
730
 
        (_, _, _), stats = case._benchcalls[0]
 
694
        (_, _, _), stats = test._benchcalls[0]
731
695
        self.assertTrue(callable(stats.pprint))
732
696
 
733
697
 
738
702
                descriptions=0,
739
703
                verbosity=1,
740
704
                )
741
 
        capture = testtools.testresult.doubles.ExtendedTestResult()
742
 
        test_case.run(MultiTestResult(result, capture))
743
 
        run_case = capture._events[0][1]
744
 
        timed_string = result._testTimeString(run_case)
 
705
        test_case.run(result)
 
706
        timed_string = result._testTimeString(test_case)
745
707
        self.assertContainsRe(timed_string, expected_re)
746
708
 
747
709
    def test_test_reporting(self):
765
727
        self.check_timing(ShortDelayTestCase('test_short_delay'),
766
728
                          r"^ +[0-9]+ms$")
767
729
 
 
730
    def _patch_get_bzr_source_tree(self):
 
731
        # Reading from the actual source tree breaks isolation, but we don't
 
732
        # want to assume that thats *all* that would happen.
 
733
        def _get_bzr_source_tree():
 
734
            return None
 
735
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
 
736
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
 
737
        def restore():
 
738
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
 
739
        self.addCleanup(restore)
 
740
 
 
741
    def test_assigned_benchmark_file_stores_date(self):
 
742
        self._patch_get_bzr_source_tree()
 
743
        output = StringIO()
 
744
        result = bzrlib.tests.TextTestResult(self._log_file,
 
745
                                        descriptions=0,
 
746
                                        verbosity=1,
 
747
                                        bench_history=output
 
748
                                        )
 
749
        output_string = output.getvalue()
 
750
        # if you are wondering about the regexp please read the comment in
 
751
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
752
        # XXX: what comment?  -- Andrew Bennetts
 
753
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
754
 
 
755
    def test_benchhistory_records_test_times(self):
 
756
        self._patch_get_bzr_source_tree()
 
757
        result_stream = StringIO()
 
758
        result = bzrlib.tests.TextTestResult(
 
759
            self._log_file,
 
760
            descriptions=0,
 
761
            verbosity=1,
 
762
            bench_history=result_stream
 
763
            )
 
764
 
 
765
        # we want profile a call and check that its test duration is recorded
 
766
        # make a new test instance that when run will generate a benchmark
 
767
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
768
        # execute the test, which should succeed and record times
 
769
        example_test_case.run(result)
 
770
        lines = result_stream.getvalue().splitlines()
 
771
        self.assertEqual(2, len(lines))
 
772
        self.assertContainsRe(lines[1],
 
773
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
774
            "._time_hello_world_encoding")
 
775
 
768
776
    def _time_hello_world_encoding(self):
769
777
        """Profile two sleep calls
770
778
 
775
783
 
776
784
    def test_lsprofiling(self):
777
785
        """Verbose test result prints lsprof statistics from test cases."""
778
 
        self.requireFeature(features.lsprof_feature)
 
786
        self.requireFeature(test_lsprof.LSProfFeature)
779
787
        result_stream = StringIO()
780
788
        result = bzrlib.tests.VerboseTestResult(
781
 
            result_stream,
 
789
            unittest._WritelnDecorator(result_stream),
782
790
            descriptions=0,
783
791
            verbosity=2,
784
792
            )
810
818
        self.assertContainsRe(output,
811
819
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
812
820
 
813
 
    def test_uses_time_from_testtools(self):
814
 
        """Test case timings in verbose results should use testtools times"""
815
 
        import datetime
816
 
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
817
 
            def startTest(self, test):
818
 
                self.time(datetime.datetime.utcfromtimestamp(1.145))
819
 
                super(TimeAddedVerboseTestResult, self).startTest(test)
820
 
            def addSuccess(self, test):
821
 
                self.time(datetime.datetime.utcfromtimestamp(51.147))
822
 
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
823
 
            def report_tests_starting(self): pass
824
 
        sio = StringIO()
825
 
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
826
 
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
827
 
 
828
821
    def test_known_failure(self):
829
 
        """Using knownFailure should trigger several result actions."""
 
822
        """A KnownFailure being raised should trigger several result actions."""
830
823
        class InstrumentedTestResult(tests.ExtendedTestResult):
831
824
            def stopTestRun(self): pass
832
 
            def report_tests_starting(self): pass
833
 
            def report_known_failure(self, test, err=None, details=None):
834
 
                self._call = test, 'known failure'
 
825
            def startTests(self): pass
 
826
            def report_test_start(self, test): pass
 
827
            def report_known_failure(self, test, err):
 
828
                self._call = test, err
835
829
        result = InstrumentedTestResult(None, None, None, None)
836
 
        class Test(tests.TestCase):
837
 
            def test_function(self):
838
 
                self.knownFailure('failed!')
839
 
        test = Test("test_function")
 
830
        def test_function():
 
831
            raise tests.KnownFailure('failed!')
 
832
        test = unittest.FunctionTestCase(test_function)
840
833
        test.run(result)
841
834
        # it should invoke 'report_known_failure'.
842
835
        self.assertEqual(2, len(result._call))
843
 
        self.assertEqual(test.id(), result._call[0].id())
844
 
        self.assertEqual('known failure', result._call[1])
 
836
        self.assertEqual(test, result._call[0])
 
837
        self.assertEqual(tests.KnownFailure, result._call[1][0])
 
838
        self.assertIsInstance(result._call[1][1], tests.KnownFailure)
845
839
        # we dont introspec the traceback, if the rest is ok, it would be
846
840
        # exceptional for it not to be.
847
841
        # it should update the known_failure_count on the object.
853
847
        # verbose test output formatting
854
848
        result_stream = StringIO()
855
849
        result = bzrlib.tests.VerboseTestResult(
856
 
            result_stream,
 
850
            unittest._WritelnDecorator(result_stream),
857
851
            descriptions=0,
858
852
            verbosity=2,
859
853
            )
860
 
        _get_test("test_xfail").run(result)
861
 
        self.assertContainsRe(result_stream.getvalue(),
862
 
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
863
 
            "\\s*(?:Text attachment: )?reason"
864
 
            "(?:\n-+\n|: {{{)"
865
 
            "this_fails"
866
 
            "(?:\n-+\n|}}}\n)")
 
854
        test = self.get_passing_test()
 
855
        result.startTest(test)
 
856
        prefix = len(result_stream.getvalue())
 
857
        # the err parameter has the shape:
 
858
        # (class, exception object, traceback)
 
859
        # KnownFailures dont get their tracebacks shown though, so we
 
860
        # can skip that.
 
861
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
862
        result.report_known_failure(test, err)
 
863
        output = result_stream.getvalue()[prefix:]
 
864
        lines = output.splitlines()
 
865
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
866
        self.assertEqual(lines[1], '    foo')
 
867
        self.assertEqual(2, len(lines))
867
868
 
868
869
    def get_passing_test(self):
869
870
        """Return a test object that can't be run usefully."""
875
876
        """Test the behaviour of invoking addNotSupported."""
876
877
        class InstrumentedTestResult(tests.ExtendedTestResult):
877
878
            def stopTestRun(self): pass
878
 
            def report_tests_starting(self): pass
 
879
            def startTests(self): pass
 
880
            def report_test_start(self, test): pass
879
881
            def report_unsupported(self, test, feature):
880
882
                self._call = test, feature
881
883
        result = InstrumentedTestResult(None, None, None, None)
882
884
        test = SampleTestCase('_test_pass')
883
 
        feature = features.Feature()
 
885
        feature = tests.Feature()
884
886
        result.startTest(test)
885
887
        result.addNotSupported(test, feature)
886
888
        # it should invoke 'report_unsupported'.
900
902
        # verbose test output formatting
901
903
        result_stream = StringIO()
902
904
        result = bzrlib.tests.VerboseTestResult(
903
 
            result_stream,
 
905
            unittest._WritelnDecorator(result_stream),
904
906
            descriptions=0,
905
907
            verbosity=2,
906
908
            )
907
909
        test = self.get_passing_test()
908
 
        feature = features.Feature()
 
910
        feature = tests.Feature()
909
911
        result.startTest(test)
910
912
        prefix = len(result_stream.getvalue())
911
913
        result.report_unsupported(test, feature)
912
914
        output = result_stream.getvalue()[prefix:]
913
915
        lines = output.splitlines()
914
 
        # We don't check for the final '0ms' since it may fail on slow hosts
915
 
        self.assertStartsWith(lines[0], 'NODEP')
916
 
        self.assertEqual(lines[1],
917
 
                         "    The feature 'Feature' is not available.")
 
916
        self.assertEqual(lines, ['NODEP        0ms',
 
917
                                 "    The feature 'Feature' is not available."])
918
918
 
919
919
    def test_unavailable_exception(self):
920
920
        """An UnavailableFeature being raised should invoke addNotSupported."""
921
921
        class InstrumentedTestResult(tests.ExtendedTestResult):
922
922
            def stopTestRun(self): pass
923
 
            def report_tests_starting(self): pass
 
923
            def startTests(self): pass
 
924
            def report_test_start(self, test): pass
924
925
            def addNotSupported(self, test, feature):
925
926
                self._call = test, feature
926
927
        result = InstrumentedTestResult(None, None, None, None)
927
 
        feature = features.Feature()
928
 
        class Test(tests.TestCase):
929
 
            def test_function(self):
930
 
                raise tests.UnavailableFeature(feature)
931
 
        test = Test("test_function")
 
928
        feature = tests.Feature()
 
929
        def test_function():
 
930
            raise tests.UnavailableFeature(feature)
 
931
        test = unittest.FunctionTestCase(test_function)
932
932
        test.run(result)
933
933
        # it should invoke 'addNotSupported'.
934
934
        self.assertEqual(2, len(result._call))
935
 
        self.assertEqual(test.id(), result._call[0].id())
 
935
        self.assertEqual(test, result._call[0])
936
936
        self.assertEqual(feature, result._call[1])
937
937
        # and not count as an error
938
938
        self.assertEqual(0, result.error_count)
949
949
    def test_strict_with_known_failure(self):
950
950
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
951
951
                                             verbosity=1)
952
 
        test = _get_test("test_xfail")
953
 
        test.run(result)
 
952
        test = self.get_passing_test()
 
953
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
954
        result._addKnownFailure(test, err)
954
955
        self.assertFalse(result.wasStrictlySuccessful())
955
956
        self.assertEqual(None, result._extractBenchmarkTime(test))
956
957
 
967
968
        class InstrumentedTestResult(tests.ExtendedTestResult):
968
969
            calls = 0
969
970
            def startTests(self): self.calls += 1
 
971
            def report_test_start(self, test): pass
970
972
        result = InstrumentedTestResult(None, None, None, None)
971
973
        def test_function():
972
974
            pass
974
976
        test.run(result)
975
977
        self.assertEquals(1, result.calls)
976
978
 
977
 
    def test_startTests_only_once(self):
978
 
        """With multiple tests startTests should still only be called once"""
979
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
980
 
            calls = 0
981
 
            def startTests(self): self.calls += 1
982
 
        result = InstrumentedTestResult(None, None, None, None)
983
 
        suite = unittest.TestSuite([
984
 
            unittest.FunctionTestCase(lambda: None),
985
 
            unittest.FunctionTestCase(lambda: None)])
986
 
        suite.run(result)
987
 
        self.assertEquals(1, result.calls)
988
 
        self.assertEquals(2, result.count)
 
979
 
 
980
class TestUnicodeFilenameFeature(tests.TestCase):
 
981
 
 
982
    def test_probe_passes(self):
 
983
        """UnicodeFilenameFeature._probe passes."""
 
984
        # We can't test much more than that because the behaviour depends
 
985
        # on the platform.
 
986
        tests.UnicodeFilenameFeature._probe()
989
987
 
990
988
 
991
989
class TestRunner(tests.TestCase):
1004
1002
        because of our use of global state.
1005
1003
        """
1006
1004
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1005
        old_leak = tests.TestCase._first_thread_leaker_id
1007
1006
        try:
1008
1007
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1008
            tests.TestCase._first_thread_leaker_id = None
1009
1009
            return testrunner.run(test)
1010
1010
        finally:
1011
1011
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1012
            tests.TestCase._first_thread_leaker_id = old_leak
1012
1013
 
1013
1014
    def test_known_failure_failed_run(self):
1014
1015
        # run a test that generates a known failure which should be printed in
1015
1016
        # the final output when real failures occur.
1016
 
        class Test(tests.TestCase):
1017
 
            def known_failure_test(self):
1018
 
                self.expectFailure('failed', self.assertTrue, False)
 
1017
        def known_failure_test():
 
1018
            raise tests.KnownFailure('failed')
1019
1019
        test = unittest.TestSuite()
1020
 
        test.addTest(Test("known_failure_test"))
 
1020
        test.addTest(unittest.FunctionTestCase(known_failure_test))
1021
1021
        def failing_test():
1022
1022
            raise AssertionError('foo')
1023
1023
        test.addTest(unittest.FunctionTestCase(failing_test))
1026
1026
        result = self.run_test_runner(runner, test)
1027
1027
        lines = stream.getvalue().splitlines()
1028
1028
        self.assertContainsRe(stream.getvalue(),
1029
 
            '(?sm)^bzr selftest.*$'
 
1029
            '(?sm)^testing.*$'
1030
1030
            '.*'
1031
1031
            '^======================================================================\n'
1032
 
            '^FAIL: failing_test\n'
 
1032
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
1033
1033
            '^----------------------------------------------------------------------\n'
1034
1034
            'Traceback \\(most recent call last\\):\n'
1035
1035
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1041
1041
            )
1042
1042
 
1043
1043
    def test_known_failure_ok_run(self):
1044
 
        # run a test that generates a known failure which should be printed in
1045
 
        # the final output.
1046
 
        class Test(tests.TestCase):
1047
 
            def known_failure_test(self):
1048
 
                self.knownFailure("Never works...")
1049
 
        test = Test("known_failure_test")
 
1044
        # run a test that generates a known failure which should be printed in the final output.
 
1045
        def known_failure_test():
 
1046
            raise tests.KnownFailure('failed')
 
1047
        test = unittest.FunctionTestCase(known_failure_test)
1050
1048
        stream = StringIO()
1051
1049
        runner = tests.TextTestRunner(stream=stream)
1052
1050
        result = self.run_test_runner(runner, test)
1057
1055
            '\n'
1058
1056
            'OK \\(known_failures=1\\)\n')
1059
1057
 
1060
 
    def test_unexpected_success_bad(self):
1061
 
        class Test(tests.TestCase):
1062
 
            def test_truth(self):
1063
 
                self.expectFailure("No absolute truth", self.assertTrue, True)
1064
 
        runner = tests.TextTestRunner(stream=StringIO())
1065
 
        result = self.run_test_runner(runner, Test("test_truth"))
1066
 
        self.assertContainsRe(runner.stream.getvalue(),
1067
 
            "=+\n"
1068
 
            "FAIL: \\S+\.test_truth\n"
1069
 
            "-+\n"
1070
 
            "(?:.*\n)*"
1071
 
            "\\s*(?:Text attachment: )?reason"
1072
 
            "(?:\n-+\n|: {{{)"
1073
 
            "No absolute truth"
1074
 
            "(?:\n-+\n|}}}\n)"
1075
 
            "(?:.*\n)*"
1076
 
            "-+\n"
1077
 
            "Ran 1 test in .*\n"
1078
 
            "\n"
1079
 
            "FAILED \\(failures=1\\)\n\\Z")
1080
 
 
1081
1058
    def test_result_decorator(self):
1082
1059
        # decorate results
1083
1060
        calls = []
1084
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1061
        class LoggingDecorator(tests.ForwardingResult):
1085
1062
            def startTest(self, test):
1086
 
                ExtendedToOriginalDecorator.startTest(self, test)
 
1063
                tests.ForwardingResult.startTest(self, test)
1087
1064
                calls.append('start')
1088
1065
        test = unittest.FunctionTestCase(lambda:None)
1089
1066
        stream = StringIO()
1150
1127
 
1151
1128
    def test_not_applicable(self):
1152
1129
        # run a test that is skipped because it's not applicable
1153
 
        class Test(tests.TestCase):
1154
 
            def not_applicable_test(self):
1155
 
                raise tests.TestNotApplicable('this test never runs')
 
1130
        def not_applicable_test():
 
1131
            raise tests.TestNotApplicable('this test never runs')
1156
1132
        out = StringIO()
1157
1133
        runner = tests.TextTestRunner(stream=out, verbosity=2)
1158
 
        test = Test("not_applicable_test")
 
1134
        test = unittest.FunctionTestCase(not_applicable_test)
1159
1135
        result = self.run_test_runner(runner, test)
1160
1136
        self._log_file.write(out.getvalue())
1161
1137
        self.assertTrue(result.wasSuccessful())
1167
1143
 
1168
1144
    def test_unsupported_features_listed(self):
1169
1145
        """When unsupported features are encountered they are detailed."""
1170
 
        class Feature1(features.Feature):
 
1146
        class Feature1(tests.Feature):
1171
1147
            def _probe(self): return False
1172
 
        class Feature2(features.Feature):
 
1148
        class Feature2(tests.Feature):
1173
1149
            def _probe(self): return False
1174
1150
        # create sample tests
1175
1151
        test1 = SampleTestCase('_test_pass')
1190
1166
            ],
1191
1167
            lines[-3:])
1192
1168
 
1193
 
    def test_verbose_test_count(self):
1194
 
        """A verbose test run reports the right test count at the start"""
1195
 
        suite = TestUtil.TestSuite([
1196
 
            unittest.FunctionTestCase(lambda:None),
1197
 
            unittest.FunctionTestCase(lambda:None)])
1198
 
        self.assertEqual(suite.countTestCases(), 2)
1199
 
        stream = StringIO()
1200
 
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1201
 
        # Need to use the CountingDecorator as that's what sets num_tests
1202
 
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1203
 
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1169
    def _patch_get_bzr_source_tree(self):
 
1170
        # Reading from the actual source tree breaks isolation, but we don't
 
1171
        # want to assume that thats *all* that would happen.
 
1172
        self._get_source_tree_calls = []
 
1173
        def _get_bzr_source_tree():
 
1174
            self._get_source_tree_calls.append("called")
 
1175
            return None
 
1176
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
 
1177
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
 
1178
        def restore():
 
1179
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
 
1180
        self.addCleanup(restore)
 
1181
 
 
1182
    def test_bench_history(self):
 
1183
        # tests that the running the benchmark passes bench_history into
 
1184
        # the test result object. We can tell that happens if
 
1185
        # _get_bzr_source_tree is called.
 
1186
        self._patch_get_bzr_source_tree()
 
1187
        test = TestRunner('dummy_test')
 
1188
        output = StringIO()
 
1189
        runner = tests.TextTestRunner(stream=self._log_file,
 
1190
                                      bench_history=output)
 
1191
        result = self.run_test_runner(runner, test)
 
1192
        output_string = output.getvalue()
 
1193
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1194
        self.assertLength(1, self._get_source_tree_calls)
 
1195
 
 
1196
    def assertLogDeleted(self, test):
 
1197
        log = test._get_log()
 
1198
        self.assertEqual("DELETED log file to reduce memory footprint", log)
 
1199
        self.assertEqual('', test._log_contents)
 
1200
        self.assertIs(None, test._log_file_name)
 
1201
 
 
1202
    def test_success_log_deleted(self):
 
1203
        """Successful tests have their log deleted"""
 
1204
 
 
1205
        class LogTester(tests.TestCase):
 
1206
 
 
1207
            def test_success(self):
 
1208
                self.log('this will be removed\n')
 
1209
 
 
1210
        sio = StringIO()
 
1211
        runner = tests.TextTestRunner(stream=sio)
 
1212
        test = LogTester('test_success')
 
1213
        result = self.run_test_runner(runner, test)
 
1214
 
 
1215
        self.assertLogDeleted(test)
 
1216
 
 
1217
    def test_skipped_log_deleted(self):
 
1218
        """Skipped tests have their log deleted"""
 
1219
 
 
1220
        class LogTester(tests.TestCase):
 
1221
 
 
1222
            def test_skipped(self):
 
1223
                self.log('this will be removed\n')
 
1224
                raise tests.TestSkipped()
 
1225
 
 
1226
        sio = StringIO()
 
1227
        runner = tests.TextTestRunner(stream=sio)
 
1228
        test = LogTester('test_skipped')
 
1229
        result = self.run_test_runner(runner, test)
 
1230
 
 
1231
        self.assertLogDeleted(test)
 
1232
 
 
1233
    def test_not_aplicable_log_deleted(self):
 
1234
        """Not applicable tests have their log deleted"""
 
1235
 
 
1236
        class LogTester(tests.TestCase):
 
1237
 
 
1238
            def test_not_applicable(self):
 
1239
                self.log('this will be removed\n')
 
1240
                raise tests.TestNotApplicable()
 
1241
 
 
1242
        sio = StringIO()
 
1243
        runner = tests.TextTestRunner(stream=sio)
 
1244
        test = LogTester('test_not_applicable')
 
1245
        result = self.run_test_runner(runner, test)
 
1246
 
 
1247
        self.assertLogDeleted(test)
 
1248
 
 
1249
    def test_known_failure_log_deleted(self):
 
1250
        """Know failure tests have their log deleted"""
 
1251
 
 
1252
        class LogTester(tests.TestCase):
 
1253
 
 
1254
            def test_known_failure(self):
 
1255
                self.log('this will be removed\n')
 
1256
                raise tests.KnownFailure()
 
1257
 
 
1258
        sio = StringIO()
 
1259
        runner = tests.TextTestRunner(stream=sio)
 
1260
        test = LogTester('test_known_failure')
 
1261
        result = self.run_test_runner(runner, test)
 
1262
 
 
1263
        self.assertLogDeleted(test)
 
1264
 
 
1265
    def test_fail_log_kept(self):
 
1266
        """Failed tests have their log kept"""
 
1267
 
 
1268
        class LogTester(tests.TestCase):
 
1269
 
 
1270
            def test_fail(self):
 
1271
                self.log('this will be kept\n')
 
1272
                self.fail('this test fails')
 
1273
 
 
1274
        sio = StringIO()
 
1275
        runner = tests.TextTestRunner(stream=sio)
 
1276
        test = LogTester('test_fail')
 
1277
        result = self.run_test_runner(runner, test)
 
1278
 
 
1279
        text = sio.getvalue()
 
1280
        self.assertContainsRe(text, 'this will be kept')
 
1281
        self.assertContainsRe(text, 'this test fails')
 
1282
 
 
1283
        log = test._get_log()
 
1284
        self.assertContainsRe(log, 'this will be kept')
 
1285
        self.assertEqual(log, test._log_contents)
 
1286
 
 
1287
    def test_error_log_kept(self):
 
1288
        """Tests with errors have their log kept"""
 
1289
 
 
1290
        class LogTester(tests.TestCase):
 
1291
 
 
1292
            def test_error(self):
 
1293
                self.log('this will be kept\n')
 
1294
                raise ValueError('random exception raised')
 
1295
 
 
1296
        sio = StringIO()
 
1297
        runner = tests.TextTestRunner(stream=sio)
 
1298
        test = LogTester('test_error')
 
1299
        result = self.run_test_runner(runner, test)
 
1300
 
 
1301
        text = sio.getvalue()
 
1302
        self.assertContainsRe(text, 'this will be kept')
 
1303
        self.assertContainsRe(text, 'random exception raised')
 
1304
 
 
1305
        log = test._get_log()
 
1306
        self.assertContainsRe(log, 'this will be kept')
 
1307
        self.assertEqual(log, test._log_contents)
1204
1308
 
1205
1309
    def test_startTestRun(self):
1206
1310
        """run should call result.startTestRun()"""
1207
1311
        calls = []
1208
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1312
        class LoggingDecorator(tests.ForwardingResult):
1209
1313
            def startTestRun(self):
1210
 
                ExtendedToOriginalDecorator.startTestRun(self)
 
1314
                tests.ForwardingResult.startTestRun(self)
1211
1315
                calls.append('startTestRun')
1212
1316
        test = unittest.FunctionTestCase(lambda:None)
1213
1317
        stream = StringIO()
1219
1323
    def test_stopTestRun(self):
1220
1324
        """run should call result.stopTestRun()"""
1221
1325
        calls = []
1222
 
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1326
        class LoggingDecorator(tests.ForwardingResult):
1223
1327
            def stopTestRun(self):
1224
 
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1328
                tests.ForwardingResult.stopTestRun(self)
1225
1329
                calls.append('stopTestRun')
1226
1330
        test = unittest.FunctionTestCase(lambda:None)
1227
1331
        stream = StringIO()
1230
1334
        result = self.run_test_runner(runner, test)
1231
1335
        self.assertLength(1, calls)
1232
1336
 
1233
 
    def test_unicode_test_output_on_ascii_stream(self):
1234
 
        """Showing results should always succeed even on an ascii console"""
1235
 
        class FailureWithUnicode(tests.TestCase):
1236
 
            def test_log_unicode(self):
1237
 
                self.log(u"\u2606")
1238
 
                self.fail("Now print that log!")
1239
 
        out = StringIO()
1240
 
        self.overrideAttr(osutils, "get_terminal_encoding",
1241
 
            lambda trace=False: "ascii")
1242
 
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1243
 
            FailureWithUnicode("test_log_unicode"))
1244
 
        self.assertContainsRe(out.getvalue(),
1245
 
            "(?:Text attachment: )?log"
1246
 
            "(?:\n-+\n|: {{{)"
1247
 
            "\d+\.\d+  \\\\u2606"
1248
 
            "(?:\n-+\n|}}}\n)")
1249
 
 
1250
1337
 
1251
1338
class SampleTestCase(tests.TestCase):
1252
1339
 
1317
1404
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1318
1405
 
1319
1406
    def change_selftest_debug_flags(self, new_flags):
1320
 
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
 
1407
        orig_selftest_flags = tests.selftest_debug_flags
 
1408
        self.addCleanup(self._restore_selftest_debug_flags, orig_selftest_flags)
 
1409
        tests.selftest_debug_flags = set(new_flags)
 
1410
 
 
1411
    def _restore_selftest_debug_flags(self, flags):
 
1412
        tests.selftest_debug_flags = flags
1321
1413
 
1322
1414
    def test_allow_debug_flag(self):
1323
1415
        """The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
1383
1475
        self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags)
1384
1476
 
1385
1477
    def make_test_result(self):
1386
 
        """Get a test result that writes to the test log file."""
1387
1478
        return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1)
1388
1479
 
1389
1480
    def inner_test(self):
1397
1488
        result = self.make_test_result()
1398
1489
        self.inner_test.run(result)
1399
1490
        note("outer finish")
1400
 
        self.addCleanup(osutils.delete_any, self._log_file_name)
1401
1491
 
1402
1492
    def test_trace_nesting(self):
1403
1493
        # this tests that each test case nests its trace facility correctly.
1415
1505
        outer_test = TestTestCase("outer_child")
1416
1506
        result = self.make_test_result()
1417
1507
        outer_test.run(result)
 
1508
        self.addCleanup(osutils.delete_any, outer_test._log_file_name)
1418
1509
        self.assertEqual(original_trace, bzrlib.trace._trace_file)
1419
1510
 
1420
1511
    def method_that_times_a_bit_twice(self):
1427
1518
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1428
1519
        output_stream = StringIO()
1429
1520
        result = bzrlib.tests.VerboseTestResult(
1430
 
            output_stream,
 
1521
            unittest._WritelnDecorator(output_stream),
1431
1522
            descriptions=0,
1432
1523
            verbosity=2)
1433
1524
        sample_test.run(result)
1440
1531
        # Note this test won't fail with hooks that the core library doesn't
1441
1532
        # use - but it trigger with a plugin that adds hooks, so its still a
1442
1533
        # useful warning in that case.
1443
 
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1444
 
        self.assertEqual(
1445
 
            bzrlib.smart.server.SmartServerHooks(),
 
1534
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1535
            bzrlib.branch.Branch.hooks)
 
1536
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
1446
1537
            bzrlib.smart.server.SmartTCPServer.hooks)
1447
 
        self.assertEqual(
1448
 
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1538
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1539
            bzrlib.commands.Command.hooks)
1449
1540
 
1450
1541
    def test__gather_lsprof_in_benchmarks(self):
1451
1542
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1452
1543
 
1453
1544
        Each self.time() call is individually and separately profiled.
1454
1545
        """
1455
 
        self.requireFeature(features.lsprof_feature)
 
1546
        self.requireFeature(test_lsprof.LSProfFeature)
1456
1547
        # overrides the class member with an instance member so no cleanup
1457
1548
        # needed.
1458
1549
        self._gather_lsprof_in_benchmarks = True
1474
1565
        # permitted.
1475
1566
        # Manually set one up (TestCase doesn't and shouldn't provide magic
1476
1567
        # machinery)
1477
 
        transport_server = memory.MemoryServer()
1478
 
        transport_server.start_server()
1479
 
        self.addCleanup(transport_server.stop_server)
1480
 
        t = transport.get_transport_from_url(transport_server.get_url())
1481
 
        controldir.ControlDir.create(t.base)
 
1568
        transport_server = MemoryServer()
 
1569
        transport_server.setUp()
 
1570
        self.addCleanup(transport_server.tearDown)
 
1571
        t = transport.get_transport(transport_server.get_url())
 
1572
        bzrdir.BzrDir.create(t.base)
1482
1573
        self.assertRaises(errors.BzrError,
1483
 
            controldir.ControlDir.open_from_transport, t)
 
1574
            bzrdir.BzrDir.open_from_transport, t)
1484
1575
        # But if we declare this as safe, we can open the bzrdir.
1485
1576
        self.permit_url(t.base)
1486
1577
        self._bzr_selftest_roots.append(t.base)
1487
 
        controldir.ControlDir.open_from_transport(t)
 
1578
        bzrdir.BzrDir.open_from_transport(t)
1488
1579
 
1489
1580
    def test_requireFeature_available(self):
1490
1581
        """self.requireFeature(available) is a no-op."""
1491
 
        class Available(features.Feature):
 
1582
        class Available(tests.Feature):
1492
1583
            def _probe(self):return True
1493
1584
        feature = Available()
1494
1585
        self.requireFeature(feature)
1495
1586
 
1496
1587
    def test_requireFeature_unavailable(self):
1497
1588
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1498
 
        class Unavailable(features.Feature):
 
1589
        class Unavailable(tests.Feature):
1499
1590
            def _probe(self):return False
1500
1591
        feature = Unavailable()
1501
1592
        self.assertRaises(tests.UnavailableFeature,
1535
1626
        """Test disabled tests behaviour with support aware results."""
1536
1627
        test = SampleTestCase('_test_pass')
1537
1628
        class DisabledFeature(object):
1538
 
            def __eq__(self, other):
1539
 
                return isinstance(other, DisabledFeature)
1540
1629
            def available(self):
1541
1630
                return False
1542
1631
        the_feature = DisabledFeature()
1553
1642
                self.calls.append(('addNotSupported', test, feature))
1554
1643
        result = InstrumentedTestResult()
1555
1644
        test.run(result)
1556
 
        case = result.calls[0][1]
1557
1645
        self.assertEqual([
1558
 
            ('startTest', case),
1559
 
            ('addNotSupported', case, the_feature),
1560
 
            ('stopTest', case),
 
1646
            ('startTest', test),
 
1647
            ('addNotSupported', test, the_feature),
 
1648
            ('stopTest', test),
1561
1649
            ],
1562
1650
            result.calls)
1563
1651
 
1564
1652
    def test_start_server_registers_url(self):
1565
 
        transport_server = memory.MemoryServer()
 
1653
        transport_server = MemoryServer()
1566
1654
        # A little strict, but unlikely to be changed soon.
1567
1655
        self.assertEqual([], self._bzr_selftest_roots)
1568
1656
        self.start_server(transport_server)
1624
1712
        self.assertRaises(AssertionError,
1625
1713
            self.assertListRaises, _TestException, success_generator)
1626
1714
 
1627
 
    def test_overrideAttr_without_value(self):
1628
 
        self.test_attr = 'original' # Define a test attribute
1629
 
        obj = self # Make 'obj' visible to the embedded test
1630
 
        class Test(tests.TestCase):
1631
 
 
1632
 
            def setUp(self):
1633
 
                tests.TestCase.setUp(self)
1634
 
                self.orig = self.overrideAttr(obj, 'test_attr')
1635
 
 
1636
 
            def test_value(self):
1637
 
                self.assertEqual('original', self.orig)
1638
 
                self.assertEqual('original', obj.test_attr)
1639
 
                obj.test_attr = 'modified'
1640
 
                self.assertEqual('modified', obj.test_attr)
1641
 
 
1642
 
        test = Test('test_value')
1643
 
        test.run(unittest.TestResult())
1644
 
        self.assertEqual('original', obj.test_attr)
1645
 
 
1646
 
    def test_overrideAttr_with_value(self):
1647
 
        self.test_attr = 'original' # Define a test attribute
1648
 
        obj = self # Make 'obj' visible to the embedded test
1649
 
        class Test(tests.TestCase):
1650
 
 
1651
 
            def setUp(self):
1652
 
                tests.TestCase.setUp(self)
1653
 
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1654
 
 
1655
 
            def test_value(self):
1656
 
                self.assertEqual('original', self.orig)
1657
 
                self.assertEqual('modified', obj.test_attr)
1658
 
 
1659
 
        test = Test('test_value')
1660
 
        test.run(unittest.TestResult())
1661
 
        self.assertEqual('original', obj.test_attr)
1662
 
 
1663
 
    def test_recordCalls(self):
1664
 
        from bzrlib.tests import test_selftest
1665
 
        calls = self.recordCalls(
1666
 
            test_selftest, '_add_numbers')
1667
 
        self.assertEqual(test_selftest._add_numbers(2, 10),
1668
 
            12)
1669
 
        self.assertEquals(calls, [((2, 10), {})])
1670
 
 
1671
 
 
1672
 
def _add_numbers(a, b):
1673
 
    return a + b
1674
 
 
1675
 
 
1676
 
class _MissingFeature(features.Feature):
1677
 
    def _probe(self):
1678
 
        return False
1679
 
missing_feature = _MissingFeature()
1680
 
 
1681
 
 
1682
 
def _get_test(name):
1683
 
    """Get an instance of a specific example test.
1684
 
 
1685
 
    We protect this in a function so that they don't auto-run in the test
1686
 
    suite.
1687
 
    """
1688
 
 
1689
 
    class ExampleTests(tests.TestCase):
1690
 
 
1691
 
        def test_fail(self):
1692
 
            mutter('this was a failing test')
1693
 
            self.fail('this test will fail')
1694
 
 
1695
 
        def test_error(self):
1696
 
            mutter('this test errored')
1697
 
            raise RuntimeError('gotcha')
1698
 
 
1699
 
        def test_missing_feature(self):
1700
 
            mutter('missing the feature')
1701
 
            self.requireFeature(missing_feature)
1702
 
 
1703
 
        def test_skip(self):
1704
 
            mutter('this test will be skipped')
1705
 
            raise tests.TestSkipped('reason')
1706
 
 
1707
 
        def test_success(self):
1708
 
            mutter('this test succeeds')
1709
 
 
1710
 
        def test_xfail(self):
1711
 
            mutter('test with expected failure')
1712
 
            self.knownFailure('this_fails')
1713
 
 
1714
 
        def test_unexpected_success(self):
1715
 
            mutter('test with unexpected success')
1716
 
            self.expectFailure('should_fail', lambda: None)
1717
 
 
1718
 
    return ExampleTests(name)
1719
 
 
1720
 
 
1721
 
class TestTestCaseLogDetails(tests.TestCase):
1722
 
 
1723
 
    def _run_test(self, test_name):
1724
 
        test = _get_test(test_name)
1725
 
        result = testtools.TestResult()
1726
 
        test.run(result)
1727
 
        return result
1728
 
 
1729
 
    def test_fail_has_log(self):
1730
 
        result = self._run_test('test_fail')
1731
 
        self.assertEqual(1, len(result.failures))
1732
 
        result_content = result.failures[0][1]
1733
 
        self.assertContainsRe(result_content,
1734
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1735
 
        self.assertContainsRe(result_content, 'this was a failing test')
1736
 
 
1737
 
    def test_error_has_log(self):
1738
 
        result = self._run_test('test_error')
1739
 
        self.assertEqual(1, len(result.errors))
1740
 
        result_content = result.errors[0][1]
1741
 
        self.assertContainsRe(result_content,
1742
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1743
 
        self.assertContainsRe(result_content, 'this test errored')
1744
 
 
1745
 
    def test_skip_has_no_log(self):
1746
 
        result = self._run_test('test_skip')
1747
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
1748
 
        skips = result.skip_reasons['reason']
1749
 
        self.assertEqual(1, len(skips))
1750
 
        test = skips[0]
1751
 
        self.assertFalse('log' in test.getDetails())
1752
 
 
1753
 
    def test_missing_feature_has_no_log(self):
1754
 
        # testtools doesn't know about addNotSupported, so it just gets
1755
 
        # considered as a skip
1756
 
        result = self._run_test('test_missing_feature')
1757
 
        self.assertEqual([missing_feature], result.skip_reasons.keys())
1758
 
        skips = result.skip_reasons[missing_feature]
1759
 
        self.assertEqual(1, len(skips))
1760
 
        test = skips[0]
1761
 
        self.assertFalse('log' in test.getDetails())
1762
 
 
1763
 
    def test_xfail_has_no_log(self):
1764
 
        result = self._run_test('test_xfail')
1765
 
        self.assertEqual(1, len(result.expectedFailures))
1766
 
        result_content = result.expectedFailures[0][1]
1767
 
        self.assertNotContainsRe(result_content,
1768
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1769
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
1770
 
 
1771
 
    def test_unexpected_success_has_log(self):
1772
 
        result = self._run_test('test_unexpected_success')
1773
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
1774
 
        # Inconsistency, unexpectedSuccesses is a list of tests,
1775
 
        # expectedFailures is a list of reasons?
1776
 
        test = result.unexpectedSuccesses[0]
1777
 
        details = test.getDetails()
1778
 
        self.assertTrue('log' in details)
1779
 
 
1780
 
 
1781
 
class TestTestCloning(tests.TestCase):
1782
 
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1783
 
 
1784
 
    def test_cloned_testcase_does_not_share_details(self):
1785
 
        """A TestCase cloned with clone_test does not share mutable attributes
1786
 
        such as details or cleanups.
1787
 
        """
1788
 
        class Test(tests.TestCase):
1789
 
            def test_foo(self):
1790
 
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1791
 
        orig_test = Test('test_foo')
1792
 
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1793
 
        orig_test.run(unittest.TestResult())
1794
 
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1795
 
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1796
 
 
1797
 
    def test_double_apply_scenario_preserves_first_scenario(self):
1798
 
        """Applying two levels of scenarios to a test preserves the attributes
1799
 
        added by both scenarios.
1800
 
        """
1801
 
        class Test(tests.TestCase):
1802
 
            def test_foo(self):
1803
 
                pass
1804
 
        test = Test('test_foo')
1805
 
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1806
 
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1807
 
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1808
 
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1809
 
        all_tests = list(tests.iter_suite_tests(suite))
1810
 
        self.assertLength(4, all_tests)
1811
 
        all_xys = sorted((t.x, t.y) for t in all_tests)
1812
 
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1813
 
 
1814
1715
 
1815
1716
# NB: Don't delete this; it's not actually from 0.11!
1816
1717
@deprecated_function(deprecated_in((0, 11, 0)))
1944
1845
    def test_make_branch_and_tree_with_format(self):
1945
1846
        # we should be able to supply a format to make_branch_and_tree
1946
1847
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1947
 
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
 
1848
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
 
1849
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1948
1850
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1851
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1852
                              bzrlib.bzrdir.BzrDirFormat6)
1949
1853
 
1950
1854
    def test_make_branch_and_memory_tree(self):
1951
1855
        # we should be able to get a new branch and a mutable tree from
1957
1861
        # make_branch_and_tree has to use local branch and repositories
1958
1862
        # when the vfs transport and local disk are colocated, even if
1959
1863
        # a different transport is in use for url generation.
1960
 
        self.transport_server = test_server.FakeVFATServer
 
1864
        from bzrlib.transport.fakevfat import FakeVFATServer
 
1865
        self.transport_server = FakeVFATServer
1961
1866
        self.assertFalse(self.get_url('t1').startswith('file://'))
1962
1867
        tree = self.make_branch_and_tree('t1')
1963
1868
        base = tree.bzrdir.root_transport.base
1968
1873
                tree.branch.repository.bzrdir.root_transport)
1969
1874
 
1970
1875
 
1971
 
class SelfTestHelper(object):
 
1876
class SelfTestHelper:
1972
1877
 
1973
1878
    def run_selftest(self, **kwargs):
1974
1879
        """Run selftest returning its output."""
2028
1933
        self.assertLength(2, output.readlines())
2029
1934
 
2030
1935
    def test_lsprof_tests(self):
2031
 
        self.requireFeature(features.lsprof_feature)
2032
 
        results = []
 
1936
        self.requireFeature(test_lsprof.LSProfFeature)
 
1937
        calls = []
2033
1938
        class Test(object):
2034
1939
            def __call__(test, result):
2035
1940
                test.run(result)
2036
1941
            def run(test, result):
2037
 
                results.append(result)
 
1942
                self.assertIsInstance(result, tests.ForwardingResult)
 
1943
                calls.append("called")
2038
1944
            def countTestCases(self):
2039
1945
                return 1
2040
1946
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2041
 
        self.assertLength(1, results)
2042
 
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
1947
        self.assertLength(1, calls)
2043
1948
 
2044
1949
    def test_random(self):
2045
1950
        # test randomising by listing a number of tests.
2061
1966
        self.assertEqual(expected.getvalue(), repeated.getvalue())
2062
1967
 
2063
1968
    def test_runner_class(self):
2064
 
        self.requireFeature(features.subunit)
 
1969
        self.requireFeature(SubUnitFeature)
2065
1970
        from subunit import ProtocolTestCase
2066
1971
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2067
1972
            test_suite_factory=self.factory)
2099
2004
        self.assertEqual(transport_server, captured_transport[0])
2100
2005
 
2101
2006
    def test_transport_sftp(self):
2102
 
        self.requireFeature(features.paramiko)
2103
 
        from bzrlib.tests import stub_sftp
2104
 
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
 
2007
        try:
 
2008
            import bzrlib.transport.sftp
 
2009
        except errors.ParamikoNotPresent:
 
2010
            raise tests.TestSkipped("Paramiko not present")
 
2011
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
2105
2012
 
2106
2013
    def test_transport_memory(self):
2107
 
        self.check_transport_set(memory.MemoryServer)
 
2014
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
2108
2015
 
2109
2016
 
2110
2017
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2125
2032
            load_list='missing file name', list_only=True)
2126
2033
 
2127
2034
 
2128
 
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2129
 
 
2130
 
    _test_needs_features = [features.subunit]
2131
 
 
2132
 
    def run_subunit_stream(self, test_name):
2133
 
        from subunit import ProtocolTestCase
2134
 
        def factory():
2135
 
            return TestUtil.TestSuite([_get_test(test_name)])
2136
 
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2137
 
            test_suite_factory=factory)
2138
 
        test = ProtocolTestCase(stream)
2139
 
        result = testtools.TestResult()
2140
 
        test.run(result)
2141
 
        content = stream.getvalue()
2142
 
        return content, result
2143
 
 
2144
 
    def test_fail_has_log(self):
2145
 
        content, result = self.run_subunit_stream('test_fail')
2146
 
        self.assertEqual(1, len(result.failures))
2147
 
        self.assertContainsRe(content, '(?m)^log$')
2148
 
        self.assertContainsRe(content, 'this test will fail')
2149
 
 
2150
 
    def test_error_has_log(self):
2151
 
        content, result = self.run_subunit_stream('test_error')
2152
 
        self.assertContainsRe(content, '(?m)^log$')
2153
 
        self.assertContainsRe(content, 'this test errored')
2154
 
 
2155
 
    def test_skip_has_no_log(self):
2156
 
        content, result = self.run_subunit_stream('test_skip')
2157
 
        self.assertNotContainsRe(content, '(?m)^log$')
2158
 
        self.assertNotContainsRe(content, 'this test will be skipped')
2159
 
        self.assertEqual(['reason'], result.skip_reasons.keys())
2160
 
        skips = result.skip_reasons['reason']
2161
 
        self.assertEqual(1, len(skips))
2162
 
        test = skips[0]
2163
 
        # RemotedTestCase doesn't preserve the "details"
2164
 
        ## self.assertFalse('log' in test.getDetails())
2165
 
 
2166
 
    def test_missing_feature_has_no_log(self):
2167
 
        content, result = self.run_subunit_stream('test_missing_feature')
2168
 
        self.assertNotContainsRe(content, '(?m)^log$')
2169
 
        self.assertNotContainsRe(content, 'missing the feature')
2170
 
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2171
 
        skips = result.skip_reasons['_MissingFeature\n']
2172
 
        self.assertEqual(1, len(skips))
2173
 
        test = skips[0]
2174
 
        # RemotedTestCase doesn't preserve the "details"
2175
 
        ## self.assertFalse('log' in test.getDetails())
2176
 
 
2177
 
    def test_xfail_has_no_log(self):
2178
 
        content, result = self.run_subunit_stream('test_xfail')
2179
 
        self.assertNotContainsRe(content, '(?m)^log$')
2180
 
        self.assertNotContainsRe(content, 'test with expected failure')
2181
 
        self.assertEqual(1, len(result.expectedFailures))
2182
 
        result_content = result.expectedFailures[0][1]
2183
 
        self.assertNotContainsRe(result_content,
2184
 
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2185
 
        self.assertNotContainsRe(result_content, 'test with expected failure')
2186
 
 
2187
 
    def test_unexpected_success_has_log(self):
2188
 
        content, result = self.run_subunit_stream('test_unexpected_success')
2189
 
        self.assertContainsRe(content, '(?m)^log$')
2190
 
        self.assertContainsRe(content, 'test with unexpected success')
2191
 
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2192
 
        #                success, if a min version check is added remove this
2193
 
        from subunit import TestProtocolClient as _Client
2194
 
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2195
 
            self.expectFailure('subunit treats "unexpectedSuccess"'
2196
 
                               ' as a plain success',
2197
 
                self.assertEqual, 1, len(result.unexpectedSuccesses))
2198
 
        self.assertEqual(1, len(result.unexpectedSuccesses))
2199
 
        test = result.unexpectedSuccesses[0]
2200
 
        # RemotedTestCase doesn't preserve the "details"
2201
 
        ## self.assertTrue('log' in test.getDetails())
2202
 
 
2203
 
    def test_success_has_no_log(self):
2204
 
        content, result = self.run_subunit_stream('test_success')
2205
 
        self.assertEqual(1, result.testsRun)
2206
 
        self.assertNotContainsRe(content, '(?m)^log$')
2207
 
        self.assertNotContainsRe(content, 'this test succeeds')
2208
 
 
2209
 
 
2210
2035
class TestRunBzr(tests.TestCase):
2211
2036
 
2212
2037
    out = ''
2335
2160
        # stdout and stderr of the invoked run_bzr
2336
2161
        current_factory = bzrlib.ui.ui_factory
2337
2162
        self.run_bzr(['foo'])
2338
 
        self.assertFalse(current_factory is self.factory)
 
2163
        self.failIf(current_factory is self.factory)
2339
2164
        self.assertNotEqual(sys.stdout, self.factory.stdout)
2340
2165
        self.assertNotEqual(sys.stderr, self.factory.stderr)
2341
2166
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2498
2323
 
2499
2324
 
2500
2325
class TestStartBzrSubProcess(tests.TestCase):
2501
 
    """Stub test start_bzr_subprocess."""
2502
2326
 
2503
 
    def _subprocess_log_cleanup(self):
2504
 
        """Inhibits the base version as we don't produce a log file."""
 
2327
    def check_popen_state(self):
 
2328
        """Replace to make assertions when popen is called."""
2505
2329
 
2506
2330
    def _popen(self, *args, **kwargs):
2507
 
        """Override the base version to record the command that is run.
2508
 
 
2509
 
        From there we can ensure it is correct without spawning a real process.
2510
 
        """
 
2331
        """Record the command that is run, so that we can ensure it is correct"""
2511
2332
        self.check_popen_state()
2512
2333
        self._popen_args = args
2513
2334
        self._popen_kwargs = kwargs
2514
2335
        raise _DontSpawnProcess()
2515
2336
 
2516
 
    def check_popen_state(self):
2517
 
        """Replace to make assertions when popen is called."""
2518
 
 
2519
2337
    def test_run_bzr_subprocess_no_plugins(self):
2520
2338
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2521
2339
        command = self._popen_args[0]
2525
2343
 
2526
2344
    def test_allow_plugins(self):
2527
2345
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2528
 
                          allow_plugins=True)
 
2346
            allow_plugins=True)
2529
2347
        command = self._popen_args[0]
2530
2348
        self.assertEqual([], command[2:])
2531
2349
 
2532
2350
    def test_set_env(self):
2533
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2351
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2534
2352
        # set in the child
2535
2353
        def check_environment():
2536
2354
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2537
2355
        self.check_popen_state = check_environment
2538
2356
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2539
 
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2357
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
2540
2358
        # not set in theparent
2541
2359
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2542
2360
 
2543
2361
    def test_run_bzr_subprocess_env_del(self):
2544
2362
        """run_bzr_subprocess can remove environment variables too."""
2545
 
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2363
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
2546
2364
        def check_environment():
2547
2365
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2548
2366
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2549
2367
        self.check_popen_state = check_environment
2550
2368
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2551
 
                          env_changes={'EXISTANT_ENV_VAR':None})
 
2369
            env_changes={'EXISTANT_ENV_VAR':None})
2552
2370
        # Still set in parent
2553
2371
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2554
2372
        del os.environ['EXISTANT_ENV_VAR']
2555
2373
 
2556
2374
    def test_env_del_missing(self):
2557
 
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2375
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2558
2376
        def check_environment():
2559
2377
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2560
2378
        self.check_popen_state = check_environment
2561
2379
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2562
 
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2380
            env_changes={'NON_EXISTANT_ENV_VAR':None})
2563
2381
 
2564
2382
    def test_working_dir(self):
2565
2383
        """Test that we can specify the working dir for the child"""
2568
2386
        chdirs = []
2569
2387
        def chdir(path):
2570
2388
            chdirs.append(path)
2571
 
        self.overrideAttr(os, 'chdir', chdir)
2572
 
        def getcwd():
2573
 
            return 'current'
2574
 
        self.overrideAttr(osutils, 'getcwd', getcwd)
2575
 
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2576
 
                          working_dir='foo')
 
2389
        os.chdir = chdir
 
2390
        try:
 
2391
            def getcwd():
 
2392
                return 'current'
 
2393
            osutils.getcwd = getcwd
 
2394
            try:
 
2395
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2396
                    working_dir='foo')
 
2397
            finally:
 
2398
                osutils.getcwd = orig_getcwd
 
2399
        finally:
 
2400
            os.chdir = orig_chdir
2577
2401
        self.assertEqual(['foo', 'current'], chdirs)
2578
2402
 
2579
 
    def test_get_bzr_path_with_cwd_bzrlib(self):
2580
 
        self.get_source_path = lambda: ""
2581
 
        self.overrideAttr(os.path, "isfile", lambda path: True)
2582
 
        self.assertEqual(self.get_bzr_path(), "bzr")
2583
 
 
2584
2403
 
2585
2404
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2586
2405
    """Tests that really need to do things with an external bzr."""
2599
2418
        self.assertEqual('bzr: interrupted\n', result[1])
2600
2419
 
2601
2420
 
 
2421
class TestKnownFailure(tests.TestCase):
 
2422
 
 
2423
    def test_known_failure(self):
 
2424
        """Check that KnownFailure is defined appropriately."""
 
2425
        # a KnownFailure is an assertion error for compatability with unaware
 
2426
        # runners.
 
2427
        self.assertIsInstance(tests.KnownFailure(""), AssertionError)
 
2428
 
 
2429
    def test_expect_failure(self):
 
2430
        try:
 
2431
            self.expectFailure("Doomed to failure", self.assertTrue, False)
 
2432
        except tests.KnownFailure, e:
 
2433
            self.assertEqual('Doomed to failure', e.args[0])
 
2434
        try:
 
2435
            self.expectFailure("Doomed to failure", self.assertTrue, True)
 
2436
        except AssertionError, e:
 
2437
            self.assertEqual('Unexpected success.  Should have failed:'
 
2438
                             ' Doomed to failure', e.args[0])
 
2439
        else:
 
2440
            self.fail('Assertion not raised')
 
2441
 
 
2442
 
 
2443
class TestFeature(tests.TestCase):
 
2444
 
 
2445
    def test_caching(self):
 
2446
        """Feature._probe is called by the feature at most once."""
 
2447
        class InstrumentedFeature(tests.Feature):
 
2448
            def __init__(self):
 
2449
                super(InstrumentedFeature, self).__init__()
 
2450
                self.calls = []
 
2451
            def _probe(self):
 
2452
                self.calls.append('_probe')
 
2453
                return False
 
2454
        feature = InstrumentedFeature()
 
2455
        feature.available()
 
2456
        self.assertEqual(['_probe'], feature.calls)
 
2457
        feature.available()
 
2458
        self.assertEqual(['_probe'], feature.calls)
 
2459
 
 
2460
    def test_named_str(self):
 
2461
        """Feature.__str__ should thunk to feature_name()."""
 
2462
        class NamedFeature(tests.Feature):
 
2463
            def feature_name(self):
 
2464
                return 'symlinks'
 
2465
        feature = NamedFeature()
 
2466
        self.assertEqual('symlinks', str(feature))
 
2467
 
 
2468
    def test_default_str(self):
 
2469
        """Feature.__str__ should default to __class__.__name__."""
 
2470
        class NamedFeature(tests.Feature):
 
2471
            pass
 
2472
        feature = NamedFeature()
 
2473
        self.assertEqual('NamedFeature', str(feature))
 
2474
 
 
2475
 
 
2476
class TestUnavailableFeature(tests.TestCase):
 
2477
 
 
2478
    def test_access_feature(self):
 
2479
        feature = tests.Feature()
 
2480
        exception = tests.UnavailableFeature(feature)
 
2481
        self.assertIs(feature, exception.args[0])
 
2482
 
 
2483
 
2602
2484
class TestSelftestFiltering(tests.TestCase):
2603
2485
 
2604
2486
    def setUp(self):
2755
2637
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2756
2638
 
2757
2639
 
2758
 
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2640
class TestCheckInventoryShape(tests.TestCaseWithTransport):
2759
2641
 
2760
 
    def test_check_tree_shape(self):
 
2642
    def test_check_inventory_shape(self):
2761
2643
        files = ['a', 'b/', 'b/c']
2762
2644
        tree = self.make_branch_and_tree('.')
2763
2645
        self.build_tree(files)
2764
2646
        tree.add(files)
2765
2647
        tree.lock_read()
2766
2648
        try:
2767
 
            self.check_tree_shape(tree, files)
 
2649
            self.check_inventory_shape(tree.inventory, files)
2768
2650
        finally:
2769
2651
            tree.unlock()
2770
2652
 
2783
2665
        # the test framework
2784
2666
        self.assertEquals('always fails', str(e))
2785
2667
        # check that there's no traceback in the test log
2786
 
        self.assertNotContainsRe(self.get_log(), r'Traceback')
 
2668
        self.assertNotContainsRe(self._get_log(keep_log_file=True),
 
2669
            r'Traceback')
2787
2670
 
2788
2671
    def test_run_bzr_user_error_caught(self):
2789
2672
        # Running bzr in blackbox mode, normal/expected/user errors should be
2790
2673
        # caught in the regular way and turned into an error message plus exit
2791
2674
        # code.
2792
 
        transport_server = memory.MemoryServer()
2793
 
        transport_server.start_server()
2794
 
        self.addCleanup(transport_server.stop_server)
 
2675
        transport_server = MemoryServer()
 
2676
        transport_server.setUp()
 
2677
        self.addCleanup(transport_server.tearDown)
2795
2678
        url = transport_server.get_url()
2796
2679
        self.permit_url(url)
2797
2680
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2941
2824
        # Test that a plausible list of modules to doctest is returned
2942
2825
        # by _test_suite_modules_to_doctest.
2943
2826
        test_list = tests._test_suite_modules_to_doctest()
2944
 
        if __doc__ is None:
2945
 
            # When docstrings are stripped, there are no modules to doctest
2946
 
            self.assertEqual([], test_list)
2947
 
            return
2948
2827
        self.assertSubset([
2949
2828
            'bzrlib.timestamp',
2950
2829
            ],
2957
2836
        # test doubles that supply a few sample tests to load, and check they
2958
2837
        # are loaded.
2959
2838
        calls = []
2960
 
        def testmod_names():
 
2839
        def _test_suite_testmod_names():
2961
2840
            calls.append("testmod_names")
2962
2841
            return [
2963
2842
                'bzrlib.tests.blackbox.test_branch',
2964
2843
                'bzrlib.tests.per_transport',
2965
2844
                'bzrlib.tests.test_selftest',
2966
2845
                ]
2967
 
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2968
 
        def doctests():
 
2846
        original_testmod_names = tests._test_suite_testmod_names
 
2847
        def _test_suite_modules_to_doctest():
2969
2848
            calls.append("modules_to_doctest")
2970
 
            if __doc__ is None:
2971
 
                return []
2972
2849
            return ['bzrlib.timestamp']
2973
 
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
 
2850
        orig_modules_to_doctest = tests._test_suite_modules_to_doctest
 
2851
        def restore_names():
 
2852
            tests._test_suite_testmod_names = original_testmod_names
 
2853
            tests._test_suite_modules_to_doctest = orig_modules_to_doctest
 
2854
        self.addCleanup(restore_names)
 
2855
        tests._test_suite_testmod_names = _test_suite_testmod_names
 
2856
        tests._test_suite_modules_to_doctest = _test_suite_modules_to_doctest
2974
2857
        expected_test_list = [
2975
2858
            # testmod_names
2976
2859
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2977
2860
            ('bzrlib.tests.per_transport.TransportTests'
2978
2861
             '.test_abspath(LocalTransport,LocalURLServer)'),
2979
2862
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
 
2863
            # modules_to_doctest
 
2864
            'bzrlib.timestamp.format_highres_date',
2980
2865
            # plugins can't be tested that way since selftest may be run with
2981
2866
            # --no-plugins
2982
2867
            ]
2983
 
        if __doc__ is not None:
2984
 
            expected_test_list.extend([
2985
 
                # modules_to_doctest
2986
 
                'bzrlib.timestamp.format_highres_date',
2987
 
                ])
2988
2868
        suite = tests.test_suite()
2989
2869
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
2990
2870
            set(calls))
3045
2925
    def test_load_tests(self):
3046
2926
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3047
2927
        loader = self._create_loader(test_list)
 
2928
 
3048
2929
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3049
2930
        self.assertEquals(test_list, _test_ids(suite))
3050
2931
 
3051
2932
    def test_exclude_tests(self):
3052
2933
        test_list = ['bogus']
3053
2934
        loader = self._create_loader(test_list)
 
2935
 
3054
2936
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3055
2937
        self.assertEquals([], _test_ids(suite))
3056
2938
 
3101
2983
        tpr.register('bar', 'bbb.aaa.rrr')
3102
2984
        tpr.register('bar', 'bBB.aAA.rRR')
3103
2985
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3104
 
        self.assertThat(self.get_log(),
3105
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3106
 
                           doctest.ELLIPSIS))
 
2986
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
2987
                              r'.*bar.*bbb.aaa.rrr.*bBB.aAA.rRR')
3107
2988
 
3108
2989
    def test_get_unknown_prefix(self):
3109
2990
        tpr = self._get_registry()
3129
3010
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3130
3011
 
3131
3012
 
3132
 
class TestThreadLeakDetection(tests.TestCase):
3133
 
    """Ensure when tests leak threads we detect and report it"""
3134
 
 
3135
 
    class LeakRecordingResult(tests.ExtendedTestResult):
3136
 
        def __init__(self):
3137
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3138
 
            self.leaks = []
3139
 
        def _report_thread_leak(self, test, leaks, alive):
3140
 
            self.leaks.append((test, leaks))
3141
 
 
3142
 
    def test_testcase_without_addCleanups(self):
3143
 
        """Check old TestCase instances don't break with leak detection"""
3144
 
        class Test(unittest.TestCase):
3145
 
            def runTest(self):
3146
 
                pass
3147
 
        result = self.LeakRecordingResult()
3148
 
        test = Test()
3149
 
        result.startTestRun()
3150
 
        test.run(result)
3151
 
        result.stopTestRun()
3152
 
        self.assertEqual(result._tests_leaking_threads_count, 0)
3153
 
        self.assertEqual(result.leaks, [])
3154
 
        
3155
 
    def test_thread_leak(self):
3156
 
        """Ensure a thread that outlives the running of a test is reported
3157
 
 
3158
 
        Uses a thread that blocks on an event, and is started by the inner
3159
 
        test case. As the thread outlives the inner case's run, it should be
3160
 
        detected as a leak, but the event is then set so that the thread can
3161
 
        be safely joined in cleanup so it's not leaked for real.
3162
 
        """
3163
 
        event = threading.Event()
3164
 
        thread = threading.Thread(name="Leaker", target=event.wait)
3165
 
        class Test(tests.TestCase):
3166
 
            def test_leak(self):
3167
 
                thread.start()
3168
 
        result = self.LeakRecordingResult()
3169
 
        test = Test("test_leak")
3170
 
        self.addCleanup(thread.join)
3171
 
        self.addCleanup(event.set)
3172
 
        result.startTestRun()
3173
 
        test.run(result)
3174
 
        result.stopTestRun()
3175
 
        self.assertEqual(result._tests_leaking_threads_count, 1)
3176
 
        self.assertEqual(result._first_thread_leaker_id, test.id())
3177
 
        self.assertEqual(result.leaks, [(test, set([thread]))])
3178
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3179
 
 
3180
 
    def test_multiple_leaks(self):
3181
 
        """Check multiple leaks are blamed on the test cases at fault
3182
 
 
3183
 
        Same concept as the previous test, but has one inner test method that
3184
 
        leaks two threads, and one that doesn't leak at all.
3185
 
        """
3186
 
        event = threading.Event()
3187
 
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3188
 
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3189
 
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3190
 
        class Test(tests.TestCase):
3191
 
            def test_first_leak(self):
3192
 
                thread_b.start()
3193
 
            def test_second_no_leak(self):
3194
 
                pass
3195
 
            def test_third_leak(self):
3196
 
                thread_c.start()
3197
 
                thread_a.start()
3198
 
        result = self.LeakRecordingResult()
3199
 
        first_test = Test("test_first_leak")
3200
 
        third_test = Test("test_third_leak")
3201
 
        self.addCleanup(thread_a.join)
3202
 
        self.addCleanup(thread_b.join)
3203
 
        self.addCleanup(thread_c.join)
3204
 
        self.addCleanup(event.set)
3205
 
        result.startTestRun()
3206
 
        unittest.TestSuite(
3207
 
            [first_test, Test("test_second_no_leak"), third_test]
3208
 
            ).run(result)
3209
 
        result.stopTestRun()
3210
 
        self.assertEqual(result._tests_leaking_threads_count, 2)
3211
 
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3212
 
        self.assertEqual(result.leaks, [
3213
 
            (first_test, set([thread_b])),
3214
 
            (third_test, set([thread_a, thread_c]))])
3215
 
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3216
 
 
3217
 
 
3218
 
class TestPostMortemDebugging(tests.TestCase):
3219
 
    """Check post mortem debugging works when tests fail or error"""
3220
 
 
3221
 
    class TracebackRecordingResult(tests.ExtendedTestResult):
3222
 
        def __init__(self):
3223
 
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3224
 
            self.postcode = None
3225
 
        def _post_mortem(self, tb=None):
3226
 
            """Record the code object at the end of the current traceback"""
3227
 
            tb = tb or sys.exc_info()[2]
3228
 
            if tb is not None:
3229
 
                next = tb.tb_next
3230
 
                while next is not None:
3231
 
                    tb = next
3232
 
                    next = next.tb_next
3233
 
                self.postcode = tb.tb_frame.f_code
3234
 
        def report_error(self, test, err):
3235
 
            pass
3236
 
        def report_failure(self, test, err):
3237
 
            pass
3238
 
 
3239
 
    def test_location_unittest_error(self):
3240
 
        """Needs right post mortem traceback with erroring unittest case"""
3241
 
        class Test(unittest.TestCase):
3242
 
            def runTest(self):
3243
 
                raise RuntimeError
3244
 
        result = self.TracebackRecordingResult()
3245
 
        Test().run(result)
3246
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3247
 
 
3248
 
    def test_location_unittest_failure(self):
3249
 
        """Needs right post mortem traceback with failing unittest case"""
3250
 
        class Test(unittest.TestCase):
3251
 
            def runTest(self):
3252
 
                raise self.failureException
3253
 
        result = self.TracebackRecordingResult()
3254
 
        Test().run(result)
3255
 
        self.assertEqual(result.postcode, Test.runTest.func_code)
3256
 
 
3257
 
    def test_location_bt_error(self):
3258
 
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
3259
 
        class Test(tests.TestCase):
3260
 
            def test_error(self):
3261
 
                raise RuntimeError
3262
 
        result = self.TracebackRecordingResult()
3263
 
        Test("test_error").run(result)
3264
 
        self.assertEqual(result.postcode, Test.test_error.func_code)
3265
 
 
3266
 
    def test_location_bt_failure(self):
3267
 
        """Needs right post mortem traceback with failing bzrlib.tests case"""
3268
 
        class Test(tests.TestCase):
3269
 
            def test_failure(self):
3270
 
                raise self.failureException
3271
 
        result = self.TracebackRecordingResult()
3272
 
        Test("test_failure").run(result)
3273
 
        self.assertEqual(result.postcode, Test.test_failure.func_code)
3274
 
 
3275
 
    def test_env_var_triggers_post_mortem(self):
3276
 
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3277
 
        import pdb
3278
 
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3279
 
        post_mortem_calls = []
3280
 
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3281
 
        self.overrideEnv('BZR_TEST_PDB', None)
3282
 
        result._post_mortem(1)
3283
 
        self.overrideEnv('BZR_TEST_PDB', 'on')
3284
 
        result._post_mortem(2)
3285
 
        self.assertEqual([2], post_mortem_calls)
3286
 
 
3287
 
 
3288
3013
class TestRunSuite(tests.TestCase):
3289
3014
 
3290
3015
    def test_runner_class(self):
3301
3026
                                                self.verbosity)
3302
3027
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3303
3028
        self.assertLength(1, calls)
3304
 
 
3305
 
 
3306
 
class _Selftest(object):
3307
 
    """Mixin for tests needing full selftest output"""
3308
 
 
3309
 
    def _inject_stream_into_subunit(self, stream):
3310
 
        """To be overridden by subclasses that run tests out of process"""
3311
 
 
3312
 
    def _run_selftest(self, **kwargs):
3313
 
        sio = StringIO()
3314
 
        self._inject_stream_into_subunit(sio)
3315
 
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3316
 
        return sio.getvalue()
3317
 
 
3318
 
 
3319
 
class _ForkedSelftest(_Selftest):
3320
 
    """Mixin for tests needing full selftest output with forked children"""
3321
 
 
3322
 
    _test_needs_features = [features.subunit]
3323
 
 
3324
 
    def _inject_stream_into_subunit(self, stream):
3325
 
        """Monkey-patch subunit so the extra output goes to stream not stdout
3326
 
 
3327
 
        Some APIs need rewriting so this kind of bogus hackery can be replaced
3328
 
        by passing the stream param from run_tests down into ProtocolTestCase.
3329
 
        """
3330
 
        from subunit import ProtocolTestCase
3331
 
        _original_init = ProtocolTestCase.__init__
3332
 
        def _init_with_passthrough(self, *args, **kwargs):
3333
 
            _original_init(self, *args, **kwargs)
3334
 
            self._passthrough = stream
3335
 
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3336
 
 
3337
 
    def _run_selftest(self, **kwargs):
3338
 
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3339
 
        if getattr(os, "fork", None) is None:
3340
 
            raise tests.TestNotApplicable("Platform doesn't support forking")
3341
 
        # Make sure the fork code is actually invoked by claiming two cores
3342
 
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3343
 
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3344
 
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3345
 
 
3346
 
 
3347
 
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3348
 
    """Check operation of --parallel=fork selftest option"""
3349
 
 
3350
 
    def test_error_in_child_during_fork(self):
3351
 
        """Error in a forked child during test setup should get reported"""
3352
 
        class Test(tests.TestCase):
3353
 
            def testMethod(self):
3354
 
                pass
3355
 
        # We don't care what, just break something that a child will run
3356
 
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3357
 
        out = self._run_selftest(test_suite_factory=Test)
3358
 
        # Lines from the tracebacks of the two child processes may be mixed
3359
 
        # together due to the way subunit parses and forwards the streams,
3360
 
        # so permit extra lines between each part of the error output.
3361
 
        self.assertContainsRe(out,
3362
 
            "Traceback.*:\n"
3363
 
            "(?:.*\n)*"
3364
 
            ".+ in fork_for_tests\n"
3365
 
            "(?:.*\n)*"
3366
 
            "\s*workaround_zealous_crypto_random\(\)\n"
3367
 
            "(?:.*\n)*"
3368
 
            "TypeError:")
3369
 
 
3370
 
 
3371
 
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3372
 
    """Check a test case still alive after being run emits a warning"""
3373
 
 
3374
 
    class Test(tests.TestCase):
3375
 
        def test_pass(self):
3376
 
            pass
3377
 
        def test_self_ref(self):
3378
 
            self.also_self = self.test_self_ref
3379
 
        def test_skip(self):
3380
 
            self.skip("Don't need")
3381
 
 
3382
 
    def _get_suite(self):
3383
 
        return TestUtil.TestSuite([
3384
 
            self.Test("test_pass"),
3385
 
            self.Test("test_self_ref"),
3386
 
            self.Test("test_skip"),
3387
 
            ])
3388
 
 
3389
 
    def _run_selftest_with_suite(self, **kwargs):
3390
 
        old_flags = tests.selftest_debug_flags
3391
 
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3392
 
        gc_on = gc.isenabled()
3393
 
        if gc_on:
3394
 
            gc.disable()
3395
 
        try:
3396
 
            output = self._run_selftest(test_suite_factory=self._get_suite,
3397
 
                **kwargs)
3398
 
        finally:
3399
 
            if gc_on:
3400
 
                gc.enable()
3401
 
            tests.selftest_debug_flags = old_flags
3402
 
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3403
 
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3404
 
        return output
3405
 
 
3406
 
    def test_testsuite(self):
3407
 
        self._run_selftest_with_suite()
3408
 
 
3409
 
    def test_pattern(self):
3410
 
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3411
 
        self.assertNotContainsRe(out, "test_skip")
3412
 
 
3413
 
    def test_exclude_pattern(self):
3414
 
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3415
 
        self.assertNotContainsRe(out, "test_skip")
3416
 
 
3417
 
    def test_random_seed(self):
3418
 
        self._run_selftest_with_suite(random_seed="now")
3419
 
 
3420
 
    def test_matching_tests_first(self):
3421
 
        self._run_selftest_with_suite(matching_tests_first=True,
3422
 
            pattern="test_self_ref$")
3423
 
 
3424
 
    def test_starting_with_and_exclude(self):
3425
 
        out = self._run_selftest_with_suite(starting_with=["bt."],
3426
 
            exclude_pattern="test_skip$")
3427
 
        self.assertNotContainsRe(out, "test_skip")
3428
 
 
3429
 
    def test_additonal_decorator(self):
3430
 
        out = self._run_selftest_with_suite(
3431
 
            suite_decorators=[tests.TestDecorator])
3432
 
 
3433
 
 
3434
 
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3435
 
    """Check warnings from tests staying alive are emitted with subunit"""
3436
 
 
3437
 
    _test_needs_features = [features.subunit]
3438
 
 
3439
 
    def _run_selftest_with_suite(self, **kwargs):
3440
 
        return TestUncollectedWarnings._run_selftest_with_suite(self,
3441
 
            runner_class=tests.SubUnitBzrRunner, **kwargs)
3442
 
 
3443
 
 
3444
 
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3445
 
    """Check warnings from tests staying alive are emitted when forking"""
3446
 
 
3447
 
 
3448
 
class TestEnvironHandling(tests.TestCase):
3449
 
 
3450
 
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3451
 
        self.assertFalse('MYVAR' in os.environ)
3452
 
        self.overrideEnv('MYVAR', '42')
3453
 
        # We use an embedded test to make sure we fix the _captureVar bug
3454
 
        class Test(tests.TestCase):
3455
 
            def test_me(self):
3456
 
                # The first call save the 42 value
3457
 
                self.overrideEnv('MYVAR', None)
3458
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3459
 
                # Make sure we can call it twice
3460
 
                self.overrideEnv('MYVAR', None)
3461
 
                self.assertEquals(None, os.environ.get('MYVAR'))
3462
 
        output = StringIO()
3463
 
        result = tests.TextTestResult(output, 0, 1)
3464
 
        Test('test_me').run(result)
3465
 
        if not result.wasStrictlySuccessful():
3466
 
            self.fail(output.getvalue())
3467
 
        # We get our value back
3468
 
        self.assertEquals('42', os.environ.get('MYVAR'))
3469
 
 
3470
 
 
3471
 
class TestIsolatedEnv(tests.TestCase):
3472
 
    """Test isolating tests from os.environ.
3473
 
 
3474
 
    Since we use tests that are already isolated from os.environ a bit of care
3475
 
    should be taken when designing the tests to avoid bootstrap side-effects.
3476
 
    The tests start an already clean os.environ which allow doing valid
3477
 
    assertions about which variables are present or not and design tests around
3478
 
    these assertions.
3479
 
    """
3480
 
 
3481
 
    class ScratchMonkey(tests.TestCase):
3482
 
 
3483
 
        def test_me(self):
3484
 
            pass
3485
 
 
3486
 
    def test_basics(self):
3487
 
        # Make sure we know the definition of BZR_HOME: not part of os.environ
3488
 
        # for tests.TestCase.
3489
 
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
3490
 
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3491
 
        # Being part of isolated_environ, BZR_HOME should not appear here
3492
 
        self.assertFalse('BZR_HOME' in os.environ)
3493
 
        # Make sure we know the definition of LINES: part of os.environ for
3494
 
        # tests.TestCase
3495
 
        self.assertTrue('LINES' in tests.isolated_environ)
3496
 
        self.assertEquals('25', tests.isolated_environ['LINES'])
3497
 
        self.assertEquals('25', os.environ['LINES'])
3498
 
 
3499
 
    def test_injecting_unknown_variable(self):
3500
 
        # BZR_HOME is known to be absent from os.environ
3501
 
        test = self.ScratchMonkey('test_me')
3502
 
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3503
 
        self.assertEquals('foo', os.environ['BZR_HOME'])
3504
 
        tests.restore_os_environ(test)
3505
 
        self.assertFalse('BZR_HOME' in os.environ)
3506
 
 
3507
 
    def test_injecting_known_variable(self):
3508
 
        test = self.ScratchMonkey('test_me')
3509
 
        # LINES is known to be present in os.environ
3510
 
        tests.override_os_environ(test, {'LINES': '42'})
3511
 
        self.assertEquals('42', os.environ['LINES'])
3512
 
        tests.restore_os_environ(test)
3513
 
        self.assertEquals('25', os.environ['LINES'])
3514
 
 
3515
 
    def test_deleting_variable(self):
3516
 
        test = self.ScratchMonkey('test_me')
3517
 
        # LINES is known to be present in os.environ
3518
 
        tests.override_os_environ(test, {'LINES': None})
3519
 
        self.assertTrue('LINES' not in os.environ)
3520
 
        tests.restore_os_environ(test)
3521
 
        self.assertEquals('25', os.environ['LINES'])
3522
 
 
3523
 
 
3524
 
class TestDocTestSuiteIsolation(tests.TestCase):
3525
 
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3526
 
 
3527
 
    Since tests.TestCase alreay provides an isolation from os.environ, we use
3528
 
    the clean environment as a base for testing. To precisely capture the
3529
 
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3530
 
    compare against.
3531
 
 
3532
 
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3533
 
    not `os.environ` so each test overrides it to suit its needs.
3534
 
 
3535
 
    """
3536
 
 
3537
 
    def get_doctest_suite_for_string(self, klass, string):
3538
 
        class Finder(doctest.DocTestFinder):
3539
 
 
3540
 
            def find(*args, **kwargs):
3541
 
                test = doctest.DocTestParser().get_doctest(
3542
 
                    string, {}, 'foo', 'foo.py', 0)
3543
 
                return [test]
3544
 
 
3545
 
        suite = klass(test_finder=Finder())
3546
 
        return suite
3547
 
 
3548
 
    def run_doctest_suite_for_string(self, klass, string):
3549
 
        suite = self.get_doctest_suite_for_string(klass, string)
3550
 
        output = StringIO()
3551
 
        result = tests.TextTestResult(output, 0, 1)
3552
 
        suite.run(result)
3553
 
        return result, output
3554
 
 
3555
 
    def assertDocTestStringSucceds(self, klass, string):
3556
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3557
 
        if not result.wasStrictlySuccessful():
3558
 
            self.fail(output.getvalue())
3559
 
 
3560
 
    def assertDocTestStringFails(self, klass, string):
3561
 
        result, output = self.run_doctest_suite_for_string(klass, string)
3562
 
        if result.wasStrictlySuccessful():
3563
 
            self.fail(output.getvalue())
3564
 
 
3565
 
    def test_injected_variable(self):
3566
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3567
 
        test = """
3568
 
            >>> import os
3569
 
            >>> os.environ['LINES']
3570
 
            '42'
3571
 
            """
3572
 
        # doctest.DocTestSuite fails as it sees '25'
3573
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3574
 
        # tests.DocTestSuite sees '42'
3575
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3576
 
 
3577
 
    def test_deleted_variable(self):
3578
 
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3579
 
        test = """
3580
 
            >>> import os
3581
 
            >>> os.environ.get('LINES')
3582
 
            """
3583
 
        # doctest.DocTestSuite fails as it sees '25'
3584
 
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3585
 
        # tests.DocTestSuite sees None
3586
 
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3587
 
 
3588
 
 
3589
 
class TestSelftestExcludePatterns(tests.TestCase):
3590
 
 
3591
 
    def setUp(self):
3592
 
        super(TestSelftestExcludePatterns, self).setUp()
3593
 
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
3594
 
 
3595
 
    def suite_factory(self, keep_only=None, starting_with=None):
3596
 
        """A test suite factory with only a few tests."""
3597
 
        class Test(tests.TestCase):
3598
 
            def id(self):
3599
 
                # We don't need the full class path
3600
 
                return self._testMethodName
3601
 
            def a(self):
3602
 
                pass
3603
 
            def b(self):
3604
 
                pass
3605
 
            def c(self):
3606
 
                pass
3607
 
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3608
 
 
3609
 
    def assertTestList(self, expected, *selftest_args):
3610
 
        # We rely on setUp installing the right test suite factory so we can
3611
 
        # test at the command level without loading the whole test suite
3612
 
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3613
 
        actual = out.splitlines()
3614
 
        self.assertEquals(expected, actual)
3615
 
 
3616
 
    def test_full_list(self):
3617
 
        self.assertTestList(['a', 'b', 'c'])
3618
 
 
3619
 
    def test_single_exclude(self):
3620
 
        self.assertTestList(['b', 'c'], '-x', 'a')
3621
 
 
3622
 
    def test_mutiple_excludes(self):
3623
 
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3624
 
 
3625
 
 
3626
 
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3627
 
 
3628
 
    _test_needs_features = [features.subunit]
3629
 
 
3630
 
    def setUp(self):
3631
 
        super(TestCounterHooks, self).setUp()
3632
 
        class Test(tests.TestCase):
3633
 
 
3634
 
            def setUp(self):
3635
 
                super(Test, self).setUp()
3636
 
                self.hooks = hooks.Hooks()
3637
 
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3638
 
                self.install_counter_hook(self.hooks, 'myhook')
3639
 
 
3640
 
            def no_hook(self):
3641
 
                pass
3642
 
 
3643
 
            def run_hook_once(self):
3644
 
                for hook in self.hooks['myhook']:
3645
 
                    hook(self)
3646
 
 
3647
 
        self.test_class = Test
3648
 
 
3649
 
    def assertHookCalls(self, expected_calls, test_name):
3650
 
        test = self.test_class(test_name)
3651
 
        result = unittest.TestResult()
3652
 
        test.run(result)
3653
 
        self.assertTrue(hasattr(test, '_counters'))
3654
 
        self.assertTrue(test._counters.has_key('myhook'))
3655
 
        self.assertEquals(expected_calls, test._counters['myhook'])
3656
 
 
3657
 
    def test_no_hook(self):
3658
 
        self.assertHookCalls(0, 'no_hook')
3659
 
 
3660
 
    def test_run_hook_once(self):
3661
 
        tt = features.testtools
3662
 
        if tt.module.__version__ < (0, 9, 8):
3663
 
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3664
 
        self.assertHookCalls(1, 'run_hook_once')