867
test = self.get_passing_test()
868
result.startTest(test)
869
prefix = len(result_stream.getvalue())
870
# the err parameter has the shape:
871
# (class, exception object, traceback)
872
# KnownFailures dont get their tracebacks shown though, so we
874
err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
result.report_known_failure(test, err)
876
output = result_stream.getvalue()[prefix:]
877
lines = output.splitlines()
878
self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
if sys.version_info > (2, 7):
880
self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
self.assertNotEqual, lines[1], ' ')
882
self.assertEqual(lines[1], ' foo')
883
self.assertEqual(2, len(lines))
859
_get_test("test_xfail").run(result)
860
self.assertContainsRe(result_stream.getvalue(),
861
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
862
"\\s*(?:Text attachment: )?reason"
885
867
def get_passing_test(self):
886
868
"""Return a test object that can't be run usefully."""
1742
1729
result = self._run_test('test_fail')
1743
1730
self.assertEqual(1, len(result.failures))
1744
1731
result_content = result.failures[0][1]
1745
self.assertContainsRe(result_content, 'Text attachment: log')
1732
self.assertContainsRe(result_content,
1733
'(?m)^(?:Text attachment: )?log(?:$|: )')
1746
1734
self.assertContainsRe(result_content, 'this was a failing test')
1748
1736
def test_error_has_log(self):
1749
1737
result = self._run_test('test_error')
1750
1738
self.assertEqual(1, len(result.errors))
1751
1739
result_content = result.errors[0][1]
1752
self.assertContainsRe(result_content, 'Text attachment: log')
1740
self.assertContainsRe(result_content,
1741
'(?m)^(?:Text attachment: )?log(?:$|: )')
1753
1742
self.assertContainsRe(result_content, 'this test errored')
1755
1744
def test_skip_has_no_log(self):
2607
2598
self.assertEqual('bzr: interrupted\n', result[1])
2610
class TestFeature(tests.TestCase):
2612
def test_caching(self):
2613
"""Feature._probe is called by the feature at most once."""
2614
class InstrumentedFeature(tests.Feature):
2616
super(InstrumentedFeature, self).__init__()
2619
self.calls.append('_probe')
2621
feature = InstrumentedFeature()
2623
self.assertEqual(['_probe'], feature.calls)
2625
self.assertEqual(['_probe'], feature.calls)
2627
def test_named_str(self):
2628
"""Feature.__str__ should thunk to feature_name()."""
2629
class NamedFeature(tests.Feature):
2630
def feature_name(self):
2632
feature = NamedFeature()
2633
self.assertEqual('symlinks', str(feature))
2635
def test_default_str(self):
2636
"""Feature.__str__ should default to __class__.__name__."""
2637
class NamedFeature(tests.Feature):
2639
feature = NamedFeature()
2640
self.assertEqual('NamedFeature', str(feature))
2643
class TestUnavailableFeature(tests.TestCase):
2645
def test_access_feature(self):
2646
feature = tests.Feature()
2647
exception = tests.UnavailableFeature(feature)
2648
self.assertIs(feature, exception.args[0])
2651
simple_thunk_feature = tests._CompatabilityThunkFeature(
2652
deprecated_in((2, 1, 0)),
2653
'bzrlib.tests.test_selftest',
2654
'simple_thunk_feature','UnicodeFilename',
2655
replacement_module='bzrlib.tests'
2658
class Test_CompatibilityFeature(tests.TestCase):
2660
def test_does_thunk(self):
2661
res = self.callDeprecated(
2662
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2663
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2664
simple_thunk_feature.available)
2665
self.assertEqual(tests.UnicodeFilename.available(), res)
2668
class TestModuleAvailableFeature(tests.TestCase):
2670
def test_available_module(self):
2671
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2672
self.assertEqual('bzrlib.tests', feature.module_name)
2673
self.assertEqual('bzrlib.tests', str(feature))
2674
self.assertTrue(feature.available())
2675
self.assertIs(tests, feature.module)
2677
def test_unavailable_module(self):
2678
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2679
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2680
self.assertFalse(feature.available())
2681
self.assertIs(None, feature.module)
2684
2601
class TestSelftestFiltering(tests.TestCase):
2686
2603
def setUp(self):
3385
3302
self.assertLength(1, calls)
3305
class _Selftest(object):
3306
"""Mixin for tests needing full selftest output"""
3308
def _inject_stream_into_subunit(self, stream):
3309
"""To be overridden by subclasses that run tests out of process"""
3311
def _run_selftest(self, **kwargs):
3313
self._inject_stream_into_subunit(sio)
3314
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3315
return sio.getvalue()
3318
class _ForkedSelftest(_Selftest):
3319
"""Mixin for tests needing full selftest output with forked children"""
3321
_test_needs_features = [features.subunit]
3323
def _inject_stream_into_subunit(self, stream):
3324
"""Monkey-patch subunit so the extra output goes to stream not stdout
3326
Some APIs need rewriting so this kind of bogus hackery can be replaced
3327
by passing the stream param from run_tests down into ProtocolTestCase.
3329
from subunit import ProtocolTestCase
3330
_original_init = ProtocolTestCase.__init__
3331
def _init_with_passthrough(self, *args, **kwargs):
3332
_original_init(self, *args, **kwargs)
3333
self._passthrough = stream
3334
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3336
def _run_selftest(self, **kwargs):
3337
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3338
if getattr(os, "fork", None) is None:
3339
raise tests.TestNotApplicable("Platform doesn't support forking")
3340
# Make sure the fork code is actually invoked by claiming two cores
3341
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3342
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3343
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3346
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3347
"""Check operation of --parallel=fork selftest option"""
3349
def test_error_in_child_during_fork(self):
3350
"""Error in a forked child during test setup should get reported"""
3351
class Test(tests.TestCase):
3352
def testMethod(self):
3354
# We don't care what, just break something that a child will run
3355
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3356
out = self._run_selftest(test_suite_factory=Test)
3357
# Lines from the tracebacks of the two child processes may be mixed
3358
# together due to the way subunit parses and forwards the streams,
3359
# so permit extra lines between each part of the error output.
3360
self.assertContainsRe(out,
3363
".+ in fork_for_tests\n"
3365
"\s*workaround_zealous_crypto_random\(\)\n"
3370
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3371
"""Check a test case still alive after being run emits a warning"""
3373
class Test(tests.TestCase):
3374
def test_pass(self):
3376
def test_self_ref(self):
3377
self.also_self = self.test_self_ref
3378
def test_skip(self):
3379
self.skip("Don't need")
3381
def _get_suite(self):
3382
return TestUtil.TestSuite([
3383
self.Test("test_pass"),
3384
self.Test("test_self_ref"),
3385
self.Test("test_skip"),
3388
def _run_selftest_with_suite(self, **kwargs):
3389
old_flags = tests.selftest_debug_flags
3390
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3391
gc_on = gc.isenabled()
3395
output = self._run_selftest(test_suite_factory=self._get_suite,
3400
tests.selftest_debug_flags = old_flags
3401
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3402
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3405
def test_testsuite(self):
3406
self._run_selftest_with_suite()
3408
def test_pattern(self):
3409
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3410
self.assertNotContainsRe(out, "test_skip")
3412
def test_exclude_pattern(self):
3413
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3414
self.assertNotContainsRe(out, "test_skip")
3416
def test_random_seed(self):
3417
self._run_selftest_with_suite(random_seed="now")
3419
def test_matching_tests_first(self):
3420
self._run_selftest_with_suite(matching_tests_first=True,
3421
pattern="test_self_ref$")
3423
def test_starting_with_and_exclude(self):
3424
out = self._run_selftest_with_suite(starting_with=["bt."],
3425
exclude_pattern="test_skip$")
3426
self.assertNotContainsRe(out, "test_skip")
3428
def test_additonal_decorator(self):
3429
out = self._run_selftest_with_suite(
3430
suite_decorators=[tests.TestDecorator])
3433
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3434
"""Check warnings from tests staying alive are emitted with subunit"""
3436
_test_needs_features = [features.subunit]
3438
def _run_selftest_with_suite(self, **kwargs):
3439
return TestUncollectedWarnings._run_selftest_with_suite(self,
3440
runner_class=tests.SubUnitBzrRunner, **kwargs)
3443
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3444
"""Check warnings from tests staying alive are emitted when forking"""
3388
3447
class TestEnvironHandling(tests.TestCase):
3390
3449
def test_overrideEnv_None_called_twice_doesnt_leak(self):