1249
1228
result = self.run_test_runner(runner, test)
1250
1229
output_string = output.getvalue()
1251
1230
self.assertContainsRe(output_string, "--date [0-9.]+")
1252
self.assertLength(1, self._get_source_tree_calls)
1254
def test_verbose_test_count(self):
1255
"""A verbose test run reports the right test count at the start"""
1256
suite = TestUtil.TestSuite([
1257
unittest.FunctionTestCase(lambda:None),
1258
unittest.FunctionTestCase(lambda:None)])
1259
self.assertEqual(suite.countTestCases(), 2)
1261
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1262
# Need to use the CountingDecorator as that's what sets num_tests
1263
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1264
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1266
def test_startTestRun(self):
1267
"""run should call result.startTestRun()"""
1269
class LoggingDecorator(tests.ForwardingResult):
1270
def startTestRun(self):
1271
tests.ForwardingResult.startTestRun(self)
1272
calls.append('startTestRun')
1273
test = unittest.FunctionTestCase(lambda:None)
1275
runner = tests.TextTestRunner(stream=stream,
1276
result_decorators=[LoggingDecorator])
1277
result = self.run_test_runner(runner, test)
1278
self.assertLength(1, calls)
1280
def test_stopTestRun(self):
1281
"""run should call result.stopTestRun()"""
1283
class LoggingDecorator(tests.ForwardingResult):
1284
def stopTestRun(self):
1285
tests.ForwardingResult.stopTestRun(self)
1286
calls.append('stopTestRun')
1287
test = unittest.FunctionTestCase(lambda:None)
1289
runner = tests.TextTestRunner(stream=stream,
1290
result_decorators=[LoggingDecorator])
1291
result = self.run_test_runner(runner, test)
1292
self.assertLength(1, calls)
1231
if workingtree is not None:
1232
revision_id = workingtree.get_parent_ids()[0]
1233
self.assertEndsWith(output_string.rstrip(), revision_id)
1235
def assertLogDeleted(self, test):
1236
log = test._get_log()
1237
self.assertEqual("DELETED log file to reduce memory footprint", log)
1238
self.assertEqual('', test._log_contents)
1239
self.assertIs(None, test._log_file_name)
1241
def test_success_log_deleted(self):
1242
"""Successful tests have their log deleted"""
1244
class LogTester(tests.TestCase):
1246
def test_success(self):
1247
self.log('this will be removed\n')
1250
runner = tests.TextTestRunner(stream=sio)
1251
test = LogTester('test_success')
1252
result = self.run_test_runner(runner, test)
1254
self.assertLogDeleted(test)
1256
def test_skipped_log_deleted(self):
1257
"""Skipped tests have their log deleted"""
1259
class LogTester(tests.TestCase):
1261
def test_skipped(self):
1262
self.log('this will be removed\n')
1263
raise tests.TestSkipped()
1266
runner = tests.TextTestRunner(stream=sio)
1267
test = LogTester('test_skipped')
1268
result = self.run_test_runner(runner, test)
1270
self.assertLogDeleted(test)
1272
def test_not_aplicable_log_deleted(self):
1273
"""Not applicable tests have their log deleted"""
1275
class LogTester(tests.TestCase):
1277
def test_not_applicable(self):
1278
self.log('this will be removed\n')
1279
raise tests.TestNotApplicable()
1282
runner = tests.TextTestRunner(stream=sio)
1283
test = LogTester('test_not_applicable')
1284
result = self.run_test_runner(runner, test)
1286
self.assertLogDeleted(test)
1288
def test_known_failure_log_deleted(self):
1289
"""Know failure tests have their log deleted"""
1291
class LogTester(tests.TestCase):
1293
def test_known_failure(self):
1294
self.log('this will be removed\n')
1295
raise tests.KnownFailure()
1298
runner = tests.TextTestRunner(stream=sio)
1299
test = LogTester('test_known_failure')
1300
result = self.run_test_runner(runner, test)
1302
self.assertLogDeleted(test)
1304
def test_fail_log_kept(self):
1305
"""Failed tests have their log kept"""
1307
class LogTester(tests.TestCase):
1309
def test_fail(self):
1310
self.log('this will be kept\n')
1311
self.fail('this test fails')
1314
runner = tests.TextTestRunner(stream=sio)
1315
test = LogTester('test_fail')
1316
result = self.run_test_runner(runner, test)
1318
text = sio.getvalue()
1319
self.assertContainsRe(text, 'this will be kept')
1320
self.assertContainsRe(text, 'this test fails')
1322
log = test._get_log()
1323
self.assertContainsRe(log, 'this will be kept')
1324
self.assertEqual(log, test._log_contents)
1326
def test_error_log_kept(self):
1327
"""Tests with errors have their log kept"""
1329
class LogTester(tests.TestCase):
1331
def test_error(self):
1332
self.log('this will be kept\n')
1333
raise ValueError('random exception raised')
1336
runner = tests.TextTestRunner(stream=sio)
1337
test = LogTester('test_error')
1338
result = self.run_test_runner(runner, test)
1340
text = sio.getvalue()
1341
self.assertContainsRe(text, 'this will be kept')
1342
self.assertContainsRe(text, 'random exception raised')
1344
log = test._get_log()
1345
self.assertContainsRe(log, 'this will be kept')
1346
self.assertEqual(log, test._log_contents)
1295
1349
class SampleTestCase(tests.TestCase):
1668
1657
self.assertRaises(AssertionError,
1669
1658
self.assertListRaises, _TestException, success_generator)
1671
def test_overrideAttr_without_value(self):
1672
self.test_attr = 'original' # Define a test attribute
1673
obj = self # Make 'obj' visible to the embedded test
1674
class Test(tests.TestCase):
1677
tests.TestCase.setUp(self)
1678
self.orig = self.overrideAttr(obj, 'test_attr')
1680
def test_value(self):
1681
self.assertEqual('original', self.orig)
1682
self.assertEqual('original', obj.test_attr)
1683
obj.test_attr = 'modified'
1684
self.assertEqual('modified', obj.test_attr)
1686
test = Test('test_value')
1687
test.run(unittest.TestResult())
1688
self.assertEqual('original', obj.test_attr)
1690
def test_overrideAttr_with_value(self):
1691
self.test_attr = 'original' # Define a test attribute
1692
obj = self # Make 'obj' visible to the embedded test
1693
class Test(tests.TestCase):
1696
tests.TestCase.setUp(self)
1697
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1699
def test_value(self):
1700
self.assertEqual('original', self.orig)
1701
self.assertEqual('modified', obj.test_attr)
1703
test = Test('test_value')
1704
test.run(unittest.TestResult())
1705
self.assertEqual('original', obj.test_attr)
1708
class _MissingFeature(tests.Feature):
1711
missing_feature = _MissingFeature()
1714
def _get_test(name):
1715
"""Get an instance of a specific example test.
1717
We protect this in a function so that they don't auto-run in the test
1721
class ExampleTests(tests.TestCase):
1723
def test_fail(self):
1724
mutter('this was a failing test')
1725
self.fail('this test will fail')
1727
def test_error(self):
1728
mutter('this test errored')
1729
raise RuntimeError('gotcha')
1731
def test_missing_feature(self):
1732
mutter('missing the feature')
1733
self.requireFeature(missing_feature)
1735
def test_skip(self):
1736
mutter('this test will be skipped')
1737
raise tests.TestSkipped('reason')
1739
def test_success(self):
1740
mutter('this test succeeds')
1742
def test_xfail(self):
1743
mutter('test with expected failure')
1744
self.knownFailure('this_fails')
1746
def test_unexpected_success(self):
1747
mutter('test with unexpected success')
1748
self.expectFailure('should_fail', lambda: None)
1750
return ExampleTests(name)
1753
class TestTestCaseLogDetails(tests.TestCase):
1755
def _run_test(self, test_name):
1756
test = _get_test(test_name)
1757
result = testtools.TestResult()
1761
def test_fail_has_log(self):
1762
result = self._run_test('test_fail')
1763
self.assertEqual(1, len(result.failures))
1764
result_content = result.failures[0][1]
1765
self.assertContainsRe(result_content, 'Text attachment: log')
1766
self.assertContainsRe(result_content, 'this was a failing test')
1768
def test_error_has_log(self):
1769
result = self._run_test('test_error')
1770
self.assertEqual(1, len(result.errors))
1771
result_content = result.errors[0][1]
1772
self.assertContainsRe(result_content, 'Text attachment: log')
1773
self.assertContainsRe(result_content, 'this test errored')
1775
def test_skip_has_no_log(self):
1776
result = self._run_test('test_skip')
1777
self.assertEqual(['reason'], result.skip_reasons.keys())
1778
skips = result.skip_reasons['reason']
1779
self.assertEqual(1, len(skips))
1781
self.assertFalse('log' in test.getDetails())
1783
def test_missing_feature_has_no_log(self):
1784
# testtools doesn't know about addNotSupported, so it just gets
1785
# considered as a skip
1786
result = self._run_test('test_missing_feature')
1787
self.assertEqual([missing_feature], result.skip_reasons.keys())
1788
skips = result.skip_reasons[missing_feature]
1789
self.assertEqual(1, len(skips))
1791
self.assertFalse('log' in test.getDetails())
1793
def test_xfail_has_no_log(self):
1794
result = self._run_test('test_xfail')
1795
self.assertEqual(1, len(result.expectedFailures))
1796
result_content = result.expectedFailures[0][1]
1797
self.assertNotContainsRe(result_content, 'Text attachment: log')
1798
self.assertNotContainsRe(result_content, 'test with expected failure')
1800
def test_unexpected_success_has_log(self):
1801
result = self._run_test('test_unexpected_success')
1802
self.assertEqual(1, len(result.unexpectedSuccesses))
1803
# Inconsistency, unexpectedSuccesses is a list of tests,
1804
# expectedFailures is a list of reasons?
1805
test = result.unexpectedSuccesses[0]
1806
details = test.getDetails()
1807
self.assertTrue('log' in details)
1810
class TestTestCloning(tests.TestCase):
1811
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1813
def test_cloned_testcase_does_not_share_details(self):
1814
"""A TestCase cloned with clone_test does not share mutable attributes
1815
such as details or cleanups.
1817
class Test(tests.TestCase):
1819
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1820
orig_test = Test('test_foo')
1821
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1822
orig_test.run(unittest.TestResult())
1823
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1824
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1826
def test_double_apply_scenario_preserves_first_scenario(self):
1827
"""Applying two levels of scenarios to a test preserves the attributes
1828
added by both scenarios.
1830
class Test(tests.TestCase):
1833
test = Test('test_foo')
1834
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1835
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1836
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1837
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1838
all_tests = list(tests.iter_suite_tests(suite))
1839
self.assertLength(4, all_tests)
1840
all_xys = sorted((t.x, t.y) for t in all_tests)
1841
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1844
1661
# NB: Don't delete this; it's not actually from 0.11!
1845
1662
@deprecated_function(deprecated_in((0, 11, 0)))
2031
1810
test_suite_factory=factory)
2032
1811
self.assertEqual([True], factory_called)
2035
"""A test suite factory."""
2036
class Test(tests.TestCase):
2043
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2045
def test_list_only(self):
2046
output = self.run_selftest(test_suite_factory=self.factory,
2048
self.assertEqual(3, len(output.readlines()))
2050
def test_list_only_filtered(self):
2051
output = self.run_selftest(test_suite_factory=self.factory,
2052
list_only=True, pattern="Test.b")
2053
self.assertEndsWith(output.getvalue(), "Test.b\n")
2054
self.assertLength(1, output.readlines())
2056
def test_list_only_excludes(self):
2057
output = self.run_selftest(test_suite_factory=self.factory,
2058
list_only=True, exclude_pattern="Test.b")
2059
self.assertNotContainsRe("Test.b", output.getvalue())
2060
self.assertLength(2, output.readlines())
2062
def test_lsprof_tests(self):
2063
self.requireFeature(test_lsprof.LSProfFeature)
2066
def __call__(test, result):
2068
def run(test, result):
2069
self.assertIsInstance(result, tests.ForwardingResult)
2070
calls.append("called")
2071
def countTestCases(self):
2073
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2074
self.assertLength(1, calls)
2076
def test_random(self):
2077
# test randomising by listing a number of tests.
2078
output_123 = self.run_selftest(test_suite_factory=self.factory,
2079
list_only=True, random_seed="123")
2080
output_234 = self.run_selftest(test_suite_factory=self.factory,
2081
list_only=True, random_seed="234")
2082
self.assertNotEqual(output_123, output_234)
2083
# "Randominzing test order..\n\n
2084
self.assertLength(5, output_123.readlines())
2085
self.assertLength(5, output_234.readlines())
2087
def test_random_reuse_is_same_order(self):
2088
# test randomising by listing a number of tests.
2089
expected = self.run_selftest(test_suite_factory=self.factory,
2090
list_only=True, random_seed="123")
2091
repeated = self.run_selftest(test_suite_factory=self.factory,
2092
list_only=True, random_seed="123")
2093
self.assertEqual(expected.getvalue(), repeated.getvalue())
2095
def test_runner_class(self):
2096
self.requireFeature(features.subunit)
2097
from subunit import ProtocolTestCase
2098
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2099
test_suite_factory=self.factory)
2100
test = ProtocolTestCase(stream)
2101
result = unittest.TestResult()
2103
self.assertEqual(3, result.testsRun)
2105
def test_starting_with_single_argument(self):
2106
output = self.run_selftest(test_suite_factory=self.factory,
2107
starting_with=['bzrlib.tests.test_selftest.Test.a'],
2109
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
2112
def test_starting_with_multiple_argument(self):
2113
output = self.run_selftest(test_suite_factory=self.factory,
2114
starting_with=['bzrlib.tests.test_selftest.Test.a',
2115
'bzrlib.tests.test_selftest.Test.b'],
2117
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
2118
'bzrlib.tests.test_selftest.Test.b\n',
2121
def check_transport_set(self, transport_server):
2122
captured_transport = []
2123
def seen_transport(a_transport):
2124
captured_transport.append(a_transport)
2125
class Capture(tests.TestCase):
2127
seen_transport(bzrlib.tests.default_transport)
2129
return TestUtil.TestSuite([Capture("a")])
2130
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2131
self.assertEqual(transport_server, captured_transport[0])
2133
def test_transport_sftp(self):
2134
self.requireFeature(features.paramiko)
2135
from bzrlib.tests import stub_sftp
2136
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2138
def test_transport_memory(self):
2139
self.check_transport_set(memory.MemoryServer)
2142
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2143
# Does IO: reads test.list
2145
def test_load_list(self):
2146
# Provide a list with one test - this test.
2147
test_id_line = '%s\n' % self.id()
2148
self.build_tree_contents([('test.list', test_id_line)])
2149
# And generate a list of the tests in the suite.
2150
stream = self.run_selftest(load_list='test.list', list_only=True)
2151
self.assertEqual(test_id_line, stream.getvalue())
2153
def test_load_unknown(self):
2154
# Provide a list with one test - this test.
2155
# And generate a list of the tests in the suite.
2156
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2157
load_list='missing file name', list_only=True)
2160
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2162
_test_needs_features = [features.subunit]
2164
def run_subunit_stream(self, test_name):
2165
from subunit import ProtocolTestCase
2167
return TestUtil.TestSuite([_get_test(test_name)])
2168
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2169
test_suite_factory=factory)
2170
test = ProtocolTestCase(stream)
2171
result = testtools.TestResult()
2173
content = stream.getvalue()
2174
return content, result
2176
def test_fail_has_log(self):
2177
content, result = self.run_subunit_stream('test_fail')
2178
self.assertEqual(1, len(result.failures))
2179
self.assertContainsRe(content, '(?m)^log$')
2180
self.assertContainsRe(content, 'this test will fail')
2182
def test_error_has_log(self):
2183
content, result = self.run_subunit_stream('test_error')
2184
self.assertContainsRe(content, '(?m)^log$')
2185
self.assertContainsRe(content, 'this test errored')
2187
def test_skip_has_no_log(self):
2188
content, result = self.run_subunit_stream('test_skip')
2189
self.assertNotContainsRe(content, '(?m)^log$')
2190
self.assertNotContainsRe(content, 'this test will be skipped')
2191
self.assertEqual(['reason'], result.skip_reasons.keys())
2192
skips = result.skip_reasons['reason']
2193
self.assertEqual(1, len(skips))
2195
# RemotedTestCase doesn't preserve the "details"
2196
## self.assertFalse('log' in test.getDetails())
2198
def test_missing_feature_has_no_log(self):
2199
content, result = self.run_subunit_stream('test_missing_feature')
2200
self.assertNotContainsRe(content, '(?m)^log$')
2201
self.assertNotContainsRe(content, 'missing the feature')
2202
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2203
skips = result.skip_reasons['_MissingFeature\n']
2204
self.assertEqual(1, len(skips))
2206
# RemotedTestCase doesn't preserve the "details"
2207
## self.assertFalse('log' in test.getDetails())
2209
def test_xfail_has_no_log(self):
2210
content, result = self.run_subunit_stream('test_xfail')
2211
self.assertNotContainsRe(content, '(?m)^log$')
2212
self.assertNotContainsRe(content, 'test with expected failure')
2213
self.assertEqual(1, len(result.expectedFailures))
2214
result_content = result.expectedFailures[0][1]
2215
self.assertNotContainsRe(result_content, 'Text attachment: log')
2216
self.assertNotContainsRe(result_content, 'test with expected failure')
2218
def test_unexpected_success_has_log(self):
2219
content, result = self.run_subunit_stream('test_unexpected_success')
2220
self.assertContainsRe(content, '(?m)^log$')
2221
self.assertContainsRe(content, 'test with unexpected success')
2222
self.expectFailure('subunit treats "unexpectedSuccess"'
2223
' as a plain success',
2224
self.assertEqual, 1, len(result.unexpectedSuccesses))
2225
self.assertEqual(1, len(result.unexpectedSuccesses))
2226
test = result.unexpectedSuccesses[0]
2227
# RemotedTestCase doesn't preserve the "details"
2228
## self.assertTrue('log' in test.getDetails())
2230
def test_success_has_no_log(self):
2231
content, result = self.run_subunit_stream('test_success')
2232
self.assertEqual(1, result.testsRun)
2233
self.assertNotContainsRe(content, '(?m)^log$')
2234
self.assertNotContainsRe(content, 'this test succeeds')
2237
class TestRunBzr(tests.TestCase):
2242
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2244
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2246
Attempts to run bzr from inside this class don't actually run it.
2248
We test how run_bzr actually invokes bzr in another location. Here we
2249
only need to test that it passes the right parameters to run_bzr.
2251
self.argv = list(argv)
2252
self.retcode = retcode
2253
self.encoding = encoding
2255
self.working_dir = working_dir
2256
return self.retcode, self.out, self.err
2258
def test_run_bzr_error(self):
2259
self.out = "It sure does!\n"
2260
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2261
self.assertEqual(['rocks'], self.argv)
2262
self.assertEqual(34, self.retcode)
2263
self.assertEqual('It sure does!\n', out)
2264
self.assertEquals(out, self.out)
2265
self.assertEqual('', err)
2266
self.assertEquals(err, self.err)
2268
def test_run_bzr_error_regexes(self):
2270
self.err = "bzr: ERROR: foobarbaz is not versioned"
2271
out, err = self.run_bzr_error(
2272
["bzr: ERROR: foobarbaz is not versioned"],
2273
['file-id', 'foobarbaz'])
2275
def test_encoding(self):
2276
"""Test that run_bzr passes encoding to _run_bzr_core"""
2277
self.run_bzr('foo bar')
2278
self.assertEqual(None, self.encoding)
2279
self.assertEqual(['foo', 'bar'], self.argv)
2281
self.run_bzr('foo bar', encoding='baz')
2282
self.assertEqual('baz', self.encoding)
2283
self.assertEqual(['foo', 'bar'], self.argv)
2285
def test_retcode(self):
2286
"""Test that run_bzr passes retcode to _run_bzr_core"""
2287
# Default is retcode == 0
2288
self.run_bzr('foo bar')
2289
self.assertEqual(0, self.retcode)
2290
self.assertEqual(['foo', 'bar'], self.argv)
2292
self.run_bzr('foo bar', retcode=1)
2293
self.assertEqual(1, self.retcode)
2294
self.assertEqual(['foo', 'bar'], self.argv)
2296
self.run_bzr('foo bar', retcode=None)
2297
self.assertEqual(None, self.retcode)
2298
self.assertEqual(['foo', 'bar'], self.argv)
2300
self.run_bzr(['foo', 'bar'], retcode=3)
2301
self.assertEqual(3, self.retcode)
2302
self.assertEqual(['foo', 'bar'], self.argv)
2304
def test_stdin(self):
2305
# test that the stdin keyword to run_bzr is passed through to
2306
# _run_bzr_core as-is. We do this by overriding
2307
# _run_bzr_core in this class, and then calling run_bzr,
2308
# which is a convenience function for _run_bzr_core, so
2310
self.run_bzr('foo bar', stdin='gam')
2311
self.assertEqual('gam', self.stdin)
2312
self.assertEqual(['foo', 'bar'], self.argv)
2314
self.run_bzr('foo bar', stdin='zippy')
2315
self.assertEqual('zippy', self.stdin)
2316
self.assertEqual(['foo', 'bar'], self.argv)
2318
def test_working_dir(self):
2319
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2320
self.run_bzr('foo bar')
2321
self.assertEqual(None, self.working_dir)
2322
self.assertEqual(['foo', 'bar'], self.argv)
2324
self.run_bzr('foo bar', working_dir='baz')
2325
self.assertEqual('baz', self.working_dir)
2326
self.assertEqual(['foo', 'bar'], self.argv)
2328
def test_reject_extra_keyword_arguments(self):
2329
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2330
error_regex=['error message'])
2333
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2334
# Does IO when testing the working_dir parameter.
2336
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2337
a_callable=None, *args, **kwargs):
2339
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2340
self.factory = bzrlib.ui.ui_factory
2341
self.working_dir = osutils.getcwd()
2342
stdout.write('foo\n')
2343
stderr.write('bar\n')
2346
def test_stdin(self):
2347
# test that the stdin keyword to _run_bzr_core is passed through to
2348
# apply_redirected as a StringIO. We do this by overriding
2349
# apply_redirected in this class, and then calling _run_bzr_core,
2350
# which calls apply_redirected.
2351
self.run_bzr(['foo', 'bar'], stdin='gam')
2352
self.assertEqual('gam', self.stdin.read())
2353
self.assertTrue(self.stdin is self.factory_stdin)
2354
self.run_bzr(['foo', 'bar'], stdin='zippy')
2355
self.assertEqual('zippy', self.stdin.read())
2356
self.assertTrue(self.stdin is self.factory_stdin)
2358
def test_ui_factory(self):
2359
# each invocation of self.run_bzr should get its
2360
# own UI factory, which is an instance of TestUIFactory,
2361
# with stdin, stdout and stderr attached to the stdin,
2362
# stdout and stderr of the invoked run_bzr
2363
current_factory = bzrlib.ui.ui_factory
2364
self.run_bzr(['foo'])
2365
self.failIf(current_factory is self.factory)
2366
self.assertNotEqual(sys.stdout, self.factory.stdout)
2367
self.assertNotEqual(sys.stderr, self.factory.stderr)
2368
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2369
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2370
self.assertIsInstance(self.factory, tests.TestUIFactory)
2372
def test_working_dir(self):
2373
self.build_tree(['one/', 'two/'])
2374
cwd = osutils.getcwd()
2376
# Default is to work in the current directory
2377
self.run_bzr(['foo', 'bar'])
2378
self.assertEqual(cwd, self.working_dir)
2380
self.run_bzr(['foo', 'bar'], working_dir=None)
2381
self.assertEqual(cwd, self.working_dir)
2383
# The function should be run in the alternative directory
2384
# but afterwards the current working dir shouldn't be changed
2385
self.run_bzr(['foo', 'bar'], working_dir='one')
2386
self.assertNotEqual(cwd, self.working_dir)
2387
self.assertEndsWith(self.working_dir, 'one')
2388
self.assertEqual(cwd, osutils.getcwd())
2390
self.run_bzr(['foo', 'bar'], working_dir='two')
2391
self.assertNotEqual(cwd, self.working_dir)
2392
self.assertEndsWith(self.working_dir, 'two')
2393
self.assertEqual(cwd, osutils.getcwd())
2396
class StubProcess(object):
2397
"""A stub process for testing run_bzr_subprocess."""
2399
def __init__(self, out="", err="", retcode=0):
2402
self.returncode = retcode
2404
def communicate(self):
2405
return self.out, self.err
2408
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2409
"""Base class for tests testing how we might run bzr."""
2412
tests.TestCaseWithTransport.setUp(self)
2413
self.subprocess_calls = []
2415
def start_bzr_subprocess(self, process_args, env_changes=None,
2416
skip_if_plan_to_signal=False,
2418
allow_plugins=False):
2419
"""capture what run_bzr_subprocess tries to do."""
2420
self.subprocess_calls.append({'process_args':process_args,
2421
'env_changes':env_changes,
2422
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2423
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2424
return self.next_subprocess
2427
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2429
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2430
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2432
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2433
that will return static results. This assertion method populates those
2434
results and also checks the arguments run_bzr_subprocess generates.
2436
self.next_subprocess = process
2438
result = self.run_bzr_subprocess(*args, **kwargs)
2440
self.next_subprocess = None
2441
for key, expected in expected_args.iteritems():
2442
self.assertEqual(expected, self.subprocess_calls[-1][key])
1814
class TestKnownFailure(tests.TestCase):
1816
def test_known_failure(self):
1817
"""Check that KnownFailure is defined appropriately."""
1818
# a KnownFailure is an assertion error for compatability with unaware
1820
self.assertIsInstance(tests.KnownFailure(""), AssertionError)
1822
def test_expect_failure(self):
1824
self.expectFailure("Doomed to failure", self.assertTrue, False)
1825
except tests.KnownFailure, e:
1826
self.assertEqual('Doomed to failure', e.args[0])
1828
self.expectFailure("Doomed to failure", self.assertTrue, True)
1829
except AssertionError, e:
1830
self.assertEqual('Unexpected success. Should have failed:'
1831
' Doomed to failure', e.args[0])
2445
self.next_subprocess = None
2446
for key, expected in expected_args.iteritems():
2447
self.assertEqual(expected, self.subprocess_calls[-1][key])
2450
def test_run_bzr_subprocess(self):
2451
"""The run_bzr_helper_external command behaves nicely."""
2452
self.assertRunBzrSubprocess({'process_args':['--version']},
2453
StubProcess(), '--version')
2454
self.assertRunBzrSubprocess({'process_args':['--version']},
2455
StubProcess(), ['--version'])
2456
# retcode=None disables retcode checking
2457
result = self.assertRunBzrSubprocess({},
2458
StubProcess(retcode=3), '--version', retcode=None)
2459
result = self.assertRunBzrSubprocess({},
2460
StubProcess(out="is free software"), '--version')
2461
self.assertContainsRe(result[0], 'is free software')
2462
# Running a subcommand that is missing errors
2463
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2464
{'process_args':['--versionn']}, StubProcess(retcode=3),
2466
# Unless it is told to expect the error from the subprocess
2467
result = self.assertRunBzrSubprocess({},
2468
StubProcess(retcode=3), '--versionn', retcode=3)
2469
# Or to ignore retcode checking
2470
result = self.assertRunBzrSubprocess({},
2471
StubProcess(err="unknown command", retcode=3), '--versionn',
2473
self.assertContainsRe(result[1], 'unknown command')
2475
def test_env_change_passes_through(self):
2476
self.assertRunBzrSubprocess(
2477
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2479
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2481
def test_no_working_dir_passed_as_None(self):
2482
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2484
def test_no_working_dir_passed_through(self):
2485
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2488
def test_run_bzr_subprocess_no_plugins(self):
2489
self.assertRunBzrSubprocess({'allow_plugins': False},
2492
def test_allow_plugins(self):
2493
self.assertRunBzrSubprocess({'allow_plugins': True},
2494
StubProcess(), '', allow_plugins=True)
2497
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2499
def test_finish_bzr_subprocess_with_error(self):
2500
"""finish_bzr_subprocess allows specification of the desired exit code.
2502
process = StubProcess(err="unknown command", retcode=3)
2503
result = self.finish_bzr_subprocess(process, retcode=3)
2504
self.assertEqual('', result[0])
2505
self.assertContainsRe(result[1], 'unknown command')
2507
def test_finish_bzr_subprocess_ignoring_retcode(self):
2508
"""finish_bzr_subprocess allows the exit code to be ignored."""
2509
process = StubProcess(err="unknown command", retcode=3)
2510
result = self.finish_bzr_subprocess(process, retcode=None)
2511
self.assertEqual('', result[0])
2512
self.assertContainsRe(result[1], 'unknown command')
2514
def test_finish_subprocess_with_unexpected_retcode(self):
2515
"""finish_bzr_subprocess raises self.failureException if the retcode is
2516
not the expected one.
2518
process = StubProcess(err="unknown command", retcode=3)
2519
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2523
class _DontSpawnProcess(Exception):
2524
"""A simple exception which just allows us to skip unnecessary steps"""
2527
class TestStartBzrSubProcess(tests.TestCase):
2529
def check_popen_state(self):
2530
"""Replace to make assertions when popen is called."""
2532
def _popen(self, *args, **kwargs):
2533
"""Record the command that is run, so that we can ensure it is correct"""
2534
self.check_popen_state()
2535
self._popen_args = args
2536
self._popen_kwargs = kwargs
2537
raise _DontSpawnProcess()
2539
def test_run_bzr_subprocess_no_plugins(self):
2540
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2541
command = self._popen_args[0]
2542
self.assertEqual(sys.executable, command[0])
2543
self.assertEqual(self.get_bzr_path(), command[1])
2544
self.assertEqual(['--no-plugins'], command[2:])
2546
def test_allow_plugins(self):
2547
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2549
command = self._popen_args[0]
2550
self.assertEqual([], command[2:])
2552
def test_set_env(self):
2553
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2555
def check_environment():
2556
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2557
self.check_popen_state = check_environment
2558
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2559
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2560
# not set in theparent
2561
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2563
def test_run_bzr_subprocess_env_del(self):
2564
"""run_bzr_subprocess can remove environment variables too."""
2565
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2566
def check_environment():
2567
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2568
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2569
self.check_popen_state = check_environment
2570
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2571
env_changes={'EXISTANT_ENV_VAR':None})
2572
# Still set in parent
2573
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2574
del os.environ['EXISTANT_ENV_VAR']
2576
def test_env_del_missing(self):
2577
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2578
def check_environment():
2579
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2580
self.check_popen_state = check_environment
2581
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2582
env_changes={'NON_EXISTANT_ENV_VAR':None})
2584
def test_working_dir(self):
2585
"""Test that we can specify the working dir for the child"""
2586
orig_getcwd = osutils.getcwd
2587
orig_chdir = os.chdir
2595
osutils.getcwd = getcwd
2597
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2600
osutils.getcwd = orig_getcwd
2602
os.chdir = orig_chdir
2603
self.assertEqual(['foo', 'current'], chdirs)
2605
def test_get_bzr_path_with_cwd_bzrlib(self):
2606
self.get_source_path = lambda: ""
2607
self.overrideAttr(os.path, "isfile", lambda path: True)
2608
self.assertEqual(self.get_bzr_path(), "bzr")
2611
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2612
"""Tests that really need to do things with an external bzr."""
2614
def test_start_and_stop_bzr_subprocess_send_signal(self):
2615
"""finish_bzr_subprocess raises self.failureException if the retcode is
2616
not the expected one.
2618
self.disable_missing_extensions_warning()
2619
process = self.start_bzr_subprocess(['wait-until-signalled'],
2620
skip_if_plan_to_signal=True)
2621
self.assertEqual('running\n', process.stdout.readline())
2622
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2624
self.assertEqual('', result[0])
2625
self.assertEqual('bzr: interrupted\n', result[1])
1833
self.fail('Assertion not raised')
2628
1836
class TestFeature(tests.TestCase):