610
626
test = TestDanglingLock('test_function')
611
627
result = test.run()
628
total_failures = result.errors + result.failures
612
629
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
630
self.assertEqual(1, len(total_failures))
615
632
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
634
self.assertEqual(0, len(total_failures))
620
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
638
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
640
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.memory import MemoryServer
626
641
from bzrlib.transport.readonly import ReadonlyTransportDecorator
627
self.vfs_transport_factory = MemoryServer
642
self.vfs_transport_factory = memory.MemoryServer
628
643
self.transport_readonly_server = None
629
644
# calling get_readonly_transport() constructs a decorator on the url
631
646
url = self.get_readonly_url()
632
647
url2 = self.get_readonly_url('foo/bar')
633
t = get_transport(url)
634
t2 = get_transport(url2)
635
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
636
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
648
t = transport.get_transport(url)
649
t2 = transport.get_transport(url2)
650
self.assertIsInstance(t, ReadonlyTransportDecorator)
651
self.assertIsInstance(t2, ReadonlyTransportDecorator)
637
652
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
639
654
def test_get_readonly_url_http(self):
640
655
from bzrlib.tests.http_server import HttpServer
641
from bzrlib.transport import get_transport
642
from bzrlib.transport.local import LocalURLServer
643
656
from bzrlib.transport.http import HttpTransportBase
644
self.transport_server = LocalURLServer
657
self.transport_server = test_server.LocalURLServer
645
658
self.transport_readonly_server = HttpServer
646
659
# calling get_readonly_transport() gives us a HTTP server instance.
647
660
url = self.get_readonly_url()
648
661
url2 = self.get_readonly_url('foo/bar')
649
662
# the transport returned may be any HttpTransportBase subclass
650
t = get_transport(url)
651
t2 = get_transport(url2)
652
self.failUnless(isinstance(t, HttpTransportBase))
653
self.failUnless(isinstance(t2, HttpTransportBase))
663
t = transport.get_transport(url)
664
t2 = transport.get_transport(url2)
665
self.assertIsInstance(t, HttpTransportBase)
666
self.assertIsInstance(t2, HttpTransportBase)
654
667
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
656
669
def test_is_directory(self):
752
764
self.check_timing(ShortDelayTestCase('test_short_delay'),
755
def _patch_get_bzr_source_tree(self):
756
# Reading from the actual source tree breaks isolation, but we don't
757
# want to assume that thats *all* that would happen.
758
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
760
def test_assigned_benchmark_file_stores_date(self):
761
self._patch_get_bzr_source_tree()
763
result = bzrlib.tests.TextTestResult(self._log_file,
768
output_string = output.getvalue()
769
# if you are wondering about the regexp please read the comment in
770
# test_bench_history (bzrlib.tests.test_selftest.TestRunner)
771
# XXX: what comment? -- Andrew Bennetts
772
self.assertContainsRe(output_string, "--date [0-9.]+")
774
def test_benchhistory_records_test_times(self):
775
self._patch_get_bzr_source_tree()
776
result_stream = StringIO()
777
result = bzrlib.tests.TextTestResult(
781
bench_history=result_stream
784
# we want profile a call and check that its test duration is recorded
785
# make a new test instance that when run will generate a benchmark
786
example_test_case = TestTestResult("_time_hello_world_encoding")
787
# execute the test, which should succeed and record times
788
example_test_case.run(result)
789
lines = result_stream.getvalue().splitlines()
790
self.assertEqual(2, len(lines))
791
self.assertContainsRe(lines[1],
792
" *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
793
"._time_hello_world_encoding")
795
767
def _time_hello_world_encoding(self):
796
768
"""Profile two sleep calls
837
809
self.assertContainsRe(output,
838
810
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
812
def test_uses_time_from_testtools(self):
813
"""Test case timings in verbose results should use testtools times"""
815
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
816
def startTest(self, test):
817
self.time(datetime.datetime.utcfromtimestamp(1.145))
818
super(TimeAddedVerboseTestResult, self).startTest(test)
819
def addSuccess(self, test):
820
self.time(datetime.datetime.utcfromtimestamp(51.147))
821
super(TimeAddedVerboseTestResult, self).addSuccess(test)
822
def report_tests_starting(self): pass
824
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
825
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
840
827
def test_known_failure(self):
841
"""A KnownFailure being raised should trigger several result actions."""
828
"""Using knownFailure should trigger several result actions."""
842
829
class InstrumentedTestResult(tests.ExtendedTestResult):
843
830
def stopTestRun(self): pass
844
def startTests(self): pass
845
def report_test_start(self, test): pass
831
def report_tests_starting(self): pass
846
832
def report_known_failure(self, test, err=None, details=None):
847
833
self._call = test, 'known failure'
848
834
result = InstrumentedTestResult(None, None, None, None)
849
835
class Test(tests.TestCase):
850
836
def test_function(self):
851
raise tests.KnownFailure('failed!')
837
self.knownFailure('failed!')
852
838
test = Test("test_function")
854
840
# it should invoke 'report_known_failure'.
1195
def _patch_get_bzr_source_tree(self):
1196
# Reading from the actual source tree breaks isolation, but we don't
1197
# want to assume that thats *all* that would happen.
1198
self._get_source_tree_calls = []
1200
self._get_source_tree_calls.append("called")
1202
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', new_get)
1204
def test_bench_history(self):
1205
# tests that the running the benchmark passes bench_history into
1206
# the test result object. We can tell that happens if
1207
# _get_bzr_source_tree is called.
1208
self._patch_get_bzr_source_tree()
1209
test = TestRunner('dummy_test')
1211
runner = tests.TextTestRunner(stream=self._log_file,
1212
bench_history=output)
1213
result = self.run_test_runner(runner, test)
1214
output_string = output.getvalue()
1215
self.assertContainsRe(output_string, "--date [0-9.]+")
1216
self.assertLength(1, self._get_source_tree_calls)
1203
def test_verbose_test_count(self):
1204
"""A verbose test run reports the right test count at the start"""
1205
suite = TestUtil.TestSuite([
1206
unittest.FunctionTestCase(lambda:None),
1207
unittest.FunctionTestCase(lambda:None)])
1208
self.assertEqual(suite.countTestCases(), 2)
1210
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1211
# Need to use the CountingDecorator as that's what sets num_tests
1212
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1213
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1218
1215
def test_startTestRun(self):
1219
1216
"""run should call result.startTestRun()"""
1221
class LoggingDecorator(tests.ForwardingResult):
1218
class LoggingDecorator(ExtendedToOriginalDecorator):
1222
1219
def startTestRun(self):
1223
tests.ForwardingResult.startTestRun(self)
1220
ExtendedToOriginalDecorator.startTestRun(self)
1224
1221
calls.append('startTestRun')
1225
1222
test = unittest.FunctionTestCase(lambda:None)
1226
1223
stream = StringIO()
1656
1673
test.run(unittest.TestResult())
1657
1674
self.assertEqual('original', obj.test_attr)
1676
def test_recordCalls(self):
1677
from bzrlib.tests import test_selftest
1678
calls = self.recordCalls(
1679
test_selftest, '_add_numbers')
1680
self.assertEqual(test_selftest._add_numbers(2, 10),
1682
self.assertEquals(calls, [((2, 10), {})])
1685
def _add_numbers(a, b):
1689
class _MissingFeature(features.Feature):
1692
missing_feature = _MissingFeature()
1695
def _get_test(name):
1696
"""Get an instance of a specific example test.
1698
We protect this in a function so that they don't auto-run in the test
1702
class ExampleTests(tests.TestCase):
1704
def test_fail(self):
1705
mutter('this was a failing test')
1706
self.fail('this test will fail')
1708
def test_error(self):
1709
mutter('this test errored')
1710
raise RuntimeError('gotcha')
1712
def test_missing_feature(self):
1713
mutter('missing the feature')
1714
self.requireFeature(missing_feature)
1716
def test_skip(self):
1717
mutter('this test will be skipped')
1718
raise tests.TestSkipped('reason')
1720
def test_success(self):
1721
mutter('this test succeeds')
1723
def test_xfail(self):
1724
mutter('test with expected failure')
1725
self.knownFailure('this_fails')
1727
def test_unexpected_success(self):
1728
mutter('test with unexpected success')
1729
self.expectFailure('should_fail', lambda: None)
1731
return ExampleTests(name)
1734
class TestTestCaseLogDetails(tests.TestCase):
1736
def _run_test(self, test_name):
1737
test = _get_test(test_name)
1738
result = testtools.TestResult()
1742
def test_fail_has_log(self):
1743
result = self._run_test('test_fail')
1744
self.assertEqual(1, len(result.failures))
1745
result_content = result.failures[0][1]
1746
if testtools_version < (0, 9, 12):
1747
self.assertContainsRe(result_content, 'Text attachment: log')
1748
self.assertContainsRe(result_content, 'this was a failing test')
1750
def test_error_has_log(self):
1751
result = self._run_test('test_error')
1752
self.assertEqual(1, len(result.errors))
1753
result_content = result.errors[0][1]
1754
if testtools_version < (0, 9, 12):
1755
self.assertContainsRe(result_content, 'Text attachment: log')
1756
self.assertContainsRe(result_content, 'this test errored')
1758
def test_skip_has_no_log(self):
1759
result = self._run_test('test_skip')
1760
self.assertEqual(['reason'], result.skip_reasons.keys())
1761
skips = result.skip_reasons['reason']
1762
self.assertEqual(1, len(skips))
1764
self.assertFalse('log' in test.getDetails())
1766
def test_missing_feature_has_no_log(self):
1767
# testtools doesn't know about addNotSupported, so it just gets
1768
# considered as a skip
1769
result = self._run_test('test_missing_feature')
1770
self.assertEqual([missing_feature], result.skip_reasons.keys())
1771
skips = result.skip_reasons[missing_feature]
1772
self.assertEqual(1, len(skips))
1774
self.assertFalse('log' in test.getDetails())
1776
def test_xfail_has_no_log(self):
1777
result = self._run_test('test_xfail')
1778
self.assertEqual(1, len(result.expectedFailures))
1779
result_content = result.expectedFailures[0][1]
1780
self.assertNotContainsRe(result_content, 'Text attachment: log')
1781
self.assertNotContainsRe(result_content, 'test with expected failure')
1783
def test_unexpected_success_has_log(self):
1784
result = self._run_test('test_unexpected_success')
1785
self.assertEqual(1, len(result.unexpectedSuccesses))
1786
# Inconsistency, unexpectedSuccesses is a list of tests,
1787
# expectedFailures is a list of reasons?
1788
test = result.unexpectedSuccesses[0]
1789
details = test.getDetails()
1790
self.assertTrue('log' in details)
1793
class TestTestCloning(tests.TestCase):
1794
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1796
def test_cloned_testcase_does_not_share_details(self):
1797
"""A TestCase cloned with clone_test does not share mutable attributes
1798
such as details or cleanups.
1800
class Test(tests.TestCase):
1802
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1803
orig_test = Test('test_foo')
1804
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1805
orig_test.run(unittest.TestResult())
1806
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1807
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1809
def test_double_apply_scenario_preserves_first_scenario(self):
1810
"""Applying two levels of scenarios to a test preserves the attributes
1811
added by both scenarios.
1813
class Test(tests.TestCase):
1816
test = Test('test_foo')
1817
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1818
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1819
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1820
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1821
all_tests = list(tests.iter_suite_tests(suite))
1822
self.assertLength(4, all_tests)
1823
all_xys = sorted((t.x, t.y) for t in all_tests)
1824
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1660
1827
# NB: Don't delete this; it's not actually from 0.11!
1661
1828
@deprecated_function(deprecated_in((0, 11, 0)))
1973
2137
load_list='missing file name', list_only=True)
2140
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2142
_test_needs_features = [features.subunit]
2144
def run_subunit_stream(self, test_name):
2145
from subunit import ProtocolTestCase
2147
return TestUtil.TestSuite([_get_test(test_name)])
2148
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2149
test_suite_factory=factory)
2150
test = ProtocolTestCase(stream)
2151
result = testtools.TestResult()
2153
content = stream.getvalue()
2154
return content, result
2156
def test_fail_has_log(self):
2157
content, result = self.run_subunit_stream('test_fail')
2158
self.assertEqual(1, len(result.failures))
2159
self.assertContainsRe(content, '(?m)^log$')
2160
self.assertContainsRe(content, 'this test will fail')
2162
def test_error_has_log(self):
2163
content, result = self.run_subunit_stream('test_error')
2164
self.assertContainsRe(content, '(?m)^log$')
2165
self.assertContainsRe(content, 'this test errored')
2167
def test_skip_has_no_log(self):
2168
content, result = self.run_subunit_stream('test_skip')
2169
self.assertNotContainsRe(content, '(?m)^log$')
2170
self.assertNotContainsRe(content, 'this test will be skipped')
2171
self.assertEqual(['reason'], result.skip_reasons.keys())
2172
skips = result.skip_reasons['reason']
2173
self.assertEqual(1, len(skips))
2175
# RemotedTestCase doesn't preserve the "details"
2176
## self.assertFalse('log' in test.getDetails())
2178
def test_missing_feature_has_no_log(self):
2179
content, result = self.run_subunit_stream('test_missing_feature')
2180
self.assertNotContainsRe(content, '(?m)^log$')
2181
self.assertNotContainsRe(content, 'missing the feature')
2182
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2183
skips = result.skip_reasons['_MissingFeature\n']
2184
self.assertEqual(1, len(skips))
2186
# RemotedTestCase doesn't preserve the "details"
2187
## self.assertFalse('log' in test.getDetails())
2189
def test_xfail_has_no_log(self):
2190
content, result = self.run_subunit_stream('test_xfail')
2191
self.assertNotContainsRe(content, '(?m)^log$')
2192
self.assertNotContainsRe(content, 'test with expected failure')
2193
self.assertEqual(1, len(result.expectedFailures))
2194
result_content = result.expectedFailures[0][1]
2195
self.assertNotContainsRe(result_content, 'Text attachment: log')
2196
self.assertNotContainsRe(result_content, 'test with expected failure')
2198
def test_unexpected_success_has_log(self):
2199
content, result = self.run_subunit_stream('test_unexpected_success')
2200
self.assertContainsRe(content, '(?m)^log$')
2201
self.assertContainsRe(content, 'test with unexpected success')
2202
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2203
# success, if a min version check is added remove this
2204
from subunit import TestProtocolClient as _Client
2205
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2206
self.expectFailure('subunit treats "unexpectedSuccess"'
2207
' as a plain success',
2208
self.assertEqual, 1, len(result.unexpectedSuccesses))
2209
self.assertEqual(1, len(result.unexpectedSuccesses))
2210
test = result.unexpectedSuccesses[0]
2211
# RemotedTestCase doesn't preserve the "details"
2212
## self.assertTrue('log' in test.getDetails())
2214
def test_success_has_no_log(self):
2215
content, result = self.run_subunit_stream('test_success')
2216
self.assertEqual(1, result.testsRun)
2217
self.assertNotContainsRe(content, '(?m)^log$')
2218
self.assertNotContainsRe(content, 'this test succeeds')
1976
2221
class TestRunBzr(tests.TestCase):
2285
2537
def test_allow_plugins(self):
2286
2538
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2288
2540
command = self._popen_args[0]
2289
2541
self.assertEqual([], command[2:])
2291
2543
def test_set_env(self):
2292
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2544
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2293
2545
# set in the child
2294
2546
def check_environment():
2295
2547
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2296
2548
self.check_popen_state = check_environment
2297
2549
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2298
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2550
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2299
2551
# not set in theparent
2300
2552
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2302
2554
def test_run_bzr_subprocess_env_del(self):
2303
2555
"""run_bzr_subprocess can remove environment variables too."""
2304
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2556
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2557
def check_environment():
2306
2558
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2307
2559
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2308
2560
self.check_popen_state = check_environment
2309
2561
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2310
env_changes={'EXISTANT_ENV_VAR':None})
2562
env_changes={'EXISTANT_ENV_VAR':None})
2311
2563
# Still set in parent
2312
2564
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2313
2565
del os.environ['EXISTANT_ENV_VAR']
2315
2567
def test_env_del_missing(self):
2316
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2568
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2569
def check_environment():
2318
2570
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2319
2571
self.check_popen_state = check_environment
2320
2572
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2321
env_changes={'NON_EXISTANT_ENV_VAR':None})
2573
env_changes={'NON_EXISTANT_ENV_VAR':None})
2323
2575
def test_working_dir(self):
2324
2576
"""Test that we can specify the working dir for the child"""
2359
2610
self.assertEqual('bzr: interrupted\n', result[1])
2362
class TestFeature(tests.TestCase):
2364
def test_caching(self):
2365
"""Feature._probe is called by the feature at most once."""
2366
class InstrumentedFeature(tests.Feature):
2368
super(InstrumentedFeature, self).__init__()
2371
self.calls.append('_probe')
2373
feature = InstrumentedFeature()
2375
self.assertEqual(['_probe'], feature.calls)
2377
self.assertEqual(['_probe'], feature.calls)
2379
def test_named_str(self):
2380
"""Feature.__str__ should thunk to feature_name()."""
2381
class NamedFeature(tests.Feature):
2382
def feature_name(self):
2384
feature = NamedFeature()
2385
self.assertEqual('symlinks', str(feature))
2387
def test_default_str(self):
2388
"""Feature.__str__ should default to __class__.__name__."""
2389
class NamedFeature(tests.Feature):
2391
feature = NamedFeature()
2392
self.assertEqual('NamedFeature', str(feature))
2395
class TestUnavailableFeature(tests.TestCase):
2397
def test_access_feature(self):
2398
feature = tests.Feature()
2399
exception = tests.UnavailableFeature(feature)
2400
self.assertIs(feature, exception.args[0])
2403
simple_thunk_feature = tests._CompatabilityThunkFeature(
2404
deprecated_in((2, 1, 0)),
2405
'bzrlib.tests.test_selftest',
2406
'simple_thunk_feature','UnicodeFilename',
2407
replacement_module='bzrlib.tests'
2410
class Test_CompatibilityFeature(tests.TestCase):
2412
def test_does_thunk(self):
2413
res = self.callDeprecated(
2414
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2415
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2416
simple_thunk_feature.available)
2417
self.assertEqual(tests.UnicodeFilename.available(), res)
2420
class TestModuleAvailableFeature(tests.TestCase):
2422
def test_available_module(self):
2423
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2424
self.assertEqual('bzrlib.tests', feature.module_name)
2425
self.assertEqual('bzrlib.tests', str(feature))
2426
self.assertTrue(feature.available())
2427
self.assertIs(tests, feature.module)
2429
def test_unavailable_module(self):
2430
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2431
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2432
self.assertFalse(feature.available())
2433
self.assertIs(None, feature.module)
2436
2613
class TestSelftestFiltering(tests.TestCase):
2438
2615
def setUp(self):
2953
3140
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3143
class TestThreadLeakDetection(tests.TestCase):
3144
"""Ensure when tests leak threads we detect and report it"""
3146
class LeakRecordingResult(tests.ExtendedTestResult):
3148
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3150
def _report_thread_leak(self, test, leaks, alive):
3151
self.leaks.append((test, leaks))
3153
def test_testcase_without_addCleanups(self):
3154
"""Check old TestCase instances don't break with leak detection"""
3155
class Test(unittest.TestCase):
3158
result = self.LeakRecordingResult()
3160
result.startTestRun()
3162
result.stopTestRun()
3163
self.assertEqual(result._tests_leaking_threads_count, 0)
3164
self.assertEqual(result.leaks, [])
3166
def test_thread_leak(self):
3167
"""Ensure a thread that outlives the running of a test is reported
3169
Uses a thread that blocks on an event, and is started by the inner
3170
test case. As the thread outlives the inner case's run, it should be
3171
detected as a leak, but the event is then set so that the thread can
3172
be safely joined in cleanup so it's not leaked for real.
3174
event = threading.Event()
3175
thread = threading.Thread(name="Leaker", target=event.wait)
3176
class Test(tests.TestCase):
3177
def test_leak(self):
3179
result = self.LeakRecordingResult()
3180
test = Test("test_leak")
3181
self.addCleanup(thread.join)
3182
self.addCleanup(event.set)
3183
result.startTestRun()
3185
result.stopTestRun()
3186
self.assertEqual(result._tests_leaking_threads_count, 1)
3187
self.assertEqual(result._first_thread_leaker_id, test.id())
3188
self.assertEqual(result.leaks, [(test, set([thread]))])
3189
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3191
def test_multiple_leaks(self):
3192
"""Check multiple leaks are blamed on the test cases at fault
3194
Same concept as the previous test, but has one inner test method that
3195
leaks two threads, and one that doesn't leak at all.
3197
event = threading.Event()
3198
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3199
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3200
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3201
class Test(tests.TestCase):
3202
def test_first_leak(self):
3204
def test_second_no_leak(self):
3206
def test_third_leak(self):
3209
result = self.LeakRecordingResult()
3210
first_test = Test("test_first_leak")
3211
third_test = Test("test_third_leak")
3212
self.addCleanup(thread_a.join)
3213
self.addCleanup(thread_b.join)
3214
self.addCleanup(thread_c.join)
3215
self.addCleanup(event.set)
3216
result.startTestRun()
3218
[first_test, Test("test_second_no_leak"), third_test]
3220
result.stopTestRun()
3221
self.assertEqual(result._tests_leaking_threads_count, 2)
3222
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3223
self.assertEqual(result.leaks, [
3224
(first_test, set([thread_b])),
3225
(third_test, set([thread_a, thread_c]))])
3226
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3229
class TestPostMortemDebugging(tests.TestCase):
3230
"""Check post mortem debugging works when tests fail or error"""
3232
class TracebackRecordingResult(tests.ExtendedTestResult):
3234
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3235
self.postcode = None
3236
def _post_mortem(self, tb=None):
3237
"""Record the code object at the end of the current traceback"""
3238
tb = tb or sys.exc_info()[2]
3241
while next is not None:
3244
self.postcode = tb.tb_frame.f_code
3245
def report_error(self, test, err):
3247
def report_failure(self, test, err):
3250
def test_location_unittest_error(self):
3251
"""Needs right post mortem traceback with erroring unittest case"""
3252
class Test(unittest.TestCase):
3255
result = self.TracebackRecordingResult()
3257
self.assertEqual(result.postcode, Test.runTest.func_code)
3259
def test_location_unittest_failure(self):
3260
"""Needs right post mortem traceback with failing unittest case"""
3261
class Test(unittest.TestCase):
3263
raise self.failureException
3264
result = self.TracebackRecordingResult()
3266
self.assertEqual(result.postcode, Test.runTest.func_code)
3268
def test_location_bt_error(self):
3269
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3270
class Test(tests.TestCase):
3271
def test_error(self):
3273
result = self.TracebackRecordingResult()
3274
Test("test_error").run(result)
3275
self.assertEqual(result.postcode, Test.test_error.func_code)
3277
def test_location_bt_failure(self):
3278
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3279
class Test(tests.TestCase):
3280
def test_failure(self):
3281
raise self.failureException
3282
result = self.TracebackRecordingResult()
3283
Test("test_failure").run(result)
3284
self.assertEqual(result.postcode, Test.test_failure.func_code)
3286
def test_env_var_triggers_post_mortem(self):
3287
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3289
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3290
post_mortem_calls = []
3291
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3292
self.overrideEnv('BZR_TEST_PDB', None)
3293
result._post_mortem(1)
3294
self.overrideEnv('BZR_TEST_PDB', 'on')
3295
result._post_mortem(2)
3296
self.assertEqual([2], post_mortem_calls)
2956
3299
class TestRunSuite(tests.TestCase):
2958
3301
def test_runner_class(self):
2969
3312
self.verbosity)
2970
3313
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2971
3314
self.assertLength(1, calls)
3317
class TestEnvironHandling(tests.TestCase):
3319
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3320
self.assertFalse('MYVAR' in os.environ)
3321
self.overrideEnv('MYVAR', '42')
3322
# We use an embedded test to make sure we fix the _captureVar bug
3323
class Test(tests.TestCase):
3325
# The first call save the 42 value
3326
self.overrideEnv('MYVAR', None)
3327
self.assertEquals(None, os.environ.get('MYVAR'))
3328
# Make sure we can call it twice
3329
self.overrideEnv('MYVAR', None)
3330
self.assertEquals(None, os.environ.get('MYVAR'))
3332
result = tests.TextTestResult(output, 0, 1)
3333
Test('test_me').run(result)
3334
if not result.wasStrictlySuccessful():
3335
self.fail(output.getvalue())
3336
# We get our value back
3337
self.assertEquals('42', os.environ.get('MYVAR'))
3340
class TestIsolatedEnv(tests.TestCase):
3341
"""Test isolating tests from os.environ.
3343
Since we use tests that are already isolated from os.environ a bit of care
3344
should be taken when designing the tests to avoid bootstrap side-effects.
3345
The tests start an already clean os.environ which allow doing valid
3346
assertions about which variables are present or not and design tests around
3350
class ScratchMonkey(tests.TestCase):
3355
def test_basics(self):
3356
# Make sure we know the definition of BZR_HOME: not part of os.environ
3357
# for tests.TestCase.
3358
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3359
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3360
# Being part of isolated_environ, BZR_HOME should not appear here
3361
self.assertFalse('BZR_HOME' in os.environ)
3362
# Make sure we know the definition of LINES: part of os.environ for
3364
self.assertTrue('LINES' in tests.isolated_environ)
3365
self.assertEquals('25', tests.isolated_environ['LINES'])
3366
self.assertEquals('25', os.environ['LINES'])
3368
def test_injecting_unknown_variable(self):
3369
# BZR_HOME is known to be absent from os.environ
3370
test = self.ScratchMonkey('test_me')
3371
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3372
self.assertEquals('foo', os.environ['BZR_HOME'])
3373
tests.restore_os_environ(test)
3374
self.assertFalse('BZR_HOME' in os.environ)
3376
def test_injecting_known_variable(self):
3377
test = self.ScratchMonkey('test_me')
3378
# LINES is known to be present in os.environ
3379
tests.override_os_environ(test, {'LINES': '42'})
3380
self.assertEquals('42', os.environ['LINES'])
3381
tests.restore_os_environ(test)
3382
self.assertEquals('25', os.environ['LINES'])
3384
def test_deleting_variable(self):
3385
test = self.ScratchMonkey('test_me')
3386
# LINES is known to be present in os.environ
3387
tests.override_os_environ(test, {'LINES': None})
3388
self.assertTrue('LINES' not in os.environ)
3389
tests.restore_os_environ(test)
3390
self.assertEquals('25', os.environ['LINES'])
3393
class TestDocTestSuiteIsolation(tests.TestCase):
3394
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3396
Since tests.TestCase alreay provides an isolation from os.environ, we use
3397
the clean environment as a base for testing. To precisely capture the
3398
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3401
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3402
not `os.environ` so each test overrides it to suit its needs.
3406
def get_doctest_suite_for_string(self, klass, string):
3407
class Finder(doctest.DocTestFinder):
3409
def find(*args, **kwargs):
3410
test = doctest.DocTestParser().get_doctest(
3411
string, {}, 'foo', 'foo.py', 0)
3414
suite = klass(test_finder=Finder())
3417
def run_doctest_suite_for_string(self, klass, string):
3418
suite = self.get_doctest_suite_for_string(klass, string)
3420
result = tests.TextTestResult(output, 0, 1)
3422
return result, output
3424
def assertDocTestStringSucceds(self, klass, string):
3425
result, output = self.run_doctest_suite_for_string(klass, string)
3426
if not result.wasStrictlySuccessful():
3427
self.fail(output.getvalue())
3429
def assertDocTestStringFails(self, klass, string):
3430
result, output = self.run_doctest_suite_for_string(klass, string)
3431
if result.wasStrictlySuccessful():
3432
self.fail(output.getvalue())
3434
def test_injected_variable(self):
3435
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3438
>>> os.environ['LINES']
3441
# doctest.DocTestSuite fails as it sees '25'
3442
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3443
# tests.DocTestSuite sees '42'
3444
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3446
def test_deleted_variable(self):
3447
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3450
>>> os.environ.get('LINES')
3452
# doctest.DocTestSuite fails as it sees '25'
3453
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3454
# tests.DocTestSuite sees None
3455
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3458
class TestSelftestExcludePatterns(tests.TestCase):
3461
super(TestSelftestExcludePatterns, self).setUp()
3462
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3464
def suite_factory(self, keep_only=None, starting_with=None):
3465
"""A test suite factory with only a few tests."""
3466
class Test(tests.TestCase):
3468
# We don't need the full class path
3469
return self._testMethodName
3476
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3478
def assertTestList(self, expected, *selftest_args):
3479
# We rely on setUp installing the right test suite factory so we can
3480
# test at the command level without loading the whole test suite
3481
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3482
actual = out.splitlines()
3483
self.assertEquals(expected, actual)
3485
def test_full_list(self):
3486
self.assertTestList(['a', 'b', 'c'])
3488
def test_single_exclude(self):
3489
self.assertTestList(['b', 'c'], '-x', 'a')
3491
def test_mutiple_excludes(self):
3492
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3495
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3497
_test_needs_features = [features.subunit]
3500
super(TestCounterHooks, self).setUp()
3501
class Test(tests.TestCase):
3504
super(Test, self).setUp()
3505
self.hooks = hooks.Hooks()
3506
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3507
self.install_counter_hook(self.hooks, 'myhook')
3512
def run_hook_once(self):
3513
for hook in self.hooks['myhook']:
3516
self.test_class = Test
3518
def assertHookCalls(self, expected_calls, test_name):
3519
test = self.test_class(test_name)
3520
result = unittest.TestResult()
3522
self.assertTrue(hasattr(test, '_counters'))
3523
self.assertTrue(test._counters.has_key('myhook'))
3524
self.assertEquals(expected_calls, test._counters['myhook'])
3526
def test_no_hook(self):
3527
self.assertHookCalls(0, 'no_hook')
3529
def test_run_hook_once(self):
3530
tt = features.testtools
3531
if tt.module.__version__ < (0, 9, 8):
3532
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3533
self.assertHookCalls(1, 'run_hook_once')