337
337
formats = [workingtree_4.WorkingTreeFormat4(),
338
workingtree_3.WorkingTreeFormat3(),]
339
scenarios = make_scenarios(server1, server2, formats)
338
workingtree_3.WorkingTreeFormat3(),
339
workingtree_4.WorkingTreeFormat6()]
340
scenarios = make_scenarios(server1, server2, formats,
341
remote_server='c', remote_readonly_server='d',
342
remote_backing_server='e')
340
343
self.assertEqual([
341
344
('WorkingTreeFormat4',
342
345
{'bzrdir_format': formats[0]._matchingbzrdir,
347
350
{'bzrdir_format': formats[1]._matchingbzrdir,
348
351
'transport_readonly_server': 'b',
349
352
'transport_server': 'a',
350
'workingtree_format': formats[1]})],
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
354
369
class TestTreeScenarios(tests.TestCase):
356
371
def test_scenarios(self):
357
372
# the tree implementation scenario generator is meant to setup one
358
# instance for each working tree format, and one additional instance
373
# instance for each working tree format, one additional instance
359
374
# that will use the default wt format, but create a revision tree for
360
# the tests. this means that the wt ones should have the
361
# workingtree_to_test_tree attribute set to 'return_parameter' and the
362
# revision one set to revision_tree_from_workingtree.
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
364
381
from bzrlib.tests.per_tree import (
365
382
_dirstate_tree_from_workingtree,
391
smart_server = test_server.SmartTCPServer_for_testing
392
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
393
mem_server = memory.MemoryServer
374
394
formats = [workingtree_4.WorkingTreeFormat4(),
375
395
workingtree_3.WorkingTreeFormat3(),]
376
396
scenarios = make_scenarios(server1, server2, formats)
377
self.assertEqual(7, len(scenarios))
397
self.assertEqual(8, len(scenarios))
378
398
default_wt_format = workingtree.format_registry.get_default()
379
399
wt4_format = workingtree_4.WorkingTreeFormat4()
380
400
wt5_format = workingtree_4.WorkingTreeFormat5()
401
wt6_format = workingtree_4.WorkingTreeFormat6()
381
402
expected_scenarios = [
382
403
('WorkingTreeFormat4',
383
404
{'bzrdir_format': formats[0]._matchingbzrdir,
393
414
'workingtree_format': formats[1],
394
415
'_workingtree_to_test_tree': return_parameter,
417
('WorkingTreeFormat6,remote',
418
{'bzrdir_format': wt6_format._matchingbzrdir,
419
'repo_is_remote': True,
420
'transport_readonly_server': smart_readonly_server,
421
'transport_server': smart_server,
422
'vfs_transport_factory': mem_server,
423
'workingtree_format': wt6_format,
424
'_workingtree_to_test_tree': return_parameter,
397
427
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
398
428
'bzrdir_format': default_wt_format._matchingbzrdir,
610
640
# Guard against regression into MemoryTransport leaking
611
641
# files to disk instead of keeping them in memory.
612
642
self.assertFalse(osutils.lexists('dir'))
613
dir_format = bzrdir.format_registry.make_bzrdir('knit')
643
dir_format = controldir.format_registry.make_bzrdir('knit')
614
644
self.assertEqual(dir_format.repository_format.__class__,
615
645
the_branch.repository._format.__class__)
616
646
self.assertEqual('Bazaar-NG Knit Repository Format 1',
679
709
builder = self.make_branch_builder('dir')
680
710
rev_id = builder.build_commit()
681
711
self.assertPathExists('dir')
682
a_dir = bzrdir.BzrDir.open('dir')
712
a_dir = controldir.ControlDir.open('dir')
683
713
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
684
714
a_branch = a_dir.open_branch()
685
715
builder_branch = builder.get_branch()
984
1014
unittest.FunctionTestCase(lambda: None),
985
1015
unittest.FunctionTestCase(lambda: None)])
986
1016
suite.run(result)
987
self.assertEquals(1, result.calls)
988
self.assertEquals(2, result.count)
1017
self.assertEqual(1, result.calls)
1018
self.assertEqual(2, result.count)
991
1021
class TestRunner(tests.TestCase):
1063
1093
self.expectFailure("No absolute truth", self.assertTrue, True)
1064
1094
runner = tests.TextTestRunner(stream=StringIO())
1065
1095
result = self.run_test_runner(runner, Test("test_truth"))
1066
if testtools_version[:3] <= (0, 9, 11):
1067
self.assertContainsRe(runner.stream.getvalue(),
1069
"FAIL: \\S+\.test_truth\n"
1072
"No absolute truth\n"
1075
"Ran 1 test in .*\n"
1077
"FAILED \\(failures=1\\)\n\\Z")
1079
self.assertContainsRe(runner.stream.getvalue(),
1081
"FAIL: \\S+\.test_truth\n"
1083
"Empty attachments:\n"
1086
"reason: {{{No absolute truth}}}\n"
1088
"Ran 1 test in .*\n"
1090
"FAILED \\(failures=1\\)\n\\Z")
1096
self.assertContainsRe(runner.stream.getvalue(),
1098
"FAIL: \\S+\.test_truth\n"
1101
"\\s*(?:Text attachment: )?reason"
1107
"Ran 1 test in .*\n"
1109
"FAILED \\(failures=1\\)\n\\Z")
1092
1111
def test_result_decorator(self):
1093
1112
# decorate results
1252
1271
lambda trace=False: "ascii")
1253
1272
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1254
1273
FailureWithUnicode("test_log_unicode"))
1255
if testtools_version[:3] > (0, 9, 11):
1256
self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+ \\\\u2606}}}")
1258
self.assertContainsRe(out.getvalue(),
1259
"Text attachment: log\n"
1261
"\d+\.\d+ \\\\u2606\n"
1274
self.assertContainsRe(out.getvalue(),
1275
"(?:Text attachment: )?log"
1277
"\d+\.\d+ \\\\u2606"
1265
1281
class SampleTestCase(tests.TestCase):
1492
1508
transport_server.start_server()
1493
1509
self.addCleanup(transport_server.stop_server)
1494
1510
t = transport.get_transport_from_url(transport_server.get_url())
1495
bzrdir.BzrDir.create(t.base)
1511
controldir.ControlDir.create(t.base)
1496
1512
self.assertRaises(errors.BzrError,
1497
bzrdir.BzrDir.open_from_transport, t)
1513
controldir.ControlDir.open_from_transport, t)
1498
1514
# But if we declare this as safe, we can open the bzrdir.
1499
1515
self.permit_url(t.base)
1500
1516
self._bzr_selftest_roots.append(t.base)
1501
bzrdir.BzrDir.open_from_transport(t)
1517
controldir.ControlDir.open_from_transport(t)
1503
1519
def test_requireFeature_available(self):
1504
1520
"""self.requireFeature(available) is a no-op."""
1638
1654
self.assertRaises(AssertionError,
1639
1655
self.assertListRaises, _TestException, success_generator)
1657
def _run_successful_test(self, test):
1658
result = testtools.TestResult()
1660
self.assertTrue(result.wasSuccessful())
1641
1663
def test_overrideAttr_without_value(self):
1642
1664
self.test_attr = 'original' # Define a test attribute
1643
1665
obj = self # Make 'obj' visible to the embedded test
1644
1666
class Test(tests.TestCase):
1646
1668
def setUp(self):
1647
tests.TestCase.setUp(self)
1669
super(Test, self).setUp()
1648
1670
self.orig = self.overrideAttr(obj, 'test_attr')
1650
1672
def test_value(self):
1663
1684
class Test(tests.TestCase):
1665
1686
def setUp(self):
1666
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1667
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1669
1690
def test_value(self):
1670
1691
self.assertEqual('original', self.orig)
1671
1692
self.assertEqual('modified', obj.test_attr)
1673
test = Test('test_value')
1674
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1675
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1677
1729
def test_recordCalls(self):
1678
1730
from bzrlib.tests import test_selftest
1679
1731
calls = self.recordCalls(
1680
1732
test_selftest, '_add_numbers')
1681
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1683
self.assertEquals(calls, [((2, 10), {})])
1735
self.assertEqual(calls, [((2, 10), {})])
1686
1738
def _add_numbers(a, b):
1744
1796
result = self._run_test('test_fail')
1745
1797
self.assertEqual(1, len(result.failures))
1746
1798
result_content = result.failures[0][1]
1747
if testtools_version < (0, 9, 12):
1748
self.assertContainsRe(result_content, 'Text attachment: log')
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1749
1801
self.assertContainsRe(result_content, 'this was a failing test')
1751
1803
def test_error_has_log(self):
1752
1804
result = self._run_test('test_error')
1753
1805
self.assertEqual(1, len(result.errors))
1754
1806
result_content = result.errors[0][1]
1755
if testtools_version < (0, 9, 12):
1756
self.assertContainsRe(result_content, 'Text attachment: log')
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1757
1809
self.assertContainsRe(result_content, 'this test errored')
1759
1811
def test_skip_has_no_log(self):
1778
1830
result = self._run_test('test_xfail')
1779
1831
self.assertEqual(1, len(result.expectedFailures))
1780
1832
result_content = result.expectedFailures[0][1]
1781
self.assertNotContainsRe(result_content, 'Text attachment: log')
1833
self.assertNotContainsRe(result_content,
1834
'(?m)^(?:Text attachment: )?log(?:$|: )')
1782
1835
self.assertNotContainsRe(result_content, 'test with expected failure')
1784
1837
def test_unexpected_success_has_log(self):
1859
1912
self.assertIsInstance(2, int)
1860
1913
self.assertIsInstance(u'', basestring)
1861
1914
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1862
self.assertEquals(str(e),
1915
self.assertEqual(str(e),
1863
1916
"None is an instance of <type 'NoneType'> rather than <type 'int'>")
1864
1917
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1865
1918
e = self.assertRaises(AssertionError,
1866
1919
self.assertIsInstance, None, int, "it's just not")
1867
self.assertEquals(str(e),
1920
self.assertEqual(str(e),
1868
1921
"None is an instance of <type 'NoneType'> rather than <type 'int'>"
1869
1922
": it's just not")
1875
1928
def test_assertEqualDiff(self):
1876
1929
e = self.assertRaises(AssertionError,
1877
1930
self.assertEqualDiff, '', '\n')
1878
self.assertEquals(str(e),
1931
self.assertEqual(str(e),
1879
1932
# Don't blink ! The '+' applies to the second string
1880
1933
'first string is missing a final newline.\n+ \n')
1881
1934
e = self.assertRaises(AssertionError,
1882
1935
self.assertEqualDiff, '\n', '')
1883
self.assertEquals(str(e),
1936
self.assertEqual(str(e),
1884
1937
# Don't blink ! The '-' applies to the second string
1885
1938
'second string is missing a final newline.\n- \n')
1943
1996
warnings.warn("this is your last warning")
1945
1998
wlist, result = self.callCatchWarnings(meth, 1, 2)
1946
self.assertEquals(3, result)
1999
self.assertEqual(3, result)
1947
2000
# would like just to compare them, but UserWarning doesn't implement
1950
2003
self.assertIsInstance(w0, UserWarning)
1951
self.assertEquals("this is your last warning", str(w0))
2004
self.assertEqual("this is your last warning", str(w0))
1954
2007
class TestConvenienceMakers(tests.TestCaseWithTransport):
1957
2010
def test_make_branch_and_tree_with_format(self):
1958
2011
# we should be able to supply a format to make_branch_and_tree
1959
2012
self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1960
self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
2013
self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1961
2014
bzrlib.bzrdir.BzrDirMetaFormat1)
1963
2016
def test_make_branch_and_memory_tree(self):
1975
2028
tree = self.make_branch_and_tree('t1')
1976
2029
base = tree.bzrdir.root_transport.base
1977
2030
self.assertStartsWith(base, 'file://')
1978
self.assertEquals(tree.bzrdir.root_transport,
2031
self.assertEqual(tree.bzrdir.root_transport,
1979
2032
tree.branch.bzrdir.root_transport)
1980
self.assertEquals(tree.bzrdir.root_transport,
2033
self.assertEqual(tree.bzrdir.root_transport,
1981
2034
tree.branch.repository.bzrdir.root_transport)
2193
2246
self.assertNotContainsRe(content, 'test with expected failure')
2194
2247
self.assertEqual(1, len(result.expectedFailures))
2195
2248
result_content = result.expectedFailures[0][1]
2196
self.assertNotContainsRe(result_content, 'Text attachment: log')
2249
self.assertNotContainsRe(result_content,
2250
'(?m)^(?:Text attachment: )?log(?:$|: )')
2197
2251
self.assertNotContainsRe(result_content, 'test with expected failure')
2199
2253
def test_unexpected_success_has_log(self):
2246
2300
self.assertEqual(['rocks'], self.argv)
2247
2301
self.assertEqual(34, self.retcode)
2248
2302
self.assertEqual('It sure does!\n', out)
2249
self.assertEquals(out, self.out)
2303
self.assertEqual(out, self.out)
2250
2304
self.assertEqual('', err)
2251
self.assertEquals(err, self.err)
2305
self.assertEqual(err, self.err)
2253
2307
def test_run_bzr_error_regexes(self):
2884
2938
def test_empty_list(self):
2885
2939
id_list = self._create_id_list([])
2886
self.assertEquals({}, id_list.tests)
2887
self.assertEquals({}, id_list.modules)
2940
self.assertEqual({}, id_list.tests)
2941
self.assertEqual({}, id_list.modules)
2889
2943
def test_valid_list(self):
2890
2944
id_list = self._create_id_list(
2917
2971
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',
2919
2973
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
2920
self.assertEquals(['bogus'], not_found)
2921
self.assertEquals([], duplicates)
2974
self.assertEqual(['bogus'], not_found)
2975
self.assertEqual([], duplicates)
2923
2977
def test_suite_matches_id_list_with_duplicates(self):
2924
2978
loader = TestUtil.TestLoader()
2931
2985
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',]
2932
2986
not_found, duplicates = tests.suite_matches_id_list(
2933
2987
dupes, test_list)
2934
self.assertEquals([], not_found)
2935
self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
2988
self.assertEqual([], not_found)
2989
self.assertEqual(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
3030
3084
self._create_test_list_file(test_list_fname,
3031
3085
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3032
3086
tlist = tests.load_test_id_list(test_list_fname)
3033
self.assertEquals(2, len(tlist))
3034
self.assertEquals('mod1.cl1.meth1', tlist[0])
3035
self.assertEquals('mod2.cl2.meth2', tlist[1])
3087
self.assertEqual(2, len(tlist))
3088
self.assertEqual('mod1.cl1.meth1', tlist[0])
3089
self.assertEqual('mod2.cl2.meth2', tlist[1])
3037
3091
def test_load_dirty_file(self):
3038
3092
test_list_fname = 'test.list'
3040
3094
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3042
3096
tlist = tests.load_test_id_list(test_list_fname)
3043
self.assertEquals(4, len(tlist))
3044
self.assertEquals('mod1.cl1.meth1', tlist[0])
3045
self.assertEquals('', tlist[1])
3046
self.assertEquals('mod2.cl2.meth2', tlist[2])
3047
self.assertEquals('bar baz', tlist[3])
3097
self.assertEqual(4, len(tlist))
3098
self.assertEqual('mod1.cl1.meth1', tlist[0])
3099
self.assertEqual('', tlist[1])
3100
self.assertEqual('mod2.cl2.meth2', tlist[2])
3101
self.assertEqual('bar baz', tlist[3])
3050
3104
class TestFilteredByModuleTestLoader(tests.TestCase):
3058
3112
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3059
3113
loader = self._create_loader(test_list)
3060
3114
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3061
self.assertEquals(test_list, _test_ids(suite))
3115
self.assertEqual(test_list, _test_ids(suite))
3063
3117
def test_exclude_tests(self):
3064
3118
test_list = ['bogus']
3065
3119
loader = self._create_loader(test_list)
3066
3120
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3067
self.assertEquals([], _test_ids(suite))
3121
self.assertEqual([], _test_ids(suite))
3070
3124
class TestFilteredByNameStartTestLoader(tests.TestCase):
3080
3134
loader = self._create_loader('bzrlib.tests.test_samp')
3082
3136
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3083
self.assertEquals(test_list, _test_ids(suite))
3137
self.assertEqual(test_list, _test_ids(suite))
3085
3139
def test_load_tests_inside_module(self):
3086
3140
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3087
3141
loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
3089
3143
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3090
self.assertEquals(test_list, _test_ids(suite))
3144
self.assertEqual(test_list, _test_ids(suite))
3092
3146
def test_exclude_tests(self):
3093
3147
test_list = ['bogus']
3094
3148
loader = self._create_loader('bogus')
3096
3150
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3097
self.assertEquals([], _test_ids(suite))
3151
self.assertEqual([], _test_ids(suite))
3100
3154
class TestTestPrefixRegistry(tests.TestCase):
3106
3160
def test_register_new_prefix(self):
3107
3161
tpr = self._get_registry()
3108
3162
tpr.register('foo', 'fff.ooo.ooo')
3109
self.assertEquals('fff.ooo.ooo', tpr.get('foo'))
3163
self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3111
3165
def test_register_existing_prefix(self):
3112
3166
tpr = self._get_registry()
3113
3167
tpr.register('bar', 'bbb.aaa.rrr')
3114
3168
tpr.register('bar', 'bBB.aAA.rRR')
3115
self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3169
self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3116
3170
self.assertThat(self.get_log(),
3117
3171
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3118
3172
doctest.ELLIPSIS))
3134
3188
def test_predefined_prefixes(self):
3135
3189
tpr = tests.test_prefix_alias_registry
3136
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
3137
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
3138
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
3139
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
3140
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3141
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3190
self.assertEqual('bzrlib', tpr.resolve_alias('bzrlib'))
3191
self.assertEqual('bzrlib.doc', tpr.resolve_alias('bd'))
3192
self.assertEqual('bzrlib.utils', tpr.resolve_alias('bu'))
3193
self.assertEqual('bzrlib.tests', tpr.resolve_alias('bt'))
3194
self.assertEqual('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3195
self.assertEqual('bzrlib.plugins', tpr.resolve_alias('bp'))
3144
3198
class TestThreadLeakDetection(tests.TestCase):
3315
3369
self.assertLength(1, calls)
3318
class TestUncollectedWarnings(tests.TestCase):
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3319
3438
"""Check a test case still alive after being run emits a warning"""
3321
3440
class Test(tests.TestCase):
3333
3452
self.Test("test_skip"),
3336
def _inject_stream_into_subunit(self, stream):
3337
"""To be overridden by subclasses that run tests out of process"""
3339
3455
def _run_selftest_with_suite(self, **kwargs):
3341
self._inject_stream_into_subunit(sio)
3342
3456
old_flags = tests.selftest_debug_flags
3343
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3344
3458
gc_on = gc.isenabled()
3348
tests.selftest(test_suite_factory=self._get_suite, stream=sio,
3349
stop_on_failure=False, **kwargs)
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3353
3467
tests.selftest_debug_flags = old_flags
3354
output = sio.getvalue()
3355
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3356
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3394
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3397
class TestUncollectedWarningsForking(TestUncollectedWarnings):
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3398
3511
"""Check warnings from tests staying alive are emitted when forking"""
3400
_test_needs_features = [features.subunit]
3402
def _inject_stream_into_subunit(self, stream):
3403
"""Monkey-patch subunit so the extra output goes to stream not stdout
3405
Some APIs need rewriting so this kind of bogus hackery can be replaced
3406
by passing the stream param from run_tests down into ProtocolTestCase.
3408
from subunit import ProtocolTestCase
3409
_original_init = ProtocolTestCase.__init__
3410
def _init_with_passthrough(self, *args, **kwargs):
3411
_original_init(self, *args, **kwargs)
3412
self._passthrough = stream
3413
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3415
def _run_selftest_with_suite(self, **kwargs):
3416
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3417
if getattr(os, "fork", None) is None:
3418
raise tests.TestNotApplicable("Platform doesn't support forking")
3419
# Make sure the fork code is actually invoked by claiming two cores
3420
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3421
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3422
return TestUncollectedWarnings._run_selftest_with_suite(self, **kwargs)
3425
3514
class TestEnvironHandling(tests.TestCase):
3432
3521
def test_me(self):
3433
3522
# The first call save the 42 value
3434
3523
self.overrideEnv('MYVAR', None)
3435
self.assertEquals(None, os.environ.get('MYVAR'))
3524
self.assertEqual(None, os.environ.get('MYVAR'))
3436
3525
# Make sure we can call it twice
3437
3526
self.overrideEnv('MYVAR', None)
3438
self.assertEquals(None, os.environ.get('MYVAR'))
3527
self.assertEqual(None, os.environ.get('MYVAR'))
3439
3528
output = StringIO()
3440
3529
result = tests.TextTestResult(output, 0, 1)
3441
3530
Test('test_me').run(result)
3442
3531
if not result.wasStrictlySuccessful():
3443
3532
self.fail(output.getvalue())
3444
3533
# We get our value back
3445
self.assertEquals('42', os.environ.get('MYVAR'))
3534
self.assertEqual('42', os.environ.get('MYVAR'))
3448
3537
class TestIsolatedEnv(tests.TestCase):
3464
3553
# Make sure we know the definition of BZR_HOME: not part of os.environ
3465
3554
# for tests.TestCase.
3466
3555
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3467
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3556
self.assertEqual(None, tests.isolated_environ['BZR_HOME'])
3468
3557
# Being part of isolated_environ, BZR_HOME should not appear here
3469
3558
self.assertFalse('BZR_HOME' in os.environ)
3470
3559
# Make sure we know the definition of LINES: part of os.environ for
3471
3560
# tests.TestCase
3472
3561
self.assertTrue('LINES' in tests.isolated_environ)
3473
self.assertEquals('25', tests.isolated_environ['LINES'])
3474
self.assertEquals('25', os.environ['LINES'])
3562
self.assertEqual('25', tests.isolated_environ['LINES'])
3563
self.assertEqual('25', os.environ['LINES'])
3476
3565
def test_injecting_unknown_variable(self):
3477
3566
# BZR_HOME is known to be absent from os.environ
3478
3567
test = self.ScratchMonkey('test_me')
3479
3568
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3480
self.assertEquals('foo', os.environ['BZR_HOME'])
3569
self.assertEqual('foo', os.environ['BZR_HOME'])
3481
3570
tests.restore_os_environ(test)
3482
3571
self.assertFalse('BZR_HOME' in os.environ)
3485
3574
test = self.ScratchMonkey('test_me')
3486
3575
# LINES is known to be present in os.environ
3487
3576
tests.override_os_environ(test, {'LINES': '42'})
3488
self.assertEquals('42', os.environ['LINES'])
3577
self.assertEqual('42', os.environ['LINES'])
3489
3578
tests.restore_os_environ(test)
3490
self.assertEquals('25', os.environ['LINES'])
3579
self.assertEqual('25', os.environ['LINES'])
3492
3581
def test_deleting_variable(self):
3493
3582
test = self.ScratchMonkey('test_me')
3588
3677
# test at the command level without loading the whole test suite
3589
3678
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3590
3679
actual = out.splitlines()
3591
self.assertEquals(expected, actual)
3680
self.assertEqual(expected, actual)
3593
3682
def test_full_list(self):
3594
3683
self.assertTestList(['a', 'b', 'c'])