346
350
{'bzrdir_format': formats[1]._matchingbzrdir,
347
351
'transport_readonly_server': 'b',
348
352
'transport_server': 'a',
349
'workingtree_format': formats[1]})],
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
353
369
class TestTreeScenarios(tests.TestCase):
355
371
def test_scenarios(self):
356
372
# the tree implementation scenario generator is meant to setup one
357
# instance for each working tree format, and one additional instance
373
# instance for each working tree format, one additional instance
358
374
# that will use the default wt format, but create a revision tree for
359
# the tests. this means that the wt ones should have the
360
# workingtree_to_test_tree attribute set to 'return_parameter' and the
361
# revision one set to revision_tree_from_workingtree.
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
363
381
from bzrlib.tests.per_tree import (
364
382
_dirstate_tree_from_workingtree,
1062
1093
self.expectFailure("No absolute truth", self.assertTrue, True)
1063
1094
runner = tests.TextTestRunner(stream=StringIO())
1064
1095
result = self.run_test_runner(runner, Test("test_truth"))
1065
if testtools_version[:3] <= (0, 9, 11):
1066
self.assertContainsRe(runner.stream.getvalue(),
1068
"FAIL: \\S+\.test_truth\n"
1071
"No absolute truth\n"
1074
"Ran 1 test in .*\n"
1076
"FAILED \\(failures=1\\)\n\\Z")
1078
self.assertContainsRe(runner.stream.getvalue(),
1080
"FAIL: \\S+\.test_truth\n"
1082
"Empty attachments:\n"
1085
"reason: {{{No absolute truth}}}\n"
1087
"Ran 1 test in .*\n"
1089
"FAILED \\(failures=1\\)\n\\Z")
1096
self.assertContainsRe(runner.stream.getvalue(),
1098
"FAIL: \\S+\.test_truth\n"
1101
"\\s*(?:Text attachment: )?reason"
1107
"Ran 1 test in .*\n"
1109
"FAILED \\(failures=1\\)\n\\Z")
1091
1111
def test_result_decorator(self):
1092
1112
# decorate results
1490
1507
transport_server = memory.MemoryServer()
1491
1508
transport_server.start_server()
1492
1509
self.addCleanup(transport_server.stop_server)
1493
t = transport.get_transport(transport_server.get_url())
1494
bzrdir.BzrDir.create(t.base)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1495
1512
self.assertRaises(errors.BzrError,
1496
bzrdir.BzrDir.open_from_transport, t)
1513
controldir.ControlDir.open_from_transport, t)
1497
1514
# But if we declare this as safe, we can open the bzrdir.
1498
1515
self.permit_url(t.base)
1499
1516
self._bzr_selftest_roots.append(t.base)
1500
bzrdir.BzrDir.open_from_transport(t)
1517
controldir.ControlDir.open_from_transport(t)
1502
1519
def test_requireFeature_available(self):
1503
1520
"""self.requireFeature(available) is a no-op."""
1662
1684
class Test(tests.TestCase):
1664
1686
def setUp(self):
1665
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1666
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1668
1690
def test_value(self):
1669
1691
self.assertEqual('original', self.orig)
1670
1692
self.assertEqual('modified', obj.test_attr)
1672
test = Test('test_value')
1673
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1674
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1676
1729
def test_recordCalls(self):
1677
1730
from bzrlib.tests import test_selftest
1678
1731
calls = self.recordCalls(
1679
1732
test_selftest, '_add_numbers')
1680
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1682
self.assertEquals(calls, [((2, 10), {})])
1735
self.assertEqual(calls, [((2, 10), {})])
1685
1738
def _add_numbers(a, b):
1743
1796
result = self._run_test('test_fail')
1744
1797
self.assertEqual(1, len(result.failures))
1745
1798
result_content = result.failures[0][1]
1746
if testtools_version < (0, 9, 12):
1747
self.assertContainsRe(result_content, 'Text attachment: log')
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1748
1801
self.assertContainsRe(result_content, 'this was a failing test')
1750
1803
def test_error_has_log(self):
1751
1804
result = self._run_test('test_error')
1752
1805
self.assertEqual(1, len(result.errors))
1753
1806
result_content = result.errors[0][1]
1754
if testtools_version < (0, 9, 12):
1755
self.assertContainsRe(result_content, 'Text attachment: log')
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1756
1809
self.assertContainsRe(result_content, 'this test errored')
1758
1811
def test_skip_has_no_log(self):
3079
3134
loader = self._create_loader('bzrlib.tests.test_samp')
3081
3136
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3082
self.assertEquals(test_list, _test_ids(suite))
3137
self.assertEqual(test_list, _test_ids(suite))
3084
3139
def test_load_tests_inside_module(self):
3085
3140
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3086
3141
loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
3088
3143
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3089
self.assertEquals(test_list, _test_ids(suite))
3144
self.assertEqual(test_list, _test_ids(suite))
3091
3146
def test_exclude_tests(self):
3092
3147
test_list = ['bogus']
3093
3148
loader = self._create_loader('bogus')
3095
3150
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3096
self.assertEquals([], _test_ids(suite))
3151
self.assertEqual([], _test_ids(suite))
3099
3154
class TestTestPrefixRegistry(tests.TestCase):
3133
3188
def test_predefined_prefixes(self):
3134
3189
tpr = tests.test_prefix_alias_registry
3135
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
3136
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
3137
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
3138
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
3139
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3140
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3190
self.assertEqual('bzrlib', tpr.resolve_alias('bzrlib'))
3191
self.assertEqual('bzrlib.doc', tpr.resolve_alias('bd'))
3192
self.assertEqual('bzrlib.utils', tpr.resolve_alias('bu'))
3193
self.assertEqual('bzrlib.tests', tpr.resolve_alias('bt'))
3194
self.assertEqual('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3195
self.assertEqual('bzrlib.plugins', tpr.resolve_alias('bp'))
3143
3198
class TestThreadLeakDetection(tests.TestCase):
3314
3369
self.assertLength(1, calls)
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3438
"""Check a test case still alive after being run emits a warning"""
3440
class Test(tests.TestCase):
3441
def test_pass(self):
3443
def test_self_ref(self):
3444
self.also_self = self.test_self_ref
3445
def test_skip(self):
3446
self.skip("Don't need")
3448
def _get_suite(self):
3449
return TestUtil.TestSuite([
3450
self.Test("test_pass"),
3451
self.Test("test_self_ref"),
3452
self.Test("test_skip"),
3455
def _run_selftest_with_suite(self, **kwargs):
3456
old_flags = tests.selftest_debug_flags
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3458
gc_on = gc.isenabled()
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3467
tests.selftest_debug_flags = old_flags
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3472
def test_testsuite(self):
3473
self._run_selftest_with_suite()
3475
def test_pattern(self):
3476
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3477
self.assertNotContainsRe(out, "test_skip")
3479
def test_exclude_pattern(self):
3480
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3481
self.assertNotContainsRe(out, "test_skip")
3483
def test_random_seed(self):
3484
self._run_selftest_with_suite(random_seed="now")
3486
def test_matching_tests_first(self):
3487
self._run_selftest_with_suite(matching_tests_first=True,
3488
pattern="test_self_ref$")
3490
def test_starting_with_and_exclude(self):
3491
out = self._run_selftest_with_suite(starting_with=["bt."],
3492
exclude_pattern="test_skip$")
3493
self.assertNotContainsRe(out, "test_skip")
3495
def test_additonal_decorator(self):
3496
out = self._run_selftest_with_suite(
3497
suite_decorators=[tests.TestDecorator])
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted with subunit"""
3503
_test_needs_features = [features.subunit]
3505
def _run_selftest_with_suite(self, **kwargs):
3506
return TestUncollectedWarnings._run_selftest_with_suite(self,
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3511
"""Check warnings from tests staying alive are emitted when forking"""
3317
3514
class TestEnvironHandling(tests.TestCase):
3319
3516
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3324
3521
def test_me(self):
3325
3522
# The first call save the 42 value
3326
3523
self.overrideEnv('MYVAR', None)
3327
self.assertEquals(None, os.environ.get('MYVAR'))
3524
self.assertEqual(None, os.environ.get('MYVAR'))
3328
3525
# Make sure we can call it twice
3329
3526
self.overrideEnv('MYVAR', None)
3330
self.assertEquals(None, os.environ.get('MYVAR'))
3527
self.assertEqual(None, os.environ.get('MYVAR'))
3331
3528
output = StringIO()
3332
3529
result = tests.TextTestResult(output, 0, 1)
3333
3530
Test('test_me').run(result)
3334
3531
if not result.wasStrictlySuccessful():
3335
3532
self.fail(output.getvalue())
3336
3533
# We get our value back
3337
self.assertEquals('42', os.environ.get('MYVAR'))
3534
self.assertEqual('42', os.environ.get('MYVAR'))
3340
3537
class TestIsolatedEnv(tests.TestCase):
3356
3553
# Make sure we know the definition of BZR_HOME: not part of os.environ
3357
3554
# for tests.TestCase.
3358
3555
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3359
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3556
self.assertEqual(None, tests.isolated_environ['BZR_HOME'])
3360
3557
# Being part of isolated_environ, BZR_HOME should not appear here
3361
3558
self.assertFalse('BZR_HOME' in os.environ)
3362
3559
# Make sure we know the definition of LINES: part of os.environ for
3363
3560
# tests.TestCase
3364
3561
self.assertTrue('LINES' in tests.isolated_environ)
3365
self.assertEquals('25', tests.isolated_environ['LINES'])
3366
self.assertEquals('25', os.environ['LINES'])
3562
self.assertEqual('25', tests.isolated_environ['LINES'])
3563
self.assertEqual('25', os.environ['LINES'])
3368
3565
def test_injecting_unknown_variable(self):
3369
3566
# BZR_HOME is known to be absent from os.environ
3370
3567
test = self.ScratchMonkey('test_me')
3371
3568
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3372
self.assertEquals('foo', os.environ['BZR_HOME'])
3569
self.assertEqual('foo', os.environ['BZR_HOME'])
3373
3570
tests.restore_os_environ(test)
3374
3571
self.assertFalse('BZR_HOME' in os.environ)