1216
def _patch_get_bzr_source_tree(self):
1217
# Reading from the actual source tree breaks isolation, but we don't
1218
# want to assume that thats *all* that would happen.
1219
self._get_source_tree_calls = []
1221
self._get_source_tree_calls.append("called")
1223
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', new_get)
1225
1233
def test_bench_history(self):
1226
# tests that the running the benchmark passes bench_history into
1227
# the test result object. We can tell that happens if
1228
# _get_bzr_source_tree is called.
1229
self._patch_get_bzr_source_tree()
1234
# tests that the running the benchmark produces a history file
1235
# containing a timestamp and the revision id of the bzrlib source which
1237
workingtree = _get_bzr_source_tree()
1230
1238
test = TestRunner('dummy_test')
1231
1239
output = StringIO()
1232
runner = tests.TextTestRunner(stream=self._log_file,
1233
bench_history=output)
1240
runner = TextTestRunner(stream=self._log_file, bench_history=output)
1234
1241
result = self.run_test_runner(runner, test)
1235
1242
output_string = output.getvalue()
1236
1243
self.assertContainsRe(output_string, "--date [0-9.]+")
1237
self.assertLength(1, self._get_source_tree_calls)
1239
def test_verbose_test_count(self):
1240
"""A verbose test run reports the right test count at the start"""
1241
suite = TestUtil.TestSuite([
1242
unittest.FunctionTestCase(lambda:None),
1243
unittest.FunctionTestCase(lambda:None)])
1244
self.assertEqual(suite.countTestCases(), 2)
1246
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
# Need to use the CountingDecorator as that's what sets num_tests
1248
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1251
def test_startTestRun(self):
1252
"""run should call result.startTestRun()"""
1254
class LoggingDecorator(tests.ForwardingResult):
1255
def startTestRun(self):
1256
tests.ForwardingResult.startTestRun(self)
1257
calls.append('startTestRun')
1258
test = unittest.FunctionTestCase(lambda:None)
1260
runner = tests.TextTestRunner(stream=stream,
1261
result_decorators=[LoggingDecorator])
1262
result = self.run_test_runner(runner, test)
1263
self.assertLength(1, calls)
1265
def test_stopTestRun(self):
1266
"""run should call result.stopTestRun()"""
1268
class LoggingDecorator(tests.ForwardingResult):
1269
def stopTestRun(self):
1270
tests.ForwardingResult.stopTestRun(self)
1271
calls.append('stopTestRun')
1272
test = unittest.FunctionTestCase(lambda:None)
1274
runner = tests.TextTestRunner(stream=stream,
1275
result_decorators=[LoggingDecorator])
1276
result = self.run_test_runner(runner, test)
1277
self.assertLength(1, calls)
1280
class SampleTestCase(tests.TestCase):
1244
if workingtree is not None:
1245
revision_id = workingtree.get_parent_ids()[0]
1246
self.assertEndsWith(output_string.rstrip(), revision_id)
1248
def assertLogDeleted(self, test):
1249
log = test._get_log()
1250
self.assertEqual("DELETED log file to reduce memory footprint", log)
1251
self.assertEqual('', test._log_contents)
1252
self.assertIs(None, test._log_file_name)
1254
def test_success_log_deleted(self):
1255
"""Successful tests have their log deleted"""
1257
class LogTester(TestCase):
1259
def test_success(self):
1260
self.log('this will be removed\n')
1262
sio = cStringIO.StringIO()
1263
runner = TextTestRunner(stream=sio)
1264
test = LogTester('test_success')
1265
result = self.run_test_runner(runner, test)
1267
self.assertLogDeleted(test)
1269
def test_skipped_log_deleted(self):
1270
"""Skipped tests have their log deleted"""
1272
class LogTester(TestCase):
1274
def test_skipped(self):
1275
self.log('this will be removed\n')
1276
raise tests.TestSkipped()
1278
sio = cStringIO.StringIO()
1279
runner = TextTestRunner(stream=sio)
1280
test = LogTester('test_skipped')
1281
result = self.run_test_runner(runner, test)
1283
self.assertLogDeleted(test)
1285
def test_not_aplicable_log_deleted(self):
1286
"""Not applicable tests have their log deleted"""
1288
class LogTester(TestCase):
1290
def test_not_applicable(self):
1291
self.log('this will be removed\n')
1292
raise tests.TestNotApplicable()
1294
sio = cStringIO.StringIO()
1295
runner = TextTestRunner(stream=sio)
1296
test = LogTester('test_not_applicable')
1297
result = self.run_test_runner(runner, test)
1299
self.assertLogDeleted(test)
1301
def test_known_failure_log_deleted(self):
1302
"""Know failure tests have their log deleted"""
1304
class LogTester(TestCase):
1306
def test_known_failure(self):
1307
self.log('this will be removed\n')
1308
raise tests.KnownFailure()
1310
sio = cStringIO.StringIO()
1311
runner = TextTestRunner(stream=sio)
1312
test = LogTester('test_known_failure')
1313
result = self.run_test_runner(runner, test)
1315
self.assertLogDeleted(test)
1317
def test_fail_log_kept(self):
1318
"""Failed tests have their log kept"""
1320
class LogTester(TestCase):
1322
def test_fail(self):
1323
self.log('this will be kept\n')
1324
self.fail('this test fails')
1326
sio = cStringIO.StringIO()
1327
runner = TextTestRunner(stream=sio)
1328
test = LogTester('test_fail')
1329
result = self.run_test_runner(runner, test)
1331
text = sio.getvalue()
1332
self.assertContainsRe(text, 'this will be kept')
1333
self.assertContainsRe(text, 'this test fails')
1335
log = test._get_log()
1336
self.assertContainsRe(log, 'this will be kept')
1337
self.assertEqual(log, test._log_contents)
1339
def test_error_log_kept(self):
1340
"""Tests with errors have their log kept"""
1342
class LogTester(TestCase):
1344
def test_error(self):
1345
self.log('this will be kept\n')
1346
raise ValueError('random exception raised')
1348
sio = cStringIO.StringIO()
1349
runner = TextTestRunner(stream=sio)
1350
test = LogTester('test_error')
1351
result = self.run_test_runner(runner, test)
1353
text = sio.getvalue()
1354
self.assertContainsRe(text, 'this will be kept')
1355
self.assertContainsRe(text, 'random exception raised')
1357
log = test._get_log()
1358
self.assertContainsRe(log, 'this will be kept')
1359
self.assertEqual(log, test._log_contents)
1362
class SampleTestCase(TestCase):
1282
1364
def _test_pass(self):
1653
1670
self.assertRaises(AssertionError,
1654
1671
self.assertListRaises, _TestException, success_generator)
1656
def test_overrideAttr_without_value(self):
1657
self.test_attr = 'original' # Define a test attribute
1658
obj = self # Make 'obj' visible to the embedded test
1659
class Test(tests.TestCase):
1662
tests.TestCase.setUp(self)
1663
self.orig = self.overrideAttr(obj, 'test_attr')
1665
def test_value(self):
1666
self.assertEqual('original', self.orig)
1667
self.assertEqual('original', obj.test_attr)
1668
obj.test_attr = 'modified'
1669
self.assertEqual('modified', obj.test_attr)
1671
test = Test('test_value')
1672
test.run(unittest.TestResult())
1673
self.assertEqual('original', obj.test_attr)
1675
def test_overrideAttr_with_value(self):
1676
self.test_attr = 'original' # Define a test attribute
1677
obj = self # Make 'obj' visible to the embedded test
1678
class Test(tests.TestCase):
1681
tests.TestCase.setUp(self)
1682
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1684
def test_value(self):
1685
self.assertEqual('original', self.orig)
1686
self.assertEqual('modified', obj.test_attr)
1688
test = Test('test_value')
1689
test.run(unittest.TestResult())
1690
self.assertEqual('original', obj.test_attr)
1693
class _MissingFeature(tests.Feature):
1696
missing_feature = _MissingFeature()
1699
def _get_test(name):
1700
"""Get an instance of a specific example test.
1702
We protect this in a function so that they don't auto-run in the test
1706
class ExampleTests(tests.TestCase):
1708
def test_fail(self):
1709
mutter('this was a failing test')
1710
self.fail('this test will fail')
1712
def test_error(self):
1713
mutter('this test errored')
1714
raise RuntimeError('gotcha')
1716
def test_missing_feature(self):
1717
mutter('missing the feature')
1718
self.requireFeature(missing_feature)
1720
def test_skip(self):
1721
mutter('this test will be skipped')
1722
raise tests.TestSkipped('reason')
1724
def test_success(self):
1725
mutter('this test succeeds')
1727
def test_xfail(self):
1728
mutter('test with expected failure')
1729
self.knownFailure('this_fails')
1731
def test_unexpected_success(self):
1732
mutter('test with unexpected success')
1733
self.expectFailure('should_fail', lambda: None)
1735
return ExampleTests(name)
1738
class TestTestCaseLogDetails(tests.TestCase):
1740
def _run_test(self, test_name):
1741
test = _get_test(test_name)
1742
result = testtools.TestResult()
1746
def test_fail_has_log(self):
1747
result = self._run_test('test_fail')
1748
self.assertEqual(1, len(result.failures))
1749
result_content = result.failures[0][1]
1750
self.assertContainsRe(result_content, 'Text attachment: log')
1751
self.assertContainsRe(result_content, 'this was a failing test')
1753
def test_error_has_log(self):
1754
result = self._run_test('test_error')
1755
self.assertEqual(1, len(result.errors))
1756
result_content = result.errors[0][1]
1757
self.assertContainsRe(result_content, 'Text attachment: log')
1758
self.assertContainsRe(result_content, 'this test errored')
1760
def test_skip_has_no_log(self):
1761
result = self._run_test('test_skip')
1762
self.assertEqual(['reason'], result.skip_reasons.keys())
1763
skips = result.skip_reasons['reason']
1764
self.assertEqual(1, len(skips))
1766
self.assertFalse('log' in test.getDetails())
1768
def test_missing_feature_has_no_log(self):
1769
# testtools doesn't know about addNotSupported, so it just gets
1770
# considered as a skip
1771
result = self._run_test('test_missing_feature')
1772
self.assertEqual([missing_feature], result.skip_reasons.keys())
1773
skips = result.skip_reasons[missing_feature]
1774
self.assertEqual(1, len(skips))
1776
self.assertFalse('log' in test.getDetails())
1778
def test_xfail_has_no_log(self):
1779
result = self._run_test('test_xfail')
1780
self.assertEqual(1, len(result.expectedFailures))
1781
result_content = result.expectedFailures[0][1]
1782
self.assertNotContainsRe(result_content, 'Text attachment: log')
1783
self.assertNotContainsRe(result_content, 'test with expected failure')
1785
def test_unexpected_success_has_log(self):
1786
result = self._run_test('test_unexpected_success')
1787
self.assertEqual(1, len(result.unexpectedSuccesses))
1788
# Inconsistency, unexpectedSuccesses is a list of tests,
1789
# expectedFailures is a list of reasons?
1790
test = result.unexpectedSuccesses[0]
1791
details = test.getDetails()
1792
self.assertTrue('log' in details)
1795
class TestTestCloning(tests.TestCase):
1796
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1798
def test_cloned_testcase_does_not_share_details(self):
1799
"""A TestCase cloned with clone_test does not share mutable attributes
1800
such as details or cleanups.
1802
class Test(tests.TestCase):
1804
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
orig_test = Test('test_foo')
1806
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
orig_test.run(unittest.TestResult())
1808
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1811
def test_double_apply_scenario_preserves_first_scenario(self):
1812
"""Applying two levels of scenarios to a test preserves the attributes
1813
added by both scenarios.
1815
class Test(tests.TestCase):
1818
test = Test('test_foo')
1819
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
all_tests = list(tests.iter_suite_tests(suite))
1824
self.assertLength(4, all_tests)
1825
all_xys = sorted((t.x, t.y) for t in all_tests)
1826
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1829
1674
# NB: Don't delete this; it's not actually from 0.11!
1830
1675
@deprecated_function(deprecated_in((0, 11, 0)))
1970
1793
tree = self.make_branch_and_memory_tree('a')
1971
1794
self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
1973
def test_make_tree_for_local_vfs_backed_transport(self):
1974
# make_branch_and_tree has to use local branch and repositories
1975
# when the vfs transport and local disk are colocated, even if
1976
# a different transport is in use for url generation.
1977
self.transport_server = test_server.FakeVFATServer
1978
self.assertFalse(self.get_url('t1').startswith('file://'))
1797
class TestSFTPMakeBranchAndTree(TestCaseWithSFTPServer):
1799
def test_make_tree_for_sftp_branch(self):
1800
"""Transports backed by local directories create local trees."""
1979
1802
tree = self.make_branch_and_tree('t1')
1980
1803
base = tree.bzrdir.root_transport.base
1981
self.assertStartsWith(base, 'file://')
1804
self.failIf(base.startswith('sftp'),
1805
'base %r is on sftp but should be local' % base)
1982
1806
self.assertEquals(tree.bzrdir.root_transport,
1983
1807
tree.branch.bzrdir.root_transport)
1984
1808
self.assertEquals(tree.bzrdir.root_transport,
1985
1809
tree.branch.repository.bzrdir.root_transport)
1988
class SelfTestHelper(object):
1990
def run_selftest(self, **kwargs):
1991
"""Run selftest returning its output."""
1993
old_transport = bzrlib.tests.default_transport
1994
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
1995
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
1997
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
1999
bzrlib.tests.default_transport = old_transport
2000
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2005
class TestSelftest(tests.TestCase, SelfTestHelper):
1812
class TestSelftest(TestCase):
2006
1813
"""Tests of bzrlib.tests.selftest."""
2008
1815
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2009
1816
factory_called = []
2011
1818
factory_called.append(True)
2012
return TestUtil.TestSuite()
2013
1820
out = StringIO()
2014
1821
err = StringIO()
2015
1822
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
2016
1823
test_suite_factory=factory)
2017
1824
self.assertEqual([True], factory_called)
2020
"""A test suite factory."""
2021
class Test(tests.TestCase):
2028
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2030
def test_list_only(self):
2031
output = self.run_selftest(test_suite_factory=self.factory,
2033
self.assertEqual(3, len(output.readlines()))
2035
def test_list_only_filtered(self):
2036
output = self.run_selftest(test_suite_factory=self.factory,
2037
list_only=True, pattern="Test.b")
2038
self.assertEndsWith(output.getvalue(), "Test.b\n")
2039
self.assertLength(1, output.readlines())
2041
def test_list_only_excludes(self):
2042
output = self.run_selftest(test_suite_factory=self.factory,
2043
list_only=True, exclude_pattern="Test.b")
2044
self.assertNotContainsRe("Test.b", output.getvalue())
2045
self.assertLength(2, output.readlines())
2047
def test_lsprof_tests(self):
2048
self.requireFeature(test_lsprof.LSProfFeature)
2051
def __call__(test, result):
2053
def run(test, result):
2054
self.assertIsInstance(result, tests.ForwardingResult)
2055
calls.append("called")
2056
def countTestCases(self):
2058
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2059
self.assertLength(1, calls)
2061
def test_random(self):
2062
# test randomising by listing a number of tests.
2063
output_123 = self.run_selftest(test_suite_factory=self.factory,
2064
list_only=True, random_seed="123")
2065
output_234 = self.run_selftest(test_suite_factory=self.factory,
2066
list_only=True, random_seed="234")
2067
self.assertNotEqual(output_123, output_234)
2068
# "Randominzing test order..\n\n
2069
self.assertLength(5, output_123.readlines())
2070
self.assertLength(5, output_234.readlines())
2072
def test_random_reuse_is_same_order(self):
2073
# test randomising by listing a number of tests.
2074
expected = self.run_selftest(test_suite_factory=self.factory,
2075
list_only=True, random_seed="123")
2076
repeated = self.run_selftest(test_suite_factory=self.factory,
2077
list_only=True, random_seed="123")
2078
self.assertEqual(expected.getvalue(), repeated.getvalue())
2080
def test_runner_class(self):
2081
self.requireFeature(features.subunit)
2082
from subunit import ProtocolTestCase
2083
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2084
test_suite_factory=self.factory)
2085
test = ProtocolTestCase(stream)
2086
result = unittest.TestResult()
2088
self.assertEqual(3, result.testsRun)
2090
def test_starting_with_single_argument(self):
2091
output = self.run_selftest(test_suite_factory=self.factory,
2092
starting_with=['bzrlib.tests.test_selftest.Test.a'],
2094
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
2097
def test_starting_with_multiple_argument(self):
2098
output = self.run_selftest(test_suite_factory=self.factory,
2099
starting_with=['bzrlib.tests.test_selftest.Test.a',
2100
'bzrlib.tests.test_selftest.Test.b'],
2102
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
2103
'bzrlib.tests.test_selftest.Test.b\n',
2106
def check_transport_set(self, transport_server):
2107
captured_transport = []
2108
def seen_transport(a_transport):
2109
captured_transport.append(a_transport)
2110
class Capture(tests.TestCase):
2112
seen_transport(bzrlib.tests.default_transport)
2114
return TestUtil.TestSuite([Capture("a")])
2115
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2116
self.assertEqual(transport_server, captured_transport[0])
2118
def test_transport_sftp(self):
2119
self.requireFeature(features.paramiko)
2120
from bzrlib.tests import stub_sftp
2121
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2123
def test_transport_memory(self):
2124
self.check_transport_set(memory.MemoryServer)
2127
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2128
# Does IO: reads test.list
2130
def test_load_list(self):
2131
# Provide a list with one test - this test.
2132
test_id_line = '%s\n' % self.id()
2133
self.build_tree_contents([('test.list', test_id_line)])
2134
# And generate a list of the tests in the suite.
2135
stream = self.run_selftest(load_list='test.list', list_only=True)
2136
self.assertEqual(test_id_line, stream.getvalue())
2138
def test_load_unknown(self):
2139
# Provide a list with one test - this test.
2140
# And generate a list of the tests in the suite.
2141
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2142
load_list='missing file name', list_only=True)
2145
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2147
_test_needs_features = [features.subunit]
2149
def run_subunit_stream(self, test_name):
2150
from subunit import ProtocolTestCase
2152
return TestUtil.TestSuite([_get_test(test_name)])
2153
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2154
test_suite_factory=factory)
2155
test = ProtocolTestCase(stream)
2156
result = testtools.TestResult()
2158
content = stream.getvalue()
2159
return content, result
2161
def test_fail_has_log(self):
2162
content, result = self.run_subunit_stream('test_fail')
2163
self.assertEqual(1, len(result.failures))
2164
self.assertContainsRe(content, '(?m)^log$')
2165
self.assertContainsRe(content, 'this test will fail')
2167
def test_error_has_log(self):
2168
content, result = self.run_subunit_stream('test_error')
2169
self.assertContainsRe(content, '(?m)^log$')
2170
self.assertContainsRe(content, 'this test errored')
2172
def test_skip_has_no_log(self):
2173
content, result = self.run_subunit_stream('test_skip')
2174
self.assertNotContainsRe(content, '(?m)^log$')
2175
self.assertNotContainsRe(content, 'this test will be skipped')
2176
self.assertEqual(['reason'], result.skip_reasons.keys())
2177
skips = result.skip_reasons['reason']
2178
self.assertEqual(1, len(skips))
2180
# RemotedTestCase doesn't preserve the "details"
2181
## self.assertFalse('log' in test.getDetails())
2183
def test_missing_feature_has_no_log(self):
2184
content, result = self.run_subunit_stream('test_missing_feature')
2185
self.assertNotContainsRe(content, '(?m)^log$')
2186
self.assertNotContainsRe(content, 'missing the feature')
2187
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2188
skips = result.skip_reasons['_MissingFeature\n']
2189
self.assertEqual(1, len(skips))
2191
# RemotedTestCase doesn't preserve the "details"
2192
## self.assertFalse('log' in test.getDetails())
2194
def test_xfail_has_no_log(self):
2195
content, result = self.run_subunit_stream('test_xfail')
2196
self.assertNotContainsRe(content, '(?m)^log$')
2197
self.assertNotContainsRe(content, 'test with expected failure')
2198
self.assertEqual(1, len(result.expectedFailures))
2199
result_content = result.expectedFailures[0][1]
2200
self.assertNotContainsRe(result_content, 'Text attachment: log')
2201
self.assertNotContainsRe(result_content, 'test with expected failure')
2203
def test_unexpected_success_has_log(self):
2204
content, result = self.run_subunit_stream('test_unexpected_success')
2205
self.assertContainsRe(content, '(?m)^log$')
2206
self.assertContainsRe(content, 'test with unexpected success')
2207
self.expectFailure('subunit treats "unexpectedSuccess"'
2208
' as a plain success',
2209
self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
self.assertEqual(1, len(result.unexpectedSuccesses))
2211
test = result.unexpectedSuccesses[0]
2212
# RemotedTestCase doesn't preserve the "details"
2213
## self.assertTrue('log' in test.getDetails())
2215
def test_success_has_no_log(self):
2216
content, result = self.run_subunit_stream('test_success')
2217
self.assertEqual(1, result.testsRun)
2218
self.assertNotContainsRe(content, '(?m)^log$')
2219
self.assertNotContainsRe(content, 'this test succeeds')
2222
class TestRunBzr(tests.TestCase):
2227
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2229
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2231
Attempts to run bzr from inside this class don't actually run it.
2233
We test how run_bzr actually invokes bzr in another location. Here we
2234
only need to test that it passes the right parameters to run_bzr.
2236
self.argv = list(argv)
2237
self.retcode = retcode
2238
self.encoding = encoding
2240
self.working_dir = working_dir
2241
return self.retcode, self.out, self.err
2243
def test_run_bzr_error(self):
2244
self.out = "It sure does!\n"
2245
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2246
self.assertEqual(['rocks'], self.argv)
2247
self.assertEqual(34, self.retcode)
2248
self.assertEqual('It sure does!\n', out)
2249
self.assertEquals(out, self.out)
2250
self.assertEqual('', err)
2251
self.assertEquals(err, self.err)
2253
def test_run_bzr_error_regexes(self):
2255
self.err = "bzr: ERROR: foobarbaz is not versioned"
2256
out, err = self.run_bzr_error(
2257
["bzr: ERROR: foobarbaz is not versioned"],
2258
['file-id', 'foobarbaz'])
2260
def test_encoding(self):
2261
"""Test that run_bzr passes encoding to _run_bzr_core"""
2262
self.run_bzr('foo bar')
2263
self.assertEqual(None, self.encoding)
2264
self.assertEqual(['foo', 'bar'], self.argv)
2266
self.run_bzr('foo bar', encoding='baz')
2267
self.assertEqual('baz', self.encoding)
2268
self.assertEqual(['foo', 'bar'], self.argv)
2270
def test_retcode(self):
2271
"""Test that run_bzr passes retcode to _run_bzr_core"""
2272
# Default is retcode == 0
2273
self.run_bzr('foo bar')
2274
self.assertEqual(0, self.retcode)
2275
self.assertEqual(['foo', 'bar'], self.argv)
2277
self.run_bzr('foo bar', retcode=1)
2278
self.assertEqual(1, self.retcode)
2279
self.assertEqual(['foo', 'bar'], self.argv)
2281
self.run_bzr('foo bar', retcode=None)
2282
self.assertEqual(None, self.retcode)
2283
self.assertEqual(['foo', 'bar'], self.argv)
2285
self.run_bzr(['foo', 'bar'], retcode=3)
2286
self.assertEqual(3, self.retcode)
2287
self.assertEqual(['foo', 'bar'], self.argv)
2289
def test_stdin(self):
2290
# test that the stdin keyword to run_bzr is passed through to
2291
# _run_bzr_core as-is. We do this by overriding
2292
# _run_bzr_core in this class, and then calling run_bzr,
2293
# which is a convenience function for _run_bzr_core, so
2295
self.run_bzr('foo bar', stdin='gam')
2296
self.assertEqual('gam', self.stdin)
2297
self.assertEqual(['foo', 'bar'], self.argv)
2299
self.run_bzr('foo bar', stdin='zippy')
2300
self.assertEqual('zippy', self.stdin)
2301
self.assertEqual(['foo', 'bar'], self.argv)
2303
def test_working_dir(self):
2304
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2305
self.run_bzr('foo bar')
2306
self.assertEqual(None, self.working_dir)
2307
self.assertEqual(['foo', 'bar'], self.argv)
2309
self.run_bzr('foo bar', working_dir='baz')
2310
self.assertEqual('baz', self.working_dir)
2311
self.assertEqual(['foo', 'bar'], self.argv)
2313
def test_reject_extra_keyword_arguments(self):
2314
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2315
error_regex=['error message'])
2318
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2319
# Does IO when testing the working_dir parameter.
2321
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2322
a_callable=None, *args, **kwargs):
2324
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2325
self.factory = bzrlib.ui.ui_factory
2326
self.working_dir = osutils.getcwd()
2327
stdout.write('foo\n')
2328
stderr.write('bar\n')
2331
def test_stdin(self):
2332
# test that the stdin keyword to _run_bzr_core is passed through to
2333
# apply_redirected as a StringIO. We do this by overriding
2334
# apply_redirected in this class, and then calling _run_bzr_core,
2335
# which calls apply_redirected.
2336
self.run_bzr(['foo', 'bar'], stdin='gam')
2337
self.assertEqual('gam', self.stdin.read())
2338
self.assertTrue(self.stdin is self.factory_stdin)
2339
self.run_bzr(['foo', 'bar'], stdin='zippy')
2340
self.assertEqual('zippy', self.stdin.read())
2341
self.assertTrue(self.stdin is self.factory_stdin)
2343
def test_ui_factory(self):
2344
# each invocation of self.run_bzr should get its
2345
# own UI factory, which is an instance of TestUIFactory,
2346
# with stdin, stdout and stderr attached to the stdin,
2347
# stdout and stderr of the invoked run_bzr
2348
current_factory = bzrlib.ui.ui_factory
2349
self.run_bzr(['foo'])
2350
self.failIf(current_factory is self.factory)
2351
self.assertNotEqual(sys.stdout, self.factory.stdout)
2352
self.assertNotEqual(sys.stderr, self.factory.stderr)
2353
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2354
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2355
self.assertIsInstance(self.factory, tests.TestUIFactory)
2357
def test_working_dir(self):
2358
self.build_tree(['one/', 'two/'])
2359
cwd = osutils.getcwd()
2361
# Default is to work in the current directory
2362
self.run_bzr(['foo', 'bar'])
2363
self.assertEqual(cwd, self.working_dir)
2365
self.run_bzr(['foo', 'bar'], working_dir=None)
2366
self.assertEqual(cwd, self.working_dir)
2368
# The function should be run in the alternative directory
2369
# but afterwards the current working dir shouldn't be changed
2370
self.run_bzr(['foo', 'bar'], working_dir='one')
2371
self.assertNotEqual(cwd, self.working_dir)
2372
self.assertEndsWith(self.working_dir, 'one')
2373
self.assertEqual(cwd, osutils.getcwd())
2375
self.run_bzr(['foo', 'bar'], working_dir='two')
2376
self.assertNotEqual(cwd, self.working_dir)
2377
self.assertEndsWith(self.working_dir, 'two')
2378
self.assertEqual(cwd, osutils.getcwd())
2381
class StubProcess(object):
2382
"""A stub process for testing run_bzr_subprocess."""
2384
def __init__(self, out="", err="", retcode=0):
2387
self.returncode = retcode
2389
def communicate(self):
2390
return self.out, self.err
2393
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2394
"""Base class for tests testing how we might run bzr."""
2397
tests.TestCaseWithTransport.setUp(self)
2398
self.subprocess_calls = []
2400
def start_bzr_subprocess(self, process_args, env_changes=None,
2401
skip_if_plan_to_signal=False,
2403
allow_plugins=False):
2404
"""capture what run_bzr_subprocess tries to do."""
2405
self.subprocess_calls.append({'process_args':process_args,
2406
'env_changes':env_changes,
2407
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2408
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2409
return self.next_subprocess
2412
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2414
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2415
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2417
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2418
that will return static results. This assertion method populates those
2419
results and also checks the arguments run_bzr_subprocess generates.
2421
self.next_subprocess = process
2423
result = self.run_bzr_subprocess(*args, **kwargs)
2425
self.next_subprocess = None
2426
for key, expected in expected_args.iteritems():
2427
self.assertEqual(expected, self.subprocess_calls[-1][key])
1827
class TestKnownFailure(TestCase):
1829
def test_known_failure(self):
1830
"""Check that KnownFailure is defined appropriately."""
1831
# a KnownFailure is an assertion error for compatability with unaware
1833
self.assertIsInstance(KnownFailure(""), AssertionError)
1835
def test_expect_failure(self):
1837
self.expectFailure("Doomed to failure", self.assertTrue, False)
1838
except KnownFailure, e:
1839
self.assertEqual('Doomed to failure', e.args[0])
1841
self.expectFailure("Doomed to failure", self.assertTrue, True)
1842
except AssertionError, e:
1843
self.assertEqual('Unexpected success. Should have failed:'
1844
' Doomed to failure', e.args[0])
2430
self.next_subprocess = None
2431
for key, expected in expected_args.iteritems():
2432
self.assertEqual(expected, self.subprocess_calls[-1][key])
2435
def test_run_bzr_subprocess(self):
2436
"""The run_bzr_helper_external command behaves nicely."""
2437
self.assertRunBzrSubprocess({'process_args':['--version']},
2438
StubProcess(), '--version')
2439
self.assertRunBzrSubprocess({'process_args':['--version']},
2440
StubProcess(), ['--version'])
2441
# retcode=None disables retcode checking
2442
result = self.assertRunBzrSubprocess({},
2443
StubProcess(retcode=3), '--version', retcode=None)
2444
result = self.assertRunBzrSubprocess({},
2445
StubProcess(out="is free software"), '--version')
2446
self.assertContainsRe(result[0], 'is free software')
2447
# Running a subcommand that is missing errors
2448
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2449
{'process_args':['--versionn']}, StubProcess(retcode=3),
2451
# Unless it is told to expect the error from the subprocess
2452
result = self.assertRunBzrSubprocess({},
2453
StubProcess(retcode=3), '--versionn', retcode=3)
2454
# Or to ignore retcode checking
2455
result = self.assertRunBzrSubprocess({},
2456
StubProcess(err="unknown command", retcode=3), '--versionn',
2458
self.assertContainsRe(result[1], 'unknown command')
2460
def test_env_change_passes_through(self):
2461
self.assertRunBzrSubprocess(
2462
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2464
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2466
def test_no_working_dir_passed_as_None(self):
2467
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2469
def test_no_working_dir_passed_through(self):
2470
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2473
def test_run_bzr_subprocess_no_plugins(self):
2474
self.assertRunBzrSubprocess({'allow_plugins': False},
2477
def test_allow_plugins(self):
2478
self.assertRunBzrSubprocess({'allow_plugins': True},
2479
StubProcess(), '', allow_plugins=True)
2482
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2484
def test_finish_bzr_subprocess_with_error(self):
2485
"""finish_bzr_subprocess allows specification of the desired exit code.
2487
process = StubProcess(err="unknown command", retcode=3)
2488
result = self.finish_bzr_subprocess(process, retcode=3)
2489
self.assertEqual('', result[0])
2490
self.assertContainsRe(result[1], 'unknown command')
2492
def test_finish_bzr_subprocess_ignoring_retcode(self):
2493
"""finish_bzr_subprocess allows the exit code to be ignored."""
2494
process = StubProcess(err="unknown command", retcode=3)
2495
result = self.finish_bzr_subprocess(process, retcode=None)
2496
self.assertEqual('', result[0])
2497
self.assertContainsRe(result[1], 'unknown command')
2499
def test_finish_subprocess_with_unexpected_retcode(self):
2500
"""finish_bzr_subprocess raises self.failureException if the retcode is
2501
not the expected one.
2503
process = StubProcess(err="unknown command", retcode=3)
2504
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2508
class _DontSpawnProcess(Exception):
2509
"""A simple exception which just allows us to skip unnecessary steps"""
2512
class TestStartBzrSubProcess(tests.TestCase):
2514
def check_popen_state(self):
2515
"""Replace to make assertions when popen is called."""
2517
def _popen(self, *args, **kwargs):
2518
"""Record the command that is run, so that we can ensure it is correct"""
2519
self.check_popen_state()
2520
self._popen_args = args
2521
self._popen_kwargs = kwargs
2522
raise _DontSpawnProcess()
2524
def test_run_bzr_subprocess_no_plugins(self):
2525
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2526
command = self._popen_args[0]
2527
self.assertEqual(sys.executable, command[0])
2528
self.assertEqual(self.get_bzr_path(), command[1])
2529
self.assertEqual(['--no-plugins'], command[2:])
2531
def test_allow_plugins(self):
2532
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2534
command = self._popen_args[0]
2535
self.assertEqual([], command[2:])
2537
def test_set_env(self):
2538
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2540
def check_environment():
2541
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2542
self.check_popen_state = check_environment
2543
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2544
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2545
# not set in theparent
2546
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2548
def test_run_bzr_subprocess_env_del(self):
2549
"""run_bzr_subprocess can remove environment variables too."""
2550
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2551
def check_environment():
2552
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2553
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2554
self.check_popen_state = check_environment
2555
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2556
env_changes={'EXISTANT_ENV_VAR':None})
2557
# Still set in parent
2558
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2559
del os.environ['EXISTANT_ENV_VAR']
2561
def test_env_del_missing(self):
2562
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2563
def check_environment():
2564
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2565
self.check_popen_state = check_environment
2566
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2567
env_changes={'NON_EXISTANT_ENV_VAR':None})
2569
def test_working_dir(self):
2570
"""Test that we can specify the working dir for the child"""
2571
orig_getcwd = osutils.getcwd
2572
orig_chdir = os.chdir
2580
osutils.getcwd = getcwd
2582
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2585
osutils.getcwd = orig_getcwd
2587
os.chdir = orig_chdir
2588
self.assertEqual(['foo', 'current'], chdirs)
2590
def test_get_bzr_path_with_cwd_bzrlib(self):
2591
self.get_source_path = lambda: ""
2592
self.overrideAttr(os.path, "isfile", lambda path: True)
2593
self.assertEqual(self.get_bzr_path(), "bzr")
2596
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2597
"""Tests that really need to do things with an external bzr."""
2599
def test_start_and_stop_bzr_subprocess_send_signal(self):
2600
"""finish_bzr_subprocess raises self.failureException if the retcode is
2601
not the expected one.
2603
self.disable_missing_extensions_warning()
2604
process = self.start_bzr_subprocess(['wait-until-signalled'],
2605
skip_if_plan_to_signal=True)
2606
self.assertEqual('running\n', process.stdout.readline())
2607
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2609
self.assertEqual('', result[0])
2610
self.assertEqual('bzr: interrupted\n', result[1])
2613
class TestFeature(tests.TestCase):
1846
self.fail('Assertion not raised')
1849
class TestFeature(TestCase):
2615
1851
def test_caching(self):
2616
1852
"""Feature._probe is called by the feature at most once."""
2617
class InstrumentedFeature(tests.Feature):
1853
class InstrumentedFeature(Feature):
2618
1854
def __init__(self):
2619
super(InstrumentedFeature, self).__init__()
1855
Feature.__init__(self)
2620
1856
self.calls = []
2621
1857
def _probe(self):
2622
1858
self.calls.append('_probe')