350
354
{'bzrdir_format': formats[1]._matchingbzrdir,
351
355
'transport_readonly_server': 'b',
352
356
'transport_server': 'a',
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
357
'workingtree_format': formats[1]})],
369
361
class TestTreeScenarios(tests.TestCase):
371
363
def test_scenarios(self):
372
364
# the tree implementation scenario generator is meant to setup one
373
# instance for each working tree format, one additional instance
365
# instance for each working tree format, and one additional instance
374
366
# that will use the default wt format, but create a revision tree for
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
367
# the tests. this means that the wt ones should have the
368
# workingtree_to_test_tree attribute set to 'return_parameter' and the
369
# revision one set to revision_tree_from_workingtree.
381
371
from bzrlib.tests.per_tree import (
382
372
_dirstate_tree_from_workingtree,
890
_get_test("test_xfail").run(result)
891
self.assertContainsRe(result_stream.getvalue(),
892
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
893
"\\s*(?:Text attachment: )?reason"
867
test = self.get_passing_test()
868
result.startTest(test)
869
prefix = len(result_stream.getvalue())
870
# the err parameter has the shape:
871
# (class, exception object, traceback)
872
# KnownFailures dont get their tracebacks shown though, so we
874
err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
result.report_known_failure(test, err)
876
output = result_stream.getvalue()[prefix:]
877
lines = output.splitlines()
878
self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
if sys.version_info > (2, 7):
880
self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
self.assertNotEqual, lines[1], ' ')
882
self.assertEqual(lines[1], ' foo')
883
self.assertEqual(2, len(lines))
898
885
def get_passing_test(self):
899
886
"""Return a test object that can't be run usefully."""
1507
1501
transport_server = memory.MemoryServer()
1508
1502
transport_server.start_server()
1509
1503
self.addCleanup(transport_server.stop_server)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1504
t = transport.get_transport(transport_server.get_url())
1505
bzrdir.BzrDir.create(t.base)
1512
1506
self.assertRaises(errors.BzrError,
1513
controldir.ControlDir.open_from_transport, t)
1507
bzrdir.BzrDir.open_from_transport, t)
1514
1508
# But if we declare this as safe, we can open the bzrdir.
1515
1509
self.permit_url(t.base)
1516
1510
self._bzr_selftest_roots.append(t.base)
1517
controldir.ControlDir.open_from_transport(t)
1511
bzrdir.BzrDir.open_from_transport(t)
1519
1513
def test_requireFeature_available(self):
1520
1514
"""self.requireFeature(available) is a no-op."""
1521
class Available(features.Feature):
1515
class Available(tests.Feature):
1522
1516
def _probe(self):return True
1523
1517
feature = Available()
1524
1518
self.requireFeature(feature)
1526
1520
def test_requireFeature_unavailable(self):
1527
1521
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1528
class Unavailable(features.Feature):
1522
class Unavailable(tests.Feature):
1529
1523
def _probe(self):return False
1530
1524
feature = Unavailable()
1531
1525
self.assertRaises(tests.UnavailableFeature,
1684
1673
class Test(tests.TestCase):
1686
1675
def setUp(self):
1687
super(Test, self).setUp()
1676
tests.TestCase.setUp(self)
1688
1677
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1690
1679
def test_value(self):
1691
1680
self.assertEqual('original', self.orig)
1692
1681
self.assertEqual('modified', obj.test_attr)
1694
self._run_successful_test(Test('test_value'))
1683
test = Test('test_value')
1684
test.run(unittest.TestResult())
1695
1685
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1729
def test_recordCalls(self):
1730
from bzrlib.tests import test_selftest
1731
calls = self.recordCalls(
1732
test_selftest, '_add_numbers')
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1735
self.assertEquals(calls, [((2, 10), {})])
1738
def _add_numbers(a, b):
1742
class _MissingFeature(features.Feature):
1688
class _MissingFeature(tests.Feature):
1743
1689
def _probe(self):
1745
1691
missing_feature = _MissingFeature()
1796
1742
result = self._run_test('test_fail')
1797
1743
self.assertEqual(1, len(result.failures))
1798
1744
result_content = result.failures[0][1]
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1745
self.assertContainsRe(result_content, 'Text attachment: log')
1801
1746
self.assertContainsRe(result_content, 'this was a failing test')
1803
1748
def test_error_has_log(self):
1804
1749
result = self._run_test('test_error')
1805
1750
self.assertEqual(1, len(result.errors))
1806
1751
result_content = result.errors[0][1]
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1752
self.assertContainsRe(result_content, 'Text attachment: log')
1809
1753
self.assertContainsRe(result_content, 'this test errored')
1811
1755
def test_skip_has_no_log(self):
2665
2607
self.assertEqual('bzr: interrupted\n', result[1])
2610
class TestFeature(tests.TestCase):
2612
def test_caching(self):
2613
"""Feature._probe is called by the feature at most once."""
2614
class InstrumentedFeature(tests.Feature):
2616
super(InstrumentedFeature, self).__init__()
2619
self.calls.append('_probe')
2621
feature = InstrumentedFeature()
2623
self.assertEqual(['_probe'], feature.calls)
2625
self.assertEqual(['_probe'], feature.calls)
2627
def test_named_str(self):
2628
"""Feature.__str__ should thunk to feature_name()."""
2629
class NamedFeature(tests.Feature):
2630
def feature_name(self):
2632
feature = NamedFeature()
2633
self.assertEqual('symlinks', str(feature))
2635
def test_default_str(self):
2636
"""Feature.__str__ should default to __class__.__name__."""
2637
class NamedFeature(tests.Feature):
2639
feature = NamedFeature()
2640
self.assertEqual('NamedFeature', str(feature))
2643
class TestUnavailableFeature(tests.TestCase):
2645
def test_access_feature(self):
2646
feature = tests.Feature()
2647
exception = tests.UnavailableFeature(feature)
2648
self.assertIs(feature, exception.args[0])
2651
simple_thunk_feature = tests._CompatabilityThunkFeature(
2652
deprecated_in((2, 1, 0)),
2653
'bzrlib.tests.test_selftest',
2654
'simple_thunk_feature','UnicodeFilename',
2655
replacement_module='bzrlib.tests'
2658
class Test_CompatibilityFeature(tests.TestCase):
2660
def test_does_thunk(self):
2661
res = self.callDeprecated(
2662
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2663
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2664
simple_thunk_feature.available)
2665
self.assertEqual(tests.UnicodeFilename.available(), res)
2668
class TestModuleAvailableFeature(tests.TestCase):
2670
def test_available_module(self):
2671
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2672
self.assertEqual('bzrlib.tests', feature.module_name)
2673
self.assertEqual('bzrlib.tests', str(feature))
2674
self.assertTrue(feature.available())
2675
self.assertIs(tests, feature.module)
2677
def test_unavailable_module(self):
2678
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2679
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2680
self.assertFalse(feature.available())
2681
self.assertIs(None, feature.module)
2668
2684
class TestSelftestFiltering(tests.TestCase):
2670
2686
def setUp(self):
2671
super(TestSelftestFiltering, self).setUp()
2687
tests.TestCase.setUp(self)
2672
2688
self.suite = TestUtil.TestSuite()
2673
2689
self.loader = TestUtil.TestLoader()
2674
2690
self.suite.addTest(self.loader.loadTestsFromModule(
3369
3385
self.assertLength(1, calls)
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3438
"""Check a test case still alive after being run emits a warning"""
3440
class Test(tests.TestCase):
3441
def test_pass(self):
3443
def test_self_ref(self):
3444
self.also_self = self.test_self_ref
3445
def test_skip(self):
3446
self.skip("Don't need")
3448
def _get_suite(self):
3449
return TestUtil.TestSuite([
3450
self.Test("test_pass"),
3451
self.Test("test_self_ref"),
3452
self.Test("test_skip"),
3455
def _run_selftest_with_suite(self, **kwargs):
3456
old_flags = tests.selftest_debug_flags
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3458
gc_on = gc.isenabled()
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3467
tests.selftest_debug_flags = old_flags
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3472
def test_testsuite(self):
3473
self._run_selftest_with_suite()
3475
def test_pattern(self):
3476
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3477
self.assertNotContainsRe(out, "test_skip")
3479
def test_exclude_pattern(self):
3480
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3481
self.assertNotContainsRe(out, "test_skip")
3483
def test_random_seed(self):
3484
self._run_selftest_with_suite(random_seed="now")
3486
def test_matching_tests_first(self):
3487
self._run_selftest_with_suite(matching_tests_first=True,
3488
pattern="test_self_ref$")
3490
def test_starting_with_and_exclude(self):
3491
out = self._run_selftest_with_suite(starting_with=["bt."],
3492
exclude_pattern="test_skip$")
3493
self.assertNotContainsRe(out, "test_skip")
3495
def test_additonal_decorator(self):
3496
out = self._run_selftest_with_suite(
3497
suite_decorators=[tests.TestDecorator])
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted with subunit"""
3503
_test_needs_features = [features.subunit]
3505
def _run_selftest_with_suite(self, **kwargs):
3506
return TestUncollectedWarnings._run_selftest_with_suite(self,
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3511
"""Check warnings from tests staying alive are emitted when forking"""
3514
3388
class TestEnvironHandling(tests.TestCase):
3516
3390
def test_overrideEnv_None_called_twice_doesnt_leak(self):