1215
def test_bench_history(self):
1216
# tests that the running the benchmark produces a history file
1217
# containing a timestamp and the revision id of the bzrlib source which
1219
workingtree = _get_bzr_source_tree()
1220
test = TestRunner('dummy_test')
1222
runner = tests.TextTestRunner(stream=self._log_file,
1223
bench_history=output)
1224
result = self.run_test_runner(runner, test)
1225
output_string = output.getvalue()
1226
self.assertContainsRe(output_string, "--date [0-9.]+")
1227
if workingtree is not None:
1228
revision_id = workingtree.get_parent_ids()[0]
1229
self.assertEndsWith(output_string.rstrip(), revision_id)
1231
def assertLogDeleted(self, test):
1232
log = test._get_log()
1233
self.assertEqual("DELETED log file to reduce memory footprint", log)
1234
self.assertEqual('', test._log_contents)
1235
self.assertIs(None, test._log_file_name)
1237
def test_success_log_deleted(self):
1238
"""Successful tests have their log deleted"""
1240
class LogTester(tests.TestCase):
1242
def test_success(self):
1243
self.log('this will be removed\n')
1246
runner = tests.TextTestRunner(stream=sio)
1247
test = LogTester('test_success')
1248
result = self.run_test_runner(runner, test)
1250
self.assertLogDeleted(test)
1252
def test_skipped_log_deleted(self):
1253
"""Skipped tests have their log deleted"""
1255
class LogTester(tests.TestCase):
1257
def test_skipped(self):
1258
self.log('this will be removed\n')
1259
raise tests.TestSkipped()
1262
runner = tests.TextTestRunner(stream=sio)
1263
test = LogTester('test_skipped')
1264
result = self.run_test_runner(runner, test)
1266
self.assertLogDeleted(test)
1268
def test_not_aplicable_log_deleted(self):
1269
"""Not applicable tests have their log deleted"""
1271
class LogTester(tests.TestCase):
1273
def test_not_applicable(self):
1274
self.log('this will be removed\n')
1275
raise tests.TestNotApplicable()
1278
runner = tests.TextTestRunner(stream=sio)
1279
test = LogTester('test_not_applicable')
1280
result = self.run_test_runner(runner, test)
1282
self.assertLogDeleted(test)
1284
def test_known_failure_log_deleted(self):
1285
"""Know failure tests have their log deleted"""
1287
class LogTester(tests.TestCase):
1289
def test_known_failure(self):
1290
self.log('this will be removed\n')
1291
raise tests.KnownFailure()
1294
runner = tests.TextTestRunner(stream=sio)
1295
test = LogTester('test_known_failure')
1296
result = self.run_test_runner(runner, test)
1298
self.assertLogDeleted(test)
1300
def test_fail_log_kept(self):
1301
"""Failed tests have their log kept"""
1303
class LogTester(tests.TestCase):
1305
def test_fail(self):
1306
self.log('this will be kept\n')
1307
self.fail('this test fails')
1310
runner = tests.TextTestRunner(stream=sio)
1311
test = LogTester('test_fail')
1312
result = self.run_test_runner(runner, test)
1314
text = sio.getvalue()
1315
self.assertContainsRe(text, 'this will be kept')
1316
self.assertContainsRe(text, 'this test fails')
1318
log = test._get_log()
1319
self.assertContainsRe(log, 'this will be kept')
1320
self.assertEqual(log, test._log_contents)
1322
def test_error_log_kept(self):
1323
"""Tests with errors have their log kept"""
1325
class LogTester(tests.TestCase):
1327
def test_error(self):
1328
self.log('this will be kept\n')
1329
raise ValueError('random exception raised')
1332
runner = tests.TextTestRunner(stream=sio)
1333
test = LogTester('test_error')
1334
result = self.run_test_runner(runner, test)
1336
text = sio.getvalue()
1337
self.assertContainsRe(text, 'this will be kept')
1338
self.assertContainsRe(text, 'random exception raised')
1340
log = test._get_log()
1341
self.assertContainsRe(log, 'this will be kept')
1342
self.assertEqual(log, test._log_contents)
1204
def test_verbose_test_count(self):
1205
"""A verbose test run reports the right test count at the start"""
1206
suite = TestUtil.TestSuite([
1207
unittest.FunctionTestCase(lambda:None),
1208
unittest.FunctionTestCase(lambda:None)])
1209
self.assertEqual(suite.countTestCases(), 2)
1211
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1212
# Need to use the CountingDecorator as that's what sets num_tests
1213
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1214
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1216
def test_startTestRun(self):
1217
"""run should call result.startTestRun()"""
1219
class LoggingDecorator(ExtendedToOriginalDecorator):
1220
def startTestRun(self):
1221
ExtendedToOriginalDecorator.startTestRun(self)
1222
calls.append('startTestRun')
1223
test = unittest.FunctionTestCase(lambda:None)
1225
runner = tests.TextTestRunner(stream=stream,
1226
result_decorators=[LoggingDecorator])
1227
result = self.run_test_runner(runner, test)
1228
self.assertLength(1, calls)
1230
def test_stopTestRun(self):
1231
"""run should call result.stopTestRun()"""
1233
class LoggingDecorator(ExtendedToOriginalDecorator):
1234
def stopTestRun(self):
1235
ExtendedToOriginalDecorator.stopTestRun(self)
1236
calls.append('stopTestRun')
1237
test = unittest.FunctionTestCase(lambda:None)
1239
runner = tests.TextTestRunner(stream=stream,
1240
result_decorators=[LoggingDecorator])
1241
result = self.run_test_runner(runner, test)
1242
self.assertLength(1, calls)
1244
def test_unicode_test_output_on_ascii_stream(self):
1245
"""Showing results should always succeed even on an ascii console"""
1246
class FailureWithUnicode(tests.TestCase):
1247
def test_log_unicode(self):
1249
self.fail("Now print that log!")
1251
self.overrideAttr(osutils, "get_terminal_encoding",
1252
lambda trace=False: "ascii")
1253
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1254
FailureWithUnicode("test_log_unicode"))
1255
if testtools_version[:3] > (0, 9, 11):
1256
self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+ \\\\u2606}}}")
1258
self.assertContainsRe(out.getvalue(),
1259
"Text attachment: log\n"
1261
"\d+\.\d+ \\\\u2606\n"
1345
1265
class SampleTestCase(tests.TestCase):
1653
1638
self.assertRaises(AssertionError,
1654
1639
self.assertListRaises, _TestException, success_generator)
1641
def test_overrideAttr_without_value(self):
1642
self.test_attr = 'original' # Define a test attribute
1643
obj = self # Make 'obj' visible to the embedded test
1644
class Test(tests.TestCase):
1647
tests.TestCase.setUp(self)
1648
self.orig = self.overrideAttr(obj, 'test_attr')
1650
def test_value(self):
1651
self.assertEqual('original', self.orig)
1652
self.assertEqual('original', obj.test_attr)
1653
obj.test_attr = 'modified'
1654
self.assertEqual('modified', obj.test_attr)
1656
test = Test('test_value')
1657
test.run(unittest.TestResult())
1658
self.assertEqual('original', obj.test_attr)
1660
def test_overrideAttr_with_value(self):
1661
self.test_attr = 'original' # Define a test attribute
1662
obj = self # Make 'obj' visible to the embedded test
1663
class Test(tests.TestCase):
1666
tests.TestCase.setUp(self)
1667
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1669
def test_value(self):
1670
self.assertEqual('original', self.orig)
1671
self.assertEqual('modified', obj.test_attr)
1673
test = Test('test_value')
1674
test.run(unittest.TestResult())
1675
self.assertEqual('original', obj.test_attr)
1677
def test_recordCalls(self):
1678
from bzrlib.tests import test_selftest
1679
calls = self.recordCalls(
1680
test_selftest, '_add_numbers')
1681
self.assertEqual(test_selftest._add_numbers(2, 10),
1683
self.assertEquals(calls, [((2, 10), {})])
1686
def _add_numbers(a, b):
1690
class _MissingFeature(features.Feature):
1693
missing_feature = _MissingFeature()
1696
def _get_test(name):
1697
"""Get an instance of a specific example test.
1699
We protect this in a function so that they don't auto-run in the test
1703
class ExampleTests(tests.TestCase):
1705
def test_fail(self):
1706
mutter('this was a failing test')
1707
self.fail('this test will fail')
1709
def test_error(self):
1710
mutter('this test errored')
1711
raise RuntimeError('gotcha')
1713
def test_missing_feature(self):
1714
mutter('missing the feature')
1715
self.requireFeature(missing_feature)
1717
def test_skip(self):
1718
mutter('this test will be skipped')
1719
raise tests.TestSkipped('reason')
1721
def test_success(self):
1722
mutter('this test succeeds')
1724
def test_xfail(self):
1725
mutter('test with expected failure')
1726
self.knownFailure('this_fails')
1728
def test_unexpected_success(self):
1729
mutter('test with unexpected success')
1730
self.expectFailure('should_fail', lambda: None)
1732
return ExampleTests(name)
1735
class TestTestCaseLogDetails(tests.TestCase):
1737
def _run_test(self, test_name):
1738
test = _get_test(test_name)
1739
result = testtools.TestResult()
1743
def test_fail_has_log(self):
1744
result = self._run_test('test_fail')
1745
self.assertEqual(1, len(result.failures))
1746
result_content = result.failures[0][1]
1747
if testtools_version < (0, 9, 12):
1748
self.assertContainsRe(result_content, 'Text attachment: log')
1749
self.assertContainsRe(result_content, 'this was a failing test')
1751
def test_error_has_log(self):
1752
result = self._run_test('test_error')
1753
self.assertEqual(1, len(result.errors))
1754
result_content = result.errors[0][1]
1755
if testtools_version < (0, 9, 12):
1756
self.assertContainsRe(result_content, 'Text attachment: log')
1757
self.assertContainsRe(result_content, 'this test errored')
1759
def test_skip_has_no_log(self):
1760
result = self._run_test('test_skip')
1761
self.assertEqual(['reason'], result.skip_reasons.keys())
1762
skips = result.skip_reasons['reason']
1763
self.assertEqual(1, len(skips))
1765
self.assertFalse('log' in test.getDetails())
1767
def test_missing_feature_has_no_log(self):
1768
# testtools doesn't know about addNotSupported, so it just gets
1769
# considered as a skip
1770
result = self._run_test('test_missing_feature')
1771
self.assertEqual([missing_feature], result.skip_reasons.keys())
1772
skips = result.skip_reasons[missing_feature]
1773
self.assertEqual(1, len(skips))
1775
self.assertFalse('log' in test.getDetails())
1777
def test_xfail_has_no_log(self):
1778
result = self._run_test('test_xfail')
1779
self.assertEqual(1, len(result.expectedFailures))
1780
result_content = result.expectedFailures[0][1]
1781
self.assertNotContainsRe(result_content, 'Text attachment: log')
1782
self.assertNotContainsRe(result_content, 'test with expected failure')
1784
def test_unexpected_success_has_log(self):
1785
result = self._run_test('test_unexpected_success')
1786
self.assertEqual(1, len(result.unexpectedSuccesses))
1787
# Inconsistency, unexpectedSuccesses is a list of tests,
1788
# expectedFailures is a list of reasons?
1789
test = result.unexpectedSuccesses[0]
1790
details = test.getDetails()
1791
self.assertTrue('log' in details)
1794
class TestTestCloning(tests.TestCase):
1795
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1797
def test_cloned_testcase_does_not_share_details(self):
1798
"""A TestCase cloned with clone_test does not share mutable attributes
1799
such as details or cleanups.
1801
class Test(tests.TestCase):
1803
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1804
orig_test = Test('test_foo')
1805
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1806
orig_test.run(unittest.TestResult())
1807
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1808
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1810
def test_double_apply_scenario_preserves_first_scenario(self):
1811
"""Applying two levels of scenarios to a test preserves the attributes
1812
added by both scenarios.
1814
class Test(tests.TestCase):
1817
test = Test('test_foo')
1818
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1819
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1820
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1821
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1822
all_tests = list(tests.iter_suite_tests(suite))
1823
self.assertLength(4, all_tests)
1824
all_xys = sorted((t.x, t.y) for t in all_tests)
1825
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1657
1828
# NB: Don't delete this; it's not actually from 0.11!
1658
1829
@deprecated_function(deprecated_in((0, 11, 0)))
1813
2012
test_suite_factory=factory)
1814
2013
self.assertEqual([True], factory_called)
1817
class TestKnownFailure(tests.TestCase):
1819
def test_known_failure(self):
1820
"""Check that KnownFailure is defined appropriately."""
1821
# a KnownFailure is an assertion error for compatability with unaware
1823
self.assertIsInstance(tests.KnownFailure(""), AssertionError)
1825
def test_expect_failure(self):
1827
self.expectFailure("Doomed to failure", self.assertTrue, False)
1828
except tests.KnownFailure, e:
1829
self.assertEqual('Doomed to failure', e.args[0])
1831
self.expectFailure("Doomed to failure", self.assertTrue, True)
1832
except AssertionError, e:
1833
self.assertEqual('Unexpected success. Should have failed:'
1834
' Doomed to failure', e.args[0])
2016
"""A test suite factory."""
2017
class Test(tests.TestCase):
2024
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2026
def test_list_only(self):
2027
output = self.run_selftest(test_suite_factory=self.factory,
2029
self.assertEqual(3, len(output.readlines()))
2031
def test_list_only_filtered(self):
2032
output = self.run_selftest(test_suite_factory=self.factory,
2033
list_only=True, pattern="Test.b")
2034
self.assertEndsWith(output.getvalue(), "Test.b\n")
2035
self.assertLength(1, output.readlines())
2037
def test_list_only_excludes(self):
2038
output = self.run_selftest(test_suite_factory=self.factory,
2039
list_only=True, exclude_pattern="Test.b")
2040
self.assertNotContainsRe("Test.b", output.getvalue())
2041
self.assertLength(2, output.readlines())
2043
def test_lsprof_tests(self):
2044
self.requireFeature(features.lsprof_feature)
2047
def __call__(test, result):
2049
def run(test, result):
2050
results.append(result)
2051
def countTestCases(self):
2053
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2054
self.assertLength(1, results)
2055
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2057
def test_random(self):
2058
# test randomising by listing a number of tests.
2059
output_123 = self.run_selftest(test_suite_factory=self.factory,
2060
list_only=True, random_seed="123")
2061
output_234 = self.run_selftest(test_suite_factory=self.factory,
2062
list_only=True, random_seed="234")
2063
self.assertNotEqual(output_123, output_234)
2064
# "Randominzing test order..\n\n
2065
self.assertLength(5, output_123.readlines())
2066
self.assertLength(5, output_234.readlines())
2068
def test_random_reuse_is_same_order(self):
2069
# test randomising by listing a number of tests.
2070
expected = self.run_selftest(test_suite_factory=self.factory,
2071
list_only=True, random_seed="123")
2072
repeated = self.run_selftest(test_suite_factory=self.factory,
2073
list_only=True, random_seed="123")
2074
self.assertEqual(expected.getvalue(), repeated.getvalue())
2076
def test_runner_class(self):
2077
self.requireFeature(features.subunit)
2078
from subunit import ProtocolTestCase
2079
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2080
test_suite_factory=self.factory)
2081
test = ProtocolTestCase(stream)
2082
result = unittest.TestResult()
2084
self.assertEqual(3, result.testsRun)
2086
def test_starting_with_single_argument(self):
2087
output = self.run_selftest(test_suite_factory=self.factory,
2088
starting_with=['bzrlib.tests.test_selftest.Test.a'],
2090
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
2093
def test_starting_with_multiple_argument(self):
2094
output = self.run_selftest(test_suite_factory=self.factory,
2095
starting_with=['bzrlib.tests.test_selftest.Test.a',
2096
'bzrlib.tests.test_selftest.Test.b'],
2098
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
2099
'bzrlib.tests.test_selftest.Test.b\n',
2102
def check_transport_set(self, transport_server):
2103
captured_transport = []
2104
def seen_transport(a_transport):
2105
captured_transport.append(a_transport)
2106
class Capture(tests.TestCase):
2108
seen_transport(bzrlib.tests.default_transport)
2110
return TestUtil.TestSuite([Capture("a")])
2111
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2112
self.assertEqual(transport_server, captured_transport[0])
2114
def test_transport_sftp(self):
2115
self.requireFeature(features.paramiko)
2116
from bzrlib.tests import stub_sftp
2117
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2119
def test_transport_memory(self):
2120
self.check_transport_set(memory.MemoryServer)
2123
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2124
# Does IO: reads test.list
2126
def test_load_list(self):
2127
# Provide a list with one test - this test.
2128
test_id_line = '%s\n' % self.id()
2129
self.build_tree_contents([('test.list', test_id_line)])
2130
# And generate a list of the tests in the suite.
2131
stream = self.run_selftest(load_list='test.list', list_only=True)
2132
self.assertEqual(test_id_line, stream.getvalue())
2134
def test_load_unknown(self):
2135
# Provide a list with one test - this test.
2136
# And generate a list of the tests in the suite.
2137
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2138
load_list='missing file name', list_only=True)
2141
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2143
_test_needs_features = [features.subunit]
2145
def run_subunit_stream(self, test_name):
2146
from subunit import ProtocolTestCase
2148
return TestUtil.TestSuite([_get_test(test_name)])
2149
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2150
test_suite_factory=factory)
2151
test = ProtocolTestCase(stream)
2152
result = testtools.TestResult()
2154
content = stream.getvalue()
2155
return content, result
2157
def test_fail_has_log(self):
2158
content, result = self.run_subunit_stream('test_fail')
2159
self.assertEqual(1, len(result.failures))
2160
self.assertContainsRe(content, '(?m)^log$')
2161
self.assertContainsRe(content, 'this test will fail')
2163
def test_error_has_log(self):
2164
content, result = self.run_subunit_stream('test_error')
2165
self.assertContainsRe(content, '(?m)^log$')
2166
self.assertContainsRe(content, 'this test errored')
2168
def test_skip_has_no_log(self):
2169
content, result = self.run_subunit_stream('test_skip')
2170
self.assertNotContainsRe(content, '(?m)^log$')
2171
self.assertNotContainsRe(content, 'this test will be skipped')
2172
self.assertEqual(['reason'], result.skip_reasons.keys())
2173
skips = result.skip_reasons['reason']
2174
self.assertEqual(1, len(skips))
2176
# RemotedTestCase doesn't preserve the "details"
2177
## self.assertFalse('log' in test.getDetails())
2179
def test_missing_feature_has_no_log(self):
2180
content, result = self.run_subunit_stream('test_missing_feature')
2181
self.assertNotContainsRe(content, '(?m)^log$')
2182
self.assertNotContainsRe(content, 'missing the feature')
2183
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2184
skips = result.skip_reasons['_MissingFeature\n']
2185
self.assertEqual(1, len(skips))
2187
# RemotedTestCase doesn't preserve the "details"
2188
## self.assertFalse('log' in test.getDetails())
2190
def test_xfail_has_no_log(self):
2191
content, result = self.run_subunit_stream('test_xfail')
2192
self.assertNotContainsRe(content, '(?m)^log$')
2193
self.assertNotContainsRe(content, 'test with expected failure')
2194
self.assertEqual(1, len(result.expectedFailures))
2195
result_content = result.expectedFailures[0][1]
2196
self.assertNotContainsRe(result_content, 'Text attachment: log')
2197
self.assertNotContainsRe(result_content, 'test with expected failure')
2199
def test_unexpected_success_has_log(self):
2200
content, result = self.run_subunit_stream('test_unexpected_success')
2201
self.assertContainsRe(content, '(?m)^log$')
2202
self.assertContainsRe(content, 'test with unexpected success')
2203
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2204
# success, if a min version check is added remove this
2205
from subunit import TestProtocolClient as _Client
2206
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2207
self.expectFailure('subunit treats "unexpectedSuccess"'
2208
' as a plain success',
2209
self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
self.assertEqual(1, len(result.unexpectedSuccesses))
2211
test = result.unexpectedSuccesses[0]
2212
# RemotedTestCase doesn't preserve the "details"
2213
## self.assertTrue('log' in test.getDetails())
2215
def test_success_has_no_log(self):
2216
content, result = self.run_subunit_stream('test_success')
2217
self.assertEqual(1, result.testsRun)
2218
self.assertNotContainsRe(content, '(?m)^log$')
2219
self.assertNotContainsRe(content, 'this test succeeds')
2222
class TestRunBzr(tests.TestCase):
2227
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2229
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2231
Attempts to run bzr from inside this class don't actually run it.
2233
We test how run_bzr actually invokes bzr in another location. Here we
2234
only need to test that it passes the right parameters to run_bzr.
2236
self.argv = list(argv)
2237
self.retcode = retcode
2238
self.encoding = encoding
2240
self.working_dir = working_dir
2241
return self.retcode, self.out, self.err
2243
def test_run_bzr_error(self):
2244
self.out = "It sure does!\n"
2245
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2246
self.assertEqual(['rocks'], self.argv)
2247
self.assertEqual(34, self.retcode)
2248
self.assertEqual('It sure does!\n', out)
2249
self.assertEquals(out, self.out)
2250
self.assertEqual('', err)
2251
self.assertEquals(err, self.err)
2253
def test_run_bzr_error_regexes(self):
2255
self.err = "bzr: ERROR: foobarbaz is not versioned"
2256
out, err = self.run_bzr_error(
2257
["bzr: ERROR: foobarbaz is not versioned"],
2258
['file-id', 'foobarbaz'])
2260
def test_encoding(self):
2261
"""Test that run_bzr passes encoding to _run_bzr_core"""
2262
self.run_bzr('foo bar')
2263
self.assertEqual(None, self.encoding)
2264
self.assertEqual(['foo', 'bar'], self.argv)
2266
self.run_bzr('foo bar', encoding='baz')
2267
self.assertEqual('baz', self.encoding)
2268
self.assertEqual(['foo', 'bar'], self.argv)
2270
def test_retcode(self):
2271
"""Test that run_bzr passes retcode to _run_bzr_core"""
2272
# Default is retcode == 0
2273
self.run_bzr('foo bar')
2274
self.assertEqual(0, self.retcode)
2275
self.assertEqual(['foo', 'bar'], self.argv)
2277
self.run_bzr('foo bar', retcode=1)
2278
self.assertEqual(1, self.retcode)
2279
self.assertEqual(['foo', 'bar'], self.argv)
2281
self.run_bzr('foo bar', retcode=None)
2282
self.assertEqual(None, self.retcode)
2283
self.assertEqual(['foo', 'bar'], self.argv)
2285
self.run_bzr(['foo', 'bar'], retcode=3)
2286
self.assertEqual(3, self.retcode)
2287
self.assertEqual(['foo', 'bar'], self.argv)
2289
def test_stdin(self):
2290
# test that the stdin keyword to run_bzr is passed through to
2291
# _run_bzr_core as-is. We do this by overriding
2292
# _run_bzr_core in this class, and then calling run_bzr,
2293
# which is a convenience function for _run_bzr_core, so
2295
self.run_bzr('foo bar', stdin='gam')
2296
self.assertEqual('gam', self.stdin)
2297
self.assertEqual(['foo', 'bar'], self.argv)
2299
self.run_bzr('foo bar', stdin='zippy')
2300
self.assertEqual('zippy', self.stdin)
2301
self.assertEqual(['foo', 'bar'], self.argv)
2303
def test_working_dir(self):
2304
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2305
self.run_bzr('foo bar')
2306
self.assertEqual(None, self.working_dir)
2307
self.assertEqual(['foo', 'bar'], self.argv)
2309
self.run_bzr('foo bar', working_dir='baz')
2310
self.assertEqual('baz', self.working_dir)
2311
self.assertEqual(['foo', 'bar'], self.argv)
2313
def test_reject_extra_keyword_arguments(self):
2314
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2315
error_regex=['error message'])
2318
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2319
# Does IO when testing the working_dir parameter.
2321
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2322
a_callable=None, *args, **kwargs):
2324
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2325
self.factory = bzrlib.ui.ui_factory
2326
self.working_dir = osutils.getcwd()
2327
stdout.write('foo\n')
2328
stderr.write('bar\n')
2331
def test_stdin(self):
2332
# test that the stdin keyword to _run_bzr_core is passed through to
2333
# apply_redirected as a StringIO. We do this by overriding
2334
# apply_redirected in this class, and then calling _run_bzr_core,
2335
# which calls apply_redirected.
2336
self.run_bzr(['foo', 'bar'], stdin='gam')
2337
self.assertEqual('gam', self.stdin.read())
2338
self.assertTrue(self.stdin is self.factory_stdin)
2339
self.run_bzr(['foo', 'bar'], stdin='zippy')
2340
self.assertEqual('zippy', self.stdin.read())
2341
self.assertTrue(self.stdin is self.factory_stdin)
2343
def test_ui_factory(self):
2344
# each invocation of self.run_bzr should get its
2345
# own UI factory, which is an instance of TestUIFactory,
2346
# with stdin, stdout and stderr attached to the stdin,
2347
# stdout and stderr of the invoked run_bzr
2348
current_factory = bzrlib.ui.ui_factory
2349
self.run_bzr(['foo'])
2350
self.assertFalse(current_factory is self.factory)
2351
self.assertNotEqual(sys.stdout, self.factory.stdout)
2352
self.assertNotEqual(sys.stderr, self.factory.stderr)
2353
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2354
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2355
self.assertIsInstance(self.factory, tests.TestUIFactory)
2357
def test_working_dir(self):
2358
self.build_tree(['one/', 'two/'])
2359
cwd = osutils.getcwd()
2361
# Default is to work in the current directory
2362
self.run_bzr(['foo', 'bar'])
2363
self.assertEqual(cwd, self.working_dir)
2365
self.run_bzr(['foo', 'bar'], working_dir=None)
2366
self.assertEqual(cwd, self.working_dir)
2368
# The function should be run in the alternative directory
2369
# but afterwards the current working dir shouldn't be changed
2370
self.run_bzr(['foo', 'bar'], working_dir='one')
2371
self.assertNotEqual(cwd, self.working_dir)
2372
self.assertEndsWith(self.working_dir, 'one')
2373
self.assertEqual(cwd, osutils.getcwd())
2375
self.run_bzr(['foo', 'bar'], working_dir='two')
2376
self.assertNotEqual(cwd, self.working_dir)
2377
self.assertEndsWith(self.working_dir, 'two')
2378
self.assertEqual(cwd, osutils.getcwd())
2381
class StubProcess(object):
2382
"""A stub process for testing run_bzr_subprocess."""
2384
def __init__(self, out="", err="", retcode=0):
2387
self.returncode = retcode
2389
def communicate(self):
2390
return self.out, self.err
2393
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2394
"""Base class for tests testing how we might run bzr."""
2397
tests.TestCaseWithTransport.setUp(self)
2398
self.subprocess_calls = []
2400
def start_bzr_subprocess(self, process_args, env_changes=None,
2401
skip_if_plan_to_signal=False,
2403
allow_plugins=False):
2404
"""capture what run_bzr_subprocess tries to do."""
2405
self.subprocess_calls.append({'process_args':process_args,
2406
'env_changes':env_changes,
2407
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2408
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2409
return self.next_subprocess
2412
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2414
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2415
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2417
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2418
that will return static results. This assertion method populates those
2419
results and also checks the arguments run_bzr_subprocess generates.
2421
self.next_subprocess = process
2423
result = self.run_bzr_subprocess(*args, **kwargs)
2425
self.next_subprocess = None
2426
for key, expected in expected_args.iteritems():
2427
self.assertEqual(expected, self.subprocess_calls[-1][key])
1836
self.fail('Assertion not raised')
1839
class TestFeature(tests.TestCase):
1841
def test_caching(self):
1842
"""Feature._probe is called by the feature at most once."""
1843
class InstrumentedFeature(tests.Feature):
1845
super(InstrumentedFeature, self).__init__()
1848
self.calls.append('_probe')
1850
feature = InstrumentedFeature()
1852
self.assertEqual(['_probe'], feature.calls)
1854
self.assertEqual(['_probe'], feature.calls)
1856
def test_named_str(self):
1857
"""Feature.__str__ should thunk to feature_name()."""
1858
class NamedFeature(tests.Feature):
1859
def feature_name(self):
1861
feature = NamedFeature()
1862
self.assertEqual('symlinks', str(feature))
1864
def test_default_str(self):
1865
"""Feature.__str__ should default to __class__.__name__."""
1866
class NamedFeature(tests.Feature):
1868
feature = NamedFeature()
1869
self.assertEqual('NamedFeature', str(feature))
1872
class TestUnavailableFeature(tests.TestCase):
1874
def test_access_feature(self):
1875
feature = tests.Feature()
1876
exception = tests.UnavailableFeature(feature)
1877
self.assertIs(feature, exception.args[0])
2430
self.next_subprocess = None
2431
for key, expected in expected_args.iteritems():
2432
self.assertEqual(expected, self.subprocess_calls[-1][key])
2435
def test_run_bzr_subprocess(self):
2436
"""The run_bzr_helper_external command behaves nicely."""
2437
self.assertRunBzrSubprocess({'process_args':['--version']},
2438
StubProcess(), '--version')
2439
self.assertRunBzrSubprocess({'process_args':['--version']},
2440
StubProcess(), ['--version'])
2441
# retcode=None disables retcode checking
2442
result = self.assertRunBzrSubprocess({},
2443
StubProcess(retcode=3), '--version', retcode=None)
2444
result = self.assertRunBzrSubprocess({},
2445
StubProcess(out="is free software"), '--version')
2446
self.assertContainsRe(result[0], 'is free software')
2447
# Running a subcommand that is missing errors
2448
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2449
{'process_args':['--versionn']}, StubProcess(retcode=3),
2451
# Unless it is told to expect the error from the subprocess
2452
result = self.assertRunBzrSubprocess({},
2453
StubProcess(retcode=3), '--versionn', retcode=3)
2454
# Or to ignore retcode checking
2455
result = self.assertRunBzrSubprocess({},
2456
StubProcess(err="unknown command", retcode=3), '--versionn',
2458
self.assertContainsRe(result[1], 'unknown command')
2460
def test_env_change_passes_through(self):
2461
self.assertRunBzrSubprocess(
2462
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2464
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2466
def test_no_working_dir_passed_as_None(self):
2467
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2469
def test_no_working_dir_passed_through(self):
2470
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2473
def test_run_bzr_subprocess_no_plugins(self):
2474
self.assertRunBzrSubprocess({'allow_plugins': False},
2477
def test_allow_plugins(self):
2478
self.assertRunBzrSubprocess({'allow_plugins': True},
2479
StubProcess(), '', allow_plugins=True)
2482
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2484
def test_finish_bzr_subprocess_with_error(self):
2485
"""finish_bzr_subprocess allows specification of the desired exit code.
2487
process = StubProcess(err="unknown command", retcode=3)
2488
result = self.finish_bzr_subprocess(process, retcode=3)
2489
self.assertEqual('', result[0])
2490
self.assertContainsRe(result[1], 'unknown command')
2492
def test_finish_bzr_subprocess_ignoring_retcode(self):
2493
"""finish_bzr_subprocess allows the exit code to be ignored."""
2494
process = StubProcess(err="unknown command", retcode=3)
2495
result = self.finish_bzr_subprocess(process, retcode=None)
2496
self.assertEqual('', result[0])
2497
self.assertContainsRe(result[1], 'unknown command')
2499
def test_finish_subprocess_with_unexpected_retcode(self):
2500
"""finish_bzr_subprocess raises self.failureException if the retcode is
2501
not the expected one.
2503
process = StubProcess(err="unknown command", retcode=3)
2504
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2508
class _DontSpawnProcess(Exception):
2509
"""A simple exception which just allows us to skip unnecessary steps"""
2512
class TestStartBzrSubProcess(tests.TestCase):
2513
"""Stub test start_bzr_subprocess."""
2515
def _subprocess_log_cleanup(self):
2516
"""Inhibits the base version as we don't produce a log file."""
2518
def _popen(self, *args, **kwargs):
2519
"""Override the base version to record the command that is run.
2521
From there we can ensure it is correct without spawning a real process.
2523
self.check_popen_state()
2524
self._popen_args = args
2525
self._popen_kwargs = kwargs
2526
raise _DontSpawnProcess()
2528
def check_popen_state(self):
2529
"""Replace to make assertions when popen is called."""
2531
def test_run_bzr_subprocess_no_plugins(self):
2532
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2533
command = self._popen_args[0]
2534
self.assertEqual(sys.executable, command[0])
2535
self.assertEqual(self.get_bzr_path(), command[1])
2536
self.assertEqual(['--no-plugins'], command[2:])
2538
def test_allow_plugins(self):
2539
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2541
command = self._popen_args[0]
2542
self.assertEqual([], command[2:])
2544
def test_set_env(self):
2545
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2547
def check_environment():
2548
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2549
self.check_popen_state = check_environment
2550
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2551
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2552
# not set in theparent
2553
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2555
def test_run_bzr_subprocess_env_del(self):
2556
"""run_bzr_subprocess can remove environment variables too."""
2557
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2558
def check_environment():
2559
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2560
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2561
self.check_popen_state = check_environment
2562
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2563
env_changes={'EXISTANT_ENV_VAR':None})
2564
# Still set in parent
2565
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2566
del os.environ['EXISTANT_ENV_VAR']
2568
def test_env_del_missing(self):
2569
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2570
def check_environment():
2571
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2572
self.check_popen_state = check_environment
2573
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2574
env_changes={'NON_EXISTANT_ENV_VAR':None})
2576
def test_working_dir(self):
2577
"""Test that we can specify the working dir for the child"""
2578
orig_getcwd = osutils.getcwd
2579
orig_chdir = os.chdir
2583
self.overrideAttr(os, 'chdir', chdir)
2586
self.overrideAttr(osutils, 'getcwd', getcwd)
2587
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2589
self.assertEqual(['foo', 'current'], chdirs)
2591
def test_get_bzr_path_with_cwd_bzrlib(self):
2592
self.get_source_path = lambda: ""
2593
self.overrideAttr(os.path, "isfile", lambda path: True)
2594
self.assertEqual(self.get_bzr_path(), "bzr")
2597
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2598
"""Tests that really need to do things with an external bzr."""
2600
def test_start_and_stop_bzr_subprocess_send_signal(self):
2601
"""finish_bzr_subprocess raises self.failureException if the retcode is
2602
not the expected one.
2604
self.disable_missing_extensions_warning()
2605
process = self.start_bzr_subprocess(['wait-until-signalled'],
2606
skip_if_plan_to_signal=True)
2607
self.assertEqual('running\n', process.stdout.readline())
2608
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2610
self.assertEqual('', result[0])
2611
self.assertEqual('bzr: interrupted\n', result[1])
1880
2614
class TestSelftestFiltering(tests.TestCase):
2353
3141
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3144
class TestThreadLeakDetection(tests.TestCase):
3145
"""Ensure when tests leak threads we detect and report it"""
3147
class LeakRecordingResult(tests.ExtendedTestResult):
3149
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3151
def _report_thread_leak(self, test, leaks, alive):
3152
self.leaks.append((test, leaks))
3154
def test_testcase_without_addCleanups(self):
3155
"""Check old TestCase instances don't break with leak detection"""
3156
class Test(unittest.TestCase):
3159
result = self.LeakRecordingResult()
3161
result.startTestRun()
3163
result.stopTestRun()
3164
self.assertEqual(result._tests_leaking_threads_count, 0)
3165
self.assertEqual(result.leaks, [])
3167
def test_thread_leak(self):
3168
"""Ensure a thread that outlives the running of a test is reported
3170
Uses a thread that blocks on an event, and is started by the inner
3171
test case. As the thread outlives the inner case's run, it should be
3172
detected as a leak, but the event is then set so that the thread can
3173
be safely joined in cleanup so it's not leaked for real.
3175
event = threading.Event()
3176
thread = threading.Thread(name="Leaker", target=event.wait)
3177
class Test(tests.TestCase):
3178
def test_leak(self):
3180
result = self.LeakRecordingResult()
3181
test = Test("test_leak")
3182
self.addCleanup(thread.join)
3183
self.addCleanup(event.set)
3184
result.startTestRun()
3186
result.stopTestRun()
3187
self.assertEqual(result._tests_leaking_threads_count, 1)
3188
self.assertEqual(result._first_thread_leaker_id, test.id())
3189
self.assertEqual(result.leaks, [(test, set([thread]))])
3190
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3192
def test_multiple_leaks(self):
3193
"""Check multiple leaks are blamed on the test cases at fault
3195
Same concept as the previous test, but has one inner test method that
3196
leaks two threads, and one that doesn't leak at all.
3198
event = threading.Event()
3199
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3200
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3201
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3202
class Test(tests.TestCase):
3203
def test_first_leak(self):
3205
def test_second_no_leak(self):
3207
def test_third_leak(self):
3210
result = self.LeakRecordingResult()
3211
first_test = Test("test_first_leak")
3212
third_test = Test("test_third_leak")
3213
self.addCleanup(thread_a.join)
3214
self.addCleanup(thread_b.join)
3215
self.addCleanup(thread_c.join)
3216
self.addCleanup(event.set)
3217
result.startTestRun()
3219
[first_test, Test("test_second_no_leak"), third_test]
3221
result.stopTestRun()
3222
self.assertEqual(result._tests_leaking_threads_count, 2)
3223
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3224
self.assertEqual(result.leaks, [
3225
(first_test, set([thread_b])),
3226
(third_test, set([thread_a, thread_c]))])
3227
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3230
class TestPostMortemDebugging(tests.TestCase):
3231
"""Check post mortem debugging works when tests fail or error"""
3233
class TracebackRecordingResult(tests.ExtendedTestResult):
3235
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3236
self.postcode = None
3237
def _post_mortem(self, tb=None):
3238
"""Record the code object at the end of the current traceback"""
3239
tb = tb or sys.exc_info()[2]
3242
while next is not None:
3245
self.postcode = tb.tb_frame.f_code
3246
def report_error(self, test, err):
3248
def report_failure(self, test, err):
3251
def test_location_unittest_error(self):
3252
"""Needs right post mortem traceback with erroring unittest case"""
3253
class Test(unittest.TestCase):
3256
result = self.TracebackRecordingResult()
3258
self.assertEqual(result.postcode, Test.runTest.func_code)
3260
def test_location_unittest_failure(self):
3261
"""Needs right post mortem traceback with failing unittest case"""
3262
class Test(unittest.TestCase):
3264
raise self.failureException
3265
result = self.TracebackRecordingResult()
3267
self.assertEqual(result.postcode, Test.runTest.func_code)
3269
def test_location_bt_error(self):
3270
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3271
class Test(tests.TestCase):
3272
def test_error(self):
3274
result = self.TracebackRecordingResult()
3275
Test("test_error").run(result)
3276
self.assertEqual(result.postcode, Test.test_error.func_code)
3278
def test_location_bt_failure(self):
3279
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3280
class Test(tests.TestCase):
3281
def test_failure(self):
3282
raise self.failureException
3283
result = self.TracebackRecordingResult()
3284
Test("test_failure").run(result)
3285
self.assertEqual(result.postcode, Test.test_failure.func_code)
3287
def test_env_var_triggers_post_mortem(self):
3288
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3290
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3291
post_mortem_calls = []
3292
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3293
self.overrideEnv('BZR_TEST_PDB', None)
3294
result._post_mortem(1)
3295
self.overrideEnv('BZR_TEST_PDB', 'on')
3296
result._post_mortem(2)
3297
self.assertEqual([2], post_mortem_calls)
2356
3300
class TestRunSuite(tests.TestCase):
2358
3302
def test_runner_class(self):
2368
3312
return tests.ExtendedTestResult(self.stream, self.descriptions,
2369
3313
self.verbosity)
2370
3314
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2371
self.assertEqual(calls, [suite])
2373
def test_done(self):
2374
"""run_suite should call result.done()"""
2376
def one_more_call(): self.calls += 1
2377
def test_function():
2379
test = unittest.FunctionTestCase(test_function)
2380
class InstrumentedTestResult(tests.ExtendedTestResult):
2381
def done(self): one_more_call()
2382
class MyRunner(tests.TextTestRunner):
2383
def run(self, test):
2384
return InstrumentedTestResult(self.stream, self.descriptions,
2386
tests.run_suite(test, runner_class=MyRunner, stream=StringIO())
2387
self.assertEquals(1, self.calls)
3315
self.assertLength(1, calls)
3318
class TestUncollectedWarnings(tests.TestCase):
3319
"""Check a test case still alive after being run emits a warning"""
3321
class Test(tests.TestCase):
3322
def test_pass(self):
3324
def test_self_ref(self):
3325
self.also_self = self.test_self_ref
3326
def test_skip(self):
3327
self.skip("Don't need")
3329
def _get_suite(self):
3330
return TestUtil.TestSuite([
3331
self.Test("test_pass"),
3332
self.Test("test_self_ref"),
3333
self.Test("test_skip"),
3336
def _inject_stream_into_subunit(self, stream):
3337
"""To be overridden by subclasses that run tests out of process"""
3339
def _run_selftest_with_suite(self, **kwargs):
3341
self._inject_stream_into_subunit(sio)
3342
old_flags = tests.selftest_debug_flags
3343
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3344
gc_on = gc.isenabled()
3348
tests.selftest(test_suite_factory=self._get_suite, stream=sio,
3349
stop_on_failure=False, **kwargs)
3353
tests.selftest_debug_flags = old_flags
3354
output = sio.getvalue()
3355
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3356
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3359
def test_testsuite(self):
3360
self._run_selftest_with_suite()
3362
def test_pattern(self):
3363
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3364
self.assertNotContainsRe(out, "test_skip")
3366
def test_exclude_pattern(self):
3367
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3368
self.assertNotContainsRe(out, "test_skip")
3370
def test_random_seed(self):
3371
self._run_selftest_with_suite(random_seed="now")
3373
def test_matching_tests_first(self):
3374
self._run_selftest_with_suite(matching_tests_first=True,
3375
pattern="test_self_ref$")
3377
def test_starting_with_and_exclude(self):
3378
out = self._run_selftest_with_suite(starting_with=["bt."],
3379
exclude_pattern="test_skip$")
3380
self.assertNotContainsRe(out, "test_skip")
3382
def test_additonal_decorator(self):
3383
out = self._run_selftest_with_suite(
3384
suite_decorators=[tests.TestDecorator])
3387
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3388
"""Check warnings from tests staying alive are emitted with subunit"""
3390
_test_needs_features = [features.subunit]
3392
def _run_selftest_with_suite(self, **kwargs):
3393
return TestUncollectedWarnings._run_selftest_with_suite(self,
3394
runner_class=tests.SubUnitBzrRunner, **kwargs)
3397
class TestUncollectedWarningsForking(TestUncollectedWarnings):
3398
"""Check warnings from tests staying alive are emitted when forking"""
3400
_test_needs_features = [features.subunit]
3402
def _inject_stream_into_subunit(self, stream):
3403
"""Monkey-patch subunit so the extra output goes to stream not stdout
3405
Some APIs need rewriting so this kind of bogus hackery can be replaced
3406
by passing the stream param from run_tests down into ProtocolTestCase.
3408
from subunit import ProtocolTestCase
3409
_original_init = ProtocolTestCase.__init__
3410
def _init_with_passthrough(self, *args, **kwargs):
3411
_original_init(self, *args, **kwargs)
3412
self._passthrough = stream
3413
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3415
def _run_selftest_with_suite(self, **kwargs):
3416
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3417
if getattr(os, "fork", None) is None:
3418
raise tests.TestNotApplicable("Platform doesn't support forking")
3419
# Make sure the fork code is actually invoked by claiming two cores
3420
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3421
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3422
return TestUncollectedWarnings._run_selftest_with_suite(self, **kwargs)
3425
class TestEnvironHandling(tests.TestCase):
3427
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3428
self.assertFalse('MYVAR' in os.environ)
3429
self.overrideEnv('MYVAR', '42')
3430
# We use an embedded test to make sure we fix the _captureVar bug
3431
class Test(tests.TestCase):
3433
# The first call save the 42 value
3434
self.overrideEnv('MYVAR', None)
3435
self.assertEquals(None, os.environ.get('MYVAR'))
3436
# Make sure we can call it twice
3437
self.overrideEnv('MYVAR', None)
3438
self.assertEquals(None, os.environ.get('MYVAR'))
3440
result = tests.TextTestResult(output, 0, 1)
3441
Test('test_me').run(result)
3442
if not result.wasStrictlySuccessful():
3443
self.fail(output.getvalue())
3444
# We get our value back
3445
self.assertEquals('42', os.environ.get('MYVAR'))
3448
class TestIsolatedEnv(tests.TestCase):
3449
"""Test isolating tests from os.environ.
3451
Since we use tests that are already isolated from os.environ a bit of care
3452
should be taken when designing the tests to avoid bootstrap side-effects.
3453
The tests start an already clean os.environ which allow doing valid
3454
assertions about which variables are present or not and design tests around
3458
class ScratchMonkey(tests.TestCase):
3463
def test_basics(self):
3464
# Make sure we know the definition of BZR_HOME: not part of os.environ
3465
# for tests.TestCase.
3466
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3467
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3468
# Being part of isolated_environ, BZR_HOME should not appear here
3469
self.assertFalse('BZR_HOME' in os.environ)
3470
# Make sure we know the definition of LINES: part of os.environ for
3472
self.assertTrue('LINES' in tests.isolated_environ)
3473
self.assertEquals('25', tests.isolated_environ['LINES'])
3474
self.assertEquals('25', os.environ['LINES'])
3476
def test_injecting_unknown_variable(self):
3477
# BZR_HOME is known to be absent from os.environ
3478
test = self.ScratchMonkey('test_me')
3479
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3480
self.assertEquals('foo', os.environ['BZR_HOME'])
3481
tests.restore_os_environ(test)
3482
self.assertFalse('BZR_HOME' in os.environ)
3484
def test_injecting_known_variable(self):
3485
test = self.ScratchMonkey('test_me')
3486
# LINES is known to be present in os.environ
3487
tests.override_os_environ(test, {'LINES': '42'})
3488
self.assertEquals('42', os.environ['LINES'])
3489
tests.restore_os_environ(test)
3490
self.assertEquals('25', os.environ['LINES'])
3492
def test_deleting_variable(self):
3493
test = self.ScratchMonkey('test_me')
3494
# LINES is known to be present in os.environ
3495
tests.override_os_environ(test, {'LINES': None})
3496
self.assertTrue('LINES' not in os.environ)
3497
tests.restore_os_environ(test)
3498
self.assertEquals('25', os.environ['LINES'])
3501
class TestDocTestSuiteIsolation(tests.TestCase):
3502
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3504
Since tests.TestCase alreay provides an isolation from os.environ, we use
3505
the clean environment as a base for testing. To precisely capture the
3506
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3509
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3510
not `os.environ` so each test overrides it to suit its needs.
3514
def get_doctest_suite_for_string(self, klass, string):
3515
class Finder(doctest.DocTestFinder):
3517
def find(*args, **kwargs):
3518
test = doctest.DocTestParser().get_doctest(
3519
string, {}, 'foo', 'foo.py', 0)
3522
suite = klass(test_finder=Finder())
3525
def run_doctest_suite_for_string(self, klass, string):
3526
suite = self.get_doctest_suite_for_string(klass, string)
3528
result = tests.TextTestResult(output, 0, 1)
3530
return result, output
3532
def assertDocTestStringSucceds(self, klass, string):
3533
result, output = self.run_doctest_suite_for_string(klass, string)
3534
if not result.wasStrictlySuccessful():
3535
self.fail(output.getvalue())
3537
def assertDocTestStringFails(self, klass, string):
3538
result, output = self.run_doctest_suite_for_string(klass, string)
3539
if result.wasStrictlySuccessful():
3540
self.fail(output.getvalue())
3542
def test_injected_variable(self):
3543
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3546
>>> os.environ['LINES']
3549
# doctest.DocTestSuite fails as it sees '25'
3550
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3551
# tests.DocTestSuite sees '42'
3552
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3554
def test_deleted_variable(self):
3555
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3558
>>> os.environ.get('LINES')
3560
# doctest.DocTestSuite fails as it sees '25'
3561
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3562
# tests.DocTestSuite sees None
3563
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3566
class TestSelftestExcludePatterns(tests.TestCase):
3569
super(TestSelftestExcludePatterns, self).setUp()
3570
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3572
def suite_factory(self, keep_only=None, starting_with=None):
3573
"""A test suite factory with only a few tests."""
3574
class Test(tests.TestCase):
3576
# We don't need the full class path
3577
return self._testMethodName
3584
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3586
def assertTestList(self, expected, *selftest_args):
3587
# We rely on setUp installing the right test suite factory so we can
3588
# test at the command level without loading the whole test suite
3589
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3590
actual = out.splitlines()
3591
self.assertEquals(expected, actual)
3593
def test_full_list(self):
3594
self.assertTestList(['a', 'b', 'c'])
3596
def test_single_exclude(self):
3597
self.assertTestList(['b', 'c'], '-x', 'a')
3599
def test_mutiple_excludes(self):
3600
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3603
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3605
_test_needs_features = [features.subunit]
3608
super(TestCounterHooks, self).setUp()
3609
class Test(tests.TestCase):
3612
super(Test, self).setUp()
3613
self.hooks = hooks.Hooks()
3614
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3615
self.install_counter_hook(self.hooks, 'myhook')
3620
def run_hook_once(self):
3621
for hook in self.hooks['myhook']:
3624
self.test_class = Test
3626
def assertHookCalls(self, expected_calls, test_name):
3627
test = self.test_class(test_name)
3628
result = unittest.TestResult()
3630
self.assertTrue(hasattr(test, '_counters'))
3631
self.assertTrue(test._counters.has_key('myhook'))
3632
self.assertEquals(expected_calls, test._counters['myhook'])
3634
def test_no_hook(self):
3635
self.assertHookCalls(0, 'no_hook')
3637
def test_run_hook_once(self):
3638
tt = features.testtools
3639
if tt.module.__version__ < (0, 9, 8):
3640
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3641
self.assertHookCalls(1, 'run_hook_once')