354
350
{'bzrdir_format': formats[1]._matchingbzrdir,
355
351
'transport_readonly_server': 'b',
356
352
'transport_server': 'a',
357
'workingtree_format': formats[1]})],
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
361
369
class TestTreeScenarios(tests.TestCase):
363
371
def test_scenarios(self):
364
372
# the tree implementation scenario generator is meant to setup one
365
# instance for each working tree format, and one additional instance
373
# instance for each working tree format, one additional instance
366
374
# that will use the default wt format, but create a revision tree for
367
# the tests. this means that the wt ones should have the
368
# workingtree_to_test_tree attribute set to 'return_parameter' and the
369
# revision one set to revision_tree_from_workingtree.
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
371
381
from bzrlib.tests.per_tree import (
372
382
_dirstate_tree_from_workingtree,
867
test = self.get_passing_test()
868
result.startTest(test)
869
prefix = len(result_stream.getvalue())
870
# the err parameter has the shape:
871
# (class, exception object, traceback)
872
# KnownFailures dont get their tracebacks shown though, so we
874
err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
result.report_known_failure(test, err)
876
output = result_stream.getvalue()[prefix:]
877
lines = output.splitlines()
878
self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
if sys.version_info > (2, 7):
880
self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
self.assertNotEqual, lines[1], ' ')
882
self.assertEqual(lines[1], ' foo')
883
self.assertEqual(2, len(lines))
890
_get_test("test_xfail").run(result)
891
self.assertContainsRe(result_stream.getvalue(),
892
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
893
"\\s*(?:Text attachment: )?reason"
885
898
def get_passing_test(self):
886
899
"""Return a test object that can't be run usefully."""
1501
1507
transport_server = memory.MemoryServer()
1502
1508
transport_server.start_server()
1503
1509
self.addCleanup(transport_server.stop_server)
1504
t = transport.get_transport(transport_server.get_url())
1505
bzrdir.BzrDir.create(t.base)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1506
1512
self.assertRaises(errors.BzrError,
1507
bzrdir.BzrDir.open_from_transport, t)
1513
controldir.ControlDir.open_from_transport, t)
1508
1514
# But if we declare this as safe, we can open the bzrdir.
1509
1515
self.permit_url(t.base)
1510
1516
self._bzr_selftest_roots.append(t.base)
1511
bzrdir.BzrDir.open_from_transport(t)
1517
controldir.ControlDir.open_from_transport(t)
1513
1519
def test_requireFeature_available(self):
1514
1520
"""self.requireFeature(available) is a no-op."""
1515
class Available(tests.Feature):
1521
class Available(features.Feature):
1516
1522
def _probe(self):return True
1517
1523
feature = Available()
1518
1524
self.requireFeature(feature)
1520
1526
def test_requireFeature_unavailable(self):
1521
1527
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1522
class Unavailable(tests.Feature):
1528
class Unavailable(features.Feature):
1523
1529
def _probe(self):return False
1524
1530
feature = Unavailable()
1525
1531
self.assertRaises(tests.UnavailableFeature,
1742
1760
result = self._run_test('test_fail')
1743
1761
self.assertEqual(1, len(result.failures))
1744
1762
result_content = result.failures[0][1]
1745
self.assertContainsRe(result_content, 'Text attachment: log')
1763
self.assertContainsRe(result_content,
1764
'(?m)^(?:Text attachment: )?log(?:$|: )')
1746
1765
self.assertContainsRe(result_content, 'this was a failing test')
1748
1767
def test_error_has_log(self):
1749
1768
result = self._run_test('test_error')
1750
1769
self.assertEqual(1, len(result.errors))
1751
1770
result_content = result.errors[0][1]
1752
self.assertContainsRe(result_content, 'Text attachment: log')
1771
self.assertContainsRe(result_content,
1772
'(?m)^(?:Text attachment: )?log(?:$|: )')
1753
1773
self.assertContainsRe(result_content, 'this test errored')
1755
1775
def test_skip_has_no_log(self):
2607
2629
self.assertEqual('bzr: interrupted\n', result[1])
2610
class TestFeature(tests.TestCase):
2612
def test_caching(self):
2613
"""Feature._probe is called by the feature at most once."""
2614
class InstrumentedFeature(tests.Feature):
2616
super(InstrumentedFeature, self).__init__()
2619
self.calls.append('_probe')
2621
feature = InstrumentedFeature()
2623
self.assertEqual(['_probe'], feature.calls)
2625
self.assertEqual(['_probe'], feature.calls)
2627
def test_named_str(self):
2628
"""Feature.__str__ should thunk to feature_name()."""
2629
class NamedFeature(tests.Feature):
2630
def feature_name(self):
2632
feature = NamedFeature()
2633
self.assertEqual('symlinks', str(feature))
2635
def test_default_str(self):
2636
"""Feature.__str__ should default to __class__.__name__."""
2637
class NamedFeature(tests.Feature):
2639
feature = NamedFeature()
2640
self.assertEqual('NamedFeature', str(feature))
2643
class TestUnavailableFeature(tests.TestCase):
2645
def test_access_feature(self):
2646
feature = tests.Feature()
2647
exception = tests.UnavailableFeature(feature)
2648
self.assertIs(feature, exception.args[0])
2651
simple_thunk_feature = tests._CompatabilityThunkFeature(
2652
deprecated_in((2, 1, 0)),
2653
'bzrlib.tests.test_selftest',
2654
'simple_thunk_feature','UnicodeFilename',
2655
replacement_module='bzrlib.tests'
2658
class Test_CompatibilityFeature(tests.TestCase):
2660
def test_does_thunk(self):
2661
res = self.callDeprecated(
2662
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2663
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2664
simple_thunk_feature.available)
2665
self.assertEqual(tests.UnicodeFilename.available(), res)
2668
class TestModuleAvailableFeature(tests.TestCase):
2670
def test_available_module(self):
2671
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2672
self.assertEqual('bzrlib.tests', feature.module_name)
2673
self.assertEqual('bzrlib.tests', str(feature))
2674
self.assertTrue(feature.available())
2675
self.assertIs(tests, feature.module)
2677
def test_unavailable_module(self):
2678
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2679
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2680
self.assertFalse(feature.available())
2681
self.assertIs(None, feature.module)
2684
2632
class TestSelftestFiltering(tests.TestCase):
2686
2634
def setUp(self):
2687
tests.TestCase.setUp(self)
2635
super(TestSelftestFiltering, self).setUp()
2688
2636
self.suite = TestUtil.TestSuite()
2689
2637
self.loader = TestUtil.TestLoader()
2690
2638
self.suite.addTest(self.loader.loadTestsFromModule(
3385
3333
self.assertLength(1, calls)
3336
class _Selftest(object):
3337
"""Mixin for tests needing full selftest output"""
3339
def _inject_stream_into_subunit(self, stream):
3340
"""To be overridden by subclasses that run tests out of process"""
3342
def _run_selftest(self, **kwargs):
3344
self._inject_stream_into_subunit(sio)
3345
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3346
return sio.getvalue()
3349
class _ForkedSelftest(_Selftest):
3350
"""Mixin for tests needing full selftest output with forked children"""
3352
_test_needs_features = [features.subunit]
3354
def _inject_stream_into_subunit(self, stream):
3355
"""Monkey-patch subunit so the extra output goes to stream not stdout
3357
Some APIs need rewriting so this kind of bogus hackery can be replaced
3358
by passing the stream param from run_tests down into ProtocolTestCase.
3360
from subunit import ProtocolTestCase
3361
_original_init = ProtocolTestCase.__init__
3362
def _init_with_passthrough(self, *args, **kwargs):
3363
_original_init(self, *args, **kwargs)
3364
self._passthrough = stream
3365
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3367
def _run_selftest(self, **kwargs):
3368
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3369
if getattr(os, "fork", None) is None:
3370
raise tests.TestNotApplicable("Platform doesn't support forking")
3371
# Make sure the fork code is actually invoked by claiming two cores
3372
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3373
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3374
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3377
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3378
"""Check operation of --parallel=fork selftest option"""
3380
def test_error_in_child_during_fork(self):
3381
"""Error in a forked child during test setup should get reported"""
3382
class Test(tests.TestCase):
3383
def testMethod(self):
3385
# We don't care what, just break something that a child will run
3386
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3387
out = self._run_selftest(test_suite_factory=Test)
3388
# Lines from the tracebacks of the two child processes may be mixed
3389
# together due to the way subunit parses and forwards the streams,
3390
# so permit extra lines between each part of the error output.
3391
self.assertContainsRe(out,
3394
".+ in fork_for_tests\n"
3396
"\s*workaround_zealous_crypto_random\(\)\n"
3401
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3402
"""Check a test case still alive after being run emits a warning"""
3404
class Test(tests.TestCase):
3405
def test_pass(self):
3407
def test_self_ref(self):
3408
self.also_self = self.test_self_ref
3409
def test_skip(self):
3410
self.skip("Don't need")
3412
def _get_suite(self):
3413
return TestUtil.TestSuite([
3414
self.Test("test_pass"),
3415
self.Test("test_self_ref"),
3416
self.Test("test_skip"),
3419
def _run_selftest_with_suite(self, **kwargs):
3420
old_flags = tests.selftest_debug_flags
3421
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3422
gc_on = gc.isenabled()
3426
output = self._run_selftest(test_suite_factory=self._get_suite,
3431
tests.selftest_debug_flags = old_flags
3432
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3433
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3436
def test_testsuite(self):
3437
self._run_selftest_with_suite()
3439
def test_pattern(self):
3440
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3441
self.assertNotContainsRe(out, "test_skip")
3443
def test_exclude_pattern(self):
3444
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3445
self.assertNotContainsRe(out, "test_skip")
3447
def test_random_seed(self):
3448
self._run_selftest_with_suite(random_seed="now")
3450
def test_matching_tests_first(self):
3451
self._run_selftest_with_suite(matching_tests_first=True,
3452
pattern="test_self_ref$")
3454
def test_starting_with_and_exclude(self):
3455
out = self._run_selftest_with_suite(starting_with=["bt."],
3456
exclude_pattern="test_skip$")
3457
self.assertNotContainsRe(out, "test_skip")
3459
def test_additonal_decorator(self):
3460
out = self._run_selftest_with_suite(
3461
suite_decorators=[tests.TestDecorator])
3464
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3465
"""Check warnings from tests staying alive are emitted with subunit"""
3467
_test_needs_features = [features.subunit]
3469
def _run_selftest_with_suite(self, **kwargs):
3470
return TestUncollectedWarnings._run_selftest_with_suite(self,
3471
runner_class=tests.SubUnitBzrRunner, **kwargs)
3474
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3475
"""Check warnings from tests staying alive are emitted when forking"""
3388
3478
class TestEnvironHandling(tests.TestCase):
3390
3480
def test_overrideEnv_None_called_twice_doesnt_leak(self):