351
350
{'bzrdir_format': formats[1]._matchingbzrdir,
352
351
'transport_readonly_server': 'b',
353
352
'transport_server': 'a',
354
'workingtree_format': formats[1]})],
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
358
369
class TestTreeScenarios(tests.TestCase):
360
371
def test_scenarios(self):
361
372
# the tree implementation scenario generator is meant to setup one
362
# instance for each working tree format, and one additional instance
373
# instance for each working tree format, one additional instance
363
374
# that will use the default wt format, but create a revision tree for
364
# the tests. this means that the wt ones should have the
365
# workingtree_to_test_tree attribute set to 'return_parameter' and the
366
# revision one set to revision_tree_from_workingtree.
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
368
381
from bzrlib.tests.per_tree import (
369
382
_dirstate_tree_from_workingtree,
378
formats = [workingtree.WorkingTreeFormat4(),
379
workingtree.WorkingTreeFormat3(),]
391
smart_server = test_server.SmartTCPServer_for_testing
392
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
393
mem_server = memory.MemoryServer
394
formats = [workingtree_4.WorkingTreeFormat4(),
395
workingtree_3.WorkingTreeFormat3(),]
380
396
scenarios = make_scenarios(server1, server2, formats)
381
self.assertEqual(7, len(scenarios))
397
self.assertEqual(8, len(scenarios))
382
398
default_wt_format = workingtree.format_registry.get_default()
383
wt4_format = workingtree.WorkingTreeFormat4()
384
wt5_format = workingtree.WorkingTreeFormat5()
399
wt4_format = workingtree_4.WorkingTreeFormat4()
400
wt5_format = workingtree_4.WorkingTreeFormat5()
401
wt6_format = workingtree_4.WorkingTreeFormat6()
385
402
expected_scenarios = [
386
403
('WorkingTreeFormat4',
387
404
{'bzrdir_format': formats[0]._matchingbzrdir,
863
test = self.get_passing_test()
864
result.startTest(test)
865
prefix = len(result_stream.getvalue())
866
# the err parameter has the shape:
867
# (class, exception object, traceback)
868
# KnownFailures dont get their tracebacks shown though, so we
870
err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
871
result.report_known_failure(test, err)
872
output = result_stream.getvalue()[prefix:]
873
lines = output.splitlines()
874
self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
875
if sys.version_info > (2, 7):
876
self.expectFailure("_ExpectedFailure on 2.7 loses the message",
877
self.assertNotEqual, lines[1], ' ')
878
self.assertEqual(lines[1], ' foo')
879
self.assertEqual(2, len(lines))
890
_get_test("test_xfail").run(result)
891
self.assertContainsRe(result_stream.getvalue(),
892
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
893
"\\s*(?:Text attachment: )?reason"
881
898
def get_passing_test(self):
882
899
"""Return a test object that can't be run usefully."""
1479
1507
transport_server = memory.MemoryServer()
1480
1508
transport_server.start_server()
1481
1509
self.addCleanup(transport_server.stop_server)
1482
t = transport.get_transport(transport_server.get_url())
1483
bzrdir.BzrDir.create(t.base)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1484
1512
self.assertRaises(errors.BzrError,
1485
bzrdir.BzrDir.open_from_transport, t)
1513
controldir.ControlDir.open_from_transport, t)
1486
1514
# But if we declare this as safe, we can open the bzrdir.
1487
1515
self.permit_url(t.base)
1488
1516
self._bzr_selftest_roots.append(t.base)
1489
bzrdir.BzrDir.open_from_transport(t)
1517
controldir.ControlDir.open_from_transport(t)
1491
1519
def test_requireFeature_available(self):
1492
1520
"""self.requireFeature(available) is a no-op."""
1493
class Available(tests.Feature):
1521
class Available(features.Feature):
1494
1522
def _probe(self):return True
1495
1523
feature = Available()
1496
1524
self.requireFeature(feature)
1498
1526
def test_requireFeature_unavailable(self):
1499
1527
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1500
class Unavailable(tests.Feature):
1528
class Unavailable(features.Feature):
1501
1529
def _probe(self):return False
1502
1530
feature = Unavailable()
1503
1531
self.assertRaises(tests.UnavailableFeature,
1651
1684
class Test(tests.TestCase):
1653
1686
def setUp(self):
1654
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1655
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1657
1690
def test_value(self):
1658
1691
self.assertEqual('original', self.orig)
1659
1692
self.assertEqual('modified', obj.test_attr)
1661
test = Test('test_value')
1662
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1663
1695
self.assertEqual('original', obj.test_attr)
1666
class _MissingFeature(tests.Feature):
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1729
def test_recordCalls(self):
1730
from bzrlib.tests import test_selftest
1731
calls = self.recordCalls(
1732
test_selftest, '_add_numbers')
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1735
self.assertEquals(calls, [((2, 10), {})])
1738
def _add_numbers(a, b):
1742
class _MissingFeature(features.Feature):
1667
1743
def _probe(self):
1669
1745
missing_feature = _MissingFeature()
1720
1796
result = self._run_test('test_fail')
1721
1797
self.assertEqual(1, len(result.failures))
1722
1798
result_content = result.failures[0][1]
1723
self.assertContainsRe(result_content, 'Text attachment: log')
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1724
1801
self.assertContainsRe(result_content, 'this was a failing test')
1726
1803
def test_error_has_log(self):
1727
1804
result = self._run_test('test_error')
1728
1805
self.assertEqual(1, len(result.errors))
1729
1806
result_content = result.errors[0][1]
1730
self.assertContainsRe(result_content, 'Text attachment: log')
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1731
1809
self.assertContainsRe(result_content, 'this test errored')
1733
1811
def test_skip_has_no_log(self):
2167
2246
self.assertNotContainsRe(content, 'test with expected failure')
2168
2247
self.assertEqual(1, len(result.expectedFailures))
2169
2248
result_content = result.expectedFailures[0][1]
2170
self.assertNotContainsRe(result_content, 'Text attachment: log')
2249
self.assertNotContainsRe(result_content,
2250
'(?m)^(?:Text attachment: )?log(?:$|: )')
2171
2251
self.assertNotContainsRe(result_content, 'test with expected failure')
2173
2253
def test_unexpected_success_has_log(self):
2174
2254
content, result = self.run_subunit_stream('test_unexpected_success')
2175
2255
self.assertContainsRe(content, '(?m)^log$')
2176
2256
self.assertContainsRe(content, 'test with unexpected success')
2177
self.expectFailure('subunit treats "unexpectedSuccess"'
2178
' as a plain success',
2179
self.assertEqual, 1, len(result.unexpectedSuccesses))
2257
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2258
# success, if a min version check is added remove this
2259
from subunit import TestProtocolClient as _Client
2260
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2261
self.expectFailure('subunit treats "unexpectedSuccess"'
2262
' as a plain success',
2263
self.assertEqual, 1, len(result.unexpectedSuccesses))
2180
2264
self.assertEqual(1, len(result.unexpectedSuccesses))
2181
2265
test = result.unexpectedSuccesses[0]
2182
2266
# RemotedTestCase doesn't preserve the "details"
2482
2566
class TestStartBzrSubProcess(tests.TestCase):
2567
"""Stub test start_bzr_subprocess."""
2484
def check_popen_state(self):
2485
"""Replace to make assertions when popen is called."""
2569
def _subprocess_log_cleanup(self):
2570
"""Inhibits the base version as we don't produce a log file."""
2487
2572
def _popen(self, *args, **kwargs):
2488
"""Record the command that is run, so that we can ensure it is correct"""
2573
"""Override the base version to record the command that is run.
2575
From there we can ensure it is correct without spawning a real process.
2489
2577
self.check_popen_state()
2490
2578
self._popen_args = args
2491
2579
self._popen_kwargs = kwargs
2492
2580
raise _DontSpawnProcess()
2582
def check_popen_state(self):
2583
"""Replace to make assertions when popen is called."""
2494
2585
def test_run_bzr_subprocess_no_plugins(self):
2495
2586
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2496
2587
command = self._popen_args[0]
2580
2665
self.assertEqual('bzr: interrupted\n', result[1])
2583
class TestFeature(tests.TestCase):
2585
def test_caching(self):
2586
"""Feature._probe is called by the feature at most once."""
2587
class InstrumentedFeature(tests.Feature):
2589
super(InstrumentedFeature, self).__init__()
2592
self.calls.append('_probe')
2594
feature = InstrumentedFeature()
2596
self.assertEqual(['_probe'], feature.calls)
2598
self.assertEqual(['_probe'], feature.calls)
2600
def test_named_str(self):
2601
"""Feature.__str__ should thunk to feature_name()."""
2602
class NamedFeature(tests.Feature):
2603
def feature_name(self):
2605
feature = NamedFeature()
2606
self.assertEqual('symlinks', str(feature))
2608
def test_default_str(self):
2609
"""Feature.__str__ should default to __class__.__name__."""
2610
class NamedFeature(tests.Feature):
2612
feature = NamedFeature()
2613
self.assertEqual('NamedFeature', str(feature))
2616
class TestUnavailableFeature(tests.TestCase):
2618
def test_access_feature(self):
2619
feature = tests.Feature()
2620
exception = tests.UnavailableFeature(feature)
2621
self.assertIs(feature, exception.args[0])
2624
simple_thunk_feature = tests._CompatabilityThunkFeature(
2625
deprecated_in((2, 1, 0)),
2626
'bzrlib.tests.test_selftest',
2627
'simple_thunk_feature','UnicodeFilename',
2628
replacement_module='bzrlib.tests'
2631
class Test_CompatibilityFeature(tests.TestCase):
2633
def test_does_thunk(self):
2634
res = self.callDeprecated(
2635
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2636
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2637
simple_thunk_feature.available)
2638
self.assertEqual(tests.UnicodeFilename.available(), res)
2641
class TestModuleAvailableFeature(tests.TestCase):
2643
def test_available_module(self):
2644
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2645
self.assertEqual('bzrlib.tests', feature.module_name)
2646
self.assertEqual('bzrlib.tests', str(feature))
2647
self.assertTrue(feature.available())
2648
self.assertIs(tests, feature.module)
2650
def test_unavailable_module(self):
2651
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2652
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2653
self.assertFalse(feature.available())
2654
self.assertIs(None, feature.module)
2657
2668
class TestSelftestFiltering(tests.TestCase):
2659
2670
def setUp(self):
2660
tests.TestCase.setUp(self)
2671
super(TestSelftestFiltering, self).setUp()
2661
2672
self.suite = TestUtil.TestSuite()
2662
2673
self.loader = TestUtil.TestLoader()
2663
2674
self.suite.addTest(self.loader.loadTestsFromModule(
3358
3369
self.assertLength(1, calls)
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3438
"""Check a test case still alive after being run emits a warning"""
3440
class Test(tests.TestCase):
3441
def test_pass(self):
3443
def test_self_ref(self):
3444
self.also_self = self.test_self_ref
3445
def test_skip(self):
3446
self.skip("Don't need")
3448
def _get_suite(self):
3449
return TestUtil.TestSuite([
3450
self.Test("test_pass"),
3451
self.Test("test_self_ref"),
3452
self.Test("test_skip"),
3455
def _run_selftest_with_suite(self, **kwargs):
3456
old_flags = tests.selftest_debug_flags
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3458
gc_on = gc.isenabled()
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3467
tests.selftest_debug_flags = old_flags
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3472
def test_testsuite(self):
3473
self._run_selftest_with_suite()
3475
def test_pattern(self):
3476
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3477
self.assertNotContainsRe(out, "test_skip")
3479
def test_exclude_pattern(self):
3480
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3481
self.assertNotContainsRe(out, "test_skip")
3483
def test_random_seed(self):
3484
self._run_selftest_with_suite(random_seed="now")
3486
def test_matching_tests_first(self):
3487
self._run_selftest_with_suite(matching_tests_first=True,
3488
pattern="test_self_ref$")
3490
def test_starting_with_and_exclude(self):
3491
out = self._run_selftest_with_suite(starting_with=["bt."],
3492
exclude_pattern="test_skip$")
3493
self.assertNotContainsRe(out, "test_skip")
3495
def test_additonal_decorator(self):
3496
out = self._run_selftest_with_suite(
3497
suite_decorators=[tests.TestDecorator])
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted with subunit"""
3503
_test_needs_features = [features.subunit]
3505
def _run_selftest_with_suite(self, **kwargs):
3506
return TestUncollectedWarnings._run_selftest_with_suite(self,
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3511
"""Check warnings from tests staying alive are emitted when forking"""
3361
3514
class TestEnvironHandling(tests.TestCase):
3363
3516
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3497
3650
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3498
3651
# tests.DocTestSuite sees None
3499
3652
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3655
class TestSelftestExcludePatterns(tests.TestCase):
3658
super(TestSelftestExcludePatterns, self).setUp()
3659
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3661
def suite_factory(self, keep_only=None, starting_with=None):
3662
"""A test suite factory with only a few tests."""
3663
class Test(tests.TestCase):
3665
# We don't need the full class path
3666
return self._testMethodName
3673
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3675
def assertTestList(self, expected, *selftest_args):
3676
# We rely on setUp installing the right test suite factory so we can
3677
# test at the command level without loading the whole test suite
3678
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3679
actual = out.splitlines()
3680
self.assertEquals(expected, actual)
3682
def test_full_list(self):
3683
self.assertTestList(['a', 'b', 'c'])
3685
def test_single_exclude(self):
3686
self.assertTestList(['b', 'c'], '-x', 'a')
3688
def test_mutiple_excludes(self):
3689
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3692
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3694
_test_needs_features = [features.subunit]
3697
super(TestCounterHooks, self).setUp()
3698
class Test(tests.TestCase):
3701
super(Test, self).setUp()
3702
self.hooks = hooks.Hooks()
3703
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3704
self.install_counter_hook(self.hooks, 'myhook')
3709
def run_hook_once(self):
3710
for hook in self.hooks['myhook']:
3713
self.test_class = Test
3715
def assertHookCalls(self, expected_calls, test_name):
3716
test = self.test_class(test_name)
3717
result = unittest.TestResult()
3719
self.assertTrue(hasattr(test, '_counters'))
3720
self.assertTrue(test._counters.has_key('myhook'))
3721
self.assertEquals(expected_calls, test._counters['myhook'])
3723
def test_no_hook(self):
3724
self.assertHookCalls(0, 'no_hook')
3726
def test_run_hook_once(self):
3727
tt = features.testtools
3728
if tt.module.__version__ < (0, 9, 8):
3729
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3730
self.assertHookCalls(1, 'run_hook_once')