80
66
return [t.id() for t in tests.iter_suite_tests(test_suite)]
69
class SelftestTests(tests.TestCase):
71
def test_import_tests(self):
72
mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
73
self.assertEqual(mod.SelftestTests, SelftestTests)
75
def test_import_test_failure(self):
76
self.assertRaises(ImportError,
77
TestUtil._load_module_by_name,
83
80
class MetaTestLog(tests.TestCase):
85
82
def test_logging(self):
86
83
"""Test logs are captured when a test fails."""
87
84
self.log('a test message')
88
details = self.getDetails()
90
self.assertThat(log.content_type, Equals(ContentType(
91
"text", "plain", {"charset": "utf8"})))
92
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
93
self.assertThat(self.get_log(),
94
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
85
self._log_file.flush()
86
self.assertContainsRe(self._get_log(keep_log_file=True),
90
class TestUnicodeFilename(tests.TestCase):
92
def test_probe_passes(self):
93
"""UnicodeFilename._probe passes."""
94
# We can't test much more than that because the behaviour depends
96
tests.UnicodeFilename._probe()
97
99
class TestTreeShape(tests.TestCaseInTempDir):
99
101
def test_unicode_paths(self):
100
self.requireFeature(features.UnicodeFilenameFeature)
102
self.requireFeature(tests.UnicodeFilename)
102
104
filename = u'hell\u00d8'
103
105
self.build_tree_contents([(filename, 'contents of hello')])
104
self.assertPathExists(filename)
107
class TestClassesAvailable(tests.TestCase):
108
"""As a convenience we expose Test* classes from bzrlib.tests"""
110
def test_test_case(self):
111
from bzrlib.tests import TestCase
113
def test_test_loader(self):
114
from bzrlib.tests import TestLoader
116
def test_test_suite(self):
117
from bzrlib.tests import TestSuite
106
self.failUnlessExists(filename)
120
109
class TestTransportScenarios(tests.TestCase):
626
599
test = TestDanglingLock('test_function')
627
600
result = test.run()
628
total_failures = result.errors + result.failures
629
601
if self._lock_check_thorough:
630
self.assertEqual(1, len(total_failures))
602
self.assertEqual(1, len(result.errors))
632
604
# When _lock_check_thorough is disabled, then we don't trigger a
634
self.assertEqual(0, len(total_failures))
606
self.assertEqual(0, len(result.errors))
637
609
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
638
610
"""Tests for the convenience functions TestCaseWithTransport introduces."""
640
612
def test_get_readonly_url_none(self):
613
from bzrlib.transport import get_transport
614
from bzrlib.transport.memory import MemoryServer
641
615
from bzrlib.transport.readonly import ReadonlyTransportDecorator
642
self.vfs_transport_factory = memory.MemoryServer
616
self.vfs_transport_factory = MemoryServer
643
617
self.transport_readonly_server = None
644
618
# calling get_readonly_transport() constructs a decorator on the url
646
620
url = self.get_readonly_url()
647
621
url2 = self.get_readonly_url('foo/bar')
648
t = transport.get_transport(url)
649
t2 = transport.get_transport(url2)
650
self.assertIsInstance(t, ReadonlyTransportDecorator)
651
self.assertIsInstance(t2, ReadonlyTransportDecorator)
622
t = get_transport(url)
623
t2 = get_transport(url2)
624
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
625
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
652
626
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
628
def test_get_readonly_url_http(self):
655
629
from bzrlib.tests.http_server import HttpServer
630
from bzrlib.transport import get_transport
631
from bzrlib.transport.local import LocalURLServer
656
632
from bzrlib.transport.http import HttpTransportBase
657
self.transport_server = test_server.LocalURLServer
633
self.transport_server = LocalURLServer
658
634
self.transport_readonly_server = HttpServer
659
635
# calling get_readonly_transport() gives us a HTTP server instance.
660
636
url = self.get_readonly_url()
661
637
url2 = self.get_readonly_url('foo/bar')
662
638
# the transport returned may be any HttpTransportBase subclass
663
t = transport.get_transport(url)
664
t2 = transport.get_transport(url2)
665
self.assertIsInstance(t, HttpTransportBase)
666
self.assertIsInstance(t2, HttpTransportBase)
639
t = get_transport(url)
640
t2 = get_transport(url2)
641
self.failUnless(isinstance(t, HttpTransportBase))
642
self.failUnless(isinstance(t2, HttpTransportBase))
667
643
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
669
645
def test_is_directory(self):
809
797
self.assertContainsRe(output,
810
798
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
812
def test_uses_time_from_testtools(self):
813
"""Test case timings in verbose results should use testtools times"""
815
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
816
def startTest(self, test):
817
self.time(datetime.datetime.utcfromtimestamp(1.145))
818
super(TimeAddedVerboseTestResult, self).startTest(test)
819
def addSuccess(self, test):
820
self.time(datetime.datetime.utcfromtimestamp(51.147))
821
super(TimeAddedVerboseTestResult, self).addSuccess(test)
822
def report_tests_starting(self): pass
824
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
825
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
827
800
def test_known_failure(self):
828
"""Using knownFailure should trigger several result actions."""
801
"""A KnownFailure being raised should trigger several result actions."""
829
802
class InstrumentedTestResult(tests.ExtendedTestResult):
830
def stopTestRun(self): pass
831
def report_tests_starting(self): pass
832
def report_known_failure(self, test, err=None, details=None):
833
self._call = test, 'known failure'
804
def startTests(self): pass
805
def report_test_start(self, test): pass
806
def report_known_failure(self, test, err):
807
self._call = test, err
834
808
result = InstrumentedTestResult(None, None, None, None)
835
class Test(tests.TestCase):
836
def test_function(self):
837
self.knownFailure('failed!')
838
test = Test("test_function")
810
raise tests.KnownFailure('failed!')
811
test = unittest.FunctionTestCase(test_function)
840
813
# it should invoke 'report_known_failure'.
841
814
self.assertEqual(2, len(result._call))
842
self.assertEqual(test.id(), result._call[0].id())
843
self.assertEqual('known failure', result._call[1])
815
self.assertEqual(test, result._call[0])
816
self.assertEqual(tests.KnownFailure, result._call[1][0])
817
self.assertIsInstance(result._call[1][1], tests.KnownFailure)
844
818
# we dont introspec the traceback, if the rest is ok, it would be
845
819
# exceptional for it not to be.
846
820
# it should update the known_failure_count on the object.
899
881
# verbose test output formatting
900
882
result_stream = StringIO()
901
883
result = bzrlib.tests.VerboseTestResult(
884
unittest._WritelnDecorator(result_stream),
906
888
test = self.get_passing_test()
907
feature = features.Feature()
889
feature = tests.Feature()
908
890
result.startTest(test)
909
891
prefix = len(result_stream.getvalue())
910
892
result.report_unsupported(test, feature)
911
893
output = result_stream.getvalue()[prefix:]
912
894
lines = output.splitlines()
913
# We don't check for the final '0ms' since it may fail on slow hosts
914
self.assertStartsWith(lines[0], 'NODEP')
915
self.assertEqual(lines[1],
916
" The feature 'Feature' is not available.")
895
self.assertEqual(lines, ['NODEP 0ms',
896
" The feature 'Feature' is not available."])
918
898
def test_unavailable_exception(self):
919
899
"""An UnavailableFeature being raised should invoke addNotSupported."""
920
900
class InstrumentedTestResult(tests.ExtendedTestResult):
921
def stopTestRun(self): pass
922
def report_tests_starting(self): pass
902
def startTests(self): pass
903
def report_test_start(self, test): pass
923
904
def addNotSupported(self, test, feature):
924
905
self._call = test, feature
925
906
result = InstrumentedTestResult(None, None, None, None)
926
feature = features.Feature()
927
class Test(tests.TestCase):
928
def test_function(self):
929
raise tests.UnavailableFeature(feature)
930
test = Test("test_function")
907
feature = tests.Feature()
909
raise tests.UnavailableFeature(feature)
910
test = unittest.FunctionTestCase(test_function)
932
912
# it should invoke 'addNotSupported'.
933
913
self.assertEqual(2, len(result._call))
934
self.assertEqual(test.id(), result._call[0].id())
914
self.assertEqual(test, result._call[0])
935
915
self.assertEqual(feature, result._call[1])
936
916
# and not count as an error
937
917
self.assertEqual(0, result.error_count)
1203
def test_verbose_test_count(self):
1204
"""A verbose test run reports the right test count at the start"""
1205
suite = TestUtil.TestSuite([
1206
unittest.FunctionTestCase(lambda:None),
1207
unittest.FunctionTestCase(lambda:None)])
1208
self.assertEqual(suite.countTestCases(), 2)
1210
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1211
# Need to use the CountingDecorator as that's what sets num_tests
1212
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1213
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1215
def test_startTestRun(self):
1216
"""run should call result.startTestRun()"""
1218
class LoggingDecorator(ExtendedToOriginalDecorator):
1219
def startTestRun(self):
1220
ExtendedToOriginalDecorator.startTestRun(self)
1221
calls.append('startTestRun')
1222
test = unittest.FunctionTestCase(lambda:None)
1224
runner = tests.TextTestRunner(stream=stream,
1225
result_decorators=[LoggingDecorator])
1226
result = self.run_test_runner(runner, test)
1227
self.assertLength(1, calls)
1229
def test_stopTestRun(self):
1230
"""run should call result.stopTestRun()"""
1232
class LoggingDecorator(ExtendedToOriginalDecorator):
1233
def stopTestRun(self):
1234
ExtendedToOriginalDecorator.stopTestRun(self)
1235
calls.append('stopTestRun')
1236
test = unittest.FunctionTestCase(lambda:None)
1238
runner = tests.TextTestRunner(stream=stream,
1239
result_decorators=[LoggingDecorator])
1240
result = self.run_test_runner(runner, test)
1241
self.assertLength(1, calls)
1243
def test_unicode_test_output_on_ascii_stream(self):
1244
"""Showing results should always succeed even on an ascii console"""
1245
class FailureWithUnicode(tests.TestCase):
1246
def test_log_unicode(self):
1248
self.fail("Now print that log!")
1250
self.overrideAttr(osutils, "get_terminal_encoding",
1251
lambda trace=False: "ascii")
1252
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1253
FailureWithUnicode("test_log_unicode"))
1254
if testtools_version[:3] > (0, 9, 11):
1255
self.assertContainsRe(out.getvalue(), "log: {{{\d+\.\d+ \\\\u2606}}}")
1257
self.assertContainsRe(out.getvalue(),
1258
"Text attachment: log\n"
1260
"\d+\.\d+ \\\\u2606\n"
1135
def test_bench_history(self):
1136
# tests that the running the benchmark produces a history file
1137
# containing a timestamp and the revision id of the bzrlib source which
1139
workingtree = _get_bzr_source_tree()
1140
test = TestRunner('dummy_test')
1142
runner = tests.TextTestRunner(stream=self._log_file,
1143
bench_history=output)
1144
result = self.run_test_runner(runner, test)
1145
output_string = output.getvalue()
1146
self.assertContainsRe(output_string, "--date [0-9.]+")
1147
if workingtree is not None:
1148
revision_id = workingtree.get_parent_ids()[0]
1149
self.assertEndsWith(output_string.rstrip(), revision_id)
1151
def assertLogDeleted(self, test):
1152
log = test._get_log()
1153
self.assertEqual("DELETED log file to reduce memory footprint", log)
1154
self.assertEqual('', test._log_contents)
1155
self.assertIs(None, test._log_file_name)
1157
def test_success_log_deleted(self):
1158
"""Successful tests have their log deleted"""
1160
class LogTester(tests.TestCase):
1162
def test_success(self):
1163
self.log('this will be removed\n')
1166
runner = tests.TextTestRunner(stream=sio)
1167
test = LogTester('test_success')
1168
result = self.run_test_runner(runner, test)
1170
self.assertLogDeleted(test)
1172
def test_skipped_log_deleted(self):
1173
"""Skipped tests have their log deleted"""
1175
class LogTester(tests.TestCase):
1177
def test_skipped(self):
1178
self.log('this will be removed\n')
1179
raise tests.TestSkipped()
1182
runner = tests.TextTestRunner(stream=sio)
1183
test = LogTester('test_skipped')
1184
result = self.run_test_runner(runner, test)
1186
self.assertLogDeleted(test)
1188
def test_not_aplicable_log_deleted(self):
1189
"""Not applicable tests have their log deleted"""
1191
class LogTester(tests.TestCase):
1193
def test_not_applicable(self):
1194
self.log('this will be removed\n')
1195
raise tests.TestNotApplicable()
1198
runner = tests.TextTestRunner(stream=sio)
1199
test = LogTester('test_not_applicable')
1200
result = self.run_test_runner(runner, test)
1202
self.assertLogDeleted(test)
1204
def test_known_failure_log_deleted(self):
1205
"""Know failure tests have their log deleted"""
1207
class LogTester(tests.TestCase):
1209
def test_known_failure(self):
1210
self.log('this will be removed\n')
1211
raise tests.KnownFailure()
1214
runner = tests.TextTestRunner(stream=sio)
1215
test = LogTester('test_known_failure')
1216
result = self.run_test_runner(runner, test)
1218
self.assertLogDeleted(test)
1220
def test_fail_log_kept(self):
1221
"""Failed tests have their log kept"""
1223
class LogTester(tests.TestCase):
1225
def test_fail(self):
1226
self.log('this will be kept\n')
1227
self.fail('this test fails')
1230
runner = tests.TextTestRunner(stream=sio)
1231
test = LogTester('test_fail')
1232
result = self.run_test_runner(runner, test)
1234
text = sio.getvalue()
1235
self.assertContainsRe(text, 'this will be kept')
1236
self.assertContainsRe(text, 'this test fails')
1238
log = test._get_log()
1239
self.assertContainsRe(log, 'this will be kept')
1240
self.assertEqual(log, test._log_contents)
1242
def test_error_log_kept(self):
1243
"""Tests with errors have their log kept"""
1245
class LogTester(tests.TestCase):
1247
def test_error(self):
1248
self.log('this will be kept\n')
1249
raise ValueError('random exception raised')
1252
runner = tests.TextTestRunner(stream=sio)
1253
test = LogTester('test_error')
1254
result = self.run_test_runner(runner, test)
1256
text = sio.getvalue()
1257
self.assertContainsRe(text, 'this will be kept')
1258
self.assertContainsRe(text, 'random exception raised')
1260
log = test._get_log()
1261
self.assertContainsRe(log, 'this will be kept')
1262
self.assertEqual(log, test._log_contents)
1264
1265
class SampleTestCase(tests.TestCase):
1476
1480
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1477
1481
self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
1478
1482
self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
1479
del self._benchcalls[:]
1481
1484
def test_knownFailure(self):
1482
1485
"""Self.knownFailure() should raise a KnownFailure exception."""
1483
1486
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1485
def test_open_bzrdir_safe_roots(self):
1486
# even a memory transport should fail to open when its url isn't
1488
# Manually set one up (TestCase doesn't and shouldn't provide magic
1490
transport_server = memory.MemoryServer()
1491
transport_server.start_server()
1492
self.addCleanup(transport_server.stop_server)
1493
t = transport.get_transport(transport_server.get_url())
1494
bzrdir.BzrDir.create(t.base)
1495
self.assertRaises(errors.BzrError,
1496
bzrdir.BzrDir.open_from_transport, t)
1497
# But if we declare this as safe, we can open the bzrdir.
1498
self.permit_url(t.base)
1499
self._bzr_selftest_roots.append(t.base)
1500
bzrdir.BzrDir.open_from_transport(t)
1502
1488
def test_requireFeature_available(self):
1503
1489
"""self.requireFeature(available) is a no-op."""
1504
class Available(features.Feature):
1490
class Available(tests.Feature):
1505
1491
def _probe(self):return True
1506
1492
feature = Available()
1507
1493
self.requireFeature(feature)
1509
1495
def test_requireFeature_unavailable(self):
1510
1496
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1511
class Unavailable(features.Feature):
1497
class Unavailable(tests.Feature):
1512
1498
def _probe(self):return False
1513
1499
feature = Unavailable()
1514
1500
self.assertRaises(tests.UnavailableFeature,
1637
1612
self.assertRaises(AssertionError,
1638
1613
self.assertListRaises, _TestException, success_generator)
1640
def test_overrideAttr_without_value(self):
1641
self.test_attr = 'original' # Define a test attribute
1642
obj = self # Make 'obj' visible to the embedded test
1643
class Test(tests.TestCase):
1646
tests.TestCase.setUp(self)
1647
self.orig = self.overrideAttr(obj, 'test_attr')
1649
def test_value(self):
1650
self.assertEqual('original', self.orig)
1651
self.assertEqual('original', obj.test_attr)
1652
obj.test_attr = 'modified'
1653
self.assertEqual('modified', obj.test_attr)
1655
test = Test('test_value')
1656
test.run(unittest.TestResult())
1657
self.assertEqual('original', obj.test_attr)
1659
def test_overrideAttr_with_value(self):
1660
self.test_attr = 'original' # Define a test attribute
1661
obj = self # Make 'obj' visible to the embedded test
1662
class Test(tests.TestCase):
1665
tests.TestCase.setUp(self)
1666
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1668
def test_value(self):
1669
self.assertEqual('original', self.orig)
1670
self.assertEqual('modified', obj.test_attr)
1672
test = Test('test_value')
1673
test.run(unittest.TestResult())
1674
self.assertEqual('original', obj.test_attr)
1676
def test_recordCalls(self):
1677
from bzrlib.tests import test_selftest
1678
calls = self.recordCalls(
1679
test_selftest, '_add_numbers')
1680
self.assertEqual(test_selftest._add_numbers(2, 10),
1682
self.assertEquals(calls, [((2, 10), {})])
1685
def _add_numbers(a, b):
1689
class _MissingFeature(features.Feature):
1692
missing_feature = _MissingFeature()
1695
def _get_test(name):
1696
"""Get an instance of a specific example test.
1698
We protect this in a function so that they don't auto-run in the test
1702
class ExampleTests(tests.TestCase):
1704
def test_fail(self):
1705
mutter('this was a failing test')
1706
self.fail('this test will fail')
1708
def test_error(self):
1709
mutter('this test errored')
1710
raise RuntimeError('gotcha')
1712
def test_missing_feature(self):
1713
mutter('missing the feature')
1714
self.requireFeature(missing_feature)
1716
def test_skip(self):
1717
mutter('this test will be skipped')
1718
raise tests.TestSkipped('reason')
1720
def test_success(self):
1721
mutter('this test succeeds')
1723
def test_xfail(self):
1724
mutter('test with expected failure')
1725
self.knownFailure('this_fails')
1727
def test_unexpected_success(self):
1728
mutter('test with unexpected success')
1729
self.expectFailure('should_fail', lambda: None)
1731
return ExampleTests(name)
1734
class TestTestCaseLogDetails(tests.TestCase):
1736
def _run_test(self, test_name):
1737
test = _get_test(test_name)
1738
result = testtools.TestResult()
1742
def test_fail_has_log(self):
1743
result = self._run_test('test_fail')
1744
self.assertEqual(1, len(result.failures))
1745
result_content = result.failures[0][1]
1746
if testtools_version < (0, 9, 12):
1747
self.assertContainsRe(result_content, 'Text attachment: log')
1748
self.assertContainsRe(result_content, 'this was a failing test')
1750
def test_error_has_log(self):
1751
result = self._run_test('test_error')
1752
self.assertEqual(1, len(result.errors))
1753
result_content = result.errors[0][1]
1754
if testtools_version < (0, 9, 12):
1755
self.assertContainsRe(result_content, 'Text attachment: log')
1756
self.assertContainsRe(result_content, 'this test errored')
1758
def test_skip_has_no_log(self):
1759
result = self._run_test('test_skip')
1760
self.assertEqual(['reason'], result.skip_reasons.keys())
1761
skips = result.skip_reasons['reason']
1762
self.assertEqual(1, len(skips))
1764
self.assertFalse('log' in test.getDetails())
1766
def test_missing_feature_has_no_log(self):
1767
# testtools doesn't know about addNotSupported, so it just gets
1768
# considered as a skip
1769
result = self._run_test('test_missing_feature')
1770
self.assertEqual([missing_feature], result.skip_reasons.keys())
1771
skips = result.skip_reasons[missing_feature]
1772
self.assertEqual(1, len(skips))
1774
self.assertFalse('log' in test.getDetails())
1776
def test_xfail_has_no_log(self):
1777
result = self._run_test('test_xfail')
1778
self.assertEqual(1, len(result.expectedFailures))
1779
result_content = result.expectedFailures[0][1]
1780
self.assertNotContainsRe(result_content, 'Text attachment: log')
1781
self.assertNotContainsRe(result_content, 'test with expected failure')
1783
def test_unexpected_success_has_log(self):
1784
result = self._run_test('test_unexpected_success')
1785
self.assertEqual(1, len(result.unexpectedSuccesses))
1786
# Inconsistency, unexpectedSuccesses is a list of tests,
1787
# expectedFailures is a list of reasons?
1788
test = result.unexpectedSuccesses[0]
1789
details = test.getDetails()
1790
self.assertTrue('log' in details)
1793
class TestTestCloning(tests.TestCase):
1794
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1796
def test_cloned_testcase_does_not_share_details(self):
1797
"""A TestCase cloned with clone_test does not share mutable attributes
1798
such as details or cleanups.
1800
class Test(tests.TestCase):
1802
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1803
orig_test = Test('test_foo')
1804
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1805
orig_test.run(unittest.TestResult())
1806
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1807
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1809
def test_double_apply_scenario_preserves_first_scenario(self):
1810
"""Applying two levels of scenarios to a test preserves the attributes
1811
added by both scenarios.
1813
class Test(tests.TestCase):
1816
test = Test('test_foo')
1817
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1818
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1819
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1820
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1821
all_tests = list(tests.iter_suite_tests(suite))
1822
self.assertLength(4, all_tests)
1823
all_xys = sorted((t.x, t.y) for t in all_tests)
1824
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1827
1616
# NB: Don't delete this; it's not actually from 0.11!
1828
1617
@deprecated_function(deprecated_in((0, 11, 0)))
2137
1903
load_list='missing file name', list_only=True)
2140
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2142
_test_needs_features = [features.subunit]
2144
def run_subunit_stream(self, test_name):
2145
from subunit import ProtocolTestCase
2147
return TestUtil.TestSuite([_get_test(test_name)])
2148
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2149
test_suite_factory=factory)
2150
test = ProtocolTestCase(stream)
2151
result = testtools.TestResult()
2153
content = stream.getvalue()
2154
return content, result
2156
def test_fail_has_log(self):
2157
content, result = self.run_subunit_stream('test_fail')
2158
self.assertEqual(1, len(result.failures))
2159
self.assertContainsRe(content, '(?m)^log$')
2160
self.assertContainsRe(content, 'this test will fail')
2162
def test_error_has_log(self):
2163
content, result = self.run_subunit_stream('test_error')
2164
self.assertContainsRe(content, '(?m)^log$')
2165
self.assertContainsRe(content, 'this test errored')
2167
def test_skip_has_no_log(self):
2168
content, result = self.run_subunit_stream('test_skip')
2169
self.assertNotContainsRe(content, '(?m)^log$')
2170
self.assertNotContainsRe(content, 'this test will be skipped')
2171
self.assertEqual(['reason'], result.skip_reasons.keys())
2172
skips = result.skip_reasons['reason']
2173
self.assertEqual(1, len(skips))
2175
# RemotedTestCase doesn't preserve the "details"
2176
## self.assertFalse('log' in test.getDetails())
2178
def test_missing_feature_has_no_log(self):
2179
content, result = self.run_subunit_stream('test_missing_feature')
2180
self.assertNotContainsRe(content, '(?m)^log$')
2181
self.assertNotContainsRe(content, 'missing the feature')
2182
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2183
skips = result.skip_reasons['_MissingFeature\n']
2184
self.assertEqual(1, len(skips))
2186
# RemotedTestCase doesn't preserve the "details"
2187
## self.assertFalse('log' in test.getDetails())
2189
def test_xfail_has_no_log(self):
2190
content, result = self.run_subunit_stream('test_xfail')
2191
self.assertNotContainsRe(content, '(?m)^log$')
2192
self.assertNotContainsRe(content, 'test with expected failure')
2193
self.assertEqual(1, len(result.expectedFailures))
2194
result_content = result.expectedFailures[0][1]
2195
self.assertNotContainsRe(result_content, 'Text attachment: log')
2196
self.assertNotContainsRe(result_content, 'test with expected failure')
2198
def test_unexpected_success_has_log(self):
2199
content, result = self.run_subunit_stream('test_unexpected_success')
2200
self.assertContainsRe(content, '(?m)^log$')
2201
self.assertContainsRe(content, 'test with unexpected success')
2202
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2203
# success, if a min version check is added remove this
2204
from subunit import TestProtocolClient as _Client
2205
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2206
self.expectFailure('subunit treats "unexpectedSuccess"'
2207
' as a plain success',
2208
self.assertEqual, 1, len(result.unexpectedSuccesses))
2209
self.assertEqual(1, len(result.unexpectedSuccesses))
2210
test = result.unexpectedSuccesses[0]
2211
# RemotedTestCase doesn't preserve the "details"
2212
## self.assertTrue('log' in test.getDetails())
2214
def test_success_has_no_log(self):
2215
content, result = self.run_subunit_stream('test_success')
2216
self.assertEqual(1, result.testsRun)
2217
self.assertNotContainsRe(content, '(?m)^log$')
2218
self.assertNotContainsRe(content, 'this test succeeds')
2221
1906
class TestRunBzr(tests.TestCase):
2478
2157
StubProcess(), '', allow_plugins=True)
2481
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2483
def test_finish_bzr_subprocess_with_error(self):
2484
"""finish_bzr_subprocess allows specification of the desired exit code.
2486
process = StubProcess(err="unknown command", retcode=3)
2487
result = self.finish_bzr_subprocess(process, retcode=3)
2488
self.assertEqual('', result[0])
2489
self.assertContainsRe(result[1], 'unknown command')
2491
def test_finish_bzr_subprocess_ignoring_retcode(self):
2492
"""finish_bzr_subprocess allows the exit code to be ignored."""
2493
process = StubProcess(err="unknown command", retcode=3)
2494
result = self.finish_bzr_subprocess(process, retcode=None)
2495
self.assertEqual('', result[0])
2496
self.assertContainsRe(result[1], 'unknown command')
2498
def test_finish_subprocess_with_unexpected_retcode(self):
2499
"""finish_bzr_subprocess raises self.failureException if the retcode is
2500
not the expected one.
2502
process = StubProcess(err="unknown command", retcode=3)
2503
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2507
2160
class _DontSpawnProcess(Exception):
2508
2161
"""A simple exception which just allows us to skip unnecessary steps"""
2511
2164
class TestStartBzrSubProcess(tests.TestCase):
2512
"""Stub test start_bzr_subprocess."""
2514
def _subprocess_log_cleanup(self):
2515
"""Inhibits the base version as we don't produce a log file."""
2166
def check_popen_state(self):
2167
"""Replace to make assertions when popen is called."""
2517
2169
def _popen(self, *args, **kwargs):
2518
"""Override the base version to record the command that is run.
2520
From there we can ensure it is correct without spawning a real process.
2170
"""Record the command that is run, so that we can ensure it is correct"""
2522
2171
self.check_popen_state()
2523
2172
self._popen_args = args
2524
2173
self._popen_kwargs = kwargs
2525
2174
raise _DontSpawnProcess()
2527
def check_popen_state(self):
2528
"""Replace to make assertions when popen is called."""
2530
2176
def test_run_bzr_subprocess_no_plugins(self):
2531
2177
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2532
2178
command = self._popen_args[0]
2537
2183
def test_allow_plugins(self):
2538
2184
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2540
2186
command = self._popen_args[0]
2541
2187
self.assertEqual([], command[2:])
2543
2189
def test_set_env(self):
2544
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2190
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2545
2191
# set in the child
2546
2192
def check_environment():
2547
2193
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2548
2194
self.check_popen_state = check_environment
2549
2195
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2550
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2196
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2551
2197
# not set in theparent
2552
2198
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2554
2200
def test_run_bzr_subprocess_env_del(self):
2555
2201
"""run_bzr_subprocess can remove environment variables too."""
2556
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2202
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2557
2203
def check_environment():
2558
2204
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2559
2205
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2560
2206
self.check_popen_state = check_environment
2561
2207
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2562
env_changes={'EXISTANT_ENV_VAR':None})
2208
env_changes={'EXISTANT_ENV_VAR':None})
2563
2209
# Still set in parent
2564
2210
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2565
2211
del os.environ['EXISTANT_ENV_VAR']
2567
2213
def test_env_del_missing(self):
2568
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2214
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2569
2215
def check_environment():
2570
2216
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2571
2217
self.check_popen_state = check_environment
2572
2218
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2573
env_changes={'NON_EXISTANT_ENV_VAR':None})
2219
env_changes={'NON_EXISTANT_ENV_VAR':None})
2575
2221
def test_working_dir(self):
2576
2222
"""Test that we can specify the working dir for the child"""
2580
2226
def chdir(path):
2581
2227
chdirs.append(path)
2582
self.overrideAttr(os, 'chdir', chdir)
2585
self.overrideAttr(osutils, 'getcwd', getcwd)
2586
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2232
osutils.getcwd = getcwd
2234
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2237
osutils.getcwd = orig_getcwd
2239
os.chdir = orig_chdir
2588
2240
self.assertEqual(['foo', 'current'], chdirs)
2590
def test_get_bzr_path_with_cwd_bzrlib(self):
2591
self.get_source_path = lambda: ""
2592
self.overrideAttr(os.path, "isfile", lambda path: True)
2593
self.assertEqual(self.get_bzr_path(), "bzr")
2596
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2597
"""Tests that really need to do things with an external bzr."""
2243
class TestBzrSubprocess(tests.TestCaseWithTransport):
2245
def test_start_and_stop_bzr_subprocess(self):
2246
"""We can start and perform other test actions while that process is
2249
process = self.start_bzr_subprocess(['--version'])
2250
result = self.finish_bzr_subprocess(process)
2251
self.assertContainsRe(result[0], 'is free software')
2252
self.assertEqual('', result[1])
2254
def test_start_and_stop_bzr_subprocess_with_error(self):
2255
"""finish_bzr_subprocess allows specification of the desired exit code.
2257
process = self.start_bzr_subprocess(['--versionn'])
2258
result = self.finish_bzr_subprocess(process, retcode=3)
2259
self.assertEqual('', result[0])
2260
self.assertContainsRe(result[1], 'unknown command')
2262
def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
2263
"""finish_bzr_subprocess allows the exit code to be ignored."""
2264
process = self.start_bzr_subprocess(['--versionn'])
2265
result = self.finish_bzr_subprocess(process, retcode=None)
2266
self.assertEqual('', result[0])
2267
self.assertContainsRe(result[1], 'unknown command')
2269
def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
2270
"""finish_bzr_subprocess raises self.failureException if the retcode is
2271
not the expected one.
2273
process = self.start_bzr_subprocess(['--versionn'])
2274
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2599
2277
def test_start_and_stop_bzr_subprocess_send_signal(self):
2600
2278
"""finish_bzr_subprocess raises self.failureException if the retcode is
2601
2279
not the expected one.
2603
self.disable_missing_extensions_warning()
2604
2281
process = self.start_bzr_subprocess(['wait-until-signalled'],
2605
2282
skip_if_plan_to_signal=True)
2606
2283
self.assertEqual('running\n', process.stdout.readline())
2609
2286
self.assertEqual('', result[0])
2610
2287
self.assertEqual('bzr: interrupted\n', result[1])
2289
def test_start_and_stop_working_dir(self):
2290
cwd = osutils.getcwd()
2291
self.make_branch_and_tree('one')
2292
process = self.start_bzr_subprocess(['root'], working_dir='one')
2293
result = self.finish_bzr_subprocess(process, universal_newlines=True)
2294
self.assertEndsWith(result[0], 'one\n')
2295
self.assertEqual('', result[1])
2298
class TestKnownFailure(tests.TestCase):
2300
def test_known_failure(self):
2301
"""Check that KnownFailure is defined appropriately."""
2302
# a KnownFailure is an assertion error for compatability with unaware
2304
self.assertIsInstance(tests.KnownFailure(""), AssertionError)
2306
def test_expect_failure(self):
2308
self.expectFailure("Doomed to failure", self.assertTrue, False)
2309
except tests.KnownFailure, e:
2310
self.assertEqual('Doomed to failure', e.args[0])
2312
self.expectFailure("Doomed to failure", self.assertTrue, True)
2313
except AssertionError, e:
2314
self.assertEqual('Unexpected success. Should have failed:'
2315
' Doomed to failure', e.args[0])
2317
self.fail('Assertion not raised')
2320
class TestFeature(tests.TestCase):
2322
def test_caching(self):
2323
"""Feature._probe is called by the feature at most once."""
2324
class InstrumentedFeature(tests.Feature):
2326
super(InstrumentedFeature, self).__init__()
2329
self.calls.append('_probe')
2331
feature = InstrumentedFeature()
2333
self.assertEqual(['_probe'], feature.calls)
2335
self.assertEqual(['_probe'], feature.calls)
2337
def test_named_str(self):
2338
"""Feature.__str__ should thunk to feature_name()."""
2339
class NamedFeature(tests.Feature):
2340
def feature_name(self):
2342
feature = NamedFeature()
2343
self.assertEqual('symlinks', str(feature))
2345
def test_default_str(self):
2346
"""Feature.__str__ should default to __class__.__name__."""
2347
class NamedFeature(tests.Feature):
2349
feature = NamedFeature()
2350
self.assertEqual('NamedFeature', str(feature))
2353
class TestUnavailableFeature(tests.TestCase):
2355
def test_access_feature(self):
2356
feature = tests.Feature()
2357
exception = tests.UnavailableFeature(feature)
2358
self.assertIs(feature, exception.args[0])
2613
2361
class TestSelftestFiltering(tests.TestCase):
2938
2682
class TestTestSuite(tests.TestCase):
2940
def test__test_suite_testmod_names(self):
2941
# Test that a plausible list of test module names are returned
2942
# by _test_suite_testmod_names.
2943
test_list = tests._test_suite_testmod_names()
2945
'bzrlib.tests.blackbox',
2946
'bzrlib.tests.per_transport',
2947
'bzrlib.tests.test_selftest',
2951
def test__test_suite_modules_to_doctest(self):
2952
# Test that a plausible list of modules to doctest is returned
2953
# by _test_suite_modules_to_doctest.
2954
test_list = tests._test_suite_modules_to_doctest()
2956
# When docstrings are stripped, there are no modules to doctest
2957
self.assertEqual([], test_list)
2964
2684
def test_test_suite(self):
2965
# test_suite() loads the entire test suite to operate. To avoid this
2966
# overhead, and yet still be confident that things are happening,
2967
# we temporarily replace two functions used by test_suite with
2968
# test doubles that supply a few sample tests to load, and check they
2971
def testmod_names():
2972
calls.append("testmod_names")
2974
'bzrlib.tests.blackbox.test_branch',
2975
'bzrlib.tests.per_transport',
2976
'bzrlib.tests.test_selftest',
2978
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2980
calls.append("modules_to_doctest")
2983
return ['bzrlib.timestamp']
2984
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2985
expected_test_list = [
2685
# This test is slow - it loads the entire test suite to operate, so we
2686
# do a single test with one test in each category
2986
2688
# testmod_names
2987
2689
'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2988
2690
('bzrlib.tests.per_transport.TransportTests'
2989
'.test_abspath(LocalTransport,LocalURLServer)'),
2691
'.test_abspath(LocalURLServer)'),
2990
2692
'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2693
# modules_to_doctest
2694
'bzrlib.timestamp.format_highres_date',
2991
2695
# plugins can't be tested that way since selftest may be run with
2994
if __doc__ is not None:
2995
expected_test_list.extend([
2996
# modules_to_doctest
2997
'bzrlib.timestamp.format_highres_date',
2999
suite = tests.test_suite()
3000
self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3002
self.assertSubset(expected_test_list, _test_ids(suite))
2698
suite = tests.test_suite(test_list)
2699
self.assertEquals(test_list, _test_ids(suite))
3004
2701
def test_test_suite_list_and_start(self):
3005
2702
# We cannot test this at the same time as the main load, because we want
3006
# to know that starting_with == None works. So a second load is
3007
# incurred - note that the starting_with parameter causes a partial load
3008
# rather than a full load so this test should be pretty quick.
2703
# to know that starting_with == None works. So a second full load is
3009
2705
test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
3010
2706
suite = tests.test_suite(test_list,
3011
2707
['bzrlib.tests.test_selftest.TestTestSuite'])
3140
2837
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3143
class TestThreadLeakDetection(tests.TestCase):
3144
"""Ensure when tests leak threads we detect and report it"""
3146
class LeakRecordingResult(tests.ExtendedTestResult):
3148
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3150
def _report_thread_leak(self, test, leaks, alive):
3151
self.leaks.append((test, leaks))
3153
def test_testcase_without_addCleanups(self):
3154
"""Check old TestCase instances don't break with leak detection"""
3155
class Test(unittest.TestCase):
3158
result = self.LeakRecordingResult()
3160
result.startTestRun()
3162
result.stopTestRun()
3163
self.assertEqual(result._tests_leaking_threads_count, 0)
3164
self.assertEqual(result.leaks, [])
3166
def test_thread_leak(self):
3167
"""Ensure a thread that outlives the running of a test is reported
3169
Uses a thread that blocks on an event, and is started by the inner
3170
test case. As the thread outlives the inner case's run, it should be
3171
detected as a leak, but the event is then set so that the thread can
3172
be safely joined in cleanup so it's not leaked for real.
3174
event = threading.Event()
3175
thread = threading.Thread(name="Leaker", target=event.wait)
3176
class Test(tests.TestCase):
3177
def test_leak(self):
3179
result = self.LeakRecordingResult()
3180
test = Test("test_leak")
3181
self.addCleanup(thread.join)
3182
self.addCleanup(event.set)
3183
result.startTestRun()
3185
result.stopTestRun()
3186
self.assertEqual(result._tests_leaking_threads_count, 1)
3187
self.assertEqual(result._first_thread_leaker_id, test.id())
3188
self.assertEqual(result.leaks, [(test, set([thread]))])
3189
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3191
def test_multiple_leaks(self):
3192
"""Check multiple leaks are blamed on the test cases at fault
3194
Same concept as the previous test, but has one inner test method that
3195
leaks two threads, and one that doesn't leak at all.
3197
event = threading.Event()
3198
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3199
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3200
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3201
class Test(tests.TestCase):
3202
def test_first_leak(self):
3204
def test_second_no_leak(self):
3206
def test_third_leak(self):
3209
result = self.LeakRecordingResult()
3210
first_test = Test("test_first_leak")
3211
third_test = Test("test_third_leak")
3212
self.addCleanup(thread_a.join)
3213
self.addCleanup(thread_b.join)
3214
self.addCleanup(thread_c.join)
3215
self.addCleanup(event.set)
3216
result.startTestRun()
3218
[first_test, Test("test_second_no_leak"), third_test]
3220
result.stopTestRun()
3221
self.assertEqual(result._tests_leaking_threads_count, 2)
3222
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3223
self.assertEqual(result.leaks, [
3224
(first_test, set([thread_b])),
3225
(third_test, set([thread_a, thread_c]))])
3226
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3229
class TestPostMortemDebugging(tests.TestCase):
3230
"""Check post mortem debugging works when tests fail or error"""
3232
class TracebackRecordingResult(tests.ExtendedTestResult):
3234
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3235
self.postcode = None
3236
def _post_mortem(self, tb=None):
3237
"""Record the code object at the end of the current traceback"""
3238
tb = tb or sys.exc_info()[2]
3241
while next is not None:
3244
self.postcode = tb.tb_frame.f_code
3245
def report_error(self, test, err):
3247
def report_failure(self, test, err):
3250
def test_location_unittest_error(self):
3251
"""Needs right post mortem traceback with erroring unittest case"""
3252
class Test(unittest.TestCase):
3255
result = self.TracebackRecordingResult()
3257
self.assertEqual(result.postcode, Test.runTest.func_code)
3259
def test_location_unittest_failure(self):
3260
"""Needs right post mortem traceback with failing unittest case"""
3261
class Test(unittest.TestCase):
3263
raise self.failureException
3264
result = self.TracebackRecordingResult()
3266
self.assertEqual(result.postcode, Test.runTest.func_code)
3268
def test_location_bt_error(self):
3269
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3270
class Test(tests.TestCase):
3271
def test_error(self):
3273
result = self.TracebackRecordingResult()
3274
Test("test_error").run(result)
3275
self.assertEqual(result.postcode, Test.test_error.func_code)
3277
def test_location_bt_failure(self):
3278
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3279
class Test(tests.TestCase):
3280
def test_failure(self):
3281
raise self.failureException
3282
result = self.TracebackRecordingResult()
3283
Test("test_failure").run(result)
3284
self.assertEqual(result.postcode, Test.test_failure.func_code)
3286
def test_env_var_triggers_post_mortem(self):
3287
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3289
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3290
post_mortem_calls = []
3291
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3292
self.overrideEnv('BZR_TEST_PDB', None)
3293
result._post_mortem(1)
3294
self.overrideEnv('BZR_TEST_PDB', 'on')
3295
result._post_mortem(2)
3296
self.assertEqual([2], post_mortem_calls)
3299
2840
class TestRunSuite(tests.TestCase):
3301
2842
def test_runner_class(self):
3313
2854
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3314
2855
self.assertLength(1, calls)
3317
class TestEnvironHandling(tests.TestCase):
3319
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3320
self.assertFalse('MYVAR' in os.environ)
3321
self.overrideEnv('MYVAR', '42')
3322
# We use an embedded test to make sure we fix the _captureVar bug
3323
class Test(tests.TestCase):
3325
# The first call save the 42 value
3326
self.overrideEnv('MYVAR', None)
3327
self.assertEquals(None, os.environ.get('MYVAR'))
3328
# Make sure we can call it twice
3329
self.overrideEnv('MYVAR', None)
3330
self.assertEquals(None, os.environ.get('MYVAR'))
3332
result = tests.TextTestResult(output, 0, 1)
3333
Test('test_me').run(result)
3334
if not result.wasStrictlySuccessful():
3335
self.fail(output.getvalue())
3336
# We get our value back
3337
self.assertEquals('42', os.environ.get('MYVAR'))
3340
class TestIsolatedEnv(tests.TestCase):
3341
"""Test isolating tests from os.environ.
3343
Since we use tests that are already isolated from os.environ a bit of care
3344
should be taken when designing the tests to avoid bootstrap side-effects.
3345
The tests start an already clean os.environ which allow doing valid
3346
assertions about which variables are present or not and design tests around
3350
class ScratchMonkey(tests.TestCase):
2857
def test_done(self):
2858
"""run_suite should call result.done()"""
2860
def one_more_call(): self.calls += 1
2861
def test_function():
3355
def test_basics(self):
3356
# Make sure we know the definition of BZR_HOME: not part of os.environ
3357
# for tests.TestCase.
3358
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3359
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3360
# Being part of isolated_environ, BZR_HOME should not appear here
3361
self.assertFalse('BZR_HOME' in os.environ)
3362
# Make sure we know the definition of LINES: part of os.environ for
3364
self.assertTrue('LINES' in tests.isolated_environ)
3365
self.assertEquals('25', tests.isolated_environ['LINES'])
3366
self.assertEquals('25', os.environ['LINES'])
3368
def test_injecting_unknown_variable(self):
3369
# BZR_HOME is known to be absent from os.environ
3370
test = self.ScratchMonkey('test_me')
3371
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3372
self.assertEquals('foo', os.environ['BZR_HOME'])
3373
tests.restore_os_environ(test)
3374
self.assertFalse('BZR_HOME' in os.environ)
3376
def test_injecting_known_variable(self):
3377
test = self.ScratchMonkey('test_me')
3378
# LINES is known to be present in os.environ
3379
tests.override_os_environ(test, {'LINES': '42'})
3380
self.assertEquals('42', os.environ['LINES'])
3381
tests.restore_os_environ(test)
3382
self.assertEquals('25', os.environ['LINES'])
3384
def test_deleting_variable(self):
3385
test = self.ScratchMonkey('test_me')
3386
# LINES is known to be present in os.environ
3387
tests.override_os_environ(test, {'LINES': None})
3388
self.assertTrue('LINES' not in os.environ)
3389
tests.restore_os_environ(test)
3390
self.assertEquals('25', os.environ['LINES'])
3393
class TestDocTestSuiteIsolation(tests.TestCase):
3394
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3396
Since tests.TestCase alreay provides an isolation from os.environ, we use
3397
the clean environment as a base for testing. To precisely capture the
3398
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3401
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3402
not `os.environ` so each test overrides it to suit its needs.
3406
def get_doctest_suite_for_string(self, klass, string):
3407
class Finder(doctest.DocTestFinder):
3409
def find(*args, **kwargs):
3410
test = doctest.DocTestParser().get_doctest(
3411
string, {}, 'foo', 'foo.py', 0)
3414
suite = klass(test_finder=Finder())
3417
def run_doctest_suite_for_string(self, klass, string):
3418
suite = self.get_doctest_suite_for_string(klass, string)
3420
result = tests.TextTestResult(output, 0, 1)
3422
return result, output
3424
def assertDocTestStringSucceds(self, klass, string):
3425
result, output = self.run_doctest_suite_for_string(klass, string)
3426
if not result.wasStrictlySuccessful():
3427
self.fail(output.getvalue())
3429
def assertDocTestStringFails(self, klass, string):
3430
result, output = self.run_doctest_suite_for_string(klass, string)
3431
if result.wasStrictlySuccessful():
3432
self.fail(output.getvalue())
3434
def test_injected_variable(self):
3435
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3438
>>> os.environ['LINES']
3441
# doctest.DocTestSuite fails as it sees '25'
3442
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3443
# tests.DocTestSuite sees '42'
3444
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3446
def test_deleted_variable(self):
3447
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3450
>>> os.environ.get('LINES')
3452
# doctest.DocTestSuite fails as it sees '25'
3453
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3454
# tests.DocTestSuite sees None
3455
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3458
class TestSelftestExcludePatterns(tests.TestCase):
3461
super(TestSelftestExcludePatterns, self).setUp()
3462
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3464
def suite_factory(self, keep_only=None, starting_with=None):
3465
"""A test suite factory with only a few tests."""
3466
class Test(tests.TestCase):
3468
# We don't need the full class path
3469
return self._testMethodName
3476
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3478
def assertTestList(self, expected, *selftest_args):
3479
# We rely on setUp installing the right test suite factory so we can
3480
# test at the command level without loading the whole test suite
3481
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3482
actual = out.splitlines()
3483
self.assertEquals(expected, actual)
3485
def test_full_list(self):
3486
self.assertTestList(['a', 'b', 'c'])
3488
def test_single_exclude(self):
3489
self.assertTestList(['b', 'c'], '-x', 'a')
3491
def test_mutiple_excludes(self):
3492
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3495
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3497
_test_needs_features = [features.subunit]
3500
super(TestCounterHooks, self).setUp()
3501
class Test(tests.TestCase):
3504
super(Test, self).setUp()
3505
self.hooks = hooks.Hooks()
3506
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3507
self.install_counter_hook(self.hooks, 'myhook')
3512
def run_hook_once(self):
3513
for hook in self.hooks['myhook']:
3516
self.test_class = Test
3518
def assertHookCalls(self, expected_calls, test_name):
3519
test = self.test_class(test_name)
3520
result = unittest.TestResult()
3522
self.assertTrue(hasattr(test, '_counters'))
3523
self.assertTrue(test._counters.has_key('myhook'))
3524
self.assertEquals(expected_calls, test._counters['myhook'])
3526
def test_no_hook(self):
3527
self.assertHookCalls(0, 'no_hook')
3529
def test_run_hook_once(self):
3530
tt = features.testtools
3531
if tt.module.__version__ < (0, 9, 8):
3532
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3533
self.assertHookCalls(1, 'run_hook_once')
2863
test = unittest.FunctionTestCase(test_function)
2864
class InstrumentedTestResult(tests.ExtendedTestResult):
2865
def done(self): one_more_call()
2866
class MyRunner(tests.TextTestRunner):
2867
def run(self, test):
2868
return InstrumentedTestResult(self.stream, self.descriptions,
2870
tests.run_suite(test, runner_class=MyRunner, stream=StringIO())
2871
self.assertEquals(1, self.calls)