393
350
'transport_readonly_server': 'b',
394
351
'transport_server': 'a',
395
352
'workingtree_format': formats[1]})],
399
class TestTreeProviderAdapter(TestCase):
400
"""Test the setup of tree_implementation tests."""
402
def test_adapted_tests(self):
403
# the tree implementation adapter is meant to setup one instance for
404
# each working tree format, and one additional instance that will
405
# use the default wt format, but create a revision tree for the tests.
406
# this means that the wt ones should have the workingtree_to_test_tree
407
# attribute set to 'return_parameter' and the revision one set to
408
# revision_tree_from_workingtree.
410
from bzrlib.tests.tree_implementations import (
411
TreeTestProviderAdapter,
356
class TestTreeScenarios(tests.TestCase):
358
def test_scenarios(self):
359
# the tree implementation scenario generator is meant to setup one
360
# instance for each working tree format, and one additional instance
361
# that will use the default wt format, but create a revision tree for
362
# the tests. this means that the wt ones should have the
363
# workingtree_to_test_tree attribute set to 'return_parameter' and the
364
# revision one set to revision_tree_from_workingtree.
366
from bzrlib.tests.per_tree import (
367
_dirstate_tree_from_workingtree,
412
371
return_parameter,
413
372
revision_tree_from_workingtree
415
input_test = TestTreeProviderAdapter(
416
"test_adapted_tests")
419
376
formats = [workingtree.WorkingTreeFormat2(),
420
377
workingtree.WorkingTreeFormat3(),]
421
adapter = TreeTestProviderAdapter(server1, server2, formats)
422
suite = adapter.adapt(input_test)
423
tests = list(iter(suite))
424
# XXX We should not have tests fail as we add more scenarios
426
self.assertEqual(7, len(tests))
427
# this must match the default format setp up in
428
# TreeTestProviderAdapter.adapt
429
default_format = workingtree.WorkingTreeFormat3
430
self.assertEqual(tests[0].workingtree_format, formats[0])
431
self.assertEqual(tests[0].bzrdir_format, formats[0]._matchingbzrdir)
432
self.assertEqual(tests[0].transport_server, server1)
433
self.assertEqual(tests[0].transport_readonly_server, server2)
434
self.assertEqual(tests[0]._workingtree_to_test_tree, return_parameter)
435
self.assertEqual(tests[1].workingtree_format, formats[1])
436
self.assertEqual(tests[1].bzrdir_format, formats[1]._matchingbzrdir)
437
self.assertEqual(tests[1].transport_server, server1)
438
self.assertEqual(tests[1].transport_readonly_server, server2)
439
self.assertEqual(tests[1]._workingtree_to_test_tree, return_parameter)
440
self.assertIsInstance(tests[2].workingtree_format, default_format)
441
#self.assertEqual(tests[2].bzrdir_format,
442
# default_format._matchingbzrdir)
443
self.assertEqual(tests[2].transport_server, server1)
444
self.assertEqual(tests[2].transport_readonly_server, server2)
445
self.assertEqual(tests[2]._workingtree_to_test_tree,
446
revision_tree_from_workingtree)
449
class TestInterTreeProviderAdapter(TestCase):
378
scenarios = make_scenarios(server1, server2, formats)
379
self.assertEqual(7, len(scenarios))
380
default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
wt4_format = workingtree.WorkingTreeFormat4()
382
wt5_format = workingtree.WorkingTreeFormat5()
383
expected_scenarios = [
384
('WorkingTreeFormat2',
385
{'bzrdir_format': formats[0]._matchingbzrdir,
386
'transport_readonly_server': 'b',
387
'transport_server': 'a',
388
'workingtree_format': formats[0],
389
'_workingtree_to_test_tree': return_parameter,
391
('WorkingTreeFormat3',
392
{'bzrdir_format': formats[1]._matchingbzrdir,
393
'transport_readonly_server': 'b',
394
'transport_server': 'a',
395
'workingtree_format': formats[1],
396
'_workingtree_to_test_tree': return_parameter,
399
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
400
'bzrdir_format': default_wt_format._matchingbzrdir,
401
'transport_readonly_server': 'b',
402
'transport_server': 'a',
403
'workingtree_format': default_wt_format,
405
('DirStateRevisionTree,WT4',
406
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
407
'bzrdir_format': wt4_format._matchingbzrdir,
408
'transport_readonly_server': 'b',
409
'transport_server': 'a',
410
'workingtree_format': wt4_format,
412
('DirStateRevisionTree,WT5',
413
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
414
'bzrdir_format': wt5_format._matchingbzrdir,
415
'transport_readonly_server': 'b',
416
'transport_server': 'a',
417
'workingtree_format': wt5_format,
420
{'_workingtree_to_test_tree': preview_tree_pre,
421
'bzrdir_format': default_wt_format._matchingbzrdir,
422
'transport_readonly_server': 'b',
423
'transport_server': 'a',
424
'workingtree_format': default_wt_format}),
426
{'_workingtree_to_test_tree': preview_tree_post,
427
'bzrdir_format': default_wt_format._matchingbzrdir,
428
'transport_readonly_server': 'b',
429
'transport_server': 'a',
430
'workingtree_format': default_wt_format}),
432
self.assertEqual(expected_scenarios, scenarios)
435
class TestInterTreeScenarios(tests.TestCase):
450
436
"""A group of tests that test the InterTreeTestAdapter."""
452
def test_adapted_tests(self):
438
def test_scenarios(self):
453
439
# check that constructor parameters are passed through to the adapted
455
441
# for InterTree tests we want the machinery to bring up two trees in
1191
def _patch_get_bzr_source_tree(self):
1192
# Reading from the actual source tree breaks isolation, but we don't
1193
# want to assume that thats *all* that would happen.
1194
self._get_source_tree_calls = []
1196
self._get_source_tree_calls.append("called")
1198
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', new_get)
1208
1200
def test_bench_history(self):
1209
# tests that the running the benchmark produces a history file
1210
# containing a timestamp and the revision id of the bzrlib source which
1212
workingtree = _get_bzr_source_tree()
1201
# tests that the running the benchmark passes bench_history into
1202
# the test result object. We can tell that happens if
1203
# _get_bzr_source_tree is called.
1204
self._patch_get_bzr_source_tree()
1213
1205
test = TestRunner('dummy_test')
1214
1206
output = StringIO()
1215
runner = TextTestRunner(stream=self._log_file, bench_history=output)
1207
runner = tests.TextTestRunner(stream=self._log_file,
1208
bench_history=output)
1216
1209
result = self.run_test_runner(runner, test)
1217
1210
output_string = output.getvalue()
1218
1211
self.assertContainsRe(output_string, "--date [0-9.]+")
1219
if workingtree is not None:
1220
revision_id = workingtree.get_parent_ids()[0]
1221
self.assertEndsWith(output_string.rstrip(), revision_id)
1223
def assertLogDeleted(self, test):
1224
log = test._get_log()
1225
self.assertEqual("DELETED log file to reduce memory footprint", log)
1226
self.assertEqual('', test._log_contents)
1227
self.assertIs(None, test._log_file_name)
1229
def test_success_log_deleted(self):
1230
"""Successful tests have their log deleted"""
1232
class LogTester(TestCase):
1234
def test_success(self):
1235
self.log('this will be removed\n')
1237
sio = cStringIO.StringIO()
1238
runner = TextTestRunner(stream=sio)
1239
test = LogTester('test_success')
1240
result = self.run_test_runner(runner, test)
1242
self.assertLogDeleted(test)
1244
def test_skipped_log_deleted(self):
1245
"""Skipped tests have their log deleted"""
1247
class LogTester(TestCase):
1249
def test_skipped(self):
1250
self.log('this will be removed\n')
1251
raise tests.TestSkipped()
1253
sio = cStringIO.StringIO()
1254
runner = TextTestRunner(stream=sio)
1255
test = LogTester('test_skipped')
1256
result = self.run_test_runner(runner, test)
1258
self.assertLogDeleted(test)
1260
def test_not_aplicable_log_deleted(self):
1261
"""Not applicable tests have their log deleted"""
1263
class LogTester(TestCase):
1265
def test_not_applicable(self):
1266
self.log('this will be removed\n')
1267
raise tests.TestNotApplicable()
1269
sio = cStringIO.StringIO()
1270
runner = TextTestRunner(stream=sio)
1271
test = LogTester('test_not_applicable')
1272
result = self.run_test_runner(runner, test)
1274
self.assertLogDeleted(test)
1276
def test_known_failure_log_deleted(self):
1277
"""Know failure tests have their log deleted"""
1279
class LogTester(TestCase):
1281
def test_known_failure(self):
1282
self.log('this will be removed\n')
1283
raise tests.KnownFailure()
1285
sio = cStringIO.StringIO()
1286
runner = TextTestRunner(stream=sio)
1287
test = LogTester('test_known_failure')
1288
result = self.run_test_runner(runner, test)
1290
self.assertLogDeleted(test)
1292
def test_fail_log_kept(self):
1293
"""Failed tests have their log kept"""
1295
class LogTester(TestCase):
1297
def test_fail(self):
1298
self.log('this will be kept\n')
1299
self.fail('this test fails')
1301
sio = cStringIO.StringIO()
1302
runner = TextTestRunner(stream=sio)
1303
test = LogTester('test_fail')
1304
result = self.run_test_runner(runner, test)
1306
text = sio.getvalue()
1307
self.assertContainsRe(text, 'this will be kept')
1308
self.assertContainsRe(text, 'this test fails')
1310
log = test._get_log()
1311
self.assertContainsRe(log, 'this will be kept')
1312
self.assertEqual(log, test._log_contents)
1314
def test_error_log_kept(self):
1315
"""Tests with errors have their log kept"""
1317
class LogTester(TestCase):
1319
def test_error(self):
1320
self.log('this will be kept\n')
1321
raise ValueError('random exception raised')
1323
sio = cStringIO.StringIO()
1324
runner = TextTestRunner(stream=sio)
1325
test = LogTester('test_error')
1326
result = self.run_test_runner(runner, test)
1328
text = sio.getvalue()
1329
self.assertContainsRe(text, 'this will be kept')
1330
self.assertContainsRe(text, 'random exception raised')
1332
log = test._get_log()
1333
self.assertContainsRe(log, 'this will be kept')
1334
self.assertEqual(log, test._log_contents)
1337
class SampleTestCase(TestCase):
1212
self.assertLength(1, self._get_source_tree_calls)
1214
def test_startTestRun(self):
1215
"""run should call result.startTestRun()"""
1217
class LoggingDecorator(tests.ForwardingResult):
1218
def startTestRun(self):
1219
tests.ForwardingResult.startTestRun(self)
1220
calls.append('startTestRun')
1221
test = unittest.FunctionTestCase(lambda:None)
1223
runner = tests.TextTestRunner(stream=stream,
1224
result_decorators=[LoggingDecorator])
1225
result = self.run_test_runner(runner, test)
1226
self.assertLength(1, calls)
1228
def test_stopTestRun(self):
1229
"""run should call result.stopTestRun()"""
1231
class LoggingDecorator(tests.ForwardingResult):
1232
def stopTestRun(self):
1233
tests.ForwardingResult.stopTestRun(self)
1234
calls.append('stopTestRun')
1235
test = unittest.FunctionTestCase(lambda:None)
1237
runner = tests.TextTestRunner(stream=stream,
1238
result_decorators=[LoggingDecorator])
1239
result = self.run_test_runner(runner, test)
1240
self.assertLength(1, calls)
1243
class SampleTestCase(tests.TestCase):
1339
1245
def _test_pass(self):
1624
1675
def sample_normal_method(self):
1625
1676
"""A undeprecated method."""
1627
@symbol_versioning.deprecated_method(zero_ten)
1678
@deprecated_method(deprecated_in((0, 10, 0)))
1628
1679
def sample_nested_deprecation(self):
1629
1680
return sample_deprecated_function()
1632
class TestExtraAssertions(TestCase):
1683
class TestExtraAssertions(tests.TestCase):
1633
1684
"""Tests for new test assertions in bzrlib test suite"""
1635
1686
def test_assert_isinstance(self):
1636
1687
self.assertIsInstance(2, int)
1637
1688
self.assertIsInstance(u'', basestring)
1638
self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1689
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1690
self.assertEquals(str(e),
1691
"None is an instance of <type 'NoneType'> rather than <type 'int'>")
1639
1692
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1693
e = self.assertRaises(AssertionError,
1694
self.assertIsInstance, None, int, "it's just not")
1695
self.assertEquals(str(e),
1696
"None is an instance of <type 'NoneType'> rather than <type 'int'>"
1641
1699
def test_assertEndsWith(self):
1642
1700
self.assertEndsWith('foo', 'oo')
1643
1701
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1703
def test_assertEqualDiff(self):
1704
e = self.assertRaises(AssertionError,
1705
self.assertEqualDiff, '', '\n')
1706
self.assertEquals(str(e),
1707
# Don't blink ! The '+' applies to the second string
1708
'first string is missing a final newline.\n+ \n')
1709
e = self.assertRaises(AssertionError,
1710
self.assertEqualDiff, '\n', '')
1711
self.assertEquals(str(e),
1712
# Don't blink ! The '-' applies to the second string
1713
'second string is missing a final newline.\n- \n')
1716
class TestDeprecations(tests.TestCase):
1645
1718
def test_applyDeprecated_not_deprecated(self):
1646
1719
sample_object = ApplyDeprecatedHelper()
1647
1720
# calling an undeprecated callable raises an assertion
1648
self.assertRaises(AssertionError, self.applyDeprecated, zero_eleven,
1721
self.assertRaises(AssertionError, self.applyDeprecated,
1722
deprecated_in((0, 11, 0)),
1649
1723
sample_object.sample_normal_method)
1650
self.assertRaises(AssertionError, self.applyDeprecated, zero_eleven,
1724
self.assertRaises(AssertionError, self.applyDeprecated,
1725
deprecated_in((0, 11, 0)),
1651
1726
sample_undeprecated_function, "a param value")
1652
1727
# calling a deprecated callable (function or method) with the wrong
1653
1728
# expected deprecation fails.
1654
self.assertRaises(AssertionError, self.applyDeprecated, zero_ten,
1729
self.assertRaises(AssertionError, self.applyDeprecated,
1730
deprecated_in((0, 10, 0)),
1655
1731
sample_object.sample_deprecated_method, "a param value")
1656
self.assertRaises(AssertionError, self.applyDeprecated, zero_ten,
1732
self.assertRaises(AssertionError, self.applyDeprecated,
1733
deprecated_in((0, 10, 0)),
1657
1734
sample_deprecated_function)
1658
1735
# calling a deprecated callable (function or method) with the right
1659
1736
# expected deprecation returns the functions result.
1660
self.assertEqual("a param value", self.applyDeprecated(zero_eleven,
1737
self.assertEqual("a param value",
1738
self.applyDeprecated(deprecated_in((0, 11, 0)),
1661
1739
sample_object.sample_deprecated_method, "a param value"))
1662
self.assertEqual(2, self.applyDeprecated(zero_eleven,
1740
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
1663
1741
sample_deprecated_function))
1664
1742
# calling a nested deprecation with the wrong deprecation version
1665
# fails even if a deeper nested function was deprecated with the
1743
# fails even if a deeper nested function was deprecated with the
1666
1744
# supplied version.
1667
1745
self.assertRaises(AssertionError, self.applyDeprecated,
1668
zero_eleven, sample_object.sample_nested_deprecation)
1746
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
1669
1747
# calling a nested deprecation with the right deprecation value
1670
1748
# returns the calls result.
1671
self.assertEqual(2, self.applyDeprecated(zero_ten,
1749
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1672
1750
sample_object.sample_nested_deprecation))
1674
1752
def test_callDeprecated(self):
1675
1753
def testfunc(be_deprecated, result=None):
1676
1754
if be_deprecated is True:
1677
symbol_versioning.warn('i am deprecated', DeprecationWarning,
1755
symbol_versioning.warn('i am deprecated', DeprecationWarning,
1680
1758
result = self.callDeprecated(['i am deprecated'], testfunc, True)
1719
1797
tree = self.make_branch_and_memory_tree('a')
1720
1798
self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
1723
class TestSFTPMakeBranchAndTree(TestCaseWithSFTPServer):
1725
def test_make_tree_for_sftp_branch(self):
1726
"""Transports backed by local directories create local trees."""
1800
def test_make_tree_for_local_vfs_backed_transport(self):
1801
# make_branch_and_tree has to use local branch and repositories
1802
# when the vfs transport and local disk are colocated, even if
1803
# a different transport is in use for url generation.
1804
self.transport_server = test_server.FakeVFATServer
1805
self.assertFalse(self.get_url('t1').startswith('file://'))
1728
1806
tree = self.make_branch_and_tree('t1')
1729
1807
base = tree.bzrdir.root_transport.base
1730
self.failIf(base.startswith('sftp'),
1731
'base %r is on sftp but should be local' % base)
1808
self.assertStartsWith(base, 'file://')
1732
1809
self.assertEquals(tree.bzrdir.root_transport,
1733
1810
tree.branch.bzrdir.root_transport)
1734
1811
self.assertEquals(tree.bzrdir.root_transport,
1735
1812
tree.branch.repository.bzrdir.root_transport)
1738
class TestSelftest(TestCase):
1815
class SelfTestHelper:
1817
def run_selftest(self, **kwargs):
1818
"""Run selftest returning its output."""
1820
old_transport = bzrlib.tests.default_transport
1821
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
1822
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
1824
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
1826
bzrlib.tests.default_transport = old_transport
1827
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
1832
class TestSelftest(tests.TestCase, SelfTestHelper):
1739
1833
"""Tests of bzrlib.tests.selftest."""
1741
1835
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
1742
1836
factory_called = []
1744
1838
factory_called.append(True)
1839
return TestUtil.TestSuite()
1746
1840
out = StringIO()
1747
1841
err = StringIO()
1748
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
1842
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
1749
1843
test_suite_factory=factory)
1750
1844
self.assertEqual([True], factory_called)
1753
class TestKnownFailure(TestCase):
1755
def test_known_failure(self):
1756
"""Check that KnownFailure is defined appropriately."""
1757
# a KnownFailure is an assertion error for compatability with unaware
1759
self.assertIsInstance(KnownFailure(""), AssertionError)
1761
def test_expect_failure(self):
1763
self.expectFailure("Doomed to failure", self.assertTrue, False)
1764
except KnownFailure, e:
1765
self.assertEqual('Doomed to failure', e.args[0])
1767
self.expectFailure("Doomed to failure", self.assertTrue, True)
1768
except AssertionError, e:
1769
self.assertEqual('Unexpected success. Should have failed:'
1770
' Doomed to failure', e.args[0])
1847
"""A test suite factory."""
1848
class Test(tests.TestCase):
1855
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
1857
def test_list_only(self):
1858
output = self.run_selftest(test_suite_factory=self.factory,
1860
self.assertEqual(3, len(output.readlines()))
1862
def test_list_only_filtered(self):
1863
output = self.run_selftest(test_suite_factory=self.factory,
1864
list_only=True, pattern="Test.b")
1865
self.assertEndsWith(output.getvalue(), "Test.b\n")
1866
self.assertLength(1, output.readlines())
1868
def test_list_only_excludes(self):
1869
output = self.run_selftest(test_suite_factory=self.factory,
1870
list_only=True, exclude_pattern="Test.b")
1871
self.assertNotContainsRe("Test.b", output.getvalue())
1872
self.assertLength(2, output.readlines())
1874
def test_lsprof_tests(self):
1875
self.requireFeature(test_lsprof.LSProfFeature)
1878
def __call__(test, result):
1880
def run(test, result):
1881
self.assertIsInstance(result, tests.ForwardingResult)
1882
calls.append("called")
1883
def countTestCases(self):
1885
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
1886
self.assertLength(1, calls)
1888
def test_random(self):
1889
# test randomising by listing a number of tests.
1890
output_123 = self.run_selftest(test_suite_factory=self.factory,
1891
list_only=True, random_seed="123")
1892
output_234 = self.run_selftest(test_suite_factory=self.factory,
1893
list_only=True, random_seed="234")
1894
self.assertNotEqual(output_123, output_234)
1895
# "Randominzing test order..\n\n
1896
self.assertLength(5, output_123.readlines())
1897
self.assertLength(5, output_234.readlines())
1899
def test_random_reuse_is_same_order(self):
1900
# test randomising by listing a number of tests.
1901
expected = self.run_selftest(test_suite_factory=self.factory,
1902
list_only=True, random_seed="123")
1903
repeated = self.run_selftest(test_suite_factory=self.factory,
1904
list_only=True, random_seed="123")
1905
self.assertEqual(expected.getvalue(), repeated.getvalue())
1907
def test_runner_class(self):
1908
self.requireFeature(features.subunit)
1909
from subunit import ProtocolTestCase
1910
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
1911
test_suite_factory=self.factory)
1912
test = ProtocolTestCase(stream)
1913
result = unittest.TestResult()
1915
self.assertEqual(3, result.testsRun)
1917
def test_starting_with_single_argument(self):
1918
output = self.run_selftest(test_suite_factory=self.factory,
1919
starting_with=['bzrlib.tests.test_selftest.Test.a'],
1921
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
1924
def test_starting_with_multiple_argument(self):
1925
output = self.run_selftest(test_suite_factory=self.factory,
1926
starting_with=['bzrlib.tests.test_selftest.Test.a',
1927
'bzrlib.tests.test_selftest.Test.b'],
1929
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
1930
'bzrlib.tests.test_selftest.Test.b\n',
1933
def check_transport_set(self, transport_server):
1934
captured_transport = []
1935
def seen_transport(a_transport):
1936
captured_transport.append(a_transport)
1937
class Capture(tests.TestCase):
1939
seen_transport(bzrlib.tests.default_transport)
1941
return TestUtil.TestSuite([Capture("a")])
1942
self.run_selftest(transport=transport_server, test_suite_factory=factory)
1943
self.assertEqual(transport_server, captured_transport[0])
1945
def test_transport_sftp(self):
1946
self.requireFeature(features.paramiko)
1947
from bzrlib.tests import stub_sftp
1948
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
1950
def test_transport_memory(self):
1951
self.check_transport_set(memory.MemoryServer)
1954
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1955
# Does IO: reads test.list
1957
def test_load_list(self):
1958
# Provide a list with one test - this test.
1959
test_id_line = '%s\n' % self.id()
1960
self.build_tree_contents([('test.list', test_id_line)])
1961
# And generate a list of the tests in the suite.
1962
stream = self.run_selftest(load_list='test.list', list_only=True)
1963
self.assertEqual(test_id_line, stream.getvalue())
1965
def test_load_unknown(self):
1966
# Provide a list with one test - this test.
1967
# And generate a list of the tests in the suite.
1968
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1969
load_list='missing file name', list_only=True)
1972
class TestRunBzr(tests.TestCase):
1977
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
1979
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1981
Attempts to run bzr from inside this class don't actually run it.
1983
We test how run_bzr actually invokes bzr in another location. Here we
1984
only need to test that it passes the right parameters to run_bzr.
1986
self.argv = list(argv)
1987
self.retcode = retcode
1988
self.encoding = encoding
1990
self.working_dir = working_dir
1991
return self.retcode, self.out, self.err
1993
def test_run_bzr_error(self):
1994
self.out = "It sure does!\n"
1995
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
1996
self.assertEqual(['rocks'], self.argv)
1997
self.assertEqual(34, self.retcode)
1998
self.assertEqual('It sure does!\n', out)
1999
self.assertEquals(out, self.out)
2000
self.assertEqual('', err)
2001
self.assertEquals(err, self.err)
2003
def test_run_bzr_error_regexes(self):
2005
self.err = "bzr: ERROR: foobarbaz is not versioned"
2006
out, err = self.run_bzr_error(
2007
["bzr: ERROR: foobarbaz is not versioned"],
2008
['file-id', 'foobarbaz'])
2010
def test_encoding(self):
2011
"""Test that run_bzr passes encoding to _run_bzr_core"""
2012
self.run_bzr('foo bar')
2013
self.assertEqual(None, self.encoding)
2014
self.assertEqual(['foo', 'bar'], self.argv)
2016
self.run_bzr('foo bar', encoding='baz')
2017
self.assertEqual('baz', self.encoding)
2018
self.assertEqual(['foo', 'bar'], self.argv)
2020
def test_retcode(self):
2021
"""Test that run_bzr passes retcode to _run_bzr_core"""
2022
# Default is retcode == 0
2023
self.run_bzr('foo bar')
2024
self.assertEqual(0, self.retcode)
2025
self.assertEqual(['foo', 'bar'], self.argv)
2027
self.run_bzr('foo bar', retcode=1)
2028
self.assertEqual(1, self.retcode)
2029
self.assertEqual(['foo', 'bar'], self.argv)
2031
self.run_bzr('foo bar', retcode=None)
2032
self.assertEqual(None, self.retcode)
2033
self.assertEqual(['foo', 'bar'], self.argv)
2035
self.run_bzr(['foo', 'bar'], retcode=3)
2036
self.assertEqual(3, self.retcode)
2037
self.assertEqual(['foo', 'bar'], self.argv)
2039
def test_stdin(self):
2040
# test that the stdin keyword to run_bzr is passed through to
2041
# _run_bzr_core as-is. We do this by overriding
2042
# _run_bzr_core in this class, and then calling run_bzr,
2043
# which is a convenience function for _run_bzr_core, so
2045
self.run_bzr('foo bar', stdin='gam')
2046
self.assertEqual('gam', self.stdin)
2047
self.assertEqual(['foo', 'bar'], self.argv)
2049
self.run_bzr('foo bar', stdin='zippy')
2050
self.assertEqual('zippy', self.stdin)
2051
self.assertEqual(['foo', 'bar'], self.argv)
2053
def test_working_dir(self):
2054
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2055
self.run_bzr('foo bar')
2056
self.assertEqual(None, self.working_dir)
2057
self.assertEqual(['foo', 'bar'], self.argv)
2059
self.run_bzr('foo bar', working_dir='baz')
2060
self.assertEqual('baz', self.working_dir)
2061
self.assertEqual(['foo', 'bar'], self.argv)
2063
def test_reject_extra_keyword_arguments(self):
2064
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2065
error_regex=['error message'])
2068
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2069
# Does IO when testing the working_dir parameter.
2071
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2072
a_callable=None, *args, **kwargs):
2074
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2075
self.factory = bzrlib.ui.ui_factory
2076
self.working_dir = osutils.getcwd()
2077
stdout.write('foo\n')
2078
stderr.write('bar\n')
2081
def test_stdin(self):
2082
# test that the stdin keyword to _run_bzr_core is passed through to
2083
# apply_redirected as a StringIO. We do this by overriding
2084
# apply_redirected in this class, and then calling _run_bzr_core,
2085
# which calls apply_redirected.
2086
self.run_bzr(['foo', 'bar'], stdin='gam')
2087
self.assertEqual('gam', self.stdin.read())
2088
self.assertTrue(self.stdin is self.factory_stdin)
2089
self.run_bzr(['foo', 'bar'], stdin='zippy')
2090
self.assertEqual('zippy', self.stdin.read())
2091
self.assertTrue(self.stdin is self.factory_stdin)
2093
def test_ui_factory(self):
2094
# each invocation of self.run_bzr should get its
2095
# own UI factory, which is an instance of TestUIFactory,
2096
# with stdin, stdout and stderr attached to the stdin,
2097
# stdout and stderr of the invoked run_bzr
2098
current_factory = bzrlib.ui.ui_factory
2099
self.run_bzr(['foo'])
2100
self.failIf(current_factory is self.factory)
2101
self.assertNotEqual(sys.stdout, self.factory.stdout)
2102
self.assertNotEqual(sys.stderr, self.factory.stderr)
2103
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2104
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2105
self.assertIsInstance(self.factory, tests.TestUIFactory)
2107
def test_working_dir(self):
2108
self.build_tree(['one/', 'two/'])
2109
cwd = osutils.getcwd()
2111
# Default is to work in the current directory
2112
self.run_bzr(['foo', 'bar'])
2113
self.assertEqual(cwd, self.working_dir)
2115
self.run_bzr(['foo', 'bar'], working_dir=None)
2116
self.assertEqual(cwd, self.working_dir)
2118
# The function should be run in the alternative directory
2119
# but afterwards the current working dir shouldn't be changed
2120
self.run_bzr(['foo', 'bar'], working_dir='one')
2121
self.assertNotEqual(cwd, self.working_dir)
2122
self.assertEndsWith(self.working_dir, 'one')
2123
self.assertEqual(cwd, osutils.getcwd())
2125
self.run_bzr(['foo', 'bar'], working_dir='two')
2126
self.assertNotEqual(cwd, self.working_dir)
2127
self.assertEndsWith(self.working_dir, 'two')
2128
self.assertEqual(cwd, osutils.getcwd())
2131
class StubProcess(object):
2132
"""A stub process for testing run_bzr_subprocess."""
2134
def __init__(self, out="", err="", retcode=0):
2137
self.returncode = retcode
2139
def communicate(self):
2140
return self.out, self.err
2143
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2144
"""Base class for tests testing how we might run bzr."""
2147
tests.TestCaseWithTransport.setUp(self)
2148
self.subprocess_calls = []
2150
def start_bzr_subprocess(self, process_args, env_changes=None,
2151
skip_if_plan_to_signal=False,
2153
allow_plugins=False):
2154
"""capture what run_bzr_subprocess tries to do."""
2155
self.subprocess_calls.append({'process_args':process_args,
2156
'env_changes':env_changes,
2157
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2158
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2159
return self.next_subprocess
2162
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2164
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2165
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2167
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2168
that will return static results. This assertion method populates those
2169
results and also checks the arguments run_bzr_subprocess generates.
2171
self.next_subprocess = process
2173
result = self.run_bzr_subprocess(*args, **kwargs)
2175
self.next_subprocess = None
2176
for key, expected in expected_args.iteritems():
2177
self.assertEqual(expected, self.subprocess_calls[-1][key])
1772
self.fail('Assertion not raised')
1775
class TestFeature(TestCase):
2180
self.next_subprocess = None
2181
for key, expected in expected_args.iteritems():
2182
self.assertEqual(expected, self.subprocess_calls[-1][key])
2185
def test_run_bzr_subprocess(self):
2186
"""The run_bzr_helper_external command behaves nicely."""
2187
self.assertRunBzrSubprocess({'process_args':['--version']},
2188
StubProcess(), '--version')
2189
self.assertRunBzrSubprocess({'process_args':['--version']},
2190
StubProcess(), ['--version'])
2191
# retcode=None disables retcode checking
2192
result = self.assertRunBzrSubprocess({},
2193
StubProcess(retcode=3), '--version', retcode=None)
2194
result = self.assertRunBzrSubprocess({},
2195
StubProcess(out="is free software"), '--version')
2196
self.assertContainsRe(result[0], 'is free software')
2197
# Running a subcommand that is missing errors
2198
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2199
{'process_args':['--versionn']}, StubProcess(retcode=3),
2201
# Unless it is told to expect the error from the subprocess
2202
result = self.assertRunBzrSubprocess({},
2203
StubProcess(retcode=3), '--versionn', retcode=3)
2204
# Or to ignore retcode checking
2205
result = self.assertRunBzrSubprocess({},
2206
StubProcess(err="unknown command", retcode=3), '--versionn',
2208
self.assertContainsRe(result[1], 'unknown command')
2210
def test_env_change_passes_through(self):
2211
self.assertRunBzrSubprocess(
2212
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2214
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2216
def test_no_working_dir_passed_as_None(self):
2217
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2219
def test_no_working_dir_passed_through(self):
2220
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2223
def test_run_bzr_subprocess_no_plugins(self):
2224
self.assertRunBzrSubprocess({'allow_plugins': False},
2227
def test_allow_plugins(self):
2228
self.assertRunBzrSubprocess({'allow_plugins': True},
2229
StubProcess(), '', allow_plugins=True)
2232
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2234
def test_finish_bzr_subprocess_with_error(self):
2235
"""finish_bzr_subprocess allows specification of the desired exit code.
2237
process = StubProcess(err="unknown command", retcode=3)
2238
result = self.finish_bzr_subprocess(process, retcode=3)
2239
self.assertEqual('', result[0])
2240
self.assertContainsRe(result[1], 'unknown command')
2242
def test_finish_bzr_subprocess_ignoring_retcode(self):
2243
"""finish_bzr_subprocess allows the exit code to be ignored."""
2244
process = StubProcess(err="unknown command", retcode=3)
2245
result = self.finish_bzr_subprocess(process, retcode=None)
2246
self.assertEqual('', result[0])
2247
self.assertContainsRe(result[1], 'unknown command')
2249
def test_finish_subprocess_with_unexpected_retcode(self):
2250
"""finish_bzr_subprocess raises self.failureException if the retcode is
2251
not the expected one.
2253
process = StubProcess(err="unknown command", retcode=3)
2254
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2258
class _DontSpawnProcess(Exception):
2259
"""A simple exception which just allows us to skip unnecessary steps"""
2262
class TestStartBzrSubProcess(tests.TestCase):
2264
def check_popen_state(self):
2265
"""Replace to make assertions when popen is called."""
2267
def _popen(self, *args, **kwargs):
2268
"""Record the command that is run, so that we can ensure it is correct"""
2269
self.check_popen_state()
2270
self._popen_args = args
2271
self._popen_kwargs = kwargs
2272
raise _DontSpawnProcess()
2274
def test_run_bzr_subprocess_no_plugins(self):
2275
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2276
command = self._popen_args[0]
2277
self.assertEqual(sys.executable, command[0])
2278
self.assertEqual(self.get_bzr_path(), command[1])
2279
self.assertEqual(['--no-plugins'], command[2:])
2281
def test_allow_plugins(self):
2282
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2284
command = self._popen_args[0]
2285
self.assertEqual([], command[2:])
2287
def test_set_env(self):
2288
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2290
def check_environment():
2291
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2292
self.check_popen_state = check_environment
2293
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2294
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2295
# not set in theparent
2296
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2298
def test_run_bzr_subprocess_env_del(self):
2299
"""run_bzr_subprocess can remove environment variables too."""
2300
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2301
def check_environment():
2302
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2304
self.check_popen_state = check_environment
2305
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2306
env_changes={'EXISTANT_ENV_VAR':None})
2307
# Still set in parent
2308
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2309
del os.environ['EXISTANT_ENV_VAR']
2311
def test_env_del_missing(self):
2312
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2313
def check_environment():
2314
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
self.check_popen_state = check_environment
2316
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2317
env_changes={'NON_EXISTANT_ENV_VAR':None})
2319
def test_working_dir(self):
2320
"""Test that we can specify the working dir for the child"""
2321
orig_getcwd = osutils.getcwd
2322
orig_chdir = os.chdir
2330
osutils.getcwd = getcwd
2332
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2335
osutils.getcwd = orig_getcwd
2337
os.chdir = orig_chdir
2338
self.assertEqual(['foo', 'current'], chdirs)
2341
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2342
"""Tests that really need to do things with an external bzr."""
2344
def test_start_and_stop_bzr_subprocess_send_signal(self):
2345
"""finish_bzr_subprocess raises self.failureException if the retcode is
2346
not the expected one.
2348
self.disable_missing_extensions_warning()
2349
process = self.start_bzr_subprocess(['wait-until-signalled'],
2350
skip_if_plan_to_signal=True)
2351
self.assertEqual('running\n', process.stdout.readline())
2352
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2354
self.assertEqual('', result[0])
2355
self.assertEqual('bzr: interrupted\n', result[1])
2358
class TestFeature(tests.TestCase):
1777
2360
def test_caching(self):
1778
2361
"""Feature._probe is called by the feature at most once."""
1779
class InstrumentedFeature(Feature):
2362
class InstrumentedFeature(tests.Feature):
1780
2363
def __init__(self):
1781
Feature.__init__(self)
2364
super(InstrumentedFeature, self).__init__()
1782
2365
self.calls = []
1783
2366
def _probe(self):
1784
2367
self.calls.append('_probe')