1770
1789
test_suite_factory=factory)
1771
1790
self.assertEqual([True], factory_called)
1793
"""A test suite factory."""
1794
class Test(tests.TestCase):
1801
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
1803
def test_list_only(self):
1804
output = self.run_selftest(test_suite_factory=self.factory,
1806
self.assertEqual(3, len(output.readlines()))
1808
def test_list_only_filtered(self):
1809
output = self.run_selftest(test_suite_factory=self.factory,
1810
list_only=True, pattern="Test.b")
1811
self.assertEndsWith(output.getvalue(), "Test.b\n")
1812
self.assertLength(1, output.readlines())
1814
def test_list_only_excludes(self):
1815
output = self.run_selftest(test_suite_factory=self.factory,
1816
list_only=True, exclude_pattern="Test.b")
1817
self.assertNotContainsRe("Test.b", output.getvalue())
1818
self.assertLength(2, output.readlines())
1820
def test_random(self):
1821
# test randomising by listing a number of tests.
1822
output_123 = self.run_selftest(test_suite_factory=self.factory,
1823
list_only=True, random_seed="123")
1824
output_234 = self.run_selftest(test_suite_factory=self.factory,
1825
list_only=True, random_seed="234")
1826
self.assertNotEqual(output_123, output_234)
1827
# "Randominzing test order..\n\n
1828
self.assertLength(5, output_123.readlines())
1829
self.assertLength(5, output_234.readlines())
1831
def test_random_reuse_is_same_order(self):
1832
# test randomising by listing a number of tests.
1833
expected = self.run_selftest(test_suite_factory=self.factory,
1834
list_only=True, random_seed="123")
1835
repeated = self.run_selftest(test_suite_factory=self.factory,
1836
list_only=True, random_seed="123")
1837
self.assertEqual(expected.getvalue(), repeated.getvalue())
1839
def test_runner_class(self):
1840
self.requireFeature(SubUnitFeature)
1841
from subunit import ProtocolTestCase
1842
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
1843
test_suite_factory=self.factory)
1844
test = ProtocolTestCase(stream)
1845
result = unittest.TestResult()
1847
self.assertEqual(3, result.testsRun)
1849
def test_starting_with_single_argument(self):
1850
output = self.run_selftest(test_suite_factory=self.factory,
1851
starting_with=['bzrlib.tests.test_selftest.Test.a'],
1853
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
1856
def test_starting_with_multiple_argument(self):
1857
output = self.run_selftest(test_suite_factory=self.factory,
1858
starting_with=['bzrlib.tests.test_selftest.Test.a',
1859
'bzrlib.tests.test_selftest.Test.b'],
1861
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
1862
'bzrlib.tests.test_selftest.Test.b\n',
1865
def check_transport_set(self, transport_server):
1866
captured_transport = []
1867
def seen_transport(a_transport):
1868
captured_transport.append(a_transport)
1869
class Capture(tests.TestCase):
1871
seen_transport(bzrlib.tests.default_transport)
1873
return TestUtil.TestSuite([Capture("a")])
1874
self.run_selftest(transport=transport_server, test_suite_factory=factory)
1875
self.assertEqual(transport_server, captured_transport[0])
1877
def test_transport_sftp(self):
1879
import bzrlib.transport.sftp
1880
except ParamikoNotPresent:
1881
raise TestSkipped("Paramiko not present")
1882
self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
1884
def test_transport_memory(self):
1885
self.check_transport_set(bzrlib.transport.memory.MemoryServer)
1888
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1889
# Does IO: reads test.list
1891
def test_load_list(self):
1892
# Provide a list with one test - this test.
1893
test_id_line = '%s\n' % self.id()
1894
self.build_tree_contents([('test.list', test_id_line)])
1895
# And generate a list of the tests in the suite.
1896
stream = self.run_selftest(load_list='test.list', list_only=True)
1897
self.assertEqual(test_id_line, stream.getvalue())
1899
def test_load_unknown(self):
1900
# Provide a list with one test - this test.
1901
# And generate a list of the tests in the suite.
1902
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1903
load_list='missing file name', list_only=True)
1906
class TestRunBzr(tests.TestCase):
1911
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
1913
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1915
Attempts to run bzr from inside this class don't actually run it.
1917
We test how run_bzr actually invokes bzr in another location.
1918
Here we only need to test that it is run_bzr passes the right
1919
parameters to run_bzr.
1921
self.argv = list(argv)
1922
self.retcode = retcode
1923
self.encoding = encoding
1925
self.working_dir = working_dir
1926
return self.out, self.err
1928
def test_run_bzr_error(self):
1929
self.out = "It sure does!\n"
1930
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
1931
self.assertEqual(['rocks'], self.argv)
1932
self.assertEqual(34, self.retcode)
1933
self.assertEqual(out, 'It sure does!\n')
1935
def test_run_bzr_error_regexes(self):
1937
self.err = "bzr: ERROR: foobarbaz is not versioned"
1938
out, err = self.run_bzr_error(
1939
["bzr: ERROR: foobarbaz is not versioned"],
1940
['file-id', 'foobarbaz'])
1942
def test_encoding(self):
1943
"""Test that run_bzr passes encoding to _run_bzr_core"""
1944
self.run_bzr('foo bar')
1945
self.assertEqual(None, self.encoding)
1946
self.assertEqual(['foo', 'bar'], self.argv)
1948
self.run_bzr('foo bar', encoding='baz')
1949
self.assertEqual('baz', self.encoding)
1950
self.assertEqual(['foo', 'bar'], self.argv)
1952
def test_retcode(self):
1953
"""Test that run_bzr passes retcode to _run_bzr_core"""
1954
# Default is retcode == 0
1955
self.run_bzr('foo bar')
1956
self.assertEqual(0, self.retcode)
1957
self.assertEqual(['foo', 'bar'], self.argv)
1959
self.run_bzr('foo bar', retcode=1)
1960
self.assertEqual(1, self.retcode)
1961
self.assertEqual(['foo', 'bar'], self.argv)
1963
self.run_bzr('foo bar', retcode=None)
1964
self.assertEqual(None, self.retcode)
1965
self.assertEqual(['foo', 'bar'], self.argv)
1967
self.run_bzr(['foo', 'bar'], retcode=3)
1968
self.assertEqual(3, self.retcode)
1969
self.assertEqual(['foo', 'bar'], self.argv)
1971
def test_stdin(self):
1972
# test that the stdin keyword to run_bzr is passed through to
1973
# _run_bzr_core as-is. We do this by overriding
1974
# _run_bzr_core in this class, and then calling run_bzr,
1975
# which is a convenience function for _run_bzr_core, so
1977
self.run_bzr('foo bar', stdin='gam')
1978
self.assertEqual('gam', self.stdin)
1979
self.assertEqual(['foo', 'bar'], self.argv)
1981
self.run_bzr('foo bar', stdin='zippy')
1982
self.assertEqual('zippy', self.stdin)
1983
self.assertEqual(['foo', 'bar'], self.argv)
1985
def test_working_dir(self):
1986
"""Test that run_bzr passes working_dir to _run_bzr_core"""
1987
self.run_bzr('foo bar')
1988
self.assertEqual(None, self.working_dir)
1989
self.assertEqual(['foo', 'bar'], self.argv)
1991
self.run_bzr('foo bar', working_dir='baz')
1992
self.assertEqual('baz', self.working_dir)
1993
self.assertEqual(['foo', 'bar'], self.argv)
1995
def test_reject_extra_keyword_arguments(self):
1996
self.assertRaises(TypeError, self.run_bzr, "foo bar",
1997
error_regex=['error message'])
2000
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2001
# Does IO when testing the working_dir parameter.
2003
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2004
a_callable=None, *args, **kwargs):
2006
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2007
self.factory = bzrlib.ui.ui_factory
2008
self.working_dir = osutils.getcwd()
2009
stdout.write('foo\n')
2010
stderr.write('bar\n')
2013
def test_stdin(self):
2014
# test that the stdin keyword to _run_bzr_core is passed through to
2015
# apply_redirected as a StringIO. We do this by overriding
2016
# apply_redirected in this class, and then calling _run_bzr_core,
2017
# which calls apply_redirected.
2018
self.run_bzr(['foo', 'bar'], stdin='gam')
2019
self.assertEqual('gam', self.stdin.read())
2020
self.assertTrue(self.stdin is self.factory_stdin)
2021
self.run_bzr(['foo', 'bar'], stdin='zippy')
2022
self.assertEqual('zippy', self.stdin.read())
2023
self.assertTrue(self.stdin is self.factory_stdin)
2025
def test_ui_factory(self):
2026
# each invocation of self.run_bzr should get its
2027
# own UI factory, which is an instance of TestUIFactory,
2028
# with stdin, stdout and stderr attached to the stdin,
2029
# stdout and stderr of the invoked run_bzr
2030
current_factory = bzrlib.ui.ui_factory
2031
self.run_bzr(['foo'])
2032
self.failIf(current_factory is self.factory)
2033
self.assertNotEqual(sys.stdout, self.factory.stdout)
2034
self.assertNotEqual(sys.stderr, self.factory.stderr)
2035
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2036
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2037
self.assertIsInstance(self.factory, tests.TestUIFactory)
2039
def test_working_dir(self):
2040
self.build_tree(['one/', 'two/'])
2041
cwd = osutils.getcwd()
2043
# Default is to work in the current directory
2044
self.run_bzr(['foo', 'bar'])
2045
self.assertEqual(cwd, self.working_dir)
2047
self.run_bzr(['foo', 'bar'], working_dir=None)
2048
self.assertEqual(cwd, self.working_dir)
2050
# The function should be run in the alternative directory
2051
# but afterwards the current working dir shouldn't be changed
2052
self.run_bzr(['foo', 'bar'], working_dir='one')
2053
self.assertNotEqual(cwd, self.working_dir)
2054
self.assertEndsWith(self.working_dir, 'one')
2055
self.assertEqual(cwd, osutils.getcwd())
2057
self.run_bzr(['foo', 'bar'], working_dir='two')
2058
self.assertNotEqual(cwd, self.working_dir)
2059
self.assertEndsWith(self.working_dir, 'two')
2060
self.assertEqual(cwd, osutils.getcwd())
2063
class StubProcess(object):
2064
"""A stub process for testing run_bzr_subprocess."""
2066
def __init__(self, out="", err="", retcode=0):
2069
self.returncode = retcode
2071
def communicate(self):
2072
return self.out, self.err
2075
class TestRunBzrSubprocess(tests.TestCaseWithTransport):
2078
tests.TestCaseWithTransport.setUp(self)
2079
self.subprocess_calls = []
2081
def start_bzr_subprocess(self, process_args, env_changes=None,
2082
skip_if_plan_to_signal=False,
2084
allow_plugins=False):
2085
"""capture what run_bzr_subprocess tries to do."""
2086
self.subprocess_calls.append({'process_args':process_args,
2087
'env_changes':env_changes,
2088
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2089
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2090
return self.next_subprocess
2092
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2093
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2095
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2096
that will return static results. This assertion method populates those
2097
results and also checks the arguments run_bzr_subprocess generates.
2099
self.next_subprocess = process
2101
result = self.run_bzr_subprocess(*args, **kwargs)
2103
self.next_subprocess = None
2104
for key, expected in expected_args.iteritems():
2105
self.assertEqual(expected, self.subprocess_calls[-1][key])
2108
self.next_subprocess = None
2109
for key, expected in expected_args.iteritems():
2110
self.assertEqual(expected, self.subprocess_calls[-1][key])
2113
def test_run_bzr_subprocess(self):
2114
"""The run_bzr_helper_external command behaves nicely."""
2115
self.assertRunBzrSubprocess({'process_args':['--version']},
2116
StubProcess(), '--version')
2117
self.assertRunBzrSubprocess({'process_args':['--version']},
2118
StubProcess(), ['--version'])
2119
# retcode=None disables retcode checking
2120
result = self.assertRunBzrSubprocess({},
2121
StubProcess(retcode=3), '--version', retcode=None)
2122
result = self.assertRunBzrSubprocess({},
2123
StubProcess(out="is free software"), '--version')
2124
self.assertContainsRe(result[0], 'is free software')
2125
# Running a subcommand that is missing errors
2126
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2127
{'process_args':['--versionn']}, StubProcess(retcode=3),
2129
# Unless it is told to expect the error from the subprocess
2130
result = self.assertRunBzrSubprocess({},
2131
StubProcess(retcode=3), '--versionn', retcode=3)
2132
# Or to ignore retcode checking
2133
result = self.assertRunBzrSubprocess({},
2134
StubProcess(err="unknown command", retcode=3), '--versionn',
2136
self.assertContainsRe(result[1], 'unknown command')
2138
def test_env_change_passes_through(self):
2139
self.assertRunBzrSubprocess(
2140
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2142
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2144
def test_no_working_dir_passed_as_None(self):
2145
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2147
def test_no_working_dir_passed_through(self):
2148
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2151
def test_run_bzr_subprocess_no_plugins(self):
2152
self.assertRunBzrSubprocess({'allow_plugins': False},
2155
def test_allow_plugins(self):
2156
self.assertRunBzrSubprocess({'allow_plugins': True},
2157
StubProcess(), '', allow_plugins=True)
2160
class _DontSpawnProcess(Exception):
2161
"""A simple exception which just allows us to skip unnecessary steps"""
2164
class TestStartBzrSubProcess(tests.TestCase):
2166
def check_popen_state(self):
2167
"""Replace to make assertions when popen is called."""
2169
def _popen(self, *args, **kwargs):
2170
"""Record the command that is run, so that we can ensure it is correct"""
2171
self.check_popen_state()
2172
self._popen_args = args
2173
self._popen_kwargs = kwargs
2174
raise _DontSpawnProcess()
2176
def test_run_bzr_subprocess_no_plugins(self):
2177
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2178
command = self._popen_args[0]
2179
self.assertEqual(sys.executable, command[0])
2180
self.assertEqual(self.get_bzr_path(), command[1])
2181
self.assertEqual(['--no-plugins'], command[2:])
2183
def test_allow_plugins(self):
2184
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2186
command = self._popen_args[0]
2187
self.assertEqual([], command[2:])
2189
def test_set_env(self):
2190
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2192
def check_environment():
2193
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2194
self.check_popen_state = check_environment
2195
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2196
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2197
# not set in theparent
2198
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2200
def test_run_bzr_subprocess_env_del(self):
2201
"""run_bzr_subprocess can remove environment variables too."""
2202
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2203
def check_environment():
2204
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2205
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2206
self.check_popen_state = check_environment
2207
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2208
env_changes={'EXISTANT_ENV_VAR':None})
2209
# Still set in parent
2210
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2211
del os.environ['EXISTANT_ENV_VAR']
2213
def test_env_del_missing(self):
2214
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2215
def check_environment():
2216
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2217
self.check_popen_state = check_environment
2218
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2219
env_changes={'NON_EXISTANT_ENV_VAR':None})
2221
def test_working_dir(self):
2222
"""Test that we can specify the working dir for the child"""
2223
orig_getcwd = osutils.getcwd
2224
orig_chdir = os.chdir
2232
osutils.getcwd = getcwd
2234
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2237
osutils.getcwd = orig_getcwd
2239
os.chdir = orig_chdir
2240
self.assertEqual(['foo', 'current'], chdirs)
2243
class TestBzrSubprocess(tests.TestCaseWithTransport):
2245
def test_start_and_stop_bzr_subprocess(self):
2246
"""We can start and perform other test actions while that process is
2249
process = self.start_bzr_subprocess(['--version'])
2250
result = self.finish_bzr_subprocess(process)
2251
self.assertContainsRe(result[0], 'is free software')
2252
self.assertEqual('', result[1])
2254
def test_start_and_stop_bzr_subprocess_with_error(self):
2255
"""finish_bzr_subprocess allows specification of the desired exit code.
2257
process = self.start_bzr_subprocess(['--versionn'])
2258
result = self.finish_bzr_subprocess(process, retcode=3)
2259
self.assertEqual('', result[0])
2260
self.assertContainsRe(result[1], 'unknown command')
2262
def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
2263
"""finish_bzr_subprocess allows the exit code to be ignored."""
2264
process = self.start_bzr_subprocess(['--versionn'])
2265
result = self.finish_bzr_subprocess(process, retcode=None)
2266
self.assertEqual('', result[0])
2267
self.assertContainsRe(result[1], 'unknown command')
2269
def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
2270
"""finish_bzr_subprocess raises self.failureException if the retcode is
2271
not the expected one.
2273
process = self.start_bzr_subprocess(['--versionn'])
2274
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2277
def test_start_and_stop_bzr_subprocess_send_signal(self):
2278
"""finish_bzr_subprocess raises self.failureException if the retcode is
2279
not the expected one.
2281
process = self.start_bzr_subprocess(['wait-until-signalled'],
2282
skip_if_plan_to_signal=True)
2283
self.assertEqual('running\n', process.stdout.readline())
2284
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2286
self.assertEqual('', result[0])
2287
self.assertEqual('bzr: interrupted\n', result[1])
2289
def test_start_and_stop_working_dir(self):
2290
cwd = osutils.getcwd()
2291
self.make_branch_and_tree('one')
2292
process = self.start_bzr_subprocess(['root'], working_dir='one')
2293
result = self.finish_bzr_subprocess(process, universal_newlines=True)
2294
self.assertEndsWith(result[0], 'one\n')
2295
self.assertEqual('', result[1])
1774
2298
class TestKnownFailure(tests.TestCase):