~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-13 16:23:07 UTC
  • mto: (4634.119.7 2.0)
  • mto: This revision was merged to the branch mainline in revision 4959.
  • Revision ID: john@arbash-meinel.com-20100113162307-0bs82td16gzih827
Update the MANIFEST.in file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import signal
21
22
import sys
22
23
import time
23
24
import unittest
40
41
    workingtree,
41
42
    )
42
43
from bzrlib.repofmt import (
 
44
    groupcompress_repo,
43
45
    pack_repo,
44
46
    weaverepo,
45
47
    )
49
51
    deprecated_method,
50
52
    )
51
53
from bzrlib.tests import (
 
54
    SubUnitFeature,
52
55
    test_lsprof,
53
56
    test_sftp_transport,
54
57
    TestUtil,
124
127
        self.assertEqual(sample_permutation,
125
128
                         get_transport_test_permutations(MockModule()))
126
129
 
127
 
    def test_scenarios_invlude_all_modules(self):
 
130
    def test_scenarios_include_all_modules(self):
128
131
        # this checks that the scenario generator returns as many permutations
129
132
        # as there are in all the registered transport modules - we assume if
130
133
        # this matches its probably doing the right thing especially in
215
218
        from bzrlib.tests.per_repository import formats_to_scenarios
216
219
        formats = [("(c)", remote.RemoteRepositoryFormat()),
217
220
                   ("(d)", repository.format_registry.get(
218
 
                        'Bazaar pack repository format 1 (needs bzr 0.92)\n'))]
 
221
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
219
222
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
220
223
            None)
221
224
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
222
225
            vfs_transport_factory="vfs")
223
226
        # no_vfs generate scenarios without vfs_transport_factory
224
 
        self.assertEqual([
 
227
        expected = [
225
228
            ('RemoteRepositoryFormat(c)',
226
229
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
227
230
              'repository_format': remote.RemoteRepositoryFormat(),
228
231
              'transport_readonly_server': 'readonly',
229
232
              'transport_server': 'server'}),
230
 
            ('RepositoryFormatKnitPack1(d)',
 
233
            ('RepositoryFormat2a(d)',
231
234
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
232
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
235
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
233
236
              'transport_readonly_server': 'readonly',
234
 
              'transport_server': 'server'})],
235
 
            no_vfs_scenarios)
 
237
              'transport_server': 'server'})]
 
238
        self.assertEqual(expected, no_vfs_scenarios)
236
239
        self.assertEqual([
237
240
            ('RemoteRepositoryFormat(c)',
238
241
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
240
243
              'transport_readonly_server': 'readonly',
241
244
              'transport_server': 'server',
242
245
              'vfs_transport_factory': 'vfs'}),
243
 
            ('RepositoryFormatKnitPack1(d)',
 
246
            ('RepositoryFormat2a(d)',
244
247
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
245
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
248
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
246
249
              'transport_readonly_server': 'readonly',
247
250
              'transport_server': 'server',
248
251
              'vfs_transport_factory': 'vfs'})],
293
296
        from bzrlib.tests.per_interrepository import make_scenarios
294
297
        server1 = "a"
295
298
        server2 = "b"
296
 
        formats = [(str, "C1", "C2"), (int, "D1", "D2")]
 
299
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
297
300
        scenarios = make_scenarios(server1, server2, formats)
298
301
        self.assertEqual([
299
 
            ('str,str,str',
300
 
             {'interrepo_class': str,
301
 
              'repository_format': 'C1',
 
302
            ('C0,str,str',
 
303
             {'repository_format': 'C1',
302
304
              'repository_format_to': 'C2',
303
305
              'transport_readonly_server': 'b',
304
306
              'transport_server': 'a'}),
305
 
            ('int,str,str',
306
 
             {'interrepo_class': int,
307
 
              'repository_format': 'D1',
 
307
            ('D0,str,str',
 
308
             {'repository_format': 'D1',
308
309
              'repository_format_to': 'D2',
309
310
              'transport_readonly_server': 'b',
310
311
              'transport_server': 'a'})],
597
598
                l.attempt_lock()
598
599
        test = TestDanglingLock('test_function')
599
600
        result = test.run()
600
 
        self.assertEqual(1, len(result.errors))
 
601
        if self._lock_check_thorough:
 
602
            self.assertEqual(1, len(result.errors))
 
603
        else:
 
604
            # When _lock_check_thorough is disabled, then we don't trigger a
 
605
            # failure
 
606
            self.assertEqual(0, len(result.errors))
601
607
 
602
608
 
603
609
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
681
687
        self.assertEqual(url, t.clone('..').base)
682
688
 
683
689
 
684
 
class MockProgress(progress._BaseProgressBar):
685
 
    """Progress-bar standin that records calls.
686
 
 
687
 
    Useful for testing pb using code.
688
 
    """
689
 
 
690
 
    def __init__(self):
691
 
        progress._BaseProgressBar.__init__(self)
692
 
        self.calls = []
693
 
 
694
 
    def tick(self):
695
 
        self.calls.append(('tick',))
696
 
 
697
 
    def update(self, msg=None, current=None, total=None):
698
 
        self.calls.append(('update', msg, current, total))
699
 
 
700
 
    def clear(self):
701
 
        self.calls.append(('clear',))
702
 
 
703
 
    def note(self, msg, *args):
704
 
        self.calls.append(('note', msg, args))
705
 
 
706
 
 
707
690
class TestTestResult(tests.TestCase):
708
691
 
709
692
    def check_timing(self, test_case, expected_re):
862
845
        self.assertEqual(lines[1], '    foo')
863
846
        self.assertEqual(2, len(lines))
864
847
 
865
 
    def test_text_report_known_failure(self):
866
 
        # text test output formatting
867
 
        pb = MockProgress()
868
 
        result = bzrlib.tests.TextTestResult(
869
 
            StringIO(),
870
 
            descriptions=0,
871
 
            verbosity=1,
872
 
            pb=pb,
873
 
            )
874
 
        test = self.get_passing_test()
875
 
        # this seeds the state to handle reporting the test.
876
 
        result.startTest(test)
877
 
        # the err parameter has the shape:
878
 
        # (class, exception object, traceback)
879
 
        # KnownFailures dont get their tracebacks shown though, so we
880
 
        # can skip that.
881
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
882
 
        result.report_known_failure(test, err)
883
 
        self.assertEqual(
884
 
            [
885
 
            ('update', '[1 in 0s] passing_test', None, None),
886
 
            ('note', 'XFAIL: %s\n%s\n', ('passing_test', err[1]))
887
 
            ],
888
 
            pb.calls)
889
 
        # known_failures should be printed in the summary, so if we run a test
890
 
        # after there are some known failures, the update prefix should match
891
 
        # this.
892
 
        result.known_failure_count = 3
893
 
        test.run(result)
894
 
        self.assertEqual(
895
 
            [
896
 
            ('update', '[2 in 0s] passing_test', None, None),
897
 
            ],
898
 
            pb.calls[2:])
899
 
 
900
848
    def get_passing_test(self):
901
849
        """Return a test object that can't be run usefully."""
902
850
        def passing_test():
944
892
        result.report_unsupported(test, feature)
945
893
        output = result_stream.getvalue()[prefix:]
946
894
        lines = output.splitlines()
 
895
        # XXX: This is a timing dependent test. I've had it fail because it
 
896
        #      took 6ms to evaluate... :(
947
897
        self.assertEqual(lines, ['NODEP        0ms',
948
898
                                 "    The feature 'Feature' is not available."])
949
899
 
950
 
    def test_text_report_unsupported(self):
951
 
        # text test output formatting
952
 
        pb = MockProgress()
953
 
        result = bzrlib.tests.TextTestResult(
954
 
            StringIO(),
955
 
            descriptions=0,
956
 
            verbosity=1,
957
 
            pb=pb,
958
 
            )
959
 
        test = self.get_passing_test()
960
 
        feature = tests.Feature()
961
 
        # this seeds the state to handle reporting the test.
962
 
        result.startTest(test)
963
 
        result.report_unsupported(test, feature)
964
 
        # no output on unsupported features
965
 
        self.assertEqual(
966
 
            [('update', '[1 in 0s] passing_test', None, None)
967
 
            ],
968
 
            pb.calls)
969
 
        # the number of missing features should be printed in the progress
970
 
        # summary, so check for that.
971
 
        result.unsupported = {'foo':0, 'bar':0}
972
 
        test.run(result)
973
 
        self.assertEqual(
974
 
            [
975
 
            ('update', '[2 in 0s, 2 missing] passing_test', None, None),
976
 
            ],
977
 
            pb.calls[1:])
978
 
 
979
900
    def test_unavailable_exception(self):
980
901
        """An UnavailableFeature being raised should invoke addNotSupported."""
981
902
        class InstrumentedTestResult(tests.ExtendedTestResult):
1082
1003
        runner = tests.TextTestRunner(stream=stream)
1083
1004
        result = self.run_test_runner(runner, test)
1084
1005
        lines = stream.getvalue().splitlines()
1085
 
        self.assertEqual([
1086
 
            '',
1087
 
            '======================================================================',
1088
 
            'FAIL: unittest.FunctionTestCase (failing_test)',
1089
 
            '----------------------------------------------------------------------',
1090
 
            'Traceback (most recent call last):',
1091
 
            '    raise AssertionError(\'foo\')',
1092
 
            'AssertionError: foo',
1093
 
            '',
1094
 
            '----------------------------------------------------------------------',
1095
 
            '',
1096
 
            'FAILED (failures=1, known_failure_count=1)'],
1097
 
            lines[3:8] + lines[9:13] + lines[14:])
 
1006
        self.assertContainsRe(stream.getvalue(),
 
1007
            '(?sm)^testing.*$'
 
1008
            '.*'
 
1009
            '^======================================================================\n'
 
1010
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1011
            '^----------------------------------------------------------------------\n'
 
1012
            'Traceback \\(most recent call last\\):\n'
 
1013
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1014
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1015
            '.*'
 
1016
            '^----------------------------------------------------------------------\n'
 
1017
            '.*'
 
1018
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1019
            )
1098
1020
 
1099
1021
    def test_known_failure_ok_run(self):
1100
1022
        # run a test that generates a known failure which should be printed in the final output.
1350
1272
class _TestException(Exception):
1351
1273
    pass
1352
1274
 
 
1275
 
1353
1276
class TestTestCase(tests.TestCase):
1354
1277
    """Tests that test the core bzrlib TestCase."""
1355
1278
 
1404
1327
        # we could set something and run a test that will check
1405
1328
        # it gets santised, but this is probably sufficient for now:
1406
1329
        # if someone runs the test with -Dsomething it will error.
1407
 
        self.assertEqual(set(), bzrlib.debug.debug_flags)
 
1330
        flags = set()
 
1331
        if self._lock_check_thorough:
 
1332
            flags.add('strict_locks')
 
1333
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1408
1334
 
1409
1335
    def change_selftest_debug_flags(self, new_flags):
1410
1336
        orig_selftest_flags = tests.selftest_debug_flags
1425
1351
                self.flags = set(bzrlib.debug.debug_flags)
1426
1352
        test = TestThatRecordsFlags('test_foo')
1427
1353
        test.run(self.make_test_result())
1428
 
        self.assertEqual(set(['a-flag']), self.flags)
 
1354
        flags = set(['a-flag'])
 
1355
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1356
            flags.add('strict_locks')
 
1357
        self.assertEqual(flags, self.flags)
 
1358
 
 
1359
    def test_disable_lock_checks(self):
 
1360
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1361
        class TestThatRecordsFlags(tests.TestCase):
 
1362
            def test_foo(nested_self):
 
1363
                self.flags = set(bzrlib.debug.debug_flags)
 
1364
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1365
        self.change_selftest_debug_flags(set())
 
1366
        test = TestThatRecordsFlags('test_foo')
 
1367
        test.run(self.make_test_result())
 
1368
        # By default we do strict lock checking and thorough lock/unlock
 
1369
        # tracking.
 
1370
        self.assertTrue(self.test_lock_check_thorough)
 
1371
        self.assertEqual(set(['strict_locks']), self.flags)
 
1372
        # Now set the disable_lock_checks flag, and show that this changed.
 
1373
        self.change_selftest_debug_flags(set(['disable_lock_checks']))
 
1374
        test = TestThatRecordsFlags('test_foo')
 
1375
        test.run(self.make_test_result())
 
1376
        self.assertFalse(self.test_lock_check_thorough)
 
1377
        self.assertEqual(set(), self.flags)
 
1378
 
 
1379
    def test_this_fails_strict_lock_check(self):
 
1380
        class TestThatRecordsFlags(tests.TestCase):
 
1381
            def test_foo(nested_self):
 
1382
                self.flags1 = set(bzrlib.debug.debug_flags)
 
1383
                self.thisFailsStrictLockCheck()
 
1384
                self.flags2 = set(bzrlib.debug.debug_flags)
 
1385
        # Make sure lock checking is active
 
1386
        self.change_selftest_debug_flags(set())
 
1387
        test = TestThatRecordsFlags('test_foo')
 
1388
        test.run(self.make_test_result())
 
1389
        self.assertEqual(set(['strict_locks']), self.flags1)
 
1390
        self.assertEqual(set(), self.flags2)
1429
1391
 
1430
1392
    def test_debug_flags_restored(self):
1431
1393
        """The bzrlib debug flags should be restored to their original state
1486
1448
        result = bzrlib.tests.VerboseTestResult(
1487
1449
            unittest._WritelnDecorator(output_stream),
1488
1450
            descriptions=0,
1489
 
            verbosity=2,
1490
 
            num_tests=sample_test.countTestCases())
 
1451
            verbosity=2)
1491
1452
        sample_test.run(result)
1492
1453
        self.assertContainsRe(
1493
1454
            output_stream.getvalue(),
1788
1749
 
1789
1750
    def test_make_tree_for_sftp_branch(self):
1790
1751
        """Transports backed by local directories create local trees."""
1791
 
 
 
1752
        # NB: This is arguably a bug in the definition of make_branch_and_tree.
1792
1753
        tree = self.make_branch_and_tree('t1')
1793
1754
        base = tree.bzrdir.root_transport.base
1794
1755
        self.failIf(base.startswith('sftp'),
1799
1760
                tree.branch.repository.bzrdir.root_transport)
1800
1761
 
1801
1762
 
1802
 
class TestSelftest(tests.TestCase):
 
1763
class SelfTestHelper:
 
1764
 
 
1765
    def run_selftest(self, **kwargs):
 
1766
        """Run selftest returning its output."""
 
1767
        output = StringIO()
 
1768
        old_transport = bzrlib.tests.default_transport
 
1769
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
1770
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
1771
        try:
 
1772
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
1773
        finally:
 
1774
            bzrlib.tests.default_transport = old_transport
 
1775
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
1776
        output.seek(0)
 
1777
        return output
 
1778
 
 
1779
 
 
1780
class TestSelftest(tests.TestCase, SelfTestHelper):
1803
1781
    """Tests of bzrlib.tests.selftest."""
1804
1782
 
1805
1783
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
1813
1791
            test_suite_factory=factory)
1814
1792
        self.assertEqual([True], factory_called)
1815
1793
 
 
1794
    def factory(self):
 
1795
        """A test suite factory."""
 
1796
        class Test(tests.TestCase):
 
1797
            def a(self):
 
1798
                pass
 
1799
            def b(self):
 
1800
                pass
 
1801
            def c(self):
 
1802
                pass
 
1803
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
1804
 
 
1805
    def test_list_only(self):
 
1806
        output = self.run_selftest(test_suite_factory=self.factory,
 
1807
            list_only=True)
 
1808
        self.assertEqual(3, len(output.readlines()))
 
1809
 
 
1810
    def test_list_only_filtered(self):
 
1811
        output = self.run_selftest(test_suite_factory=self.factory,
 
1812
            list_only=True, pattern="Test.b")
 
1813
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
1814
        self.assertLength(1, output.readlines())
 
1815
 
 
1816
    def test_list_only_excludes(self):
 
1817
        output = self.run_selftest(test_suite_factory=self.factory,
 
1818
            list_only=True, exclude_pattern="Test.b")
 
1819
        self.assertNotContainsRe("Test.b", output.getvalue())
 
1820
        self.assertLength(2, output.readlines())
 
1821
 
 
1822
    def test_random(self):
 
1823
        # test randomising by listing a number of tests.
 
1824
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
1825
            list_only=True, random_seed="123")
 
1826
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
1827
            list_only=True, random_seed="234")
 
1828
        self.assertNotEqual(output_123, output_234)
 
1829
        # "Randominzing test order..\n\n
 
1830
        self.assertLength(5, output_123.readlines())
 
1831
        self.assertLength(5, output_234.readlines())
 
1832
 
 
1833
    def test_random_reuse_is_same_order(self):
 
1834
        # test randomising by listing a number of tests.
 
1835
        expected = self.run_selftest(test_suite_factory=self.factory,
 
1836
            list_only=True, random_seed="123")
 
1837
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
1838
            list_only=True, random_seed="123")
 
1839
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
1840
 
 
1841
    def test_runner_class(self):
 
1842
        self.requireFeature(SubUnitFeature)
 
1843
        from subunit import ProtocolTestCase
 
1844
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
1845
            test_suite_factory=self.factory)
 
1846
        test = ProtocolTestCase(stream)
 
1847
        result = unittest.TestResult()
 
1848
        test.run(result)
 
1849
        self.assertEqual(3, result.testsRun)
 
1850
 
 
1851
    def test_starting_with_single_argument(self):
 
1852
        output = self.run_selftest(test_suite_factory=self.factory,
 
1853
            starting_with=['bzrlib.tests.test_selftest.Test.a'],
 
1854
            list_only=True)
 
1855
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
 
1856
            output.getvalue())
 
1857
 
 
1858
    def test_starting_with_multiple_argument(self):
 
1859
        output = self.run_selftest(test_suite_factory=self.factory,
 
1860
            starting_with=['bzrlib.tests.test_selftest.Test.a',
 
1861
                'bzrlib.tests.test_selftest.Test.b'],
 
1862
            list_only=True)
 
1863
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
 
1864
            'bzrlib.tests.test_selftest.Test.b\n',
 
1865
            output.getvalue())
 
1866
 
 
1867
    def check_transport_set(self, transport_server):
 
1868
        captured_transport = []
 
1869
        def seen_transport(a_transport):
 
1870
            captured_transport.append(a_transport)
 
1871
        class Capture(tests.TestCase):
 
1872
            def a(self):
 
1873
                seen_transport(bzrlib.tests.default_transport)
 
1874
        def factory():
 
1875
            return TestUtil.TestSuite([Capture("a")])
 
1876
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
1877
        self.assertEqual(transport_server, captured_transport[0])
 
1878
 
 
1879
    def test_transport_sftp(self):
 
1880
        try:
 
1881
            import bzrlib.transport.sftp
 
1882
        except errors.ParamikoNotPresent:
 
1883
            raise tests.TestSkipped("Paramiko not present")
 
1884
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
1885
 
 
1886
    def test_transport_memory(self):
 
1887
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
1888
 
 
1889
 
 
1890
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
1891
    # Does IO: reads test.list
 
1892
 
 
1893
    def test_load_list(self):
 
1894
        # Provide a list with one test - this test.
 
1895
        test_id_line = '%s\n' % self.id()
 
1896
        self.build_tree_contents([('test.list', test_id_line)])
 
1897
        # And generate a list of the tests in  the suite.
 
1898
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
1899
        self.assertEqual(test_id_line, stream.getvalue())
 
1900
 
 
1901
    def test_load_unknown(self):
 
1902
        # Provide a list with one test - this test.
 
1903
        # And generate a list of the tests in  the suite.
 
1904
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
1905
            load_list='missing file name', list_only=True)
 
1906
 
 
1907
 
 
1908
class TestRunBzr(tests.TestCase):
 
1909
 
 
1910
    out = ''
 
1911
    err = ''
 
1912
 
 
1913
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
1914
                         working_dir=None):
 
1915
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
1916
 
 
1917
        Attempts to run bzr from inside this class don't actually run it.
 
1918
 
 
1919
        We test how run_bzr actually invokes bzr in another location.
 
1920
        Here we only need to test that it is run_bzr passes the right
 
1921
        parameters to run_bzr.
 
1922
        """
 
1923
        self.argv = list(argv)
 
1924
        self.retcode = retcode
 
1925
        self.encoding = encoding
 
1926
        self.stdin = stdin
 
1927
        self.working_dir = working_dir
 
1928
        return self.out, self.err
 
1929
 
 
1930
    def test_run_bzr_error(self):
 
1931
        self.out = "It sure does!\n"
 
1932
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
1933
        self.assertEqual(['rocks'], self.argv)
 
1934
        self.assertEqual(34, self.retcode)
 
1935
        self.assertEqual(out, 'It sure does!\n')
 
1936
 
 
1937
    def test_run_bzr_error_regexes(self):
 
1938
        self.out = ''
 
1939
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
1940
        out, err = self.run_bzr_error(
 
1941
                ["bzr: ERROR: foobarbaz is not versioned"],
 
1942
                ['file-id', 'foobarbaz'])
 
1943
 
 
1944
    def test_encoding(self):
 
1945
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
1946
        self.run_bzr('foo bar')
 
1947
        self.assertEqual(None, self.encoding)
 
1948
        self.assertEqual(['foo', 'bar'], self.argv)
 
1949
 
 
1950
        self.run_bzr('foo bar', encoding='baz')
 
1951
        self.assertEqual('baz', self.encoding)
 
1952
        self.assertEqual(['foo', 'bar'], self.argv)
 
1953
 
 
1954
    def test_retcode(self):
 
1955
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
1956
        # Default is retcode == 0
 
1957
        self.run_bzr('foo bar')
 
1958
        self.assertEqual(0, self.retcode)
 
1959
        self.assertEqual(['foo', 'bar'], self.argv)
 
1960
 
 
1961
        self.run_bzr('foo bar', retcode=1)
 
1962
        self.assertEqual(1, self.retcode)
 
1963
        self.assertEqual(['foo', 'bar'], self.argv)
 
1964
 
 
1965
        self.run_bzr('foo bar', retcode=None)
 
1966
        self.assertEqual(None, self.retcode)
 
1967
        self.assertEqual(['foo', 'bar'], self.argv)
 
1968
 
 
1969
        self.run_bzr(['foo', 'bar'], retcode=3)
 
1970
        self.assertEqual(3, self.retcode)
 
1971
        self.assertEqual(['foo', 'bar'], self.argv)
 
1972
 
 
1973
    def test_stdin(self):
 
1974
        # test that the stdin keyword to run_bzr is passed through to
 
1975
        # _run_bzr_core as-is. We do this by overriding
 
1976
        # _run_bzr_core in this class, and then calling run_bzr,
 
1977
        # which is a convenience function for _run_bzr_core, so
 
1978
        # should invoke it.
 
1979
        self.run_bzr('foo bar', stdin='gam')
 
1980
        self.assertEqual('gam', self.stdin)
 
1981
        self.assertEqual(['foo', 'bar'], self.argv)
 
1982
 
 
1983
        self.run_bzr('foo bar', stdin='zippy')
 
1984
        self.assertEqual('zippy', self.stdin)
 
1985
        self.assertEqual(['foo', 'bar'], self.argv)
 
1986
 
 
1987
    def test_working_dir(self):
 
1988
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
1989
        self.run_bzr('foo bar')
 
1990
        self.assertEqual(None, self.working_dir)
 
1991
        self.assertEqual(['foo', 'bar'], self.argv)
 
1992
 
 
1993
        self.run_bzr('foo bar', working_dir='baz')
 
1994
        self.assertEqual('baz', self.working_dir)
 
1995
        self.assertEqual(['foo', 'bar'], self.argv)
 
1996
 
 
1997
    def test_reject_extra_keyword_arguments(self):
 
1998
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
1999
                          error_regex=['error message'])
 
2000
 
 
2001
 
 
2002
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2003
    # Does IO when testing the working_dir parameter.
 
2004
 
 
2005
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2006
                         a_callable=None, *args, **kwargs):
 
2007
        self.stdin = stdin
 
2008
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
2009
        self.factory = bzrlib.ui.ui_factory
 
2010
        self.working_dir = osutils.getcwd()
 
2011
        stdout.write('foo\n')
 
2012
        stderr.write('bar\n')
 
2013
        return 0
 
2014
 
 
2015
    def test_stdin(self):
 
2016
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2017
        # apply_redirected as a StringIO. We do this by overriding
 
2018
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2019
        # which calls apply_redirected.
 
2020
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2021
        self.assertEqual('gam', self.stdin.read())
 
2022
        self.assertTrue(self.stdin is self.factory_stdin)
 
2023
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2024
        self.assertEqual('zippy', self.stdin.read())
 
2025
        self.assertTrue(self.stdin is self.factory_stdin)
 
2026
 
 
2027
    def test_ui_factory(self):
 
2028
        # each invocation of self.run_bzr should get its
 
2029
        # own UI factory, which is an instance of TestUIFactory,
 
2030
        # with stdin, stdout and stderr attached to the stdin,
 
2031
        # stdout and stderr of the invoked run_bzr
 
2032
        current_factory = bzrlib.ui.ui_factory
 
2033
        self.run_bzr(['foo'])
 
2034
        self.failIf(current_factory is self.factory)
 
2035
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2036
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2037
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2038
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2039
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2040
 
 
2041
    def test_working_dir(self):
 
2042
        self.build_tree(['one/', 'two/'])
 
2043
        cwd = osutils.getcwd()
 
2044
 
 
2045
        # Default is to work in the current directory
 
2046
        self.run_bzr(['foo', 'bar'])
 
2047
        self.assertEqual(cwd, self.working_dir)
 
2048
 
 
2049
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2050
        self.assertEqual(cwd, self.working_dir)
 
2051
 
 
2052
        # The function should be run in the alternative directory
 
2053
        # but afterwards the current working dir shouldn't be changed
 
2054
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2055
        self.assertNotEqual(cwd, self.working_dir)
 
2056
        self.assertEndsWith(self.working_dir, 'one')
 
2057
        self.assertEqual(cwd, osutils.getcwd())
 
2058
 
 
2059
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2060
        self.assertNotEqual(cwd, self.working_dir)
 
2061
        self.assertEndsWith(self.working_dir, 'two')
 
2062
        self.assertEqual(cwd, osutils.getcwd())
 
2063
 
 
2064
 
 
2065
class StubProcess(object):
 
2066
    """A stub process for testing run_bzr_subprocess."""
 
2067
    
 
2068
    def __init__(self, out="", err="", retcode=0):
 
2069
        self.out = out
 
2070
        self.err = err
 
2071
        self.returncode = retcode
 
2072
 
 
2073
    def communicate(self):
 
2074
        return self.out, self.err
 
2075
 
 
2076
 
 
2077
class TestRunBzrSubprocess(tests.TestCaseWithTransport):
 
2078
 
 
2079
    def setUp(self):
 
2080
        tests.TestCaseWithTransport.setUp(self)
 
2081
        self.subprocess_calls = []
 
2082
 
 
2083
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2084
                             skip_if_plan_to_signal=False,
 
2085
                             working_dir=None,
 
2086
                             allow_plugins=False):
 
2087
        """capture what run_bzr_subprocess tries to do."""
 
2088
        self.subprocess_calls.append({'process_args':process_args,
 
2089
            'env_changes':env_changes,
 
2090
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2091
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2092
        return self.next_subprocess
 
2093
 
 
2094
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2095
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2096
 
 
2097
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2098
        that will return static results. This assertion method populates those
 
2099
        results and also checks the arguments run_bzr_subprocess generates.
 
2100
        """
 
2101
        self.next_subprocess = process
 
2102
        try:
 
2103
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2104
        except:
 
2105
            self.next_subprocess = None
 
2106
            for key, expected in expected_args.iteritems():
 
2107
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2108
            raise
 
2109
        else:
 
2110
            self.next_subprocess = None
 
2111
            for key, expected in expected_args.iteritems():
 
2112
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2113
            return result
 
2114
 
 
2115
    def test_run_bzr_subprocess(self):
 
2116
        """The run_bzr_helper_external command behaves nicely."""
 
2117
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2118
            StubProcess(), '--version')
 
2119
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2120
            StubProcess(), ['--version'])
 
2121
        # retcode=None disables retcode checking
 
2122
        result = self.assertRunBzrSubprocess({},
 
2123
            StubProcess(retcode=3), '--version', retcode=None)
 
2124
        result = self.assertRunBzrSubprocess({},
 
2125
            StubProcess(out="is free software"), '--version')
 
2126
        self.assertContainsRe(result[0], 'is free software')
 
2127
        # Running a subcommand that is missing errors
 
2128
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2129
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2130
            '--versionn')
 
2131
        # Unless it is told to expect the error from the subprocess
 
2132
        result = self.assertRunBzrSubprocess({},
 
2133
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2134
        # Or to ignore retcode checking
 
2135
        result = self.assertRunBzrSubprocess({},
 
2136
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2137
            retcode=None)
 
2138
        self.assertContainsRe(result[1], 'unknown command')
 
2139
 
 
2140
    def test_env_change_passes_through(self):
 
2141
        self.assertRunBzrSubprocess(
 
2142
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2143
            StubProcess(), '',
 
2144
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2145
 
 
2146
    def test_no_working_dir_passed_as_None(self):
 
2147
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2148
 
 
2149
    def test_no_working_dir_passed_through(self):
 
2150
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2151
            working_dir='dir')
 
2152
 
 
2153
    def test_run_bzr_subprocess_no_plugins(self):
 
2154
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2155
            StubProcess(), '')
 
2156
 
 
2157
    def test_allow_plugins(self):
 
2158
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2159
            StubProcess(), '', allow_plugins=True)
 
2160
 
 
2161
 
 
2162
class _DontSpawnProcess(Exception):
 
2163
    """A simple exception which just allows us to skip unnecessary steps"""
 
2164
 
 
2165
 
 
2166
class TestStartBzrSubProcess(tests.TestCase):
 
2167
 
 
2168
    def check_popen_state(self):
 
2169
        """Replace to make assertions when popen is called."""
 
2170
 
 
2171
    def _popen(self, *args, **kwargs):
 
2172
        """Record the command that is run, so that we can ensure it is correct"""
 
2173
        self.check_popen_state()
 
2174
        self._popen_args = args
 
2175
        self._popen_kwargs = kwargs
 
2176
        raise _DontSpawnProcess()
 
2177
 
 
2178
    def test_run_bzr_subprocess_no_plugins(self):
 
2179
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2180
        command = self._popen_args[0]
 
2181
        self.assertEqual(sys.executable, command[0])
 
2182
        self.assertEqual(self.get_bzr_path(), command[1])
 
2183
        self.assertEqual(['--no-plugins'], command[2:])
 
2184
 
 
2185
    def test_allow_plugins(self):
 
2186
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2187
            allow_plugins=True)
 
2188
        command = self._popen_args[0]
 
2189
        self.assertEqual([], command[2:])
 
2190
 
 
2191
    def test_set_env(self):
 
2192
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2193
        # set in the child
 
2194
        def check_environment():
 
2195
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2196
        self.check_popen_state = check_environment
 
2197
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2198
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2199
        # not set in theparent
 
2200
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2201
 
 
2202
    def test_run_bzr_subprocess_env_del(self):
 
2203
        """run_bzr_subprocess can remove environment variables too."""
 
2204
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2205
        def check_environment():
 
2206
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2207
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2208
        self.check_popen_state = check_environment
 
2209
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2210
            env_changes={'EXISTANT_ENV_VAR':None})
 
2211
        # Still set in parent
 
2212
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2213
        del os.environ['EXISTANT_ENV_VAR']
 
2214
 
 
2215
    def test_env_del_missing(self):
 
2216
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2217
        def check_environment():
 
2218
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2219
        self.check_popen_state = check_environment
 
2220
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2221
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2222
 
 
2223
    def test_working_dir(self):
 
2224
        """Test that we can specify the working dir for the child"""
 
2225
        orig_getcwd = osutils.getcwd
 
2226
        orig_chdir = os.chdir
 
2227
        chdirs = []
 
2228
        def chdir(path):
 
2229
            chdirs.append(path)
 
2230
        os.chdir = chdir
 
2231
        try:
 
2232
            def getcwd():
 
2233
                return 'current'
 
2234
            osutils.getcwd = getcwd
 
2235
            try:
 
2236
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2237
                    working_dir='foo')
 
2238
            finally:
 
2239
                osutils.getcwd = orig_getcwd
 
2240
        finally:
 
2241
            os.chdir = orig_chdir
 
2242
        self.assertEqual(['foo', 'current'], chdirs)
 
2243
 
 
2244
 
 
2245
class TestBzrSubprocess(tests.TestCaseWithTransport):
 
2246
 
 
2247
    def test_start_and_stop_bzr_subprocess(self):
 
2248
        """We can start and perform other test actions while that process is
 
2249
        still alive.
 
2250
        """
 
2251
        process = self.start_bzr_subprocess(['--version'])
 
2252
        result = self.finish_bzr_subprocess(process)
 
2253
        self.assertContainsRe(result[0], 'is free software')
 
2254
        self.assertEqual('', result[1])
 
2255
 
 
2256
    def test_start_and_stop_bzr_subprocess_with_error(self):
 
2257
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2258
        """
 
2259
        process = self.start_bzr_subprocess(['--versionn'])
 
2260
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2261
        self.assertEqual('', result[0])
 
2262
        self.assertContainsRe(result[1], 'unknown command')
 
2263
 
 
2264
    def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
 
2265
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2266
        process = self.start_bzr_subprocess(['--versionn'])
 
2267
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2268
        self.assertEqual('', result[0])
 
2269
        self.assertContainsRe(result[1], 'unknown command')
 
2270
 
 
2271
    def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
 
2272
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2273
        not the expected one.
 
2274
        """
 
2275
        process = self.start_bzr_subprocess(['--versionn'])
 
2276
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2277
                          process)
 
2278
 
 
2279
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2280
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2281
        not the expected one.
 
2282
        """
 
2283
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2284
                                            skip_if_plan_to_signal=True)
 
2285
        self.assertEqual('running\n', process.stdout.readline())
 
2286
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2287
                                            retcode=3)
 
2288
        self.assertEqual('', result[0])
 
2289
        self.assertEqual('bzr: interrupted\n', result[1])
 
2290
 
 
2291
    def test_start_and_stop_working_dir(self):
 
2292
        cwd = osutils.getcwd()
 
2293
        self.make_branch_and_tree('one')
 
2294
        process = self.start_bzr_subprocess(['root'], working_dir='one')
 
2295
        result = self.finish_bzr_subprocess(process, universal_newlines=True)
 
2296
        self.assertEndsWith(result[0], 'one\n')
 
2297
        self.assertEqual('', result[1])
 
2298
 
1816
2299
 
1817
2300
class TestKnownFailure(tests.TestCase):
1818
2301
 
1883
2366
        tests.TestCase.setUp(self)
1884
2367
        self.suite = TestUtil.TestSuite()
1885
2368
        self.loader = TestUtil.TestLoader()
1886
 
        self.suite.addTest(self.loader.loadTestsFromModuleNames([
1887
 
            'bzrlib.tests.test_selftest']))
 
2369
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2370
            sys.modules['bzrlib.tests.test_selftest']))
1888
2371
        self.all_names = _test_ids(self.suite)
1889
2372
 
1890
2373
    def test_condition_id_re(self):
2201
2684
class TestTestSuite(tests.TestCase):
2202
2685
 
2203
2686
    def test_test_suite(self):
2204
 
        # This test is slow, so we do a single test with one test in each
2205
 
        # category
 
2687
        # This test is slow - it loads the entire test suite to operate, so we
 
2688
        # do a single test with one test in each category
2206
2689
        test_list = [
2207
2690
            # testmod_names
2208
2691
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2218
2701
        self.assertEquals(test_list, _test_ids(suite))
2219
2702
 
2220
2703
    def test_test_suite_list_and_start(self):
 
2704
        # We cannot test this at the same time as the main load, because we want
 
2705
        # to know that starting_with == None works. So a second full load is
 
2706
        # incurred.
2221
2707
        test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
2222
2708
        suite = tests.test_suite(test_list,
2223
2709
                                 ['bzrlib.tests.test_selftest.TestTestSuite'])
2368
2854
                return tests.ExtendedTestResult(self.stream, self.descriptions,
2369
2855
                                                self.verbosity)
2370
2856
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2371
 
        self.assertEqual(calls, [suite])
 
2857
        self.assertLength(1, calls)
2372
2858
 
2373
2859
    def test_done(self):
2374
2860
        """run_suite should call result.done()"""