~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin Pool
  • Date: 2009-09-14 01:48:28 UTC
  • mfrom: (4685 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4688.
  • Revision ID: mbp@sourcefrog.net-20090914014828-ydr9rlkdfq2sv57z
Merge news

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import signal
21
22
import sys
22
23
import time
23
24
import unittest
40
41
    workingtree,
41
42
    )
42
43
from bzrlib.repofmt import (
 
44
    groupcompress_repo,
43
45
    pack_repo,
44
46
    weaverepo,
45
47
    )
49
51
    deprecated_method,
50
52
    )
51
53
from bzrlib.tests import (
 
54
    SubUnitFeature,
52
55
    test_lsprof,
53
56
    test_sftp_transport,
54
57
    TestUtil,
124
127
        self.assertEqual(sample_permutation,
125
128
                         get_transport_test_permutations(MockModule()))
126
129
 
127
 
    def test_scenarios_invlude_all_modules(self):
 
130
    def test_scenarios_include_all_modules(self):
128
131
        # this checks that the scenario generator returns as many permutations
129
132
        # as there are in all the registered transport modules - we assume if
130
133
        # this matches its probably doing the right thing especially in
215
218
        from bzrlib.tests.per_repository import formats_to_scenarios
216
219
        formats = [("(c)", remote.RemoteRepositoryFormat()),
217
220
                   ("(d)", repository.format_registry.get(
218
 
                        'Bazaar pack repository format 1 (needs bzr 0.92)\n'))]
 
221
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
219
222
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
220
223
            None)
221
224
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
222
225
            vfs_transport_factory="vfs")
223
226
        # no_vfs generate scenarios without vfs_transport_factory
224
 
        self.assertEqual([
 
227
        expected = [
225
228
            ('RemoteRepositoryFormat(c)',
226
229
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
227
230
              'repository_format': remote.RemoteRepositoryFormat(),
228
231
              'transport_readonly_server': 'readonly',
229
232
              'transport_server': 'server'}),
230
 
            ('RepositoryFormatKnitPack1(d)',
 
233
            ('RepositoryFormat2a(d)',
231
234
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
232
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
235
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
233
236
              'transport_readonly_server': 'readonly',
234
 
              'transport_server': 'server'})],
235
 
            no_vfs_scenarios)
 
237
              'transport_server': 'server'})]
 
238
        self.assertEqual(expected, no_vfs_scenarios)
236
239
        self.assertEqual([
237
240
            ('RemoteRepositoryFormat(c)',
238
241
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
240
243
              'transport_readonly_server': 'readonly',
241
244
              'transport_server': 'server',
242
245
              'vfs_transport_factory': 'vfs'}),
243
 
            ('RepositoryFormatKnitPack1(d)',
 
246
            ('RepositoryFormat2a(d)',
244
247
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
245
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
248
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
246
249
              'transport_readonly_server': 'readonly',
247
250
              'transport_server': 'server',
248
251
              'vfs_transport_factory': 'vfs'})],
293
296
        from bzrlib.tests.per_interrepository import make_scenarios
294
297
        server1 = "a"
295
298
        server2 = "b"
296
 
        formats = [(str, "C1", "C2"), (int, "D1", "D2")]
 
299
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
297
300
        scenarios = make_scenarios(server1, server2, formats)
298
301
        self.assertEqual([
299
 
            ('str,str,str',
300
 
             {'interrepo_class': str,
301
 
              'repository_format': 'C1',
 
302
            ('C0,str,str',
 
303
             {'repository_format': 'C1',
302
304
              'repository_format_to': 'C2',
303
305
              'transport_readonly_server': 'b',
304
306
              'transport_server': 'a'}),
305
 
            ('int,str,str',
306
 
             {'interrepo_class': int,
307
 
              'repository_format': 'D1',
 
307
            ('D0,str,str',
 
308
             {'repository_format': 'D1',
308
309
              'repository_format_to': 'D2',
309
310
              'transport_readonly_server': 'b',
310
311
              'transport_server': 'a'})],
597
598
                l.attempt_lock()
598
599
        test = TestDanglingLock('test_function')
599
600
        result = test.run()
600
 
        self.assertEqual(1, len(result.errors))
 
601
        if self._lock_check_thorough:
 
602
            self.assertEqual(1, len(result.errors))
 
603
        else:
 
604
            # When _lock_check_thorough is disabled, then we don't trigger a
 
605
            # failure
 
606
            self.assertEqual(0, len(result.errors))
601
607
 
602
608
 
603
609
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
681
687
        self.assertEqual(url, t.clone('..').base)
682
688
 
683
689
 
684
 
class MockProgress(progress._BaseProgressBar):
685
 
    """Progress-bar standin that records calls.
686
 
 
687
 
    Useful for testing pb using code.
688
 
    """
689
 
 
690
 
    def __init__(self):
691
 
        progress._BaseProgressBar.__init__(self)
692
 
        self.calls = []
693
 
 
694
 
    def tick(self):
695
 
        self.calls.append(('tick',))
696
 
 
697
 
    def update(self, msg=None, current=None, total=None):
698
 
        self.calls.append(('update', msg, current, total))
699
 
 
700
 
    def clear(self):
701
 
        self.calls.append(('clear',))
702
 
 
703
 
    def note(self, msg, *args):
704
 
        self.calls.append(('note', msg, args))
 
690
class TestProfileResult(tests.TestCase):
 
691
 
 
692
    def test_profiles_tests(self):
 
693
        self.requireFeature(test_lsprof.LSProfFeature)
 
694
        terminal = unittest.TestResult()
 
695
        result = tests.ProfileResult(terminal)
 
696
        class Sample(tests.TestCase):
 
697
            def a(self):
 
698
                self.sample_function()
 
699
            def sample_function(self):
 
700
                pass
 
701
        test = Sample("a")
 
702
        test.attrs_to_keep = test.attrs_to_keep + ('_benchcalls',)
 
703
        test.run(result)
 
704
        self.assertLength(1, test._benchcalls)
 
705
        # We must be able to unpack it as the test reporting code wants
 
706
        (_, _, _), stats = test._benchcalls[0]
 
707
        self.assertTrue(callable(stats.pprint))
705
708
 
706
709
 
707
710
class TestTestResult(tests.TestCase):
817
820
    def test_known_failure(self):
818
821
        """A KnownFailure being raised should trigger several result actions."""
819
822
        class InstrumentedTestResult(tests.ExtendedTestResult):
820
 
            def done(self): pass
 
823
            def stopTestRun(self): pass
821
824
            def startTests(self): pass
822
825
            def report_test_start(self, test): pass
823
826
            def report_known_failure(self, test, err):
862
865
        self.assertEqual(lines[1], '    foo')
863
866
        self.assertEqual(2, len(lines))
864
867
 
865
 
    def test_text_report_known_failure(self):
866
 
        # text test output formatting
867
 
        pb = MockProgress()
868
 
        result = bzrlib.tests.TextTestResult(
869
 
            StringIO(),
870
 
            descriptions=0,
871
 
            verbosity=1,
872
 
            pb=pb,
873
 
            )
874
 
        test = self.get_passing_test()
875
 
        # this seeds the state to handle reporting the test.
876
 
        result.startTest(test)
877
 
        # the err parameter has the shape:
878
 
        # (class, exception object, traceback)
879
 
        # KnownFailures dont get their tracebacks shown though, so we
880
 
        # can skip that.
881
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
882
 
        result.report_known_failure(test, err)
883
 
        self.assertEqual(
884
 
            [
885
 
            ('update', '[1 in 0s] passing_test', None, None),
886
 
            ('note', 'XFAIL: %s\n%s\n', ('passing_test', err[1]))
887
 
            ],
888
 
            pb.calls)
889
 
        # known_failures should be printed in the summary, so if we run a test
890
 
        # after there are some known failures, the update prefix should match
891
 
        # this.
892
 
        result.known_failure_count = 3
893
 
        test.run(result)
894
 
        self.assertEqual(
895
 
            [
896
 
            ('update', '[2 in 0s] passing_test', None, None),
897
 
            ],
898
 
            pb.calls[2:])
899
 
 
900
868
    def get_passing_test(self):
901
869
        """Return a test object that can't be run usefully."""
902
870
        def passing_test():
906
874
    def test_add_not_supported(self):
907
875
        """Test the behaviour of invoking addNotSupported."""
908
876
        class InstrumentedTestResult(tests.ExtendedTestResult):
909
 
            def done(self): pass
 
877
            def stopTestRun(self): pass
910
878
            def startTests(self): pass
911
879
            def report_test_start(self, test): pass
912
880
            def report_unsupported(self, test, feature):
947
915
        self.assertEqual(lines, ['NODEP        0ms',
948
916
                                 "    The feature 'Feature' is not available."])
949
917
 
950
 
    def test_text_report_unsupported(self):
951
 
        # text test output formatting
952
 
        pb = MockProgress()
953
 
        result = bzrlib.tests.TextTestResult(
954
 
            StringIO(),
955
 
            descriptions=0,
956
 
            verbosity=1,
957
 
            pb=pb,
958
 
            )
959
 
        test = self.get_passing_test()
960
 
        feature = tests.Feature()
961
 
        # this seeds the state to handle reporting the test.
962
 
        result.startTest(test)
963
 
        result.report_unsupported(test, feature)
964
 
        # no output on unsupported features
965
 
        self.assertEqual(
966
 
            [('update', '[1 in 0s] passing_test', None, None)
967
 
            ],
968
 
            pb.calls)
969
 
        # the number of missing features should be printed in the progress
970
 
        # summary, so check for that.
971
 
        result.unsupported = {'foo':0, 'bar':0}
972
 
        test.run(result)
973
 
        self.assertEqual(
974
 
            [
975
 
            ('update', '[2 in 0s, 2 missing] passing_test', None, None),
976
 
            ],
977
 
            pb.calls[1:])
978
 
 
979
918
    def test_unavailable_exception(self):
980
919
        """An UnavailableFeature being raised should invoke addNotSupported."""
981
920
        class InstrumentedTestResult(tests.ExtendedTestResult):
982
 
            def done(self): pass
 
921
            def stopTestRun(self): pass
983
922
            def startTests(self): pass
984
923
            def report_test_start(self, test): pass
985
924
            def addNotSupported(self, test, feature):
1062
1001
        because of our use of global state.
1063
1002
        """
1064
1003
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1004
        old_leak = tests.TestCase._first_thread_leaker_id
1065
1005
        try:
1066
1006
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1007
            tests.TestCase._first_thread_leaker_id = None
1067
1008
            return testrunner.run(test)
1068
1009
        finally:
1069
1010
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1011
            tests.TestCase._first_thread_leaker_id = old_leak
1070
1012
 
1071
1013
    def test_known_failure_failed_run(self):
1072
1014
        # run a test that generates a known failure which should be printed in
1082
1024
        runner = tests.TextTestRunner(stream=stream)
1083
1025
        result = self.run_test_runner(runner, test)
1084
1026
        lines = stream.getvalue().splitlines()
1085
 
        self.assertEqual([
1086
 
            '',
1087
 
            '======================================================================',
1088
 
            'FAIL: unittest.FunctionTestCase (failing_test)',
1089
 
            '----------------------------------------------------------------------',
1090
 
            'Traceback (most recent call last):',
1091
 
            '    raise AssertionError(\'foo\')',
1092
 
            'AssertionError: foo',
1093
 
            '',
1094
 
            '----------------------------------------------------------------------',
1095
 
            '',
1096
 
            'FAILED (failures=1, known_failure_count=1)'],
1097
 
            lines[3:8] + lines[9:13] + lines[14:])
 
1027
        self.assertContainsRe(stream.getvalue(),
 
1028
            '(?sm)^testing.*$'
 
1029
            '.*'
 
1030
            '^======================================================================\n'
 
1031
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1032
            '^----------------------------------------------------------------------\n'
 
1033
            'Traceback \\(most recent call last\\):\n'
 
1034
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1035
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1036
            '.*'
 
1037
            '^----------------------------------------------------------------------\n'
 
1038
            '.*'
 
1039
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1040
            )
1098
1041
 
1099
1042
    def test_known_failure_ok_run(self):
1100
1043
        # run a test that generates a known failure which should be printed in the final output.
1111
1054
            '\n'
1112
1055
            'OK \\(known_failures=1\\)\n')
1113
1056
 
 
1057
    def test_result_decorator(self):
 
1058
        # decorate results
 
1059
        calls = []
 
1060
        class LoggingDecorator(tests.ForwardingResult):
 
1061
            def startTest(self, test):
 
1062
                tests.ForwardingResult.startTest(self, test)
 
1063
                calls.append('start')
 
1064
        test = unittest.FunctionTestCase(lambda:None)
 
1065
        stream = StringIO()
 
1066
        runner = tests.TextTestRunner(stream=stream,
 
1067
            result_decorators=[LoggingDecorator])
 
1068
        result = self.run_test_runner(runner, test)
 
1069
        self.assertLength(1, calls)
 
1070
 
1114
1071
    def test_skipped_test(self):
1115
1072
        # run a test that is skipped, and check the suite as a whole still
1116
1073
        # succeeds.
1183
1140
        self.assertContainsRe(out.getvalue(),
1184
1141
                r'(?m)^    this test never runs')
1185
1142
 
1186
 
    def test_not_applicable_demo(self):
1187
 
        # just so you can see it in the test output
1188
 
        raise tests.TestNotApplicable('this test is just a demonstation')
1189
 
 
1190
1143
    def test_unsupported_features_listed(self):
1191
1144
        """When unsupported features are encountered they are detailed."""
1192
1145
        class Feature1(tests.Feature):
1341
1294
        self.assertContainsRe(log, 'this will be kept')
1342
1295
        self.assertEqual(log, test._log_contents)
1343
1296
 
 
1297
    def test_startTestRun(self):
 
1298
        """run should call result.startTestRun()"""
 
1299
        calls = []
 
1300
        class LoggingDecorator(tests.ForwardingResult):
 
1301
            def startTestRun(self):
 
1302
                tests.ForwardingResult.startTestRun(self)
 
1303
                calls.append('startTestRun')
 
1304
        test = unittest.FunctionTestCase(lambda:None)
 
1305
        stream = StringIO()
 
1306
        runner = tests.TextTestRunner(stream=stream,
 
1307
            result_decorators=[LoggingDecorator])
 
1308
        result = self.run_test_runner(runner, test)
 
1309
        self.assertLength(1, calls)
 
1310
 
 
1311
    def test_stopTestRun(self):
 
1312
        """run should call result.stopTestRun()"""
 
1313
        calls = []
 
1314
        class LoggingDecorator(tests.ForwardingResult):
 
1315
            def stopTestRun(self):
 
1316
                tests.ForwardingResult.stopTestRun(self)
 
1317
                calls.append('stopTestRun')
 
1318
        test = unittest.FunctionTestCase(lambda:None)
 
1319
        stream = StringIO()
 
1320
        runner = tests.TextTestRunner(stream=stream,
 
1321
            result_decorators=[LoggingDecorator])
 
1322
        result = self.run_test_runner(runner, test)
 
1323
        self.assertLength(1, calls)
 
1324
 
1344
1325
 
1345
1326
class SampleTestCase(tests.TestCase):
1346
1327
 
1350
1331
class _TestException(Exception):
1351
1332
    pass
1352
1333
 
 
1334
 
1353
1335
class TestTestCase(tests.TestCase):
1354
1336
    """Tests that test the core bzrlib TestCase."""
1355
1337
 
1404
1386
        # we could set something and run a test that will check
1405
1387
        # it gets santised, but this is probably sufficient for now:
1406
1388
        # if someone runs the test with -Dsomething it will error.
1407
 
        self.assertEqual(set(), bzrlib.debug.debug_flags)
 
1389
        flags = set()
 
1390
        if self._lock_check_thorough:
 
1391
            flags.add('strict_locks')
 
1392
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1408
1393
 
1409
1394
    def change_selftest_debug_flags(self, new_flags):
1410
1395
        orig_selftest_flags = tests.selftest_debug_flags
1425
1410
                self.flags = set(bzrlib.debug.debug_flags)
1426
1411
        test = TestThatRecordsFlags('test_foo')
1427
1412
        test.run(self.make_test_result())
1428
 
        self.assertEqual(set(['a-flag']), self.flags)
 
1413
        flags = set(['a-flag'])
 
1414
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1415
            flags.add('strict_locks')
 
1416
        self.assertEqual(flags, self.flags)
 
1417
 
 
1418
    def test_disable_lock_checks(self):
 
1419
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1420
        class TestThatRecordsFlags(tests.TestCase):
 
1421
            def test_foo(nested_self):
 
1422
                self.flags = set(bzrlib.debug.debug_flags)
 
1423
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1424
        self.change_selftest_debug_flags(set())
 
1425
        test = TestThatRecordsFlags('test_foo')
 
1426
        test.run(self.make_test_result())
 
1427
        # By default we do strict lock checking and thorough lock/unlock
 
1428
        # tracking.
 
1429
        self.assertTrue(self.test_lock_check_thorough)
 
1430
        self.assertEqual(set(['strict_locks']), self.flags)
 
1431
        # Now set the disable_lock_checks flag, and show that this changed.
 
1432
        self.change_selftest_debug_flags(set(['disable_lock_checks']))
 
1433
        test = TestThatRecordsFlags('test_foo')
 
1434
        test.run(self.make_test_result())
 
1435
        self.assertFalse(self.test_lock_check_thorough)
 
1436
        self.assertEqual(set(), self.flags)
 
1437
 
 
1438
    def test_this_fails_strict_lock_check(self):
 
1439
        class TestThatRecordsFlags(tests.TestCase):
 
1440
            def test_foo(nested_self):
 
1441
                self.flags1 = set(bzrlib.debug.debug_flags)
 
1442
                self.thisFailsStrictLockCheck()
 
1443
                self.flags2 = set(bzrlib.debug.debug_flags)
 
1444
        # Make sure lock checking is active
 
1445
        self.change_selftest_debug_flags(set())
 
1446
        test = TestThatRecordsFlags('test_foo')
 
1447
        test.run(self.make_test_result())
 
1448
        self.assertEqual(set(['strict_locks']), self.flags1)
 
1449
        self.assertEqual(set(), self.flags2)
1429
1450
 
1430
1451
    def test_debug_flags_restored(self):
1431
1452
        """The bzrlib debug flags should be restored to their original state
1472
1493
        outer_test = TestTestCase("outer_child")
1473
1494
        result = self.make_test_result()
1474
1495
        outer_test.run(result)
 
1496
        self.addCleanup(osutils.delete_any, outer_test._log_file_name)
1475
1497
        self.assertEqual(original_trace, bzrlib.trace._trace_file)
1476
1498
 
1477
1499
    def method_that_times_a_bit_twice(self):
1486
1508
        result = bzrlib.tests.VerboseTestResult(
1487
1509
            unittest._WritelnDecorator(output_stream),
1488
1510
            descriptions=0,
1489
 
            verbosity=2,
1490
 
            num_tests=sample_test.countTestCases())
 
1511
            verbosity=2)
1491
1512
        sample_test.run(result)
1492
1513
        self.assertContainsRe(
1493
1514
            output_stream.getvalue(),
1521
1542
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1522
1543
        self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
1523
1544
        self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
 
1545
        del self._benchcalls[:]
1524
1546
 
1525
1547
    def test_knownFailure(self):
1526
1548
        """Self.knownFailure() should raise a KnownFailure exception."""
1701
1723
        self.assertEndsWith('foo', 'oo')
1702
1724
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1703
1725
 
 
1726
    def test_assertEqualDiff(self):
 
1727
        e = self.assertRaises(AssertionError,
 
1728
                              self.assertEqualDiff, '', '\n')
 
1729
        self.assertEquals(str(e),
 
1730
                          # Don't blink ! The '+' applies to the second string
 
1731
                          'first string is missing a final newline.\n+ \n')
 
1732
        e = self.assertRaises(AssertionError,
 
1733
                              self.assertEqualDiff, '\n', '')
 
1734
        self.assertEquals(str(e),
 
1735
                          # Don't blink ! The '-' applies to the second string
 
1736
                          'second string is missing a final newline.\n- \n')
 
1737
 
 
1738
 
 
1739
class TestDeprecations(tests.TestCase):
 
1740
 
1704
1741
    def test_applyDeprecated_not_deprecated(self):
1705
1742
        sample_object = ApplyDeprecatedHelper()
1706
1743
        # calling an undeprecated callable raises an assertion
1783
1820
        tree = self.make_branch_and_memory_tree('a')
1784
1821
        self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
1785
1822
 
1786
 
 
1787
 
class TestSFTPMakeBranchAndTree(test_sftp_transport.TestCaseWithSFTPServer):
1788
 
 
1789
 
    def test_make_tree_for_sftp_branch(self):
1790
 
        """Transports backed by local directories create local trees."""
1791
 
 
 
1823
    def test_make_tree_for_local_vfs_backed_transport(self):
 
1824
        # make_branch_and_tree has to use local branch and repositories
 
1825
        # when the vfs transport and local disk are colocated, even if
 
1826
        # a different transport is in use for url generation.
 
1827
        from bzrlib.transport.fakevfat import FakeVFATServer
 
1828
        self.transport_server = FakeVFATServer
 
1829
        self.assertFalse(self.get_url('t1').startswith('file://'))
1792
1830
        tree = self.make_branch_and_tree('t1')
1793
1831
        base = tree.bzrdir.root_transport.base
1794
 
        self.failIf(base.startswith('sftp'),
1795
 
                'base %r is on sftp but should be local' % base)
 
1832
        self.assertStartsWith(base, 'file://')
1796
1833
        self.assertEquals(tree.bzrdir.root_transport,
1797
1834
                tree.branch.bzrdir.root_transport)
1798
1835
        self.assertEquals(tree.bzrdir.root_transport,
1799
1836
                tree.branch.repository.bzrdir.root_transport)
1800
1837
 
1801
1838
 
1802
 
class TestSelftest(tests.TestCase):
 
1839
class SelfTestHelper:
 
1840
 
 
1841
    def run_selftest(self, **kwargs):
 
1842
        """Run selftest returning its output."""
 
1843
        output = StringIO()
 
1844
        old_transport = bzrlib.tests.default_transport
 
1845
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
1846
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
1847
        try:
 
1848
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
1849
        finally:
 
1850
            bzrlib.tests.default_transport = old_transport
 
1851
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
1852
        output.seek(0)
 
1853
        return output
 
1854
 
 
1855
 
 
1856
class TestSelftest(tests.TestCase, SelfTestHelper):
1803
1857
    """Tests of bzrlib.tests.selftest."""
1804
1858
 
1805
1859
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
1813
1867
            test_suite_factory=factory)
1814
1868
        self.assertEqual([True], factory_called)
1815
1869
 
 
1870
    def factory(self):
 
1871
        """A test suite factory."""
 
1872
        class Test(tests.TestCase):
 
1873
            def a(self):
 
1874
                pass
 
1875
            def b(self):
 
1876
                pass
 
1877
            def c(self):
 
1878
                pass
 
1879
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
1880
 
 
1881
    def test_list_only(self):
 
1882
        output = self.run_selftest(test_suite_factory=self.factory,
 
1883
            list_only=True)
 
1884
        self.assertEqual(3, len(output.readlines()))
 
1885
 
 
1886
    def test_list_only_filtered(self):
 
1887
        output = self.run_selftest(test_suite_factory=self.factory,
 
1888
            list_only=True, pattern="Test.b")
 
1889
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
1890
        self.assertLength(1, output.readlines())
 
1891
 
 
1892
    def test_list_only_excludes(self):
 
1893
        output = self.run_selftest(test_suite_factory=self.factory,
 
1894
            list_only=True, exclude_pattern="Test.b")
 
1895
        self.assertNotContainsRe("Test.b", output.getvalue())
 
1896
        self.assertLength(2, output.readlines())
 
1897
 
 
1898
    def test_lsprof_tests(self):
 
1899
        self.requireFeature(test_lsprof.LSProfFeature)
 
1900
        calls = []
 
1901
        class Test(object):
 
1902
            def __call__(test, result):
 
1903
                test.run(result)
 
1904
            def run(test, result):
 
1905
                self.assertIsInstance(result, tests.ForwardingResult)
 
1906
                calls.append("called")
 
1907
            def countTestCases(self):
 
1908
                return 1
 
1909
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
 
1910
        self.assertLength(1, calls)
 
1911
 
 
1912
    def test_random(self):
 
1913
        # test randomising by listing a number of tests.
 
1914
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
1915
            list_only=True, random_seed="123")
 
1916
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
1917
            list_only=True, random_seed="234")
 
1918
        self.assertNotEqual(output_123, output_234)
 
1919
        # "Randominzing test order..\n\n
 
1920
        self.assertLength(5, output_123.readlines())
 
1921
        self.assertLength(5, output_234.readlines())
 
1922
 
 
1923
    def test_random_reuse_is_same_order(self):
 
1924
        # test randomising by listing a number of tests.
 
1925
        expected = self.run_selftest(test_suite_factory=self.factory,
 
1926
            list_only=True, random_seed="123")
 
1927
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
1928
            list_only=True, random_seed="123")
 
1929
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
1930
 
 
1931
    def test_runner_class(self):
 
1932
        self.requireFeature(SubUnitFeature)
 
1933
        from subunit import ProtocolTestCase
 
1934
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
1935
            test_suite_factory=self.factory)
 
1936
        test = ProtocolTestCase(stream)
 
1937
        result = unittest.TestResult()
 
1938
        test.run(result)
 
1939
        self.assertEqual(3, result.testsRun)
 
1940
 
 
1941
    def test_starting_with_single_argument(self):
 
1942
        output = self.run_selftest(test_suite_factory=self.factory,
 
1943
            starting_with=['bzrlib.tests.test_selftest.Test.a'],
 
1944
            list_only=True)
 
1945
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
 
1946
            output.getvalue())
 
1947
 
 
1948
    def test_starting_with_multiple_argument(self):
 
1949
        output = self.run_selftest(test_suite_factory=self.factory,
 
1950
            starting_with=['bzrlib.tests.test_selftest.Test.a',
 
1951
                'bzrlib.tests.test_selftest.Test.b'],
 
1952
            list_only=True)
 
1953
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
 
1954
            'bzrlib.tests.test_selftest.Test.b\n',
 
1955
            output.getvalue())
 
1956
 
 
1957
    def check_transport_set(self, transport_server):
 
1958
        captured_transport = []
 
1959
        def seen_transport(a_transport):
 
1960
            captured_transport.append(a_transport)
 
1961
        class Capture(tests.TestCase):
 
1962
            def a(self):
 
1963
                seen_transport(bzrlib.tests.default_transport)
 
1964
        def factory():
 
1965
            return TestUtil.TestSuite([Capture("a")])
 
1966
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
1967
        self.assertEqual(transport_server, captured_transport[0])
 
1968
 
 
1969
    def test_transport_sftp(self):
 
1970
        try:
 
1971
            import bzrlib.transport.sftp
 
1972
        except errors.ParamikoNotPresent:
 
1973
            raise tests.TestSkipped("Paramiko not present")
 
1974
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
1975
 
 
1976
    def test_transport_memory(self):
 
1977
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
1978
 
 
1979
 
 
1980
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
1981
    # Does IO: reads test.list
 
1982
 
 
1983
    def test_load_list(self):
 
1984
        # Provide a list with one test - this test.
 
1985
        test_id_line = '%s\n' % self.id()
 
1986
        self.build_tree_contents([('test.list', test_id_line)])
 
1987
        # And generate a list of the tests in  the suite.
 
1988
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
1989
        self.assertEqual(test_id_line, stream.getvalue())
 
1990
 
 
1991
    def test_load_unknown(self):
 
1992
        # Provide a list with one test - this test.
 
1993
        # And generate a list of the tests in  the suite.
 
1994
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
1995
            load_list='missing file name', list_only=True)
 
1996
 
 
1997
 
 
1998
class TestRunBzr(tests.TestCase):
 
1999
 
 
2000
    out = ''
 
2001
    err = ''
 
2002
 
 
2003
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
2004
                         working_dir=None):
 
2005
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
2006
 
 
2007
        Attempts to run bzr from inside this class don't actually run it.
 
2008
 
 
2009
        We test how run_bzr actually invokes bzr in another location.
 
2010
        Here we only need to test that it is run_bzr passes the right
 
2011
        parameters to run_bzr.
 
2012
        """
 
2013
        self.argv = list(argv)
 
2014
        self.retcode = retcode
 
2015
        self.encoding = encoding
 
2016
        self.stdin = stdin
 
2017
        self.working_dir = working_dir
 
2018
        return self.out, self.err
 
2019
 
 
2020
    def test_run_bzr_error(self):
 
2021
        self.out = "It sure does!\n"
 
2022
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
2023
        self.assertEqual(['rocks'], self.argv)
 
2024
        self.assertEqual(34, self.retcode)
 
2025
        self.assertEqual(out, 'It sure does!\n')
 
2026
 
 
2027
    def test_run_bzr_error_regexes(self):
 
2028
        self.out = ''
 
2029
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
2030
        out, err = self.run_bzr_error(
 
2031
                ["bzr: ERROR: foobarbaz is not versioned"],
 
2032
                ['file-id', 'foobarbaz'])
 
2033
 
 
2034
    def test_encoding(self):
 
2035
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
2036
        self.run_bzr('foo bar')
 
2037
        self.assertEqual(None, self.encoding)
 
2038
        self.assertEqual(['foo', 'bar'], self.argv)
 
2039
 
 
2040
        self.run_bzr('foo bar', encoding='baz')
 
2041
        self.assertEqual('baz', self.encoding)
 
2042
        self.assertEqual(['foo', 'bar'], self.argv)
 
2043
 
 
2044
    def test_retcode(self):
 
2045
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
2046
        # Default is retcode == 0
 
2047
        self.run_bzr('foo bar')
 
2048
        self.assertEqual(0, self.retcode)
 
2049
        self.assertEqual(['foo', 'bar'], self.argv)
 
2050
 
 
2051
        self.run_bzr('foo bar', retcode=1)
 
2052
        self.assertEqual(1, self.retcode)
 
2053
        self.assertEqual(['foo', 'bar'], self.argv)
 
2054
 
 
2055
        self.run_bzr('foo bar', retcode=None)
 
2056
        self.assertEqual(None, self.retcode)
 
2057
        self.assertEqual(['foo', 'bar'], self.argv)
 
2058
 
 
2059
        self.run_bzr(['foo', 'bar'], retcode=3)
 
2060
        self.assertEqual(3, self.retcode)
 
2061
        self.assertEqual(['foo', 'bar'], self.argv)
 
2062
 
 
2063
    def test_stdin(self):
 
2064
        # test that the stdin keyword to run_bzr is passed through to
 
2065
        # _run_bzr_core as-is. We do this by overriding
 
2066
        # _run_bzr_core in this class, and then calling run_bzr,
 
2067
        # which is a convenience function for _run_bzr_core, so
 
2068
        # should invoke it.
 
2069
        self.run_bzr('foo bar', stdin='gam')
 
2070
        self.assertEqual('gam', self.stdin)
 
2071
        self.assertEqual(['foo', 'bar'], self.argv)
 
2072
 
 
2073
        self.run_bzr('foo bar', stdin='zippy')
 
2074
        self.assertEqual('zippy', self.stdin)
 
2075
        self.assertEqual(['foo', 'bar'], self.argv)
 
2076
 
 
2077
    def test_working_dir(self):
 
2078
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
2079
        self.run_bzr('foo bar')
 
2080
        self.assertEqual(None, self.working_dir)
 
2081
        self.assertEqual(['foo', 'bar'], self.argv)
 
2082
 
 
2083
        self.run_bzr('foo bar', working_dir='baz')
 
2084
        self.assertEqual('baz', self.working_dir)
 
2085
        self.assertEqual(['foo', 'bar'], self.argv)
 
2086
 
 
2087
    def test_reject_extra_keyword_arguments(self):
 
2088
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
2089
                          error_regex=['error message'])
 
2090
 
 
2091
 
 
2092
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2093
    # Does IO when testing the working_dir parameter.
 
2094
 
 
2095
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2096
                         a_callable=None, *args, **kwargs):
 
2097
        self.stdin = stdin
 
2098
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
2099
        self.factory = bzrlib.ui.ui_factory
 
2100
        self.working_dir = osutils.getcwd()
 
2101
        stdout.write('foo\n')
 
2102
        stderr.write('bar\n')
 
2103
        return 0
 
2104
 
 
2105
    def test_stdin(self):
 
2106
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2107
        # apply_redirected as a StringIO. We do this by overriding
 
2108
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2109
        # which calls apply_redirected.
 
2110
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2111
        self.assertEqual('gam', self.stdin.read())
 
2112
        self.assertTrue(self.stdin is self.factory_stdin)
 
2113
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2114
        self.assertEqual('zippy', self.stdin.read())
 
2115
        self.assertTrue(self.stdin is self.factory_stdin)
 
2116
 
 
2117
    def test_ui_factory(self):
 
2118
        # each invocation of self.run_bzr should get its
 
2119
        # own UI factory, which is an instance of TestUIFactory,
 
2120
        # with stdin, stdout and stderr attached to the stdin,
 
2121
        # stdout and stderr of the invoked run_bzr
 
2122
        current_factory = bzrlib.ui.ui_factory
 
2123
        self.run_bzr(['foo'])
 
2124
        self.failIf(current_factory is self.factory)
 
2125
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2126
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2127
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2128
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2129
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2130
 
 
2131
    def test_working_dir(self):
 
2132
        self.build_tree(['one/', 'two/'])
 
2133
        cwd = osutils.getcwd()
 
2134
 
 
2135
        # Default is to work in the current directory
 
2136
        self.run_bzr(['foo', 'bar'])
 
2137
        self.assertEqual(cwd, self.working_dir)
 
2138
 
 
2139
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2140
        self.assertEqual(cwd, self.working_dir)
 
2141
 
 
2142
        # The function should be run in the alternative directory
 
2143
        # but afterwards the current working dir shouldn't be changed
 
2144
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2145
        self.assertNotEqual(cwd, self.working_dir)
 
2146
        self.assertEndsWith(self.working_dir, 'one')
 
2147
        self.assertEqual(cwd, osutils.getcwd())
 
2148
 
 
2149
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2150
        self.assertNotEqual(cwd, self.working_dir)
 
2151
        self.assertEndsWith(self.working_dir, 'two')
 
2152
        self.assertEqual(cwd, osutils.getcwd())
 
2153
 
 
2154
 
 
2155
class StubProcess(object):
 
2156
    """A stub process for testing run_bzr_subprocess."""
 
2157
    
 
2158
    def __init__(self, out="", err="", retcode=0):
 
2159
        self.out = out
 
2160
        self.err = err
 
2161
        self.returncode = retcode
 
2162
 
 
2163
    def communicate(self):
 
2164
        return self.out, self.err
 
2165
 
 
2166
 
 
2167
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
 
2168
    """Base class for tests testing how we might run bzr."""
 
2169
 
 
2170
    def setUp(self):
 
2171
        tests.TestCaseWithTransport.setUp(self)
 
2172
        self.subprocess_calls = []
 
2173
 
 
2174
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2175
                             skip_if_plan_to_signal=False,
 
2176
                             working_dir=None,
 
2177
                             allow_plugins=False):
 
2178
        """capture what run_bzr_subprocess tries to do."""
 
2179
        self.subprocess_calls.append({'process_args':process_args,
 
2180
            'env_changes':env_changes,
 
2181
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2182
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2183
        return self.next_subprocess
 
2184
 
 
2185
 
 
2186
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2187
 
 
2188
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2189
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2190
 
 
2191
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2192
        that will return static results. This assertion method populates those
 
2193
        results and also checks the arguments run_bzr_subprocess generates.
 
2194
        """
 
2195
        self.next_subprocess = process
 
2196
        try:
 
2197
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2198
        except:
 
2199
            self.next_subprocess = None
 
2200
            for key, expected in expected_args.iteritems():
 
2201
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2202
            raise
 
2203
        else:
 
2204
            self.next_subprocess = None
 
2205
            for key, expected in expected_args.iteritems():
 
2206
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2207
            return result
 
2208
 
 
2209
    def test_run_bzr_subprocess(self):
 
2210
        """The run_bzr_helper_external command behaves nicely."""
 
2211
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2212
            StubProcess(), '--version')
 
2213
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2214
            StubProcess(), ['--version'])
 
2215
        # retcode=None disables retcode checking
 
2216
        result = self.assertRunBzrSubprocess({},
 
2217
            StubProcess(retcode=3), '--version', retcode=None)
 
2218
        result = self.assertRunBzrSubprocess({},
 
2219
            StubProcess(out="is free software"), '--version')
 
2220
        self.assertContainsRe(result[0], 'is free software')
 
2221
        # Running a subcommand that is missing errors
 
2222
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2223
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2224
            '--versionn')
 
2225
        # Unless it is told to expect the error from the subprocess
 
2226
        result = self.assertRunBzrSubprocess({},
 
2227
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2228
        # Or to ignore retcode checking
 
2229
        result = self.assertRunBzrSubprocess({},
 
2230
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2231
            retcode=None)
 
2232
        self.assertContainsRe(result[1], 'unknown command')
 
2233
 
 
2234
    def test_env_change_passes_through(self):
 
2235
        self.assertRunBzrSubprocess(
 
2236
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2237
            StubProcess(), '',
 
2238
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2239
 
 
2240
    def test_no_working_dir_passed_as_None(self):
 
2241
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2242
 
 
2243
    def test_no_working_dir_passed_through(self):
 
2244
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2245
            working_dir='dir')
 
2246
 
 
2247
    def test_run_bzr_subprocess_no_plugins(self):
 
2248
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2249
            StubProcess(), '')
 
2250
 
 
2251
    def test_allow_plugins(self):
 
2252
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2253
            StubProcess(), '', allow_plugins=True)
 
2254
 
 
2255
 
 
2256
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2257
 
 
2258
    def test_finish_bzr_subprocess_with_error(self):
 
2259
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2260
        """
 
2261
        process = StubProcess(err="unknown command", retcode=3)
 
2262
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2263
        self.assertEqual('', result[0])
 
2264
        self.assertContainsRe(result[1], 'unknown command')
 
2265
 
 
2266
    def test_finish_bzr_subprocess_ignoring_retcode(self):
 
2267
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2268
        process = StubProcess(err="unknown command", retcode=3)
 
2269
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2270
        self.assertEqual('', result[0])
 
2271
        self.assertContainsRe(result[1], 'unknown command')
 
2272
 
 
2273
    def test_finish_subprocess_with_unexpected_retcode(self):
 
2274
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2275
        not the expected one.
 
2276
        """
 
2277
        process = StubProcess(err="unknown command", retcode=3)
 
2278
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2279
                          process)
 
2280
 
 
2281
 
 
2282
class _DontSpawnProcess(Exception):
 
2283
    """A simple exception which just allows us to skip unnecessary steps"""
 
2284
 
 
2285
 
 
2286
class TestStartBzrSubProcess(tests.TestCase):
 
2287
 
 
2288
    def check_popen_state(self):
 
2289
        """Replace to make assertions when popen is called."""
 
2290
 
 
2291
    def _popen(self, *args, **kwargs):
 
2292
        """Record the command that is run, so that we can ensure it is correct"""
 
2293
        self.check_popen_state()
 
2294
        self._popen_args = args
 
2295
        self._popen_kwargs = kwargs
 
2296
        raise _DontSpawnProcess()
 
2297
 
 
2298
    def test_run_bzr_subprocess_no_plugins(self):
 
2299
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2300
        command = self._popen_args[0]
 
2301
        self.assertEqual(sys.executable, command[0])
 
2302
        self.assertEqual(self.get_bzr_path(), command[1])
 
2303
        self.assertEqual(['--no-plugins'], command[2:])
 
2304
 
 
2305
    def test_allow_plugins(self):
 
2306
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2307
            allow_plugins=True)
 
2308
        command = self._popen_args[0]
 
2309
        self.assertEqual([], command[2:])
 
2310
 
 
2311
    def test_set_env(self):
 
2312
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2313
        # set in the child
 
2314
        def check_environment():
 
2315
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2316
        self.check_popen_state = check_environment
 
2317
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2318
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2319
        # not set in theparent
 
2320
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2321
 
 
2322
    def test_run_bzr_subprocess_env_del(self):
 
2323
        """run_bzr_subprocess can remove environment variables too."""
 
2324
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2325
        def check_environment():
 
2326
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2327
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2328
        self.check_popen_state = check_environment
 
2329
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2330
            env_changes={'EXISTANT_ENV_VAR':None})
 
2331
        # Still set in parent
 
2332
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2333
        del os.environ['EXISTANT_ENV_VAR']
 
2334
 
 
2335
    def test_env_del_missing(self):
 
2336
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2337
        def check_environment():
 
2338
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2339
        self.check_popen_state = check_environment
 
2340
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2341
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2342
 
 
2343
    def test_working_dir(self):
 
2344
        """Test that we can specify the working dir for the child"""
 
2345
        orig_getcwd = osutils.getcwd
 
2346
        orig_chdir = os.chdir
 
2347
        chdirs = []
 
2348
        def chdir(path):
 
2349
            chdirs.append(path)
 
2350
        os.chdir = chdir
 
2351
        try:
 
2352
            def getcwd():
 
2353
                return 'current'
 
2354
            osutils.getcwd = getcwd
 
2355
            try:
 
2356
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2357
                    working_dir='foo')
 
2358
            finally:
 
2359
                osutils.getcwd = orig_getcwd
 
2360
        finally:
 
2361
            os.chdir = orig_chdir
 
2362
        self.assertEqual(['foo', 'current'], chdirs)
 
2363
 
 
2364
 
 
2365
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
 
2366
    """Tests that really need to do things with an external bzr."""
 
2367
 
 
2368
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2369
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2370
        not the expected one.
 
2371
        """
 
2372
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2373
                                            skip_if_plan_to_signal=True)
 
2374
        self.assertEqual('running\n', process.stdout.readline())
 
2375
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2376
                                            retcode=3)
 
2377
        self.assertEqual('', result[0])
 
2378
        self.assertEqual('bzr: interrupted\n', result[1])
 
2379
 
1816
2380
 
1817
2381
class TestKnownFailure(tests.TestCase):
1818
2382
 
1883
2447
        tests.TestCase.setUp(self)
1884
2448
        self.suite = TestUtil.TestSuite()
1885
2449
        self.loader = TestUtil.TestLoader()
1886
 
        self.suite.addTest(self.loader.loadTestsFromModuleNames([
1887
 
            'bzrlib.tests.test_selftest']))
 
2450
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2451
            sys.modules['bzrlib.tests.test_selftest']))
1888
2452
        self.all_names = _test_ids(self.suite)
1889
2453
 
1890
2454
    def test_condition_id_re(self):
2200
2764
 
2201
2765
class TestTestSuite(tests.TestCase):
2202
2766
 
 
2767
    def test__test_suite_testmod_names(self):
 
2768
        # Test that a plausible list of test module names are returned
 
2769
        # by _test_suite_testmod_names.
 
2770
        test_list = tests._test_suite_testmod_names()
 
2771
        self.assertSubset([
 
2772
            'bzrlib.tests.blackbox',
 
2773
            'bzrlib.tests.per_transport',
 
2774
            'bzrlib.tests.test_selftest',
 
2775
            ],
 
2776
            test_list)
 
2777
 
 
2778
    def test__test_suite_modules_to_doctest(self):
 
2779
        # Test that a plausible list of modules to doctest is returned
 
2780
        # by _test_suite_modules_to_doctest.
 
2781
        test_list = tests._test_suite_modules_to_doctest()
 
2782
        self.assertSubset([
 
2783
            'bzrlib.timestamp',
 
2784
            ],
 
2785
            test_list)
 
2786
 
2203
2787
    def test_test_suite(self):
2204
 
        # This test is slow, so we do a single test with one test in each
2205
 
        # category
2206
 
        test_list = [
 
2788
        # test_suite() loads the entire test suite to operate. To avoid this
 
2789
        # overhead, and yet still be confident that things are happening,
 
2790
        # we temporarily replace two functions used by test_suite with 
 
2791
        # test doubles that supply a few sample tests to load, and check they
 
2792
        # are loaded.
 
2793
        calls = []
 
2794
        def _test_suite_testmod_names():
 
2795
            calls.append("testmod_names")
 
2796
            return [
 
2797
                'bzrlib.tests.blackbox.test_branch',
 
2798
                'bzrlib.tests.per_transport',
 
2799
                'bzrlib.tests.test_selftest',
 
2800
                ]
 
2801
        original_testmod_names = tests._test_suite_testmod_names
 
2802
        def _test_suite_modules_to_doctest():
 
2803
            calls.append("modules_to_doctest")
 
2804
            return ['bzrlib.timestamp']
 
2805
        orig_modules_to_doctest = tests._test_suite_modules_to_doctest
 
2806
        def restore_names():
 
2807
            tests._test_suite_testmod_names = original_testmod_names
 
2808
            tests._test_suite_modules_to_doctest = orig_modules_to_doctest
 
2809
        self.addCleanup(restore_names)
 
2810
        tests._test_suite_testmod_names = _test_suite_testmod_names
 
2811
        tests._test_suite_modules_to_doctest = _test_suite_modules_to_doctest
 
2812
        expected_test_list = [
2207
2813
            # testmod_names
2208
2814
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2209
2815
            ('bzrlib.tests.per_transport.TransportTests'
2214
2820
            # plugins can't be tested that way since selftest may be run with
2215
2821
            # --no-plugins
2216
2822
            ]
2217
 
        suite = tests.test_suite(test_list)
2218
 
        self.assertEquals(test_list, _test_ids(suite))
 
2823
        suite = tests.test_suite()
 
2824
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
 
2825
            set(calls))
 
2826
        self.assertSubset(expected_test_list, _test_ids(suite))
2219
2827
 
2220
2828
    def test_test_suite_list_and_start(self):
 
2829
        # We cannot test this at the same time as the main load, because we want
 
2830
        # to know that starting_with == None works. So a second load is
 
2831
        # incurred - note that the starting_with parameter causes a partial load
 
2832
        # rather than a full load so this test should be pretty quick.
2221
2833
        test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
2222
2834
        suite = tests.test_suite(test_list,
2223
2835
                                 ['bzrlib.tests.test_selftest.TestTestSuite'])
2368
2980
                return tests.ExtendedTestResult(self.stream, self.descriptions,
2369
2981
                                                self.verbosity)
2370
2982
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2371
 
        self.assertEqual(calls, [suite])
2372
 
 
2373
 
    def test_done(self):
2374
 
        """run_suite should call result.done()"""
2375
 
        self.calls = 0
2376
 
        def one_more_call(): self.calls += 1
2377
 
        def test_function():
2378
 
            pass
2379
 
        test = unittest.FunctionTestCase(test_function)
2380
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
2381
 
            def done(self): one_more_call()
2382
 
        class MyRunner(tests.TextTestRunner):
2383
 
            def run(self, test):
2384
 
                return InstrumentedTestResult(self.stream, self.descriptions,
2385
 
                                              self.verbosity)
2386
 
        tests.run_suite(test, runner_class=MyRunner, stream=StringIO())
2387
 
        self.assertEquals(1, self.calls)
 
2983
        self.assertLength(1, calls)