313
326
from bzrlib.tests.per_interrepository import make_scenarios
316
formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
329
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
317
330
scenarios = make_scenarios(server1, server2, formats)
318
331
self.assertEqual([
320
333
{'repository_format': 'C1',
321
334
'repository_format_to': 'C2',
322
335
'transport_readonly_server': 'b',
323
'transport_server': 'a'}),
336
'transport_server': 'a',
337
'extra_setup': 'C3'}),
325
339
{'repository_format': 'D1',
326
340
'repository_format_to': 'D2',
327
341
'transport_readonly_server': 'b',
328
'transport_server': 'a'})],
342
'transport_server': 'a',
343
'extra_setup': 'D3'})],
1656
1690
self.assertEqual('original', obj.test_attr)
1693
class _MissingFeature(tests.Feature):
1696
missing_feature = _MissingFeature()
1699
def _get_test(name):
1700
"""Get an instance of a specific example test.
1702
We protect this in a function so that they don't auto-run in the test
1706
class ExampleTests(tests.TestCase):
1708
def test_fail(self):
1709
mutter('this was a failing test')
1710
self.fail('this test will fail')
1712
def test_error(self):
1713
mutter('this test errored')
1714
raise RuntimeError('gotcha')
1716
def test_missing_feature(self):
1717
mutter('missing the feature')
1718
self.requireFeature(missing_feature)
1720
def test_skip(self):
1721
mutter('this test will be skipped')
1722
raise tests.TestSkipped('reason')
1724
def test_success(self):
1725
mutter('this test succeeds')
1727
def test_xfail(self):
1728
mutter('test with expected failure')
1729
self.knownFailure('this_fails')
1731
def test_unexpected_success(self):
1732
mutter('test with unexpected success')
1733
self.expectFailure('should_fail', lambda: None)
1735
return ExampleTests(name)
1738
class TestTestCaseLogDetails(tests.TestCase):
1740
def _run_test(self, test_name):
1741
test = _get_test(test_name)
1742
result = testtools.TestResult()
1746
def test_fail_has_log(self):
1747
result = self._run_test('test_fail')
1748
self.assertEqual(1, len(result.failures))
1749
result_content = result.failures[0][1]
1750
self.assertContainsRe(result_content, 'Text attachment: log')
1751
self.assertContainsRe(result_content, 'this was a failing test')
1753
def test_error_has_log(self):
1754
result = self._run_test('test_error')
1755
self.assertEqual(1, len(result.errors))
1756
result_content = result.errors[0][1]
1757
self.assertContainsRe(result_content, 'Text attachment: log')
1758
self.assertContainsRe(result_content, 'this test errored')
1760
def test_skip_has_no_log(self):
1761
result = self._run_test('test_skip')
1762
self.assertEqual(['reason'], result.skip_reasons.keys())
1763
skips = result.skip_reasons['reason']
1764
self.assertEqual(1, len(skips))
1766
self.assertFalse('log' in test.getDetails())
1768
def test_missing_feature_has_no_log(self):
1769
# testtools doesn't know about addNotSupported, so it just gets
1770
# considered as a skip
1771
result = self._run_test('test_missing_feature')
1772
self.assertEqual([missing_feature], result.skip_reasons.keys())
1773
skips = result.skip_reasons[missing_feature]
1774
self.assertEqual(1, len(skips))
1776
self.assertFalse('log' in test.getDetails())
1778
def test_xfail_has_no_log(self):
1779
result = self._run_test('test_xfail')
1780
self.assertEqual(1, len(result.expectedFailures))
1781
result_content = result.expectedFailures[0][1]
1782
self.assertNotContainsRe(result_content, 'Text attachment: log')
1783
self.assertNotContainsRe(result_content, 'test with expected failure')
1785
def test_unexpected_success_has_log(self):
1786
result = self._run_test('test_unexpected_success')
1787
self.assertEqual(1, len(result.unexpectedSuccesses))
1788
# Inconsistency, unexpectedSuccesses is a list of tests,
1789
# expectedFailures is a list of reasons?
1790
test = result.unexpectedSuccesses[0]
1791
details = test.getDetails()
1792
self.assertTrue('log' in details)
1795
class TestTestCloning(tests.TestCase):
1796
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1798
def test_cloned_testcase_does_not_share_details(self):
1799
"""A TestCase cloned with clone_test does not share mutable attributes
1800
such as details or cleanups.
1802
class Test(tests.TestCase):
1804
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
orig_test = Test('test_foo')
1806
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
orig_test.run(unittest.TestResult())
1808
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1811
def test_double_apply_scenario_preserves_first_scenario(self):
1812
"""Applying two levels of scenarios to a test preserves the attributes
1813
added by both scenarios.
1815
class Test(tests.TestCase):
1818
test = Test('test_foo')
1819
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
all_tests = list(tests.iter_suite_tests(suite))
1824
self.assertLength(4, all_tests)
1825
all_xys = sorted((t.x, t.y) for t in all_tests)
1826
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1659
1829
# NB: Don't delete this; it's not actually from 0.11!
1660
1830
@deprecated_function(deprecated_in((0, 11, 0)))
1661
1831
def sample_deprecated_function():
1971
2142
load_list='missing file name', list_only=True)
2145
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2147
_test_needs_features = [features.subunit]
2149
def run_subunit_stream(self, test_name):
2150
from subunit import ProtocolTestCase
2152
return TestUtil.TestSuite([_get_test(test_name)])
2153
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2154
test_suite_factory=factory)
2155
test = ProtocolTestCase(stream)
2156
result = testtools.TestResult()
2158
content = stream.getvalue()
2159
return content, result
2161
def test_fail_has_log(self):
2162
content, result = self.run_subunit_stream('test_fail')
2163
self.assertEqual(1, len(result.failures))
2164
self.assertContainsRe(content, '(?m)^log$')
2165
self.assertContainsRe(content, 'this test will fail')
2167
def test_error_has_log(self):
2168
content, result = self.run_subunit_stream('test_error')
2169
self.assertContainsRe(content, '(?m)^log$')
2170
self.assertContainsRe(content, 'this test errored')
2172
def test_skip_has_no_log(self):
2173
content, result = self.run_subunit_stream('test_skip')
2174
self.assertNotContainsRe(content, '(?m)^log$')
2175
self.assertNotContainsRe(content, 'this test will be skipped')
2176
self.assertEqual(['reason'], result.skip_reasons.keys())
2177
skips = result.skip_reasons['reason']
2178
self.assertEqual(1, len(skips))
2180
# RemotedTestCase doesn't preserve the "details"
2181
## self.assertFalse('log' in test.getDetails())
2183
def test_missing_feature_has_no_log(self):
2184
content, result = self.run_subunit_stream('test_missing_feature')
2185
self.assertNotContainsRe(content, '(?m)^log$')
2186
self.assertNotContainsRe(content, 'missing the feature')
2187
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2188
skips = result.skip_reasons['_MissingFeature\n']
2189
self.assertEqual(1, len(skips))
2191
# RemotedTestCase doesn't preserve the "details"
2192
## self.assertFalse('log' in test.getDetails())
2194
def test_xfail_has_no_log(self):
2195
content, result = self.run_subunit_stream('test_xfail')
2196
self.assertNotContainsRe(content, '(?m)^log$')
2197
self.assertNotContainsRe(content, 'test with expected failure')
2198
self.assertEqual(1, len(result.expectedFailures))
2199
result_content = result.expectedFailures[0][1]
2200
self.assertNotContainsRe(result_content, 'Text attachment: log')
2201
self.assertNotContainsRe(result_content, 'test with expected failure')
2203
def test_unexpected_success_has_log(self):
2204
content, result = self.run_subunit_stream('test_unexpected_success')
2205
self.assertContainsRe(content, '(?m)^log$')
2206
self.assertContainsRe(content, 'test with unexpected success')
2207
self.expectFailure('subunit treats "unexpectedSuccess"'
2208
' as a plain success',
2209
self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
self.assertEqual(1, len(result.unexpectedSuccesses))
2211
test = result.unexpectedSuccesses[0]
2212
# RemotedTestCase doesn't preserve the "details"
2213
## self.assertTrue('log' in test.getDetails())
2215
def test_success_has_no_log(self):
2216
content, result = self.run_subunit_stream('test_success')
2217
self.assertEqual(1, result.testsRun)
2218
self.assertNotContainsRe(content, '(?m)^log$')
2219
self.assertNotContainsRe(content, 'this test succeeds')
1974
2222
class TestRunBzr(tests.TestCase):
2951
3213
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3216
class TestThreadLeakDetection(tests.TestCase):
3217
"""Ensure when tests leak threads we detect and report it"""
3219
class LeakRecordingResult(tests.ExtendedTestResult):
3221
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3223
def _report_thread_leak(self, test, leaks, alive):
3224
self.leaks.append((test, leaks))
3226
def test_testcase_without_addCleanups(self):
3227
"""Check old TestCase instances don't break with leak detection"""
3228
class Test(unittest.TestCase):
3231
addCleanup = None # for when on Python 2.7 with native addCleanup
3232
result = self.LeakRecordingResult()
3234
self.assertIs(getattr(test, "addCleanup", None), None)
3235
result.startTestRun()
3237
result.stopTestRun()
3238
self.assertEqual(result._tests_leaking_threads_count, 0)
3239
self.assertEqual(result.leaks, [])
3241
def test_thread_leak(self):
3242
"""Ensure a thread that outlives the running of a test is reported
3244
Uses a thread that blocks on an event, and is started by the inner
3245
test case. As the thread outlives the inner case's run, it should be
3246
detected as a leak, but the event is then set so that the thread can
3247
be safely joined in cleanup so it's not leaked for real.
3249
event = threading.Event()
3250
thread = threading.Thread(name="Leaker", target=event.wait)
3251
class Test(tests.TestCase):
3252
def test_leak(self):
3254
result = self.LeakRecordingResult()
3255
test = Test("test_leak")
3256
self.addCleanup(thread.join)
3257
self.addCleanup(event.set)
3258
result.startTestRun()
3260
result.stopTestRun()
3261
self.assertEqual(result._tests_leaking_threads_count, 1)
3262
self.assertEqual(result._first_thread_leaker_id, test.id())
3263
self.assertEqual(result.leaks, [(test, set([thread]))])
3264
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3266
def test_multiple_leaks(self):
3267
"""Check multiple leaks are blamed on the test cases at fault
3269
Same concept as the previous test, but has one inner test method that
3270
leaks two threads, and one that doesn't leak at all.
3272
event = threading.Event()
3273
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3274
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3275
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3276
class Test(tests.TestCase):
3277
def test_first_leak(self):
3279
def test_second_no_leak(self):
3281
def test_third_leak(self):
3284
result = self.LeakRecordingResult()
3285
first_test = Test("test_first_leak")
3286
third_test = Test("test_third_leak")
3287
self.addCleanup(thread_a.join)
3288
self.addCleanup(thread_b.join)
3289
self.addCleanup(thread_c.join)
3290
self.addCleanup(event.set)
3291
result.startTestRun()
3293
[first_test, Test("test_second_no_leak"), third_test]
3295
result.stopTestRun()
3296
self.assertEqual(result._tests_leaking_threads_count, 2)
3297
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3298
self.assertEqual(result.leaks, [
3299
(first_test, set([thread_b])),
3300
(third_test, set([thread_a, thread_c]))])
3301
self.assertContainsString(result.stream.getvalue(), "leaking threads")
2954
3304
class TestRunSuite(tests.TestCase):
2956
3306
def test_runner_class(self):