~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

(vila) Make all transport put_bytes() raises TypeError when given unicode
 strings rather than bytes (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
 
20
import gc
20
21
import doctest
21
22
import os
22
23
import signal
42
43
from bzrlib import (
43
44
    branchbuilder,
44
45
    bzrdir,
 
46
    controldir,
45
47
    errors,
 
48
    hooks,
46
49
    lockdir,
47
50
    memorytree,
48
51
    osutils,
92
95
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
93
96
 
94
97
 
95
 
class TestUnicodeFilename(tests.TestCase):
96
 
 
97
 
    def test_probe_passes(self):
98
 
        """UnicodeFilename._probe passes."""
99
 
        # We can't test much more than that because the behaviour depends
100
 
        # on the platform.
101
 
        tests.UnicodeFilename._probe()
102
 
 
103
 
 
104
98
class TestTreeShape(tests.TestCaseInTempDir):
105
99
 
106
100
    def test_unicode_paths(self):
107
 
        self.requireFeature(tests.UnicodeFilename)
 
101
        self.requireFeature(features.UnicodeFilenameFeature)
108
102
 
109
103
        filename = u'hell\u00d8'
110
104
        self.build_tree_contents([(filename, 'contents of hello')])
341
335
        server1 = "a"
342
336
        server2 = "b"
343
337
        formats = [workingtree_4.WorkingTreeFormat4(),
344
 
                   workingtree_3.WorkingTreeFormat3(),]
345
 
        scenarios = make_scenarios(server1, server2, formats)
 
338
                   workingtree_3.WorkingTreeFormat3(),
 
339
                   workingtree_4.WorkingTreeFormat6()]
 
340
        scenarios = make_scenarios(server1, server2, formats,
 
341
            remote_server='c', remote_readonly_server='d',
 
342
            remote_backing_server='e')
346
343
        self.assertEqual([
347
344
            ('WorkingTreeFormat4',
348
345
             {'bzrdir_format': formats[0]._matchingbzrdir,
353
350
             {'bzrdir_format': formats[1]._matchingbzrdir,
354
351
              'transport_readonly_server': 'b',
355
352
              'transport_server': 'a',
356
 
              'workingtree_format': formats[1]})],
357
 
            scenarios)
 
353
              'workingtree_format': formats[1]}),
 
354
            ('WorkingTreeFormat6',
 
355
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
356
              'transport_readonly_server': 'b',
 
357
              'transport_server': 'a',
 
358
              'workingtree_format': formats[2]}),
 
359
            ('WorkingTreeFormat6,remote',
 
360
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
361
              'repo_is_remote': True,
 
362
              'transport_readonly_server': 'd',
 
363
              'transport_server': 'c',
 
364
              'vfs_transport_factory': 'e',
 
365
              'workingtree_format': formats[2]}),
 
366
            ], scenarios)
358
367
 
359
368
 
360
369
class TestTreeScenarios(tests.TestCase):
361
370
 
362
371
    def test_scenarios(self):
363
372
        # the tree implementation scenario generator is meant to setup one
364
 
        # instance for each working tree format, and one additional instance
 
373
        # instance for each working tree format, one additional instance
365
374
        # that will use the default wt format, but create a revision tree for
366
 
        # the tests.  this means that the wt ones should have the
367
 
        # workingtree_to_test_tree attribute set to 'return_parameter' and the
368
 
        # revision one set to revision_tree_from_workingtree.
 
375
        # the tests, and one more that uses the default wt format as a
 
376
        # lightweight checkout of a remote repository.  This means that the wt
 
377
        # ones should have the workingtree_to_test_tree attribute set to
 
378
        # 'return_parameter' and the revision one set to
 
379
        # revision_tree_from_workingtree.
369
380
 
370
381
        from bzrlib.tests.per_tree import (
371
382
            _dirstate_tree_from_workingtree,
377
388
            )
378
389
        server1 = "a"
379
390
        server2 = "b"
 
391
        smart_server = test_server.SmartTCPServer_for_testing
 
392
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
 
393
        mem_server = memory.MemoryServer
380
394
        formats = [workingtree_4.WorkingTreeFormat4(),
381
395
                   workingtree_3.WorkingTreeFormat3(),]
382
396
        scenarios = make_scenarios(server1, server2, formats)
383
 
        self.assertEqual(7, len(scenarios))
 
397
        self.assertEqual(8, len(scenarios))
384
398
        default_wt_format = workingtree.format_registry.get_default()
385
399
        wt4_format = workingtree_4.WorkingTreeFormat4()
386
400
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
401
        wt6_format = workingtree_4.WorkingTreeFormat6()
387
402
        expected_scenarios = [
388
403
            ('WorkingTreeFormat4',
389
404
             {'bzrdir_format': formats[0]._matchingbzrdir,
399
414
              'workingtree_format': formats[1],
400
415
              '_workingtree_to_test_tree': return_parameter,
401
416
             }),
 
417
            ('WorkingTreeFormat6,remote',
 
418
             {'bzrdir_format': wt6_format._matchingbzrdir,
 
419
              'repo_is_remote': True,
 
420
              'transport_readonly_server': smart_readonly_server,
 
421
              'transport_server': smart_server,
 
422
              'vfs_transport_factory': mem_server,
 
423
              'workingtree_format': wt6_format,
 
424
              '_workingtree_to_test_tree': return_parameter,
 
425
             }),
402
426
            ('RevisionTree',
403
427
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
404
428
              'bzrdir_format': default_wt_format._matchingbzrdir,
616
640
        # Guard against regression into MemoryTransport leaking
617
641
        # files to disk instead of keeping them in memory.
618
642
        self.assertFalse(osutils.lexists('dir'))
619
 
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
643
        dir_format = controldir.format_registry.make_bzrdir('knit')
620
644
        self.assertEqual(dir_format.repository_format.__class__,
621
645
                         the_branch.repository._format.__class__)
622
646
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
626
650
    def test_dangling_locks_cause_failures(self):
627
651
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
628
652
            def test_function(self):
629
 
                t = self.get_transport('.')
 
653
                t = self.get_transport_from_path('.')
630
654
                l = lockdir.LockDir(t, 'lock')
631
655
                l.create()
632
656
                l.attempt_lock()
652
676
        # for the server
653
677
        url = self.get_readonly_url()
654
678
        url2 = self.get_readonly_url('foo/bar')
655
 
        t = transport.get_transport(url)
656
 
        t2 = transport.get_transport(url2)
 
679
        t = transport.get_transport_from_url(url)
 
680
        t2 = transport.get_transport_from_url(url2)
657
681
        self.assertIsInstance(t, ReadonlyTransportDecorator)
658
682
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
659
683
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
667
691
        url = self.get_readonly_url()
668
692
        url2 = self.get_readonly_url('foo/bar')
669
693
        # the transport returned may be any HttpTransportBase subclass
670
 
        t = transport.get_transport(url)
671
 
        t2 = transport.get_transport(url2)
 
694
        t = transport.get_transport_from_url(url)
 
695
        t2 = transport.get_transport_from_url(url2)
672
696
        self.assertIsInstance(t, HttpTransportBase)
673
697
        self.assertIsInstance(t2, HttpTransportBase)
674
698
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
685
709
        builder = self.make_branch_builder('dir')
686
710
        rev_id = builder.build_commit()
687
711
        self.assertPathExists('dir')
688
 
        a_dir = bzrdir.BzrDir.open('dir')
 
712
        a_dir = controldir.ControlDir.open('dir')
689
713
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
690
714
        a_branch = a_dir.open_branch()
691
715
        builder_branch = builder.get_branch()
712
736
class TestChrootedTest(tests.ChrootedTestCase):
713
737
 
714
738
    def test_root_is_root(self):
715
 
        t = transport.get_transport(self.get_readonly_url())
 
739
        t = transport.get_transport_from_url(self.get_readonly_url())
716
740
        url = t.base
717
741
        self.assertEqual(url, t.clone('..').base)
718
742
 
720
744
class TestProfileResult(tests.TestCase):
721
745
 
722
746
    def test_profiles_tests(self):
723
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
747
        self.requireFeature(features.lsprof_feature)
724
748
        terminal = testtools.testresult.doubles.ExtendedTestResult()
725
749
        result = tests.ProfileResult(terminal)
726
750
        class Sample(tests.TestCase):
781
805
 
782
806
    def test_lsprofiling(self):
783
807
        """Verbose test result prints lsprof statistics from test cases."""
784
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
808
        self.requireFeature(features.lsprof_feature)
785
809
        result_stream = StringIO()
786
810
        result = bzrlib.tests.VerboseTestResult(
787
811
            result_stream,
832
856
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
833
857
 
834
858
    def test_known_failure(self):
835
 
        """A KnownFailure being raised should trigger several result actions."""
 
859
        """Using knownFailure should trigger several result actions."""
836
860
        class InstrumentedTestResult(tests.ExtendedTestResult):
837
861
            def stopTestRun(self): pass
838
862
            def report_tests_starting(self): pass
841
865
        result = InstrumentedTestResult(None, None, None, None)
842
866
        class Test(tests.TestCase):
843
867
            def test_function(self):
844
 
                raise tests.KnownFailure('failed!')
 
868
                self.knownFailure('failed!')
845
869
        test = Test("test_function")
846
870
        test.run(result)
847
871
        # it should invoke 'report_known_failure'.
863
887
            descriptions=0,
864
888
            verbosity=2,
865
889
            )
866
 
        test = self.get_passing_test()
867
 
        result.startTest(test)
868
 
        prefix = len(result_stream.getvalue())
869
 
        # the err parameter has the shape:
870
 
        # (class, exception object, traceback)
871
 
        # KnownFailures dont get their tracebacks shown though, so we
872
 
        # can skip that.
873
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
874
 
        result.report_known_failure(test, err)
875
 
        output = result_stream.getvalue()[prefix:]
876
 
        lines = output.splitlines()
877
 
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
878
 
        if sys.version_info > (2, 7):
879
 
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
880
 
                self.assertNotEqual, lines[1], '    ')
881
 
        self.assertEqual(lines[1], '    foo')
882
 
        self.assertEqual(2, len(lines))
 
890
        _get_test("test_xfail").run(result)
 
891
        self.assertContainsRe(result_stream.getvalue(),
 
892
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
893
            "\\s*(?:Text attachment: )?reason"
 
894
            "(?:\n-+\n|: {{{)"
 
895
            "this_fails"
 
896
            "(?:\n-+\n|}}}\n)")
883
897
 
884
898
    def get_passing_test(self):
885
899
        """Return a test object that can't be run usefully."""
896
910
                self._call = test, feature
897
911
        result = InstrumentedTestResult(None, None, None, None)
898
912
        test = SampleTestCase('_test_pass')
899
 
        feature = tests.Feature()
 
913
        feature = features.Feature()
900
914
        result.startTest(test)
901
915
        result.addNotSupported(test, feature)
902
916
        # it should invoke 'report_unsupported'.
921
935
            verbosity=2,
922
936
            )
923
937
        test = self.get_passing_test()
924
 
        feature = tests.Feature()
 
938
        feature = features.Feature()
925
939
        result.startTest(test)
926
940
        prefix = len(result_stream.getvalue())
927
941
        result.report_unsupported(test, feature)
940
954
            def addNotSupported(self, test, feature):
941
955
                self._call = test, feature
942
956
        result = InstrumentedTestResult(None, None, None, None)
943
 
        feature = tests.Feature()
 
957
        feature = features.Feature()
944
958
        class Test(tests.TestCase):
945
959
            def test_function(self):
946
960
                raise tests.UnavailableFeature(feature)
965
979
    def test_strict_with_known_failure(self):
966
980
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
967
981
                                             verbosity=1)
968
 
        test = self.get_passing_test()
969
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
970
 
        result.addExpectedFailure(test, err)
 
982
        test = _get_test("test_xfail")
 
983
        test.run(result)
971
984
        self.assertFalse(result.wasStrictlySuccessful())
972
985
        self.assertEqual(None, result._extractBenchmarkTime(test))
973
986
 
1005
1018
        self.assertEquals(2, result.count)
1006
1019
 
1007
1020
 
1008
 
class TestUnicodeFilenameFeature(tests.TestCase):
1009
 
 
1010
 
    def test_probe_passes(self):
1011
 
        """UnicodeFilenameFeature._probe passes."""
1012
 
        # We can't test much more than that because the behaviour depends
1013
 
        # on the platform.
1014
 
        tests.UnicodeFilenameFeature._probe()
1015
 
 
1016
 
 
1017
1021
class TestRunner(tests.TestCase):
1018
1022
 
1019
1023
    def dummy_test(self):
1094
1098
            "FAIL: \\S+\.test_truth\n"
1095
1099
            "-+\n"
1096
1100
            "(?:.*\n)*"
1097
 
            "No absolute truth\n"
 
1101
            "\\s*(?:Text attachment: )?reason"
 
1102
            "(?:\n-+\n|: {{{)"
 
1103
            "No absolute truth"
 
1104
            "(?:\n-+\n|}}}\n)"
1098
1105
            "(?:.*\n)*"
1099
1106
            "-+\n"
1100
1107
            "Ran 1 test in .*\n"
1154
1161
        class SkippedTest(tests.TestCase):
1155
1162
 
1156
1163
            def setUp(self):
1157
 
                tests.TestCase.setUp(self)
 
1164
                super(SkippedTest, self).setUp()
1158
1165
                calls.append('setUp')
1159
1166
                self.addCleanup(self.cleanup)
1160
1167
 
1190
1197
 
1191
1198
    def test_unsupported_features_listed(self):
1192
1199
        """When unsupported features are encountered they are detailed."""
1193
 
        class Feature1(tests.Feature):
 
1200
        class Feature1(features.Feature):
1194
1201
            def _probe(self): return False
1195
 
        class Feature2(tests.Feature):
 
1202
        class Feature2(features.Feature):
1196
1203
            def _probe(self): return False
1197
1204
        # create sample tests
1198
1205
        test1 = SampleTestCase('_test_pass')
1265
1272
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1266
1273
            FailureWithUnicode("test_log_unicode"))
1267
1274
        self.assertContainsRe(out.getvalue(),
1268
 
            "Text attachment: log\n"
1269
 
            "-+\n"
1270
 
            "\d+\.\d+  \\\\u2606\n"
1271
 
            "-+\n")
 
1275
            "(?:Text attachment: )?log"
 
1276
            "(?:\n-+\n|: {{{)"
 
1277
            "\d+\.\d+  \\\\u2606"
 
1278
            "(?:\n-+\n|}}}\n)")
1272
1279
 
1273
1280
 
1274
1281
class SampleTestCase(tests.TestCase):
1475
1482
 
1476
1483
        Each self.time() call is individually and separately profiled.
1477
1484
        """
1478
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
1485
        self.requireFeature(features.lsprof_feature)
1479
1486
        # overrides the class member with an instance member so no cleanup
1480
1487
        # needed.
1481
1488
        self._gather_lsprof_in_benchmarks = True
1500
1507
        transport_server = memory.MemoryServer()
1501
1508
        transport_server.start_server()
1502
1509
        self.addCleanup(transport_server.stop_server)
1503
 
        t = transport.get_transport(transport_server.get_url())
1504
 
        bzrdir.BzrDir.create(t.base)
 
1510
        t = transport.get_transport_from_url(transport_server.get_url())
 
1511
        controldir.ControlDir.create(t.base)
1505
1512
        self.assertRaises(errors.BzrError,
1506
 
            bzrdir.BzrDir.open_from_transport, t)
 
1513
            controldir.ControlDir.open_from_transport, t)
1507
1514
        # But if we declare this as safe, we can open the bzrdir.
1508
1515
        self.permit_url(t.base)
1509
1516
        self._bzr_selftest_roots.append(t.base)
1510
 
        bzrdir.BzrDir.open_from_transport(t)
 
1517
        controldir.ControlDir.open_from_transport(t)
1511
1518
 
1512
1519
    def test_requireFeature_available(self):
1513
1520
        """self.requireFeature(available) is a no-op."""
1514
 
        class Available(tests.Feature):
 
1521
        class Available(features.Feature):
1515
1522
            def _probe(self):return True
1516
1523
        feature = Available()
1517
1524
        self.requireFeature(feature)
1518
1525
 
1519
1526
    def test_requireFeature_unavailable(self):
1520
1527
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1521
 
        class Unavailable(tests.Feature):
 
1528
        class Unavailable(features.Feature):
1522
1529
            def _probe(self):return False
1523
1530
        feature = Unavailable()
1524
1531
        self.assertRaises(tests.UnavailableFeature,
1647
1654
        self.assertRaises(AssertionError,
1648
1655
            self.assertListRaises, _TestException, success_generator)
1649
1656
 
 
1657
    def _run_successful_test(self, test):
 
1658
        result = testtools.TestResult()
 
1659
        test.run(result)
 
1660
        self.assertTrue(result.wasSuccessful())
 
1661
        return result
 
1662
 
1650
1663
    def test_overrideAttr_without_value(self):
1651
1664
        self.test_attr = 'original' # Define a test attribute
1652
1665
        obj = self # Make 'obj' visible to the embedded test
1653
1666
        class Test(tests.TestCase):
1654
1667
 
1655
1668
            def setUp(self):
1656
 
                tests.TestCase.setUp(self)
 
1669
                super(Test, self).setUp()
1657
1670
                self.orig = self.overrideAttr(obj, 'test_attr')
1658
1671
 
1659
1672
            def test_value(self):
1662
1675
                obj.test_attr = 'modified'
1663
1676
                self.assertEqual('modified', obj.test_attr)
1664
1677
 
1665
 
        test = Test('test_value')
1666
 
        test.run(unittest.TestResult())
 
1678
        self._run_successful_test(Test('test_value'))
1667
1679
        self.assertEqual('original', obj.test_attr)
1668
1680
 
1669
1681
    def test_overrideAttr_with_value(self):
1672
1684
        class Test(tests.TestCase):
1673
1685
 
1674
1686
            def setUp(self):
1675
 
                tests.TestCase.setUp(self)
 
1687
                super(Test, self).setUp()
1676
1688
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1677
1689
 
1678
1690
            def test_value(self):
1679
1691
                self.assertEqual('original', self.orig)
1680
1692
                self.assertEqual('modified', obj.test_attr)
1681
1693
 
1682
 
        test = Test('test_value')
1683
 
        test.run(unittest.TestResult())
 
1694
        self._run_successful_test(Test('test_value'))
1684
1695
        self.assertEqual('original', obj.test_attr)
1685
1696
 
1686
 
 
1687
 
class _MissingFeature(tests.Feature):
 
1697
    def test_overrideAttr_with_no_existing_value_and_value(self):
 
1698
        # Do not define the test_attribute
 
1699
        obj = self # Make 'obj' visible to the embedded test
 
1700
        class Test(tests.TestCase):
 
1701
 
 
1702
            def setUp(self):
 
1703
                tests.TestCase.setUp(self)
 
1704
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1705
 
 
1706
            def test_value(self):
 
1707
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1708
                self.assertEqual('modified', obj.test_attr)
 
1709
 
 
1710
        self._run_successful_test(Test('test_value'))
 
1711
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1712
 
 
1713
    def test_overrideAttr_with_no_existing_value_and_no_value(self):
 
1714
        # Do not define the test_attribute
 
1715
        obj = self # Make 'obj' visible to the embedded test
 
1716
        class Test(tests.TestCase):
 
1717
 
 
1718
            def setUp(self):
 
1719
                tests.TestCase.setUp(self)
 
1720
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1721
 
 
1722
            def test_value(self):
 
1723
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1724
                self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1725
 
 
1726
        self._run_successful_test(Test('test_value'))
 
1727
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1728
 
 
1729
    def test_recordCalls(self):
 
1730
        from bzrlib.tests import test_selftest
 
1731
        calls = self.recordCalls(
 
1732
            test_selftest, '_add_numbers')
 
1733
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1734
            12)
 
1735
        self.assertEquals(calls, [((2, 10), {})])
 
1736
 
 
1737
 
 
1738
def _add_numbers(a, b):
 
1739
    return a + b
 
1740
 
 
1741
 
 
1742
class _MissingFeature(features.Feature):
1688
1743
    def _probe(self):
1689
1744
        return False
1690
1745
missing_feature = _MissingFeature()
1741
1796
        result = self._run_test('test_fail')
1742
1797
        self.assertEqual(1, len(result.failures))
1743
1798
        result_content = result.failures[0][1]
1744
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1799
        self.assertContainsRe(result_content,
 
1800
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1745
1801
        self.assertContainsRe(result_content, 'this was a failing test')
1746
1802
 
1747
1803
    def test_error_has_log(self):
1748
1804
        result = self._run_test('test_error')
1749
1805
        self.assertEqual(1, len(result.errors))
1750
1806
        result_content = result.errors[0][1]
1751
 
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1807
        self.assertContainsRe(result_content,
 
1808
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1752
1809
        self.assertContainsRe(result_content, 'this test errored')
1753
1810
 
1754
1811
    def test_skip_has_no_log(self):
1773
1830
        result = self._run_test('test_xfail')
1774
1831
        self.assertEqual(1, len(result.expectedFailures))
1775
1832
        result_content = result.expectedFailures[0][1]
1776
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1833
        self.assertNotContainsRe(result_content,
 
1834
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1777
1835
        self.assertNotContainsRe(result_content, 'test with expected failure')
1778
1836
 
1779
1837
    def test_unexpected_success_has_log(self):
1952
2010
    def test_make_branch_and_tree_with_format(self):
1953
2011
        # we should be able to supply a format to make_branch_and_tree
1954
2012
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1955
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
2013
        self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1956
2014
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1957
2015
 
1958
2016
    def test_make_branch_and_memory_tree(self):
2036
2094
        self.assertLength(2, output.readlines())
2037
2095
 
2038
2096
    def test_lsprof_tests(self):
2039
 
        self.requireFeature(test_lsprof.LSProfFeature)
 
2097
        self.requireFeature(features.lsprof_feature)
2040
2098
        results = []
2041
2099
        class Test(object):
2042
2100
            def __call__(test, result):
2188
2246
        self.assertNotContainsRe(content, 'test with expected failure')
2189
2247
        self.assertEqual(1, len(result.expectedFailures))
2190
2248
        result_content = result.expectedFailures[0][1]
2191
 
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2249
        self.assertNotContainsRe(result_content,
 
2250
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2192
2251
        self.assertNotContainsRe(result_content, 'test with expected failure')
2193
2252
 
2194
2253
    def test_unexpected_success_has_log(self):
2389
2448
    """Base class for tests testing how we might run bzr."""
2390
2449
 
2391
2450
    def setUp(self):
2392
 
        tests.TestCaseWithTransport.setUp(self)
 
2451
        super(TestWithFakedStartBzrSubprocess, self).setUp()
2393
2452
        self.subprocess_calls = []
2394
2453
 
2395
2454
    def start_bzr_subprocess(self, process_args, env_changes=None,
2505
2564
 
2506
2565
 
2507
2566
class TestStartBzrSubProcess(tests.TestCase):
 
2567
    """Stub test start_bzr_subprocess."""
2508
2568
 
2509
 
    def check_popen_state(self):
2510
 
        """Replace to make assertions when popen is called."""
 
2569
    def _subprocess_log_cleanup(self):
 
2570
        """Inhibits the base version as we don't produce a log file."""
2511
2571
 
2512
2572
    def _popen(self, *args, **kwargs):
2513
 
        """Record the command that is run, so that we can ensure it is correct"""
 
2573
        """Override the base version to record the command that is run.
 
2574
 
 
2575
        From there we can ensure it is correct without spawning a real process.
 
2576
        """
2514
2577
        self.check_popen_state()
2515
2578
        self._popen_args = args
2516
2579
        self._popen_kwargs = kwargs
2517
2580
        raise _DontSpawnProcess()
2518
2581
 
 
2582
    def check_popen_state(self):
 
2583
        """Replace to make assertions when popen is called."""
 
2584
 
2519
2585
    def test_run_bzr_subprocess_no_plugins(self):
2520
2586
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2521
2587
        command = self._popen_args[0]
2525
2591
 
2526
2592
    def test_allow_plugins(self):
2527
2593
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2528
 
            allow_plugins=True)
 
2594
                          allow_plugins=True)
2529
2595
        command = self._popen_args[0]
2530
2596
        self.assertEqual([], command[2:])
2531
2597
 
2536
2602
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2537
2603
        self.check_popen_state = check_environment
2538
2604
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2539
 
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2605
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
2540
2606
        # not set in theparent
2541
2607
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2542
2608
 
2548
2614
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2549
2615
        self.check_popen_state = check_environment
2550
2616
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2551
 
            env_changes={'EXISTANT_ENV_VAR':None})
 
2617
                          env_changes={'EXISTANT_ENV_VAR':None})
2552
2618
        # Still set in parent
2553
2619
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2554
2620
        del os.environ['EXISTANT_ENV_VAR']
2559
2625
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2560
2626
        self.check_popen_state = check_environment
2561
2627
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2562
 
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2628
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
2563
2629
 
2564
2630
    def test_working_dir(self):
2565
2631
        """Test that we can specify the working dir for the child"""
2568
2634
        chdirs = []
2569
2635
        def chdir(path):
2570
2636
            chdirs.append(path)
2571
 
        os.chdir = chdir
2572
 
        try:
2573
 
            def getcwd():
2574
 
                return 'current'
2575
 
            osutils.getcwd = getcwd
2576
 
            try:
2577
 
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2578
 
                    working_dir='foo')
2579
 
            finally:
2580
 
                osutils.getcwd = orig_getcwd
2581
 
        finally:
2582
 
            os.chdir = orig_chdir
 
2637
        self.overrideAttr(os, 'chdir', chdir)
 
2638
        def getcwd():
 
2639
            return 'current'
 
2640
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2641
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2642
                          working_dir='foo')
2583
2643
        self.assertEqual(['foo', 'current'], chdirs)
2584
2644
 
2585
2645
    def test_get_bzr_path_with_cwd_bzrlib(self):
2605
2665
        self.assertEqual('bzr: interrupted\n', result[1])
2606
2666
 
2607
2667
 
2608
 
class TestFeature(tests.TestCase):
2609
 
 
2610
 
    def test_caching(self):
2611
 
        """Feature._probe is called by the feature at most once."""
2612
 
        class InstrumentedFeature(tests.Feature):
2613
 
            def __init__(self):
2614
 
                super(InstrumentedFeature, self).__init__()
2615
 
                self.calls = []
2616
 
            def _probe(self):
2617
 
                self.calls.append('_probe')
2618
 
                return False
2619
 
        feature = InstrumentedFeature()
2620
 
        feature.available()
2621
 
        self.assertEqual(['_probe'], feature.calls)
2622
 
        feature.available()
2623
 
        self.assertEqual(['_probe'], feature.calls)
2624
 
 
2625
 
    def test_named_str(self):
2626
 
        """Feature.__str__ should thunk to feature_name()."""
2627
 
        class NamedFeature(tests.Feature):
2628
 
            def feature_name(self):
2629
 
                return 'symlinks'
2630
 
        feature = NamedFeature()
2631
 
        self.assertEqual('symlinks', str(feature))
2632
 
 
2633
 
    def test_default_str(self):
2634
 
        """Feature.__str__ should default to __class__.__name__."""
2635
 
        class NamedFeature(tests.Feature):
2636
 
            pass
2637
 
        feature = NamedFeature()
2638
 
        self.assertEqual('NamedFeature', str(feature))
2639
 
 
2640
 
 
2641
 
class TestUnavailableFeature(tests.TestCase):
2642
 
 
2643
 
    def test_access_feature(self):
2644
 
        feature = tests.Feature()
2645
 
        exception = tests.UnavailableFeature(feature)
2646
 
        self.assertIs(feature, exception.args[0])
2647
 
 
2648
 
 
2649
 
simple_thunk_feature = tests._CompatabilityThunkFeature(
2650
 
    deprecated_in((2, 1, 0)),
2651
 
    'bzrlib.tests.test_selftest',
2652
 
    'simple_thunk_feature','UnicodeFilename',
2653
 
    replacement_module='bzrlib.tests'
2654
 
    )
2655
 
 
2656
 
class Test_CompatibilityFeature(tests.TestCase):
2657
 
 
2658
 
    def test_does_thunk(self):
2659
 
        res = self.callDeprecated(
2660
 
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2661
 
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2662
 
            simple_thunk_feature.available)
2663
 
        self.assertEqual(tests.UnicodeFilename.available(), res)
2664
 
 
2665
 
 
2666
 
class TestModuleAvailableFeature(tests.TestCase):
2667
 
 
2668
 
    def test_available_module(self):
2669
 
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
2670
 
        self.assertEqual('bzrlib.tests', feature.module_name)
2671
 
        self.assertEqual('bzrlib.tests', str(feature))
2672
 
        self.assertTrue(feature.available())
2673
 
        self.assertIs(tests, feature.module)
2674
 
 
2675
 
    def test_unavailable_module(self):
2676
 
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2677
 
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2678
 
        self.assertFalse(feature.available())
2679
 
        self.assertIs(None, feature.module)
2680
 
 
2681
 
 
2682
2668
class TestSelftestFiltering(tests.TestCase):
2683
2669
 
2684
2670
    def setUp(self):
2685
 
        tests.TestCase.setUp(self)
 
2671
        super(TestSelftestFiltering, self).setUp()
2686
2672
        self.suite = TestUtil.TestSuite()
2687
2673
        self.loader = TestUtil.TestLoader()
2688
2674
        self.suite.addTest(self.loader.loadTestsFromModule(
3383
3369
        self.assertLength(1, calls)
3384
3370
 
3385
3371
 
 
3372
class _Selftest(object):
 
3373
    """Mixin for tests needing full selftest output"""
 
3374
 
 
3375
    def _inject_stream_into_subunit(self, stream):
 
3376
        """To be overridden by subclasses that run tests out of process"""
 
3377
 
 
3378
    def _run_selftest(self, **kwargs):
 
3379
        sio = StringIO()
 
3380
        self._inject_stream_into_subunit(sio)
 
3381
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3382
        return sio.getvalue()
 
3383
 
 
3384
 
 
3385
class _ForkedSelftest(_Selftest):
 
3386
    """Mixin for tests needing full selftest output with forked children"""
 
3387
 
 
3388
    _test_needs_features = [features.subunit]
 
3389
 
 
3390
    def _inject_stream_into_subunit(self, stream):
 
3391
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3392
 
 
3393
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3394
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3395
        """
 
3396
        from subunit import ProtocolTestCase
 
3397
        _original_init = ProtocolTestCase.__init__
 
3398
        def _init_with_passthrough(self, *args, **kwargs):
 
3399
            _original_init(self, *args, **kwargs)
 
3400
            self._passthrough = stream
 
3401
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3402
 
 
3403
    def _run_selftest(self, **kwargs):
 
3404
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3405
        if getattr(os, "fork", None) is None:
 
3406
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3407
        # Make sure the fork code is actually invoked by claiming two cores
 
3408
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3409
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3410
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3411
 
 
3412
 
 
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3414
    """Check operation of --parallel=fork selftest option"""
 
3415
 
 
3416
    def test_error_in_child_during_fork(self):
 
3417
        """Error in a forked child during test setup should get reported"""
 
3418
        class Test(tests.TestCase):
 
3419
            def testMethod(self):
 
3420
                pass
 
3421
        # We don't care what, just break something that a child will run
 
3422
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3423
        out = self._run_selftest(test_suite_factory=Test)
 
3424
        # Lines from the tracebacks of the two child processes may be mixed
 
3425
        # together due to the way subunit parses and forwards the streams,
 
3426
        # so permit extra lines between each part of the error output.
 
3427
        self.assertContainsRe(out,
 
3428
            "Traceback.*:\n"
 
3429
            "(?:.*\n)*"
 
3430
            ".+ in fork_for_tests\n"
 
3431
            "(?:.*\n)*"
 
3432
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3433
            "(?:.*\n)*"
 
3434
            "TypeError:")
 
3435
 
 
3436
 
 
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3438
    """Check a test case still alive after being run emits a warning"""
 
3439
 
 
3440
    class Test(tests.TestCase):
 
3441
        def test_pass(self):
 
3442
            pass
 
3443
        def test_self_ref(self):
 
3444
            self.also_self = self.test_self_ref
 
3445
        def test_skip(self):
 
3446
            self.skip("Don't need")
 
3447
 
 
3448
    def _get_suite(self):
 
3449
        return TestUtil.TestSuite([
 
3450
            self.Test("test_pass"),
 
3451
            self.Test("test_self_ref"),
 
3452
            self.Test("test_skip"),
 
3453
            ])
 
3454
 
 
3455
    def _run_selftest_with_suite(self, **kwargs):
 
3456
        old_flags = tests.selftest_debug_flags
 
3457
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3458
        gc_on = gc.isenabled()
 
3459
        if gc_on:
 
3460
            gc.disable()
 
3461
        try:
 
3462
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3463
                **kwargs)
 
3464
        finally:
 
3465
            if gc_on:
 
3466
                gc.enable()
 
3467
            tests.selftest_debug_flags = old_flags
 
3468
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3469
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3470
        return output
 
3471
 
 
3472
    def test_testsuite(self):
 
3473
        self._run_selftest_with_suite()
 
3474
 
 
3475
    def test_pattern(self):
 
3476
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3477
        self.assertNotContainsRe(out, "test_skip")
 
3478
 
 
3479
    def test_exclude_pattern(self):
 
3480
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3481
        self.assertNotContainsRe(out, "test_skip")
 
3482
 
 
3483
    def test_random_seed(self):
 
3484
        self._run_selftest_with_suite(random_seed="now")
 
3485
 
 
3486
    def test_matching_tests_first(self):
 
3487
        self._run_selftest_with_suite(matching_tests_first=True,
 
3488
            pattern="test_self_ref$")
 
3489
 
 
3490
    def test_starting_with_and_exclude(self):
 
3491
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3492
            exclude_pattern="test_skip$")
 
3493
        self.assertNotContainsRe(out, "test_skip")
 
3494
 
 
3495
    def test_additonal_decorator(self):
 
3496
        out = self._run_selftest_with_suite(
 
3497
            suite_decorators=[tests.TestDecorator])
 
3498
 
 
3499
 
 
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3501
    """Check warnings from tests staying alive are emitted with subunit"""
 
3502
 
 
3503
    _test_needs_features = [features.subunit]
 
3504
 
 
3505
    def _run_selftest_with_suite(self, **kwargs):
 
3506
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3507
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3508
 
 
3509
 
 
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3511
    """Check warnings from tests staying alive are emitted when forking"""
 
3512
 
 
3513
 
3386
3514
class TestEnvironHandling(tests.TestCase):
3387
3515
 
3388
3516
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3522
3650
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3523
3651
        # tests.DocTestSuite sees None
3524
3652
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3653
 
 
3654
 
 
3655
class TestSelftestExcludePatterns(tests.TestCase):
 
3656
 
 
3657
    def setUp(self):
 
3658
        super(TestSelftestExcludePatterns, self).setUp()
 
3659
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3660
 
 
3661
    def suite_factory(self, keep_only=None, starting_with=None):
 
3662
        """A test suite factory with only a few tests."""
 
3663
        class Test(tests.TestCase):
 
3664
            def id(self):
 
3665
                # We don't need the full class path
 
3666
                return self._testMethodName
 
3667
            def a(self):
 
3668
                pass
 
3669
            def b(self):
 
3670
                pass
 
3671
            def c(self):
 
3672
                pass
 
3673
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3674
 
 
3675
    def assertTestList(self, expected, *selftest_args):
 
3676
        # We rely on setUp installing the right test suite factory so we can
 
3677
        # test at the command level without loading the whole test suite
 
3678
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3679
        actual = out.splitlines()
 
3680
        self.assertEquals(expected, actual)
 
3681
 
 
3682
    def test_full_list(self):
 
3683
        self.assertTestList(['a', 'b', 'c'])
 
3684
 
 
3685
    def test_single_exclude(self):
 
3686
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3687
 
 
3688
    def test_mutiple_excludes(self):
 
3689
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3690
 
 
3691
 
 
3692
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3693
 
 
3694
    _test_needs_features = [features.subunit]
 
3695
 
 
3696
    def setUp(self):
 
3697
        super(TestCounterHooks, self).setUp()
 
3698
        class Test(tests.TestCase):
 
3699
 
 
3700
            def setUp(self):
 
3701
                super(Test, self).setUp()
 
3702
                self.hooks = hooks.Hooks()
 
3703
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
 
3704
                self.install_counter_hook(self.hooks, 'myhook')
 
3705
 
 
3706
            def no_hook(self):
 
3707
                pass
 
3708
 
 
3709
            def run_hook_once(self):
 
3710
                for hook in self.hooks['myhook']:
 
3711
                    hook(self)
 
3712
 
 
3713
        self.test_class = Test
 
3714
 
 
3715
    def assertHookCalls(self, expected_calls, test_name):
 
3716
        test = self.test_class(test_name)
 
3717
        result = unittest.TestResult()
 
3718
        test.run(result)
 
3719
        self.assertTrue(hasattr(test, '_counters'))
 
3720
        self.assertTrue(test._counters.has_key('myhook'))
 
3721
        self.assertEquals(expected_calls, test._counters['myhook'])
 
3722
 
 
3723
    def test_no_hook(self):
 
3724
        self.assertHookCalls(0, 'no_hook')
 
3725
 
 
3726
    def test_run_hook_once(self):
 
3727
        tt = features.testtools
 
3728
        if tt.module.__version__ < (0, 9, 8):
 
3729
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3730
        self.assertHookCalls(1, 'run_hook_once')