867
test = self.get_passing_test()
868
result.startTest(test)
869
prefix = len(result_stream.getvalue())
870
# the err parameter has the shape:
871
# (class, exception object, traceback)
872
# KnownFailures dont get their tracebacks shown though, so we
874
err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
result.report_known_failure(test, err)
876
output = result_stream.getvalue()[prefix:]
877
lines = output.splitlines()
878
self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
if sys.version_info > (2, 7):
880
self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
self.assertNotEqual, lines[1], ' ')
882
self.assertEqual(lines[1], ' foo')
883
self.assertEqual(2, len(lines))
859
_get_test("test_xfail").run(result)
860
self.assertContainsRe(result_stream.getvalue(),
861
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
862
"\\s*(?:Text attachment: )?reason"
885
867
def get_passing_test(self):
886
868
"""Return a test object that can't be run usefully."""
1754
1729
result = self._run_test('test_fail')
1755
1730
self.assertEqual(1, len(result.failures))
1756
1731
result_content = result.failures[0][1]
1757
self.assertContainsRe(result_content, 'Text attachment: log')
1732
self.assertContainsRe(result_content,
1733
'(?m)^(?:Text attachment: )?log(?:$|: )')
1758
1734
self.assertContainsRe(result_content, 'this was a failing test')
1760
1736
def test_error_has_log(self):
1761
1737
result = self._run_test('test_error')
1762
1738
self.assertEqual(1, len(result.errors))
1763
1739
result_content = result.errors[0][1]
1764
self.assertContainsRe(result_content, 'Text attachment: log')
1740
self.assertContainsRe(result_content,
1741
'(?m)^(?:Text attachment: )?log(?:$|: )')
1765
1742
self.assertContainsRe(result_content, 'this test errored')
1767
1744
def test_skip_has_no_log(self):
2619
2598
self.assertEqual('bzr: interrupted\n', result[1])
2622
class TestFeature(tests.TestCase):
2624
def test_caching(self):
2625
"""Feature._probe is called by the feature at most once."""
2626
class InstrumentedFeature(tests.Feature):
2628
super(InstrumentedFeature, self).__init__()
2631
self.calls.append('_probe')
2633
feature = InstrumentedFeature()
2635
self.assertEqual(['_probe'], feature.calls)
2637
self.assertEqual(['_probe'], feature.calls)
2639
def test_named_str(self):
2640
"""Feature.__str__ should thunk to feature_name()."""
2641
class NamedFeature(tests.Feature):
2642
def feature_name(self):
2644
feature = NamedFeature()
2645
self.assertEqual('symlinks', str(feature))
2647
def test_default_str(self):
2648
"""Feature.__str__ should default to __class__.__name__."""
2649
class NamedFeature(tests.Feature):
2651
feature = NamedFeature()
2652
self.assertEqual('NamedFeature', str(feature))
2655
class TestUnavailableFeature(tests.TestCase):
2657
def test_access_feature(self):
2658
feature = tests.Feature()
2659
exception = tests.UnavailableFeature(feature)
2660
self.assertIs(feature, exception.args[0])
2663
simple_thunk_feature = tests._CompatabilityThunkFeature(
2664
deprecated_in((2, 1, 0)),
2665
'bzrlib.tests.test_selftest',
2666
'simple_thunk_feature','UnicodeFilename',
2667
replacement_module='bzrlib.tests'
2670
class Test_CompatibilityFeature(tests.TestCase):
2672
def test_does_thunk(self):
2673
res = self.callDeprecated(
2674
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2675
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2676
simple_thunk_feature.available)
2677
self.assertEqual(tests.UnicodeFilename.available(), res)
2680
class TestModuleAvailableFeature(tests.TestCase):
2682
def test_available_module(self):
2683
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2684
self.assertEqual('bzrlib.tests', feature.module_name)
2685
self.assertEqual('bzrlib.tests', str(feature))
2686
self.assertTrue(feature.available())
2687
self.assertIs(tests, feature.module)
2689
def test_unavailable_module(self):
2690
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2691
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2692
self.assertFalse(feature.available())
2693
self.assertIs(None, feature.module)
2696
2601
class TestSelftestFiltering(tests.TestCase):
2698
2603
def setUp(self):
3397
3302
self.assertLength(1, calls)
3305
class _Selftest(object):
3306
"""Mixin for tests needing full selftest output"""
3308
def _inject_stream_into_subunit(self, stream):
3309
"""To be overridden by subclasses that run tests out of process"""
3311
def _run_selftest(self, **kwargs):
3313
self._inject_stream_into_subunit(sio)
3314
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3315
return sio.getvalue()
3318
class _ForkedSelftest(_Selftest):
3319
"""Mixin for tests needing full selftest output with forked children"""
3321
_test_needs_features = [features.subunit]
3323
def _inject_stream_into_subunit(self, stream):
3324
"""Monkey-patch subunit so the extra output goes to stream not stdout
3326
Some APIs need rewriting so this kind of bogus hackery can be replaced
3327
by passing the stream param from run_tests down into ProtocolTestCase.
3329
from subunit import ProtocolTestCase
3330
_original_init = ProtocolTestCase.__init__
3331
def _init_with_passthrough(self, *args, **kwargs):
3332
_original_init(self, *args, **kwargs)
3333
self._passthrough = stream
3334
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3336
def _run_selftest(self, **kwargs):
3337
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3338
if getattr(os, "fork", None) is None:
3339
raise tests.TestNotApplicable("Platform doesn't support forking")
3340
# Make sure the fork code is actually invoked by claiming two cores
3341
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3342
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3343
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3346
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3347
"""Check operation of --parallel=fork selftest option"""
3349
def test_error_in_child_during_fork(self):
3350
"""Error in a forked child during test setup should get reported"""
3351
class Test(tests.TestCase):
3352
def testMethod(self):
3354
# We don't care what, just break something that a child will run
3355
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3356
out = self._run_selftest(test_suite_factory=Test)
3357
# Lines from the tracebacks of the two child processes may be mixed
3358
# together due to the way subunit parses and forwards the streams,
3359
# so permit extra lines between each part of the error output.
3360
self.assertContainsRe(out,
3363
".+ in fork_for_tests\n"
3365
"\s*workaround_zealous_crypto_random\(\)\n"
3370
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3371
"""Check a test case still alive after being run emits a warning"""
3373
class Test(tests.TestCase):
3374
def test_pass(self):
3376
def test_self_ref(self):
3377
self.also_self = self.test_self_ref
3378
def test_skip(self):
3379
self.skip("Don't need")
3381
def _get_suite(self):
3382
return TestUtil.TestSuite([
3383
self.Test("test_pass"),
3384
self.Test("test_self_ref"),
3385
self.Test("test_skip"),
3388
def _run_selftest_with_suite(self, **kwargs):
3389
old_flags = tests.selftest_debug_flags
3390
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3391
gc_on = gc.isenabled()
3395
output = self._run_selftest(test_suite_factory=self._get_suite,
3400
tests.selftest_debug_flags = old_flags
3401
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3402
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3405
def test_testsuite(self):
3406
self._run_selftest_with_suite()
3408
def test_pattern(self):
3409
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3410
self.assertNotContainsRe(out, "test_skip")
3412
def test_exclude_pattern(self):
3413
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3414
self.assertNotContainsRe(out, "test_skip")
3416
def test_random_seed(self):
3417
self._run_selftest_with_suite(random_seed="now")
3419
def test_matching_tests_first(self):
3420
self._run_selftest_with_suite(matching_tests_first=True,
3421
pattern="test_self_ref$")
3423
def test_starting_with_and_exclude(self):
3424
out = self._run_selftest_with_suite(starting_with=["bt."],
3425
exclude_pattern="test_skip$")
3426
self.assertNotContainsRe(out, "test_skip")
3428
def test_additonal_decorator(self):
3429
out = self._run_selftest_with_suite(
3430
suite_decorators=[tests.TestDecorator])
3433
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3434
"""Check warnings from tests staying alive are emitted with subunit"""
3436
_test_needs_features = [features.subunit]
3438
def _run_selftest_with_suite(self, **kwargs):
3439
return TestUncollectedWarnings._run_selftest_with_suite(self,
3440
runner_class=tests.SubUnitBzrRunner, **kwargs)
3443
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3444
"""Check warnings from tests staying alive are emitted when forking"""
3400
3447
class TestEnvironHandling(tests.TestCase):
3402
3449
def test_overrideEnv_None_called_twice_doesnt_leak(self):