609
617
test = TestDanglingLock('test_function')
610
618
result = test.run()
619
total_failures = result.errors + result.failures
611
620
if self._lock_check_thorough:
612
self.assertEqual(1, len(result.errors))
621
self.assertEqual(1, len(total_failures))
614
623
# When _lock_check_thorough is disabled, then we don't trigger a
616
self.assertEqual(0, len(result.errors))
625
self.assertEqual(0, len(total_failures))
619
628
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
620
629
"""Tests for the convenience functions TestCaseWithTransport introduces."""
622
631
def test_get_readonly_url_none(self):
623
from bzrlib.transport import get_transport
624
from bzrlib.transport.memory import MemoryServer
625
632
from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
self.vfs_transport_factory = MemoryServer
633
self.vfs_transport_factory = memory.MemoryServer
627
634
self.transport_readonly_server = None
628
635
# calling get_readonly_transport() constructs a decorator on the url
630
637
url = self.get_readonly_url()
631
638
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
639
t = transport.get_transport(url)
640
t2 = transport.get_transport(url2)
634
641
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
642
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
643
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
645
def test_get_readonly_url_http(self):
639
646
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.local import LocalURLServer
642
647
from bzrlib.transport.http import HttpTransportBase
643
self.transport_server = LocalURLServer
648
self.transport_server = test_server.LocalURLServer
644
649
self.transport_readonly_server = HttpServer
645
650
# calling get_readonly_transport() gives us a HTTP server instance.
646
651
url = self.get_readonly_url()
647
652
url2 = self.get_readonly_url('foo/bar')
648
653
# the transport returned may be any HttpTransportBase subclass
649
t = get_transport(url)
650
t2 = get_transport(url2)
654
t = transport.get_transport(url)
655
t2 = transport.get_transport(url2)
651
656
self.failUnless(isinstance(t, HttpTransportBase))
652
657
self.failUnless(isinstance(t2, HttpTransportBase))
653
658
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
836
840
self.assertContainsRe(output,
837
841
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
843
def test_uses_time_from_testtools(self):
844
"""Test case timings in verbose results should use testtools times"""
846
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
847
def startTest(self, test):
848
self.time(datetime.datetime.utcfromtimestamp(1.145))
849
super(TimeAddedVerboseTestResult, self).startTest(test)
850
def addSuccess(self, test):
851
self.time(datetime.datetime.utcfromtimestamp(51.147))
852
super(TimeAddedVerboseTestResult, self).addSuccess(test)
853
def report_tests_starting(self): pass
855
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
856
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
839
858
def test_known_failure(self):
840
859
"""A KnownFailure being raised should trigger several result actions."""
841
860
class InstrumentedTestResult(tests.ExtendedTestResult):
842
861
def stopTestRun(self): pass
843
def startTests(self): pass
844
def report_test_start(self, test): pass
862
def report_tests_starting(self): pass
845
863
def report_known_failure(self, test, err=None, details=None):
846
864
self._call = test, 'known failure'
847
865
result = InstrumentedTestResult(None, None, None, None)
1214
1242
self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1243
self.assertLength(1, self._get_source_tree_calls)
1245
def test_verbose_test_count(self):
1246
"""A verbose test run reports the right test count at the start"""
1247
suite = TestUtil.TestSuite([
1248
unittest.FunctionTestCase(lambda:None),
1249
unittest.FunctionTestCase(lambda:None)])
1250
self.assertEqual(suite.countTestCases(), 2)
1252
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1253
# Need to use the CountingDecorator as that's what sets num_tests
1254
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1255
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1217
1257
def test_startTestRun(self):
1218
1258
"""run should call result.startTestRun()"""
1220
class LoggingDecorator(tests.ForwardingResult):
1260
class LoggingDecorator(ExtendedToOriginalDecorator):
1221
1261
def startTestRun(self):
1222
tests.ForwardingResult.startTestRun(self)
1262
ExtendedToOriginalDecorator.startTestRun(self)
1223
1263
calls.append('startTestRun')
1224
1264
test = unittest.FunctionTestCase(lambda:None)
1225
1265
stream = StringIO()
1656
1713
self.assertEqual('original', obj.test_attr)
1716
class _MissingFeature(tests.Feature):
1719
missing_feature = _MissingFeature()
1722
def _get_test(name):
1723
"""Get an instance of a specific example test.
1725
We protect this in a function so that they don't auto-run in the test
1729
class ExampleTests(tests.TestCase):
1731
def test_fail(self):
1732
mutter('this was a failing test')
1733
self.fail('this test will fail')
1735
def test_error(self):
1736
mutter('this test errored')
1737
raise RuntimeError('gotcha')
1739
def test_missing_feature(self):
1740
mutter('missing the feature')
1741
self.requireFeature(missing_feature)
1743
def test_skip(self):
1744
mutter('this test will be skipped')
1745
raise tests.TestSkipped('reason')
1747
def test_success(self):
1748
mutter('this test succeeds')
1750
def test_xfail(self):
1751
mutter('test with expected failure')
1752
self.knownFailure('this_fails')
1754
def test_unexpected_success(self):
1755
mutter('test with unexpected success')
1756
self.expectFailure('should_fail', lambda: None)
1758
return ExampleTests(name)
1761
class TestTestCaseLogDetails(tests.TestCase):
1763
def _run_test(self, test_name):
1764
test = _get_test(test_name)
1765
result = testtools.TestResult()
1769
def test_fail_has_log(self):
1770
result = self._run_test('test_fail')
1771
self.assertEqual(1, len(result.failures))
1772
result_content = result.failures[0][1]
1773
self.assertContainsRe(result_content, 'Text attachment: log')
1774
self.assertContainsRe(result_content, 'this was a failing test')
1776
def test_error_has_log(self):
1777
result = self._run_test('test_error')
1778
self.assertEqual(1, len(result.errors))
1779
result_content = result.errors[0][1]
1780
self.assertContainsRe(result_content, 'Text attachment: log')
1781
self.assertContainsRe(result_content, 'this test errored')
1783
def test_skip_has_no_log(self):
1784
result = self._run_test('test_skip')
1785
self.assertEqual(['reason'], result.skip_reasons.keys())
1786
skips = result.skip_reasons['reason']
1787
self.assertEqual(1, len(skips))
1789
self.assertFalse('log' in test.getDetails())
1791
def test_missing_feature_has_no_log(self):
1792
# testtools doesn't know about addNotSupported, so it just gets
1793
# considered as a skip
1794
result = self._run_test('test_missing_feature')
1795
self.assertEqual([missing_feature], result.skip_reasons.keys())
1796
skips = result.skip_reasons[missing_feature]
1797
self.assertEqual(1, len(skips))
1799
self.assertFalse('log' in test.getDetails())
1801
def test_xfail_has_no_log(self):
1802
result = self._run_test('test_xfail')
1803
self.assertEqual(1, len(result.expectedFailures))
1804
result_content = result.expectedFailures[0][1]
1805
self.assertNotContainsRe(result_content, 'Text attachment: log')
1806
self.assertNotContainsRe(result_content, 'test with expected failure')
1808
def test_unexpected_success_has_log(self):
1809
result = self._run_test('test_unexpected_success')
1810
self.assertEqual(1, len(result.unexpectedSuccesses))
1811
# Inconsistency, unexpectedSuccesses is a list of tests,
1812
# expectedFailures is a list of reasons?
1813
test = result.unexpectedSuccesses[0]
1814
details = test.getDetails()
1815
self.assertTrue('log' in details)
1818
class TestTestCloning(tests.TestCase):
1819
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1821
def test_cloned_testcase_does_not_share_details(self):
1822
"""A TestCase cloned with clone_test does not share mutable attributes
1823
such as details or cleanups.
1825
class Test(tests.TestCase):
1827
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1828
orig_test = Test('test_foo')
1829
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1830
orig_test.run(unittest.TestResult())
1831
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1832
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1834
def test_double_apply_scenario_preserves_first_scenario(self):
1835
"""Applying two levels of scenarios to a test preserves the attributes
1836
added by both scenarios.
1838
class Test(tests.TestCase):
1841
test = Test('test_foo')
1842
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1843
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1844
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1845
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1846
all_tests = list(tests.iter_suite_tests(suite))
1847
self.assertLength(4, all_tests)
1848
all_xys = sorted((t.x, t.y) for t in all_tests)
1849
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1659
1852
# NB: Don't delete this; it's not actually from 0.11!
1660
1853
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1854
def sample_deprecated_function():
1972
2165
load_list='missing file name', list_only=True)
2168
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2170
_test_needs_features = [features.subunit]
2172
def run_subunit_stream(self, test_name):
2173
from subunit import ProtocolTestCase
2175
return TestUtil.TestSuite([_get_test(test_name)])
2176
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2177
test_suite_factory=factory)
2178
test = ProtocolTestCase(stream)
2179
result = testtools.TestResult()
2181
content = stream.getvalue()
2182
return content, result
2184
def test_fail_has_log(self):
2185
content, result = self.run_subunit_stream('test_fail')
2186
self.assertEqual(1, len(result.failures))
2187
self.assertContainsRe(content, '(?m)^log$')
2188
self.assertContainsRe(content, 'this test will fail')
2190
def test_error_has_log(self):
2191
content, result = self.run_subunit_stream('test_error')
2192
self.assertContainsRe(content, '(?m)^log$')
2193
self.assertContainsRe(content, 'this test errored')
2195
def test_skip_has_no_log(self):
2196
content, result = self.run_subunit_stream('test_skip')
2197
self.assertNotContainsRe(content, '(?m)^log$')
2198
self.assertNotContainsRe(content, 'this test will be skipped')
2199
self.assertEqual(['reason'], result.skip_reasons.keys())
2200
skips = result.skip_reasons['reason']
2201
self.assertEqual(1, len(skips))
2203
# RemotedTestCase doesn't preserve the "details"
2204
## self.assertFalse('log' in test.getDetails())
2206
def test_missing_feature_has_no_log(self):
2207
content, result = self.run_subunit_stream('test_missing_feature')
2208
self.assertNotContainsRe(content, '(?m)^log$')
2209
self.assertNotContainsRe(content, 'missing the feature')
2210
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2211
skips = result.skip_reasons['_MissingFeature\n']
2212
self.assertEqual(1, len(skips))
2214
# RemotedTestCase doesn't preserve the "details"
2215
## self.assertFalse('log' in test.getDetails())
2217
def test_xfail_has_no_log(self):
2218
content, result = self.run_subunit_stream('test_xfail')
2219
self.assertNotContainsRe(content, '(?m)^log$')
2220
self.assertNotContainsRe(content, 'test with expected failure')
2221
self.assertEqual(1, len(result.expectedFailures))
2222
result_content = result.expectedFailures[0][1]
2223
self.assertNotContainsRe(result_content, 'Text attachment: log')
2224
self.assertNotContainsRe(result_content, 'test with expected failure')
2226
def test_unexpected_success_has_log(self):
2227
content, result = self.run_subunit_stream('test_unexpected_success')
2228
self.assertContainsRe(content, '(?m)^log$')
2229
self.assertContainsRe(content, 'test with unexpected success')
2230
self.expectFailure('subunit treats "unexpectedSuccess"'
2231
' as a plain success',
2232
self.assertEqual, 1, len(result.unexpectedSuccesses))
2233
self.assertEqual(1, len(result.unexpectedSuccesses))
2234
test = result.unexpectedSuccesses[0]
2235
# RemotedTestCase doesn't preserve the "details"
2236
## self.assertTrue('log' in test.getDetails())
2238
def test_success_has_no_log(self):
2239
content, result = self.run_subunit_stream('test_success')
2240
self.assertEqual(1, result.testsRun)
2241
self.assertNotContainsRe(content, '(?m)^log$')
2242
self.assertNotContainsRe(content, 'this test succeeds')
1975
2245
class TestRunBzr(tests.TestCase):
2950
3236
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3239
class TestThreadLeakDetection(tests.TestCase):
3240
"""Ensure when tests leak threads we detect and report it"""
3242
class LeakRecordingResult(tests.ExtendedTestResult):
3244
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3246
def _report_thread_leak(self, test, leaks, alive):
3247
self.leaks.append((test, leaks))
3249
def test_testcase_without_addCleanups(self):
3250
"""Check old TestCase instances don't break with leak detection"""
3251
class Test(unittest.TestCase):
3254
addCleanup = None # for when on Python 2.7 with native addCleanup
3255
result = self.LeakRecordingResult()
3257
self.assertIs(getattr(test, "addCleanup", None), None)
3258
result.startTestRun()
3260
result.stopTestRun()
3261
self.assertEqual(result._tests_leaking_threads_count, 0)
3262
self.assertEqual(result.leaks, [])
3264
def test_thread_leak(self):
3265
"""Ensure a thread that outlives the running of a test is reported
3267
Uses a thread that blocks on an event, and is started by the inner
3268
test case. As the thread outlives the inner case's run, it should be
3269
detected as a leak, but the event is then set so that the thread can
3270
be safely joined in cleanup so it's not leaked for real.
3272
event = threading.Event()
3273
thread = threading.Thread(name="Leaker", target=event.wait)
3274
class Test(tests.TestCase):
3275
def test_leak(self):
3277
result = self.LeakRecordingResult()
3278
test = Test("test_leak")
3279
self.addCleanup(thread.join)
3280
self.addCleanup(event.set)
3281
result.startTestRun()
3283
result.stopTestRun()
3284
self.assertEqual(result._tests_leaking_threads_count, 1)
3285
self.assertEqual(result._first_thread_leaker_id, test.id())
3286
self.assertEqual(result.leaks, [(test, set([thread]))])
3287
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3289
def test_multiple_leaks(self):
3290
"""Check multiple leaks are blamed on the test cases at fault
3292
Same concept as the previous test, but has one inner test method that
3293
leaks two threads, and one that doesn't leak at all.
3295
event = threading.Event()
3296
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3297
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3298
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3299
class Test(tests.TestCase):
3300
def test_first_leak(self):
3302
def test_second_no_leak(self):
3304
def test_third_leak(self):
3307
result = self.LeakRecordingResult()
3308
first_test = Test("test_first_leak")
3309
third_test = Test("test_third_leak")
3310
self.addCleanup(thread_a.join)
3311
self.addCleanup(thread_b.join)
3312
self.addCleanup(thread_c.join)
3313
self.addCleanup(event.set)
3314
result.startTestRun()
3316
[first_test, Test("test_second_no_leak"), third_test]
3318
result.stopTestRun()
3319
self.assertEqual(result._tests_leaking_threads_count, 2)
3320
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3321
self.assertEqual(result.leaks, [
3322
(first_test, set([thread_b])),
3323
(third_test, set([thread_a, thread_c]))])
3324
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3327
class TestPostMortemDebugging(tests.TestCase):
3328
"""Check post mortem debugging works when tests fail or error"""
3330
class TracebackRecordingResult(tests.ExtendedTestResult):
3332
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3333
self.postcode = None
3334
def _post_mortem(self, tb=None):
3335
"""Record the code object at the end of the current traceback"""
3336
tb = tb or sys.exc_info()[2]
3339
while next is not None:
3342
self.postcode = tb.tb_frame.f_code
3343
def report_error(self, test, err):
3345
def report_failure(self, test, err):
3348
def test_location_unittest_error(self):
3349
"""Needs right post mortem traceback with erroring unittest case"""
3350
class Test(unittest.TestCase):
3353
result = self.TracebackRecordingResult()
3355
self.assertEqual(result.postcode, Test.runTest.func_code)
3357
def test_location_unittest_failure(self):
3358
"""Needs right post mortem traceback with failing unittest case"""
3359
class Test(unittest.TestCase):
3361
raise self.failureException
3362
result = self.TracebackRecordingResult()
3364
self.assertEqual(result.postcode, Test.runTest.func_code)
3366
def test_location_bt_error(self):
3367
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3368
class Test(tests.TestCase):
3369
def test_error(self):
3371
result = self.TracebackRecordingResult()
3372
Test("test_error").run(result)
3373
self.assertEqual(result.postcode, Test.test_error.func_code)
3375
def test_location_bt_failure(self):
3376
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3377
class Test(tests.TestCase):
3378
def test_failure(self):
3379
raise self.failureException
3380
result = self.TracebackRecordingResult()
3381
Test("test_failure").run(result)
3382
self.assertEqual(result.postcode, Test.test_failure.func_code)
3384
def test_env_var_triggers_post_mortem(self):
3385
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3387
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3388
post_mortem_calls = []
3389
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3390
self.addCleanup(osutils.set_or_unset_env, "BZR_TEST_PDB",
3391
osutils.set_or_unset_env("BZR_TEST_PDB", None))
3392
result._post_mortem(1)
3393
os.environ["BZR_TEST_PDB"] = "on"
3394
result._post_mortem(2)
3395
self.assertEqual([2], post_mortem_calls)
2953
3398
class TestRunSuite(tests.TestCase):
2955
3400
def test_runner_class(self):