325
326
from bzrlib.tests.per_interrepository import make_scenarios
328
formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
329
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
329
330
scenarios = make_scenarios(server1, server2, formats)
330
331
self.assertEqual([
332
333
{'repository_format': 'C1',
333
334
'repository_format_to': 'C2',
334
335
'transport_readonly_server': 'b',
335
'transport_server': 'a'}),
336
'transport_server': 'a',
337
'extra_setup': 'C3'}),
337
339
{'repository_format': 'D1',
338
340
'repository_format_to': 'D2',
339
341
'transport_readonly_server': 'b',
340
'transport_server': 'a'})],
342
'transport_server': 'a',
343
'extra_setup': 'D3'})],
850
853
"""A KnownFailure being raised should trigger several result actions."""
851
854
class InstrumentedTestResult(tests.ExtendedTestResult):
852
855
def stopTestRun(self): pass
853
def startTests(self): pass
854
def report_test_start(self, test): pass
856
def report_tests_starting(self): pass
855
857
def report_known_failure(self, test, err=None, details=None):
856
858
self._call = test, 'known failure'
857
859
result = InstrumentedTestResult(None, None, None, None)
907
909
"""Test the behaviour of invoking addNotSupported."""
908
910
class InstrumentedTestResult(tests.ExtendedTestResult):
909
911
def stopTestRun(self): pass
910
def startTests(self): pass
911
def report_test_start(self, test): pass
912
def report_tests_starting(self): pass
912
913
def report_unsupported(self, test, feature):
913
914
self._call = test, feature
914
915
result = InstrumentedTestResult(None, None, None, None)
953
954
"""An UnavailableFeature being raised should invoke addNotSupported."""
954
955
class InstrumentedTestResult(tests.ExtendedTestResult):
955
956
def stopTestRun(self): pass
956
def startTests(self): pass
957
def report_test_start(self, test): pass
957
def report_tests_starting(self): pass
958
958
def addNotSupported(self, test, feature):
959
959
self._call = test, feature
960
960
result = InstrumentedTestResult(None, None, None, None)
1010
1009
test.run(result)
1011
1010
self.assertEquals(1, result.calls)
1012
def test_startTests_only_once(self):
1013
"""With multiple tests startTests should still only be called once"""
1014
class InstrumentedTestResult(tests.ExtendedTestResult):
1016
def startTests(self): self.calls += 1
1017
result = InstrumentedTestResult(None, None, None, None)
1018
suite = unittest.TestSuite([
1019
unittest.FunctionTestCase(lambda: None),
1020
unittest.FunctionTestCase(lambda: None)])
1022
self.assertEquals(1, result.calls)
1023
self.assertEquals(2, result.count)
1014
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1227
1236
self.assertContainsRe(output_string, "--date [0-9.]+")
1228
1237
self.assertLength(1, self._get_source_tree_calls)
1239
def test_verbose_test_count(self):
1240
"""A verbose test run reports the right test count at the start"""
1241
suite = TestUtil.TestSuite([
1242
unittest.FunctionTestCase(lambda:None),
1243
unittest.FunctionTestCase(lambda:None)])
1244
self.assertEqual(suite.countTestCases(), 2)
1246
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
# Need to use the CountingDecorator as that's what sets num_tests
1248
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1230
1251
def test_startTestRun(self):
1231
1252
"""run should call result.startTestRun()"""
1669
1690
self.assertEqual('original', obj.test_attr)
1693
class TestTestCloning(tests.TestCase):
1694
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1696
def test_cloned_testcase_does_not_share_details(self):
1697
"""A TestCase cloned with clone_test does not share mutable attributes
1698
such as details or cleanups.
1700
class Test(tests.TestCase):
1702
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1703
orig_test = Test('test_foo')
1704
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1705
orig_test.run(unittest.TestResult())
1706
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1707
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1709
def test_double_apply_scenario_preserves_first_scenario(self):
1710
"""Applying two levels of scenarios to a test preserves the attributes
1711
added by both scenarios.
1713
class Test(tests.TestCase):
1716
test = Test('test_foo')
1717
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1718
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1719
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1720
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1721
all_tests = list(tests.iter_suite_tests(suite))
1722
self.assertLength(4, all_tests)
1723
all_xys = sorted((t.x, t.y) for t in all_tests)
1724
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1672
1727
# NB: Don't delete this; it's not actually from 0.11!
1673
1728
@deprecated_function(deprecated_in((0, 11, 0)))
1674
1729
def sample_deprecated_function():
2979
3034
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3037
class TestThreadLeakDetection(tests.TestCase):
3038
"""Ensure when tests leak threads we detect and report it"""
3040
class LeakRecordingResult(tests.ExtendedTestResult):
3042
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3044
def _report_thread_leak(self, test, leaks, alive):
3045
self.leaks.append((test, leaks))
3047
def test_testcase_without_addCleanups(self):
3048
"""Check old TestCase instances don't break with leak detection"""
3049
class Test(unittest.TestCase):
3052
addCleanup = None # for when on Python 2.7 with native addCleanup
3053
result = self.LeakRecordingResult()
3055
self.assertIs(getattr(test, "addCleanup", None), None)
3056
result.startTestRun()
3058
result.stopTestRun()
3059
self.assertEqual(result._tests_leaking_threads_count, 0)
3060
self.assertEqual(result.leaks, [])
3062
def test_thread_leak(self):
3063
"""Ensure a thread that outlives the running of a test is reported
3065
Uses a thread that blocks on an event, and is started by the inner
3066
test case. As the thread outlives the inner case's run, it should be
3067
detected as a leak, but the event is then set so that the thread can
3068
be safely joined in cleanup so it's not leaked for real.
3070
event = threading.Event()
3071
thread = threading.Thread(name="Leaker", target=event.wait)
3072
class Test(tests.TestCase):
3073
def test_leak(self):
3075
result = self.LeakRecordingResult()
3076
test = Test("test_leak")
3077
self.addCleanup(thread.join)
3078
self.addCleanup(event.set)
3079
result.startTestRun()
3081
result.stopTestRun()
3082
self.assertEqual(result._tests_leaking_threads_count, 1)
3083
self.assertEqual(result._first_thread_leaker_id, test.id())
3084
self.assertEqual(result.leaks, [(test, set([thread]))])
3085
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3087
def test_multiple_leaks(self):
3088
"""Check multiple leaks are blamed on the test cases at fault
3090
Same concept as the previous test, but has one inner test method that
3091
leaks two threads, and one that doesn't leak at all.
3093
event = threading.Event()
3094
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3095
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3096
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3097
class Test(tests.TestCase):
3098
def test_first_leak(self):
3100
def test_second_no_leak(self):
3102
def test_third_leak(self):
3105
result = self.LeakRecordingResult()
3106
first_test = Test("test_first_leak")
3107
third_test = Test("test_third_leak")
3108
self.addCleanup(thread_a.join)
3109
self.addCleanup(thread_b.join)
3110
self.addCleanup(thread_c.join)
3111
self.addCleanup(event.set)
3112
result.startTestRun()
3114
[first_test, Test("test_second_no_leak"), third_test]
3116
result.stopTestRun()
3117
self.assertEqual(result._tests_leaking_threads_count, 2)
3118
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3119
self.assertEqual(result.leaks, [
3120
(first_test, set([thread_b])),
3121
(third_test, set([thread_a, thread_c]))])
3122
self.assertContainsString(result.stream.getvalue(), "leaking threads")
2982
3125
class TestRunSuite(tests.TestCase):
2984
3127
def test_runner_class(self):