620
580
def test_dangling_locks_cause_failures(self):
621
581
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
622
582
def test_function(self):
623
t = self.get_transport_from_path('.')
583
t = self.get_transport('.')
624
584
l = lockdir.LockDir(t, 'lock')
627
587
test = TestDanglingLock('test_function')
628
588
result = test.run()
629
total_failures = result.errors + result.failures
630
589
if self._lock_check_thorough:
631
self.assertEqual(1, len(total_failures))
590
self.assertEqual(1, len(result.errors))
633
592
# When _lock_check_thorough is disabled, then we don't trigger a
635
self.assertEqual(0, len(total_failures))
594
self.assertEqual(0, len(result.errors))
638
597
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
639
598
"""Tests for the convenience functions TestCaseWithTransport introduces."""
641
600
def test_get_readonly_url_none(self):
601
from bzrlib.transport import get_transport
602
from bzrlib.transport.memory import MemoryServer
642
603
from bzrlib.transport.readonly import ReadonlyTransportDecorator
643
self.vfs_transport_factory = memory.MemoryServer
604
self.vfs_transport_factory = MemoryServer
644
605
self.transport_readonly_server = None
645
606
# calling get_readonly_transport() constructs a decorator on the url
647
608
url = self.get_readonly_url()
648
609
url2 = self.get_readonly_url('foo/bar')
649
t = transport.get_transport_from_url(url)
650
t2 = transport.get_transport_from_url(url2)
651
self.assertIsInstance(t, ReadonlyTransportDecorator)
652
self.assertIsInstance(t2, ReadonlyTransportDecorator)
610
t = get_transport(url)
611
t2 = get_transport(url2)
612
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
613
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
653
614
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
655
616
def test_get_readonly_url_http(self):
656
617
from bzrlib.tests.http_server import HttpServer
618
from bzrlib.transport import get_transport
619
from bzrlib.transport.local import LocalURLServer
657
620
from bzrlib.transport.http import HttpTransportBase
658
self.transport_server = test_server.LocalURLServer
621
self.transport_server = LocalURLServer
659
622
self.transport_readonly_server = HttpServer
660
623
# calling get_readonly_transport() gives us a HTTP server instance.
661
624
url = self.get_readonly_url()
662
625
url2 = self.get_readonly_url('foo/bar')
663
626
# the transport returned may be any HttpTransportBase subclass
664
t = transport.get_transport_from_url(url)
665
t2 = transport.get_transport_from_url(url2)
666
self.assertIsInstance(t, HttpTransportBase)
667
self.assertIsInstance(t2, HttpTransportBase)
627
t = get_transport(url)
628
t2 = get_transport(url2)
629
self.failUnless(isinstance(t, HttpTransportBase))
630
self.failUnless(isinstance(t2, HttpTransportBase))
668
631
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
670
633
def test_is_directory(self):
810
818
self.assertContainsRe(output,
811
819
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
813
def test_uses_time_from_testtools(self):
814
"""Test case timings in verbose results should use testtools times"""
816
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
817
def startTest(self, test):
818
self.time(datetime.datetime.utcfromtimestamp(1.145))
819
super(TimeAddedVerboseTestResult, self).startTest(test)
820
def addSuccess(self, test):
821
self.time(datetime.datetime.utcfromtimestamp(51.147))
822
super(TimeAddedVerboseTestResult, self).addSuccess(test)
823
def report_tests_starting(self): pass
825
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
826
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
828
821
def test_known_failure(self):
829
"""Using knownFailure should trigger several result actions."""
822
"""A KnownFailure being raised should trigger several result actions."""
830
823
class InstrumentedTestResult(tests.ExtendedTestResult):
831
824
def stopTestRun(self): pass
832
def report_tests_starting(self): pass
833
def report_known_failure(self, test, err=None, details=None):
834
self._call = test, 'known failure'
825
def startTests(self): pass
826
def report_test_start(self, test): pass
827
def report_known_failure(self, test, err):
828
self._call = test, err
835
829
result = InstrumentedTestResult(None, None, None, None)
836
class Test(tests.TestCase):
837
def test_function(self):
838
self.knownFailure('failed!')
839
test = Test("test_function")
831
raise tests.KnownFailure('failed!')
832
test = unittest.FunctionTestCase(test_function)
841
834
# it should invoke 'report_known_failure'.
842
835
self.assertEqual(2, len(result._call))
843
self.assertEqual(test.id(), result._call[0].id())
844
self.assertEqual('known failure', result._call[1])
836
self.assertEqual(test, result._call[0])
837
self.assertEqual(tests.KnownFailure, result._call[1][0])
838
self.assertIsInstance(result._call[1][1], tests.KnownFailure)
845
839
# we dont introspec the traceback, if the rest is ok, it would be
846
840
# exceptional for it not to be.
847
841
# it should update the known_failure_count on the object.
1193
def test_verbose_test_count(self):
1194
"""A verbose test run reports the right test count at the start"""
1195
suite = TestUtil.TestSuite([
1196
unittest.FunctionTestCase(lambda:None),
1197
unittest.FunctionTestCase(lambda:None)])
1198
self.assertEqual(suite.countTestCases(), 2)
1200
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1201
# Need to use the CountingDecorator as that's what sets num_tests
1202
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1203
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1169
def _patch_get_bzr_source_tree(self):
1170
# Reading from the actual source tree breaks isolation, but we don't
1171
# want to assume that thats *all* that would happen.
1172
self._get_source_tree_calls = []
1173
def _get_bzr_source_tree():
1174
self._get_source_tree_calls.append("called")
1176
orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
1177
bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
1179
bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
1180
self.addCleanup(restore)
1182
def test_bench_history(self):
1183
# tests that the running the benchmark passes bench_history into
1184
# the test result object. We can tell that happens if
1185
# _get_bzr_source_tree is called.
1186
self._patch_get_bzr_source_tree()
1187
test = TestRunner('dummy_test')
1189
runner = tests.TextTestRunner(stream=self._log_file,
1190
bench_history=output)
1191
result = self.run_test_runner(runner, test)
1192
output_string = output.getvalue()
1193
self.assertContainsRe(output_string, "--date [0-9.]+")
1194
self.assertLength(1, self._get_source_tree_calls)
1196
def assertLogDeleted(self, test):
1197
log = test._get_log()
1198
self.assertEqual("DELETED log file to reduce memory footprint", log)
1199
self.assertEqual('', test._log_contents)
1200
self.assertIs(None, test._log_file_name)
1202
def test_success_log_deleted(self):
1203
"""Successful tests have their log deleted"""
1205
class LogTester(tests.TestCase):
1207
def test_success(self):
1208
self.log('this will be removed\n')
1211
runner = tests.TextTestRunner(stream=sio)
1212
test = LogTester('test_success')
1213
result = self.run_test_runner(runner, test)
1215
self.assertLogDeleted(test)
1217
def test_skipped_log_deleted(self):
1218
"""Skipped tests have their log deleted"""
1220
class LogTester(tests.TestCase):
1222
def test_skipped(self):
1223
self.log('this will be removed\n')
1224
raise tests.TestSkipped()
1227
runner = tests.TextTestRunner(stream=sio)
1228
test = LogTester('test_skipped')
1229
result = self.run_test_runner(runner, test)
1231
self.assertLogDeleted(test)
1233
def test_not_aplicable_log_deleted(self):
1234
"""Not applicable tests have their log deleted"""
1236
class LogTester(tests.TestCase):
1238
def test_not_applicable(self):
1239
self.log('this will be removed\n')
1240
raise tests.TestNotApplicable()
1243
runner = tests.TextTestRunner(stream=sio)
1244
test = LogTester('test_not_applicable')
1245
result = self.run_test_runner(runner, test)
1247
self.assertLogDeleted(test)
1249
def test_known_failure_log_deleted(self):
1250
"""Know failure tests have their log deleted"""
1252
class LogTester(tests.TestCase):
1254
def test_known_failure(self):
1255
self.log('this will be removed\n')
1256
raise tests.KnownFailure()
1259
runner = tests.TextTestRunner(stream=sio)
1260
test = LogTester('test_known_failure')
1261
result = self.run_test_runner(runner, test)
1263
self.assertLogDeleted(test)
1265
def test_fail_log_kept(self):
1266
"""Failed tests have their log kept"""
1268
class LogTester(tests.TestCase):
1270
def test_fail(self):
1271
self.log('this will be kept\n')
1272
self.fail('this test fails')
1275
runner = tests.TextTestRunner(stream=sio)
1276
test = LogTester('test_fail')
1277
result = self.run_test_runner(runner, test)
1279
text = sio.getvalue()
1280
self.assertContainsRe(text, 'this will be kept')
1281
self.assertContainsRe(text, 'this test fails')
1283
log = test._get_log()
1284
self.assertContainsRe(log, 'this will be kept')
1285
self.assertEqual(log, test._log_contents)
1287
def test_error_log_kept(self):
1288
"""Tests with errors have their log kept"""
1290
class LogTester(tests.TestCase):
1292
def test_error(self):
1293
self.log('this will be kept\n')
1294
raise ValueError('random exception raised')
1297
runner = tests.TextTestRunner(stream=sio)
1298
test = LogTester('test_error')
1299
result = self.run_test_runner(runner, test)
1301
text = sio.getvalue()
1302
self.assertContainsRe(text, 'this will be kept')
1303
self.assertContainsRe(text, 'random exception raised')
1305
log = test._get_log()
1306
self.assertContainsRe(log, 'this will be kept')
1307
self.assertEqual(log, test._log_contents)
1205
1309
def test_startTestRun(self):
1206
1310
"""run should call result.startTestRun()"""
1208
class LoggingDecorator(ExtendedToOriginalDecorator):
1312
class LoggingDecorator(tests.ForwardingResult):
1209
1313
def startTestRun(self):
1210
ExtendedToOriginalDecorator.startTestRun(self)
1314
tests.ForwardingResult.startTestRun(self)
1211
1315
calls.append('startTestRun')
1212
1316
test = unittest.FunctionTestCase(lambda:None)
1213
1317
stream = StringIO()
1624
1712
self.assertRaises(AssertionError,
1625
1713
self.assertListRaises, _TestException, success_generator)
1627
def test_overrideAttr_without_value(self):
1628
self.test_attr = 'original' # Define a test attribute
1629
obj = self # Make 'obj' visible to the embedded test
1630
class Test(tests.TestCase):
1633
tests.TestCase.setUp(self)
1634
self.orig = self.overrideAttr(obj, 'test_attr')
1636
def test_value(self):
1637
self.assertEqual('original', self.orig)
1638
self.assertEqual('original', obj.test_attr)
1639
obj.test_attr = 'modified'
1640
self.assertEqual('modified', obj.test_attr)
1642
test = Test('test_value')
1643
test.run(unittest.TestResult())
1644
self.assertEqual('original', obj.test_attr)
1646
def test_overrideAttr_with_value(self):
1647
self.test_attr = 'original' # Define a test attribute
1648
obj = self # Make 'obj' visible to the embedded test
1649
class Test(tests.TestCase):
1652
tests.TestCase.setUp(self)
1653
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1655
def test_value(self):
1656
self.assertEqual('original', self.orig)
1657
self.assertEqual('modified', obj.test_attr)
1659
test = Test('test_value')
1660
test.run(unittest.TestResult())
1661
self.assertEqual('original', obj.test_attr)
1663
def test_recordCalls(self):
1664
from bzrlib.tests import test_selftest
1665
calls = self.recordCalls(
1666
test_selftest, '_add_numbers')
1667
self.assertEqual(test_selftest._add_numbers(2, 10),
1669
self.assertEquals(calls, [((2, 10), {})])
1672
def _add_numbers(a, b):
1676
class _MissingFeature(features.Feature):
1679
missing_feature = _MissingFeature()
1682
def _get_test(name):
1683
"""Get an instance of a specific example test.
1685
We protect this in a function so that they don't auto-run in the test
1689
class ExampleTests(tests.TestCase):
1691
def test_fail(self):
1692
mutter('this was a failing test')
1693
self.fail('this test will fail')
1695
def test_error(self):
1696
mutter('this test errored')
1697
raise RuntimeError('gotcha')
1699
def test_missing_feature(self):
1700
mutter('missing the feature')
1701
self.requireFeature(missing_feature)
1703
def test_skip(self):
1704
mutter('this test will be skipped')
1705
raise tests.TestSkipped('reason')
1707
def test_success(self):
1708
mutter('this test succeeds')
1710
def test_xfail(self):
1711
mutter('test with expected failure')
1712
self.knownFailure('this_fails')
1714
def test_unexpected_success(self):
1715
mutter('test with unexpected success')
1716
self.expectFailure('should_fail', lambda: None)
1718
return ExampleTests(name)
1721
class TestTestCaseLogDetails(tests.TestCase):
1723
def _run_test(self, test_name):
1724
test = _get_test(test_name)
1725
result = testtools.TestResult()
1729
def test_fail_has_log(self):
1730
result = self._run_test('test_fail')
1731
self.assertEqual(1, len(result.failures))
1732
result_content = result.failures[0][1]
1733
self.assertContainsRe(result_content,
1734
'(?m)^(?:Text attachment: )?log(?:$|: )')
1735
self.assertContainsRe(result_content, 'this was a failing test')
1737
def test_error_has_log(self):
1738
result = self._run_test('test_error')
1739
self.assertEqual(1, len(result.errors))
1740
result_content = result.errors[0][1]
1741
self.assertContainsRe(result_content,
1742
'(?m)^(?:Text attachment: )?log(?:$|: )')
1743
self.assertContainsRe(result_content, 'this test errored')
1745
def test_skip_has_no_log(self):
1746
result = self._run_test('test_skip')
1747
self.assertEqual(['reason'], result.skip_reasons.keys())
1748
skips = result.skip_reasons['reason']
1749
self.assertEqual(1, len(skips))
1751
self.assertFalse('log' in test.getDetails())
1753
def test_missing_feature_has_no_log(self):
1754
# testtools doesn't know about addNotSupported, so it just gets
1755
# considered as a skip
1756
result = self._run_test('test_missing_feature')
1757
self.assertEqual([missing_feature], result.skip_reasons.keys())
1758
skips = result.skip_reasons[missing_feature]
1759
self.assertEqual(1, len(skips))
1761
self.assertFalse('log' in test.getDetails())
1763
def test_xfail_has_no_log(self):
1764
result = self._run_test('test_xfail')
1765
self.assertEqual(1, len(result.expectedFailures))
1766
result_content = result.expectedFailures[0][1]
1767
self.assertNotContainsRe(result_content,
1768
'(?m)^(?:Text attachment: )?log(?:$|: )')
1769
self.assertNotContainsRe(result_content, 'test with expected failure')
1771
def test_unexpected_success_has_log(self):
1772
result = self._run_test('test_unexpected_success')
1773
self.assertEqual(1, len(result.unexpectedSuccesses))
1774
# Inconsistency, unexpectedSuccesses is a list of tests,
1775
# expectedFailures is a list of reasons?
1776
test = result.unexpectedSuccesses[0]
1777
details = test.getDetails()
1778
self.assertTrue('log' in details)
1781
class TestTestCloning(tests.TestCase):
1782
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1784
def test_cloned_testcase_does_not_share_details(self):
1785
"""A TestCase cloned with clone_test does not share mutable attributes
1786
such as details or cleanups.
1788
class Test(tests.TestCase):
1790
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1791
orig_test = Test('test_foo')
1792
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1793
orig_test.run(unittest.TestResult())
1794
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1795
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1797
def test_double_apply_scenario_preserves_first_scenario(self):
1798
"""Applying two levels of scenarios to a test preserves the attributes
1799
added by both scenarios.
1801
class Test(tests.TestCase):
1804
test = Test('test_foo')
1805
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1806
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1807
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1808
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1809
all_tests = list(tests.iter_suite_tests(suite))
1810
self.assertLength(4, all_tests)
1811
all_xys = sorted((t.x, t.y) for t in all_tests)
1812
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1815
1716
# NB: Don't delete this; it's not actually from 0.11!
1816
1717
@deprecated_function(deprecated_in((0, 11, 0)))
2125
2032
load_list='missing file name', list_only=True)
2128
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2130
_test_needs_features = [features.subunit]
2132
def run_subunit_stream(self, test_name):
2133
from subunit import ProtocolTestCase
2135
return TestUtil.TestSuite([_get_test(test_name)])
2136
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2137
test_suite_factory=factory)
2138
test = ProtocolTestCase(stream)
2139
result = testtools.TestResult()
2141
content = stream.getvalue()
2142
return content, result
2144
def test_fail_has_log(self):
2145
content, result = self.run_subunit_stream('test_fail')
2146
self.assertEqual(1, len(result.failures))
2147
self.assertContainsRe(content, '(?m)^log$')
2148
self.assertContainsRe(content, 'this test will fail')
2150
def test_error_has_log(self):
2151
content, result = self.run_subunit_stream('test_error')
2152
self.assertContainsRe(content, '(?m)^log$')
2153
self.assertContainsRe(content, 'this test errored')
2155
def test_skip_has_no_log(self):
2156
content, result = self.run_subunit_stream('test_skip')
2157
self.assertNotContainsRe(content, '(?m)^log$')
2158
self.assertNotContainsRe(content, 'this test will be skipped')
2159
self.assertEqual(['reason'], result.skip_reasons.keys())
2160
skips = result.skip_reasons['reason']
2161
self.assertEqual(1, len(skips))
2163
# RemotedTestCase doesn't preserve the "details"
2164
## self.assertFalse('log' in test.getDetails())
2166
def test_missing_feature_has_no_log(self):
2167
content, result = self.run_subunit_stream('test_missing_feature')
2168
self.assertNotContainsRe(content, '(?m)^log$')
2169
self.assertNotContainsRe(content, 'missing the feature')
2170
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2171
skips = result.skip_reasons['_MissingFeature\n']
2172
self.assertEqual(1, len(skips))
2174
# RemotedTestCase doesn't preserve the "details"
2175
## self.assertFalse('log' in test.getDetails())
2177
def test_xfail_has_no_log(self):
2178
content, result = self.run_subunit_stream('test_xfail')
2179
self.assertNotContainsRe(content, '(?m)^log$')
2180
self.assertNotContainsRe(content, 'test with expected failure')
2181
self.assertEqual(1, len(result.expectedFailures))
2182
result_content = result.expectedFailures[0][1]
2183
self.assertNotContainsRe(result_content,
2184
'(?m)^(?:Text attachment: )?log(?:$|: )')
2185
self.assertNotContainsRe(result_content, 'test with expected failure')
2187
def test_unexpected_success_has_log(self):
2188
content, result = self.run_subunit_stream('test_unexpected_success')
2189
self.assertContainsRe(content, '(?m)^log$')
2190
self.assertContainsRe(content, 'test with unexpected success')
2191
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2192
# success, if a min version check is added remove this
2193
from subunit import TestProtocolClient as _Client
2194
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2195
self.expectFailure('subunit treats "unexpectedSuccess"'
2196
' as a plain success',
2197
self.assertEqual, 1, len(result.unexpectedSuccesses))
2198
self.assertEqual(1, len(result.unexpectedSuccesses))
2199
test = result.unexpectedSuccesses[0]
2200
# RemotedTestCase doesn't preserve the "details"
2201
## self.assertTrue('log' in test.getDetails())
2203
def test_success_has_no_log(self):
2204
content, result = self.run_subunit_stream('test_success')
2205
self.assertEqual(1, result.testsRun)
2206
self.assertNotContainsRe(content, '(?m)^log$')
2207
self.assertNotContainsRe(content, 'this test succeeds')
2210
2035
class TestRunBzr(tests.TestCase):
3129
3010
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3132
class TestThreadLeakDetection(tests.TestCase):
3133
"""Ensure when tests leak threads we detect and report it"""
3135
class LeakRecordingResult(tests.ExtendedTestResult):
3137
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3139
def _report_thread_leak(self, test, leaks, alive):
3140
self.leaks.append((test, leaks))
3142
def test_testcase_without_addCleanups(self):
3143
"""Check old TestCase instances don't break with leak detection"""
3144
class Test(unittest.TestCase):
3147
result = self.LeakRecordingResult()
3149
result.startTestRun()
3151
result.stopTestRun()
3152
self.assertEqual(result._tests_leaking_threads_count, 0)
3153
self.assertEqual(result.leaks, [])
3155
def test_thread_leak(self):
3156
"""Ensure a thread that outlives the running of a test is reported
3158
Uses a thread that blocks on an event, and is started by the inner
3159
test case. As the thread outlives the inner case's run, it should be
3160
detected as a leak, but the event is then set so that the thread can
3161
be safely joined in cleanup so it's not leaked for real.
3163
event = threading.Event()
3164
thread = threading.Thread(name="Leaker", target=event.wait)
3165
class Test(tests.TestCase):
3166
def test_leak(self):
3168
result = self.LeakRecordingResult()
3169
test = Test("test_leak")
3170
self.addCleanup(thread.join)
3171
self.addCleanup(event.set)
3172
result.startTestRun()
3174
result.stopTestRun()
3175
self.assertEqual(result._tests_leaking_threads_count, 1)
3176
self.assertEqual(result._first_thread_leaker_id, test.id())
3177
self.assertEqual(result.leaks, [(test, set([thread]))])
3178
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3180
def test_multiple_leaks(self):
3181
"""Check multiple leaks are blamed on the test cases at fault
3183
Same concept as the previous test, but has one inner test method that
3184
leaks two threads, and one that doesn't leak at all.
3186
event = threading.Event()
3187
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3188
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3189
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3190
class Test(tests.TestCase):
3191
def test_first_leak(self):
3193
def test_second_no_leak(self):
3195
def test_third_leak(self):
3198
result = self.LeakRecordingResult()
3199
first_test = Test("test_first_leak")
3200
third_test = Test("test_third_leak")
3201
self.addCleanup(thread_a.join)
3202
self.addCleanup(thread_b.join)
3203
self.addCleanup(thread_c.join)
3204
self.addCleanup(event.set)
3205
result.startTestRun()
3207
[first_test, Test("test_second_no_leak"), third_test]
3209
result.stopTestRun()
3210
self.assertEqual(result._tests_leaking_threads_count, 2)
3211
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3212
self.assertEqual(result.leaks, [
3213
(first_test, set([thread_b])),
3214
(third_test, set([thread_a, thread_c]))])
3215
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3218
class TestPostMortemDebugging(tests.TestCase):
3219
"""Check post mortem debugging works when tests fail or error"""
3221
class TracebackRecordingResult(tests.ExtendedTestResult):
3223
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3224
self.postcode = None
3225
def _post_mortem(self, tb=None):
3226
"""Record the code object at the end of the current traceback"""
3227
tb = tb or sys.exc_info()[2]
3230
while next is not None:
3233
self.postcode = tb.tb_frame.f_code
3234
def report_error(self, test, err):
3236
def report_failure(self, test, err):
3239
def test_location_unittest_error(self):
3240
"""Needs right post mortem traceback with erroring unittest case"""
3241
class Test(unittest.TestCase):
3244
result = self.TracebackRecordingResult()
3246
self.assertEqual(result.postcode, Test.runTest.func_code)
3248
def test_location_unittest_failure(self):
3249
"""Needs right post mortem traceback with failing unittest case"""
3250
class Test(unittest.TestCase):
3252
raise self.failureException
3253
result = self.TracebackRecordingResult()
3255
self.assertEqual(result.postcode, Test.runTest.func_code)
3257
def test_location_bt_error(self):
3258
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3259
class Test(tests.TestCase):
3260
def test_error(self):
3262
result = self.TracebackRecordingResult()
3263
Test("test_error").run(result)
3264
self.assertEqual(result.postcode, Test.test_error.func_code)
3266
def test_location_bt_failure(self):
3267
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3268
class Test(tests.TestCase):
3269
def test_failure(self):
3270
raise self.failureException
3271
result = self.TracebackRecordingResult()
3272
Test("test_failure").run(result)
3273
self.assertEqual(result.postcode, Test.test_failure.func_code)
3275
def test_env_var_triggers_post_mortem(self):
3276
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3278
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3279
post_mortem_calls = []
3280
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3281
self.overrideEnv('BZR_TEST_PDB', None)
3282
result._post_mortem(1)
3283
self.overrideEnv('BZR_TEST_PDB', 'on')
3284
result._post_mortem(2)
3285
self.assertEqual([2], post_mortem_calls)
3288
3013
class TestRunSuite(tests.TestCase):
3290
3015
def test_runner_class(self):
3301
3026
self.verbosity)
3302
3027
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3303
3028
self.assertLength(1, calls)
3306
class _Selftest(object):
3307
"""Mixin for tests needing full selftest output"""
3309
def _inject_stream_into_subunit(self, stream):
3310
"""To be overridden by subclasses that run tests out of process"""
3312
def _run_selftest(self, **kwargs):
3314
self._inject_stream_into_subunit(sio)
3315
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3316
return sio.getvalue()
3319
class _ForkedSelftest(_Selftest):
3320
"""Mixin for tests needing full selftest output with forked children"""
3322
_test_needs_features = [features.subunit]
3324
def _inject_stream_into_subunit(self, stream):
3325
"""Monkey-patch subunit so the extra output goes to stream not stdout
3327
Some APIs need rewriting so this kind of bogus hackery can be replaced
3328
by passing the stream param from run_tests down into ProtocolTestCase.
3330
from subunit import ProtocolTestCase
3331
_original_init = ProtocolTestCase.__init__
3332
def _init_with_passthrough(self, *args, **kwargs):
3333
_original_init(self, *args, **kwargs)
3334
self._passthrough = stream
3335
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3337
def _run_selftest(self, **kwargs):
3338
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3339
if getattr(os, "fork", None) is None:
3340
raise tests.TestNotApplicable("Platform doesn't support forking")
3341
# Make sure the fork code is actually invoked by claiming two cores
3342
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3343
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3344
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3347
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3348
"""Check operation of --parallel=fork selftest option"""
3350
def test_error_in_child_during_fork(self):
3351
"""Error in a forked child during test setup should get reported"""
3352
class Test(tests.TestCase):
3353
def testMethod(self):
3355
# We don't care what, just break something that a child will run
3356
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3357
out = self._run_selftest(test_suite_factory=Test)
3358
# Lines from the tracebacks of the two child processes may be mixed
3359
# together due to the way subunit parses and forwards the streams,
3360
# so permit extra lines between each part of the error output.
3361
self.assertContainsRe(out,
3364
".+ in fork_for_tests\n"
3366
"\s*workaround_zealous_crypto_random\(\)\n"
3371
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3372
"""Check a test case still alive after being run emits a warning"""
3374
class Test(tests.TestCase):
3375
def test_pass(self):
3377
def test_self_ref(self):
3378
self.also_self = self.test_self_ref
3379
def test_skip(self):
3380
self.skip("Don't need")
3382
def _get_suite(self):
3383
return TestUtil.TestSuite([
3384
self.Test("test_pass"),
3385
self.Test("test_self_ref"),
3386
self.Test("test_skip"),
3389
def _run_selftest_with_suite(self, **kwargs):
3390
old_flags = tests.selftest_debug_flags
3391
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3392
gc_on = gc.isenabled()
3396
output = self._run_selftest(test_suite_factory=self._get_suite,
3401
tests.selftest_debug_flags = old_flags
3402
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3403
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3406
def test_testsuite(self):
3407
self._run_selftest_with_suite()
3409
def test_pattern(self):
3410
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3411
self.assertNotContainsRe(out, "test_skip")
3413
def test_exclude_pattern(self):
3414
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3415
self.assertNotContainsRe(out, "test_skip")
3417
def test_random_seed(self):
3418
self._run_selftest_with_suite(random_seed="now")
3420
def test_matching_tests_first(self):
3421
self._run_selftest_with_suite(matching_tests_first=True,
3422
pattern="test_self_ref$")
3424
def test_starting_with_and_exclude(self):
3425
out = self._run_selftest_with_suite(starting_with=["bt."],
3426
exclude_pattern="test_skip$")
3427
self.assertNotContainsRe(out, "test_skip")
3429
def test_additonal_decorator(self):
3430
out = self._run_selftest_with_suite(
3431
suite_decorators=[tests.TestDecorator])
3434
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3435
"""Check warnings from tests staying alive are emitted with subunit"""
3437
_test_needs_features = [features.subunit]
3439
def _run_selftest_with_suite(self, **kwargs):
3440
return TestUncollectedWarnings._run_selftest_with_suite(self,
3441
runner_class=tests.SubUnitBzrRunner, **kwargs)
3444
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3445
"""Check warnings from tests staying alive are emitted when forking"""
3448
class TestEnvironHandling(tests.TestCase):
3450
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3451
self.assertFalse('MYVAR' in os.environ)
3452
self.overrideEnv('MYVAR', '42')
3453
# We use an embedded test to make sure we fix the _captureVar bug
3454
class Test(tests.TestCase):
3456
# The first call save the 42 value
3457
self.overrideEnv('MYVAR', None)
3458
self.assertEquals(None, os.environ.get('MYVAR'))
3459
# Make sure we can call it twice
3460
self.overrideEnv('MYVAR', None)
3461
self.assertEquals(None, os.environ.get('MYVAR'))
3463
result = tests.TextTestResult(output, 0, 1)
3464
Test('test_me').run(result)
3465
if not result.wasStrictlySuccessful():
3466
self.fail(output.getvalue())
3467
# We get our value back
3468
self.assertEquals('42', os.environ.get('MYVAR'))
3471
class TestIsolatedEnv(tests.TestCase):
3472
"""Test isolating tests from os.environ.
3474
Since we use tests that are already isolated from os.environ a bit of care
3475
should be taken when designing the tests to avoid bootstrap side-effects.
3476
The tests start an already clean os.environ which allow doing valid
3477
assertions about which variables are present or not and design tests around
3481
class ScratchMonkey(tests.TestCase):
3486
def test_basics(self):
3487
# Make sure we know the definition of BZR_HOME: not part of os.environ
3488
# for tests.TestCase.
3489
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3490
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3491
# Being part of isolated_environ, BZR_HOME should not appear here
3492
self.assertFalse('BZR_HOME' in os.environ)
3493
# Make sure we know the definition of LINES: part of os.environ for
3495
self.assertTrue('LINES' in tests.isolated_environ)
3496
self.assertEquals('25', tests.isolated_environ['LINES'])
3497
self.assertEquals('25', os.environ['LINES'])
3499
def test_injecting_unknown_variable(self):
3500
# BZR_HOME is known to be absent from os.environ
3501
test = self.ScratchMonkey('test_me')
3502
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3503
self.assertEquals('foo', os.environ['BZR_HOME'])
3504
tests.restore_os_environ(test)
3505
self.assertFalse('BZR_HOME' in os.environ)
3507
def test_injecting_known_variable(self):
3508
test = self.ScratchMonkey('test_me')
3509
# LINES is known to be present in os.environ
3510
tests.override_os_environ(test, {'LINES': '42'})
3511
self.assertEquals('42', os.environ['LINES'])
3512
tests.restore_os_environ(test)
3513
self.assertEquals('25', os.environ['LINES'])
3515
def test_deleting_variable(self):
3516
test = self.ScratchMonkey('test_me')
3517
# LINES is known to be present in os.environ
3518
tests.override_os_environ(test, {'LINES': None})
3519
self.assertTrue('LINES' not in os.environ)
3520
tests.restore_os_environ(test)
3521
self.assertEquals('25', os.environ['LINES'])
3524
class TestDocTestSuiteIsolation(tests.TestCase):
3525
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3527
Since tests.TestCase alreay provides an isolation from os.environ, we use
3528
the clean environment as a base for testing. To precisely capture the
3529
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3532
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3533
not `os.environ` so each test overrides it to suit its needs.
3537
def get_doctest_suite_for_string(self, klass, string):
3538
class Finder(doctest.DocTestFinder):
3540
def find(*args, **kwargs):
3541
test = doctest.DocTestParser().get_doctest(
3542
string, {}, 'foo', 'foo.py', 0)
3545
suite = klass(test_finder=Finder())
3548
def run_doctest_suite_for_string(self, klass, string):
3549
suite = self.get_doctest_suite_for_string(klass, string)
3551
result = tests.TextTestResult(output, 0, 1)
3553
return result, output
3555
def assertDocTestStringSucceds(self, klass, string):
3556
result, output = self.run_doctest_suite_for_string(klass, string)
3557
if not result.wasStrictlySuccessful():
3558
self.fail(output.getvalue())
3560
def assertDocTestStringFails(self, klass, string):
3561
result, output = self.run_doctest_suite_for_string(klass, string)
3562
if result.wasStrictlySuccessful():
3563
self.fail(output.getvalue())
3565
def test_injected_variable(self):
3566
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3569
>>> os.environ['LINES']
3572
# doctest.DocTestSuite fails as it sees '25'
3573
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3574
# tests.DocTestSuite sees '42'
3575
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3577
def test_deleted_variable(self):
3578
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3581
>>> os.environ.get('LINES')
3583
# doctest.DocTestSuite fails as it sees '25'
3584
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3585
# tests.DocTestSuite sees None
3586
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3589
class TestSelftestExcludePatterns(tests.TestCase):
3592
super(TestSelftestExcludePatterns, self).setUp()
3593
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3595
def suite_factory(self, keep_only=None, starting_with=None):
3596
"""A test suite factory with only a few tests."""
3597
class Test(tests.TestCase):
3599
# We don't need the full class path
3600
return self._testMethodName
3607
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3609
def assertTestList(self, expected, *selftest_args):
3610
# We rely on setUp installing the right test suite factory so we can
3611
# test at the command level without loading the whole test suite
3612
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3613
actual = out.splitlines()
3614
self.assertEquals(expected, actual)
3616
def test_full_list(self):
3617
self.assertTestList(['a', 'b', 'c'])
3619
def test_single_exclude(self):
3620
self.assertTestList(['b', 'c'], '-x', 'a')
3622
def test_mutiple_excludes(self):
3623
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3626
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3628
_test_needs_features = [features.subunit]
3631
super(TestCounterHooks, self).setUp()
3632
class Test(tests.TestCase):
3635
super(Test, self).setUp()
3636
self.hooks = hooks.Hooks()
3637
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3638
self.install_counter_hook(self.hooks, 'myhook')
3643
def run_hook_once(self):
3644
for hook in self.hooks['myhook']:
3647
self.test_class = Test
3649
def assertHookCalls(self, expected_calls, test_name):
3650
test = self.test_class(test_name)
3651
result = unittest.TestResult()
3653
self.assertTrue(hasattr(test, '_counters'))
3654
self.assertTrue(test._counters.has_key('myhook'))
3655
self.assertEquals(expected_calls, test._counters['myhook'])
3657
def test_no_hook(self):
3658
self.assertHookCalls(0, 'no_hook')
3660
def test_run_hook_once(self):
3661
tt = features.testtools
3662
if tt.module.__version__ < (0, 9, 8):
3663
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3664
self.assertHookCalls(1, 'run_hook_once')