337
337
formats = [workingtree_4.WorkingTreeFormat4(),
338
workingtree_3.WorkingTreeFormat3(),]
339
scenarios = make_scenarios(server1, server2, formats)
338
workingtree_3.WorkingTreeFormat3(),
339
workingtree_4.WorkingTreeFormat6()]
340
scenarios = make_scenarios(server1, server2, formats,
341
remote_server='c', remote_readonly_server='d',
342
remote_backing_server='e')
340
343
self.assertEqual([
341
344
('WorkingTreeFormat4',
342
345
{'bzrdir_format': formats[0]._matchingbzrdir,
347
350
{'bzrdir_format': formats[1]._matchingbzrdir,
348
351
'transport_readonly_server': 'b',
349
352
'transport_server': 'a',
350
'workingtree_format': formats[1]})],
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
354
369
class TestTreeScenarios(tests.TestCase):
356
371
def test_scenarios(self):
357
372
# the tree implementation scenario generator is meant to setup one
358
# instance for each working tree format, and one additional instance
373
# instance for each working tree format, one additional instance
359
374
# that will use the default wt format, but create a revision tree for
360
# the tests. this means that the wt ones should have the
361
# workingtree_to_test_tree attribute set to 'return_parameter' and the
362
# revision one set to revision_tree_from_workingtree.
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
364
381
from bzrlib.tests.per_tree import (
365
382
_dirstate_tree_from_workingtree,
391
smart_server = test_server.SmartTCPServer_for_testing
392
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
393
mem_server = memory.MemoryServer
374
394
formats = [workingtree_4.WorkingTreeFormat4(),
375
395
workingtree_3.WorkingTreeFormat3(),]
376
396
scenarios = make_scenarios(server1, server2, formats)
377
self.assertEqual(7, len(scenarios))
397
self.assertEqual(8, len(scenarios))
378
398
default_wt_format = workingtree.format_registry.get_default()
379
399
wt4_format = workingtree_4.WorkingTreeFormat4()
380
400
wt5_format = workingtree_4.WorkingTreeFormat5()
401
wt6_format = workingtree_4.WorkingTreeFormat6()
381
402
expected_scenarios = [
382
403
('WorkingTreeFormat4',
383
404
{'bzrdir_format': formats[0]._matchingbzrdir,
393
414
'workingtree_format': formats[1],
394
415
'_workingtree_to_test_tree': return_parameter,
417
('WorkingTreeFormat6,remote',
418
{'bzrdir_format': wt6_format._matchingbzrdir,
419
'repo_is_remote': True,
420
'transport_readonly_server': smart_readonly_server,
421
'transport_server': smart_server,
422
'vfs_transport_factory': mem_server,
423
'workingtree_format': wt6_format,
424
'_workingtree_to_test_tree': return_parameter,
397
427
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
398
428
'bzrdir_format': default_wt_format._matchingbzrdir,
610
640
# Guard against regression into MemoryTransport leaking
611
641
# files to disk instead of keeping them in memory.
612
642
self.assertFalse(osutils.lexists('dir'))
613
dir_format = bzrdir.format_registry.make_bzrdir('knit')
643
dir_format = controldir.format_registry.make_bzrdir('knit')
614
644
self.assertEqual(dir_format.repository_format.__class__,
615
645
the_branch.repository._format.__class__)
616
646
self.assertEqual('Bazaar-NG Knit Repository Format 1',
679
709
builder = self.make_branch_builder('dir')
680
710
rev_id = builder.build_commit()
681
711
self.assertPathExists('dir')
682
a_dir = bzrdir.BzrDir.open('dir')
712
a_dir = controldir.ControlDir.open('dir')
683
713
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
684
714
a_branch = a_dir.open_branch()
685
715
builder_branch = builder.get_branch()
1063
1093
self.expectFailure("No absolute truth", self.assertTrue, True)
1064
1094
runner = tests.TextTestRunner(stream=StringIO())
1065
1095
result = self.run_test_runner(runner, Test("test_truth"))
1066
if testtools_version[:3] <= (0, 9, 11):
1067
self.assertContainsRe(runner.stream.getvalue(),
1069
"FAIL: \\S+\.test_truth\n"
1072
"No absolute truth\n"
1075
"Ran 1 test in .*\n"
1077
"FAILED \\(failures=1\\)\n\\Z")
1079
self.assertContainsRe(runner.stream.getvalue(),
1081
"FAIL: \\S+\.test_truth\n"
1083
"Empty attachments:\n"
1086
"reason: {{{No absolute truth}}}\n"
1088
"Ran 1 test in .*\n"
1090
"FAILED \\(failures=1\\)\n\\Z")
1096
self.assertContainsRe(runner.stream.getvalue(),
1098
"FAIL: \\S+\.test_truth\n"
1101
"\\s*(?:Text attachment: )?reason"
1107
"Ran 1 test in .*\n"
1109
"FAILED \\(failures=1\\)\n\\Z")
1092
1111
def test_result_decorator(self):
1093
1112
# decorate results
1252
1271
lambda trace=False: "ascii")
1253
1272
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1254
1273
FailureWithUnicode("test_log_unicode"))
1255
if testtools_version[:3] > (0, 9, 11):
1256
self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+ \\\\u2606}}}")
1258
self.assertContainsRe(out.getvalue(),
1259
"Text attachment: log\n"
1261
"\d+\.\d+ \\\\u2606\n"
1274
self.assertContainsRe(out.getvalue(),
1275
"(?:Text attachment: )?log"
1277
"\d+\.\d+ \\\\u2606"
1265
1281
class SampleTestCase(tests.TestCase):
1492
1508
transport_server.start_server()
1493
1509
self.addCleanup(transport_server.stop_server)
1494
1510
t = transport.get_transport_from_url(transport_server.get_url())
1495
bzrdir.BzrDir.create(t.base)
1511
controldir.ControlDir.create(t.base)
1496
1512
self.assertRaises(errors.BzrError,
1497
bzrdir.BzrDir.open_from_transport, t)
1513
controldir.ControlDir.open_from_transport, t)
1498
1514
# But if we declare this as safe, we can open the bzrdir.
1499
1515
self.permit_url(t.base)
1500
1516
self._bzr_selftest_roots.append(t.base)
1501
bzrdir.BzrDir.open_from_transport(t)
1517
controldir.ControlDir.open_from_transport(t)
1503
1519
def test_requireFeature_available(self):
1504
1520
"""self.requireFeature(available) is a no-op."""
1638
1654
self.assertRaises(AssertionError,
1639
1655
self.assertListRaises, _TestException, success_generator)
1657
def _run_successful_test(self, test):
1658
result = testtools.TestResult()
1660
self.assertTrue(result.wasSuccessful())
1641
1663
def test_overrideAttr_without_value(self):
1642
1664
self.test_attr = 'original' # Define a test attribute
1643
1665
obj = self # Make 'obj' visible to the embedded test
1644
1666
class Test(tests.TestCase):
1646
1668
def setUp(self):
1647
tests.TestCase.setUp(self)
1669
super(Test, self).setUp()
1648
1670
self.orig = self.overrideAttr(obj, 'test_attr')
1650
1672
def test_value(self):
1663
1684
class Test(tests.TestCase):
1665
1686
def setUp(self):
1666
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1667
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1669
1690
def test_value(self):
1670
1691
self.assertEqual('original', self.orig)
1671
1692
self.assertEqual('modified', obj.test_attr)
1673
test = Test('test_value')
1674
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1675
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1677
1729
def test_recordCalls(self):
1678
1730
from bzrlib.tests import test_selftest
1679
1731
calls = self.recordCalls(
1744
1796
result = self._run_test('test_fail')
1745
1797
self.assertEqual(1, len(result.failures))
1746
1798
result_content = result.failures[0][1]
1747
if testtools_version < (0, 9, 12):
1748
self.assertContainsRe(result_content, 'Text attachment: log')
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1749
1801
self.assertContainsRe(result_content, 'this was a failing test')
1751
1803
def test_error_has_log(self):
1752
1804
result = self._run_test('test_error')
1753
1805
self.assertEqual(1, len(result.errors))
1754
1806
result_content = result.errors[0][1]
1755
if testtools_version < (0, 9, 12):
1756
self.assertContainsRe(result_content, 'Text attachment: log')
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1757
1809
self.assertContainsRe(result_content, 'this test errored')
1759
1811
def test_skip_has_no_log(self):
1778
1830
result = self._run_test('test_xfail')
1779
1831
self.assertEqual(1, len(result.expectedFailures))
1780
1832
result_content = result.expectedFailures[0][1]
1781
self.assertNotContainsRe(result_content, 'Text attachment: log')
1833
self.assertNotContainsRe(result_content,
1834
'(?m)^(?:Text attachment: )?log(?:$|: )')
1782
1835
self.assertNotContainsRe(result_content, 'test with expected failure')
1784
1837
def test_unexpected_success_has_log(self):
1957
2010
def test_make_branch_and_tree_with_format(self):
1958
2011
# we should be able to supply a format to make_branch_and_tree
1959
2012
self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1960
self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
2013
self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1961
2014
bzrlib.bzrdir.BzrDirMetaFormat1)
1963
2016
def test_make_branch_and_memory_tree(self):
2193
2246
self.assertNotContainsRe(content, 'test with expected failure')
2194
2247
self.assertEqual(1, len(result.expectedFailures))
2195
2248
result_content = result.expectedFailures[0][1]
2196
self.assertNotContainsRe(result_content, 'Text attachment: log')
2249
self.assertNotContainsRe(result_content,
2250
'(?m)^(?:Text attachment: )?log(?:$|: )')
2197
2251
self.assertNotContainsRe(result_content, 'test with expected failure')
2199
2253
def test_unexpected_success_has_log(self):
3315
3369
self.assertLength(1, calls)
3318
class TestUncollectedWarnings(tests.TestCase):
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3319
3438
"""Check a test case still alive after being run emits a warning"""
3321
3440
class Test(tests.TestCase):
3333
3452
self.Test("test_skip"),
3336
def _inject_stream_into_subunit(self, stream):
3337
"""To be overridden by subclasses that run tests out of process"""
3339
3455
def _run_selftest_with_suite(self, **kwargs):
3341
self._inject_stream_into_subunit(sio)
3342
3456
old_flags = tests.selftest_debug_flags
3343
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3344
3458
gc_on = gc.isenabled()
3348
tests.selftest(test_suite_factory=self._get_suite, stream=sio,
3349
stop_on_failure=False, **kwargs)
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3353
3467
tests.selftest_debug_flags = old_flags
3354
output = sio.getvalue()
3355
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3356
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3394
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3397
class TestUncollectedWarningsForking(TestUncollectedWarnings):
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3398
3511
"""Check warnings from tests staying alive are emitted when forking"""
3400
_test_needs_features = [features.subunit]
3402
def _inject_stream_into_subunit(self, stream):
3403
"""Monkey-patch subunit so the extra output goes to stream not stdout
3405
Some APIs need rewriting so this kind of bogus hackery can be replaced
3406
by passing the stream param from run_tests down into ProtocolTestCase.
3408
from subunit import ProtocolTestCase
3409
_original_init = ProtocolTestCase.__init__
3410
def _init_with_passthrough(self, *args, **kwargs):
3411
_original_init(self, *args, **kwargs)
3412
self._passthrough = stream
3413
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3415
def _run_selftest_with_suite(self, **kwargs):
3416
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3417
if getattr(os, "fork", None) is None:
3418
raise tests.TestNotApplicable("Platform doesn't support forking")
3419
# Make sure the fork code is actually invoked by claiming two cores
3420
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3421
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3422
return TestUncollectedWarnings._run_selftest_with_suite(self, **kwargs)
3425
3514
class TestEnvironHandling(tests.TestCase):