1813
1791
test_suite_factory=factory)
1814
1792
self.assertEqual([True], factory_called)
1795
"""A test suite factory."""
1796
class Test(tests.TestCase):
1803
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
1805
def test_list_only(self):
1806
output = self.run_selftest(test_suite_factory=self.factory,
1808
self.assertEqual(3, len(output.readlines()))
1810
def test_list_only_filtered(self):
1811
output = self.run_selftest(test_suite_factory=self.factory,
1812
list_only=True, pattern="Test.b")
1813
self.assertEndsWith(output.getvalue(), "Test.b\n")
1814
self.assertLength(1, output.readlines())
1816
def test_list_only_excludes(self):
1817
output = self.run_selftest(test_suite_factory=self.factory,
1818
list_only=True, exclude_pattern="Test.b")
1819
self.assertNotContainsRe("Test.b", output.getvalue())
1820
self.assertLength(2, output.readlines())
1822
def test_random(self):
1823
# test randomising by listing a number of tests.
1824
output_123 = self.run_selftest(test_suite_factory=self.factory,
1825
list_only=True, random_seed="123")
1826
output_234 = self.run_selftest(test_suite_factory=self.factory,
1827
list_only=True, random_seed="234")
1828
self.assertNotEqual(output_123, output_234)
1829
# "Randominzing test order..\n\n
1830
self.assertLength(5, output_123.readlines())
1831
self.assertLength(5, output_234.readlines())
1833
def test_random_reuse_is_same_order(self):
1834
# test randomising by listing a number of tests.
1835
expected = self.run_selftest(test_suite_factory=self.factory,
1836
list_only=True, random_seed="123")
1837
repeated = self.run_selftest(test_suite_factory=self.factory,
1838
list_only=True, random_seed="123")
1839
self.assertEqual(expected.getvalue(), repeated.getvalue())
1841
def test_runner_class(self):
1842
self.requireFeature(SubUnitFeature)
1843
from subunit import ProtocolTestCase
1844
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
1845
test_suite_factory=self.factory)
1846
test = ProtocolTestCase(stream)
1847
result = unittest.TestResult()
1849
self.assertEqual(3, result.testsRun)
1851
def test_starting_with_single_argument(self):
1852
output = self.run_selftest(test_suite_factory=self.factory,
1853
starting_with=['bzrlib.tests.test_selftest.Test.a'],
1855
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
1858
def test_starting_with_multiple_argument(self):
1859
output = self.run_selftest(test_suite_factory=self.factory,
1860
starting_with=['bzrlib.tests.test_selftest.Test.a',
1861
'bzrlib.tests.test_selftest.Test.b'],
1863
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
1864
'bzrlib.tests.test_selftest.Test.b\n',
1867
def check_transport_set(self, transport_server):
1868
captured_transport = []
1869
def seen_transport(a_transport):
1870
captured_transport.append(a_transport)
1871
class Capture(tests.TestCase):
1873
seen_transport(bzrlib.tests.default_transport)
1875
return TestUtil.TestSuite([Capture("a")])
1876
self.run_selftest(transport=transport_server, test_suite_factory=factory)
1877
self.assertEqual(transport_server, captured_transport[0])
1879
def test_transport_sftp(self):
1881
import bzrlib.transport.sftp
1882
except errors.ParamikoNotPresent:
1883
raise tests.TestSkipped("Paramiko not present")
1884
self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
1886
def test_transport_memory(self):
1887
self.check_transport_set(bzrlib.transport.memory.MemoryServer)
1890
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1891
# Does IO: reads test.list
1893
def test_load_list(self):
1894
# Provide a list with one test - this test.
1895
test_id_line = '%s\n' % self.id()
1896
self.build_tree_contents([('test.list', test_id_line)])
1897
# And generate a list of the tests in the suite.
1898
stream = self.run_selftest(load_list='test.list', list_only=True)
1899
self.assertEqual(test_id_line, stream.getvalue())
1901
def test_load_unknown(self):
1902
# Provide a list with one test - this test.
1903
# And generate a list of the tests in the suite.
1904
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1905
load_list='missing file name', list_only=True)
1908
class TestRunBzr(tests.TestCase):
1913
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
1915
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1917
Attempts to run bzr from inside this class don't actually run it.
1919
We test how run_bzr actually invokes bzr in another location.
1920
Here we only need to test that it is run_bzr passes the right
1921
parameters to run_bzr.
1923
self.argv = list(argv)
1924
self.retcode = retcode
1925
self.encoding = encoding
1927
self.working_dir = working_dir
1928
return self.out, self.err
1930
def test_run_bzr_error(self):
1931
self.out = "It sure does!\n"
1932
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
1933
self.assertEqual(['rocks'], self.argv)
1934
self.assertEqual(34, self.retcode)
1935
self.assertEqual(out, 'It sure does!\n')
1937
def test_run_bzr_error_regexes(self):
1939
self.err = "bzr: ERROR: foobarbaz is not versioned"
1940
out, err = self.run_bzr_error(
1941
["bzr: ERROR: foobarbaz is not versioned"],
1942
['file-id', 'foobarbaz'])
1944
def test_encoding(self):
1945
"""Test that run_bzr passes encoding to _run_bzr_core"""
1946
self.run_bzr('foo bar')
1947
self.assertEqual(None, self.encoding)
1948
self.assertEqual(['foo', 'bar'], self.argv)
1950
self.run_bzr('foo bar', encoding='baz')
1951
self.assertEqual('baz', self.encoding)
1952
self.assertEqual(['foo', 'bar'], self.argv)
1954
def test_retcode(self):
1955
"""Test that run_bzr passes retcode to _run_bzr_core"""
1956
# Default is retcode == 0
1957
self.run_bzr('foo bar')
1958
self.assertEqual(0, self.retcode)
1959
self.assertEqual(['foo', 'bar'], self.argv)
1961
self.run_bzr('foo bar', retcode=1)
1962
self.assertEqual(1, self.retcode)
1963
self.assertEqual(['foo', 'bar'], self.argv)
1965
self.run_bzr('foo bar', retcode=None)
1966
self.assertEqual(None, self.retcode)
1967
self.assertEqual(['foo', 'bar'], self.argv)
1969
self.run_bzr(['foo', 'bar'], retcode=3)
1970
self.assertEqual(3, self.retcode)
1971
self.assertEqual(['foo', 'bar'], self.argv)
1973
def test_stdin(self):
1974
# test that the stdin keyword to run_bzr is passed through to
1975
# _run_bzr_core as-is. We do this by overriding
1976
# _run_bzr_core in this class, and then calling run_bzr,
1977
# which is a convenience function for _run_bzr_core, so
1979
self.run_bzr('foo bar', stdin='gam')
1980
self.assertEqual('gam', self.stdin)
1981
self.assertEqual(['foo', 'bar'], self.argv)
1983
self.run_bzr('foo bar', stdin='zippy')
1984
self.assertEqual('zippy', self.stdin)
1985
self.assertEqual(['foo', 'bar'], self.argv)
1987
def test_working_dir(self):
1988
"""Test that run_bzr passes working_dir to _run_bzr_core"""
1989
self.run_bzr('foo bar')
1990
self.assertEqual(None, self.working_dir)
1991
self.assertEqual(['foo', 'bar'], self.argv)
1993
self.run_bzr('foo bar', working_dir='baz')
1994
self.assertEqual('baz', self.working_dir)
1995
self.assertEqual(['foo', 'bar'], self.argv)
1997
def test_reject_extra_keyword_arguments(self):
1998
self.assertRaises(TypeError, self.run_bzr, "foo bar",
1999
error_regex=['error message'])
2002
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2003
# Does IO when testing the working_dir parameter.
2005
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2006
a_callable=None, *args, **kwargs):
2008
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2009
self.factory = bzrlib.ui.ui_factory
2010
self.working_dir = osutils.getcwd()
2011
stdout.write('foo\n')
2012
stderr.write('bar\n')
2015
def test_stdin(self):
2016
# test that the stdin keyword to _run_bzr_core is passed through to
2017
# apply_redirected as a StringIO. We do this by overriding
2018
# apply_redirected in this class, and then calling _run_bzr_core,
2019
# which calls apply_redirected.
2020
self.run_bzr(['foo', 'bar'], stdin='gam')
2021
self.assertEqual('gam', self.stdin.read())
2022
self.assertTrue(self.stdin is self.factory_stdin)
2023
self.run_bzr(['foo', 'bar'], stdin='zippy')
2024
self.assertEqual('zippy', self.stdin.read())
2025
self.assertTrue(self.stdin is self.factory_stdin)
2027
def test_ui_factory(self):
2028
# each invocation of self.run_bzr should get its
2029
# own UI factory, which is an instance of TestUIFactory,
2030
# with stdin, stdout and stderr attached to the stdin,
2031
# stdout and stderr of the invoked run_bzr
2032
current_factory = bzrlib.ui.ui_factory
2033
self.run_bzr(['foo'])
2034
self.failIf(current_factory is self.factory)
2035
self.assertNotEqual(sys.stdout, self.factory.stdout)
2036
self.assertNotEqual(sys.stderr, self.factory.stderr)
2037
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2038
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2039
self.assertIsInstance(self.factory, tests.TestUIFactory)
2041
def test_working_dir(self):
2042
self.build_tree(['one/', 'two/'])
2043
cwd = osutils.getcwd()
2045
# Default is to work in the current directory
2046
self.run_bzr(['foo', 'bar'])
2047
self.assertEqual(cwd, self.working_dir)
2049
self.run_bzr(['foo', 'bar'], working_dir=None)
2050
self.assertEqual(cwd, self.working_dir)
2052
# The function should be run in the alternative directory
2053
# but afterwards the current working dir shouldn't be changed
2054
self.run_bzr(['foo', 'bar'], working_dir='one')
2055
self.assertNotEqual(cwd, self.working_dir)
2056
self.assertEndsWith(self.working_dir, 'one')
2057
self.assertEqual(cwd, osutils.getcwd())
2059
self.run_bzr(['foo', 'bar'], working_dir='two')
2060
self.assertNotEqual(cwd, self.working_dir)
2061
self.assertEndsWith(self.working_dir, 'two')
2062
self.assertEqual(cwd, osutils.getcwd())
2065
class StubProcess(object):
2066
"""A stub process for testing run_bzr_subprocess."""
2068
def __init__(self, out="", err="", retcode=0):
2071
self.returncode = retcode
2073
def communicate(self):
2074
return self.out, self.err
2077
class TestRunBzrSubprocess(tests.TestCaseWithTransport):
2080
tests.TestCaseWithTransport.setUp(self)
2081
self.subprocess_calls = []
2083
def start_bzr_subprocess(self, process_args, env_changes=None,
2084
skip_if_plan_to_signal=False,
2086
allow_plugins=False):
2087
"""capture what run_bzr_subprocess tries to do."""
2088
self.subprocess_calls.append({'process_args':process_args,
2089
'env_changes':env_changes,
2090
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2091
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2092
return self.next_subprocess
2094
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2095
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2097
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2098
that will return static results. This assertion method populates those
2099
results and also checks the arguments run_bzr_subprocess generates.
2101
self.next_subprocess = process
2103
result = self.run_bzr_subprocess(*args, **kwargs)
2105
self.next_subprocess = None
2106
for key, expected in expected_args.iteritems():
2107
self.assertEqual(expected, self.subprocess_calls[-1][key])
2110
self.next_subprocess = None
2111
for key, expected in expected_args.iteritems():
2112
self.assertEqual(expected, self.subprocess_calls[-1][key])
2115
def test_run_bzr_subprocess(self):
2116
"""The run_bzr_helper_external command behaves nicely."""
2117
self.assertRunBzrSubprocess({'process_args':['--version']},
2118
StubProcess(), '--version')
2119
self.assertRunBzrSubprocess({'process_args':['--version']},
2120
StubProcess(), ['--version'])
2121
# retcode=None disables retcode checking
2122
result = self.assertRunBzrSubprocess({},
2123
StubProcess(retcode=3), '--version', retcode=None)
2124
result = self.assertRunBzrSubprocess({},
2125
StubProcess(out="is free software"), '--version')
2126
self.assertContainsRe(result[0], 'is free software')
2127
# Running a subcommand that is missing errors
2128
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2129
{'process_args':['--versionn']}, StubProcess(retcode=3),
2131
# Unless it is told to expect the error from the subprocess
2132
result = self.assertRunBzrSubprocess({},
2133
StubProcess(retcode=3), '--versionn', retcode=3)
2134
# Or to ignore retcode checking
2135
result = self.assertRunBzrSubprocess({},
2136
StubProcess(err="unknown command", retcode=3), '--versionn',
2138
self.assertContainsRe(result[1], 'unknown command')
2140
def test_env_change_passes_through(self):
2141
self.assertRunBzrSubprocess(
2142
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2144
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2146
def test_no_working_dir_passed_as_None(self):
2147
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2149
def test_no_working_dir_passed_through(self):
2150
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2153
def test_run_bzr_subprocess_no_plugins(self):
2154
self.assertRunBzrSubprocess({'allow_plugins': False},
2157
def test_allow_plugins(self):
2158
self.assertRunBzrSubprocess({'allow_plugins': True},
2159
StubProcess(), '', allow_plugins=True)
2162
class _DontSpawnProcess(Exception):
2163
"""A simple exception which just allows us to skip unnecessary steps"""
2166
class TestStartBzrSubProcess(tests.TestCase):
2168
def check_popen_state(self):
2169
"""Replace to make assertions when popen is called."""
2171
def _popen(self, *args, **kwargs):
2172
"""Record the command that is run, so that we can ensure it is correct"""
2173
self.check_popen_state()
2174
self._popen_args = args
2175
self._popen_kwargs = kwargs
2176
raise _DontSpawnProcess()
2178
def test_run_bzr_subprocess_no_plugins(self):
2179
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2180
command = self._popen_args[0]
2181
self.assertEqual(sys.executable, command[0])
2182
self.assertEqual(self.get_bzr_path(), command[1])
2183
self.assertEqual(['--no-plugins'], command[2:])
2185
def test_allow_plugins(self):
2186
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2188
command = self._popen_args[0]
2189
self.assertEqual([], command[2:])
2191
def test_set_env(self):
2192
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2194
def check_environment():
2195
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2196
self.check_popen_state = check_environment
2197
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2198
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2199
# not set in theparent
2200
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2202
def test_run_bzr_subprocess_env_del(self):
2203
"""run_bzr_subprocess can remove environment variables too."""
2204
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2205
def check_environment():
2206
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2207
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2208
self.check_popen_state = check_environment
2209
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2210
env_changes={'EXISTANT_ENV_VAR':None})
2211
# Still set in parent
2212
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2213
del os.environ['EXISTANT_ENV_VAR']
2215
def test_env_del_missing(self):
2216
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2217
def check_environment():
2218
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2219
self.check_popen_state = check_environment
2220
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2221
env_changes={'NON_EXISTANT_ENV_VAR':None})
2223
def test_working_dir(self):
2224
"""Test that we can specify the working dir for the child"""
2225
orig_getcwd = osutils.getcwd
2226
orig_chdir = os.chdir
2234
osutils.getcwd = getcwd
2236
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2239
osutils.getcwd = orig_getcwd
2241
os.chdir = orig_chdir
2242
self.assertEqual(['foo', 'current'], chdirs)
2245
class TestBzrSubprocess(tests.TestCaseWithTransport):
2247
def test_start_and_stop_bzr_subprocess(self):
2248
"""We can start and perform other test actions while that process is
2251
process = self.start_bzr_subprocess(['--version'])
2252
result = self.finish_bzr_subprocess(process)
2253
self.assertContainsRe(result[0], 'is free software')
2254
self.assertEqual('', result[1])
2256
def test_start_and_stop_bzr_subprocess_with_error(self):
2257
"""finish_bzr_subprocess allows specification of the desired exit code.
2259
process = self.start_bzr_subprocess(['--versionn'])
2260
result = self.finish_bzr_subprocess(process, retcode=3)
2261
self.assertEqual('', result[0])
2262
self.assertContainsRe(result[1], 'unknown command')
2264
def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
2265
"""finish_bzr_subprocess allows the exit code to be ignored."""
2266
process = self.start_bzr_subprocess(['--versionn'])
2267
result = self.finish_bzr_subprocess(process, retcode=None)
2268
self.assertEqual('', result[0])
2269
self.assertContainsRe(result[1], 'unknown command')
2271
def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
2272
"""finish_bzr_subprocess raises self.failureException if the retcode is
2273
not the expected one.
2275
process = self.start_bzr_subprocess(['--versionn'])
2276
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2279
def test_start_and_stop_bzr_subprocess_send_signal(self):
2280
"""finish_bzr_subprocess raises self.failureException if the retcode is
2281
not the expected one.
2283
process = self.start_bzr_subprocess(['wait-until-signalled'],
2284
skip_if_plan_to_signal=True)
2285
self.assertEqual('running\n', process.stdout.readline())
2286
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2288
self.assertEqual('', result[0])
2289
self.assertEqual('bzr: interrupted\n', result[1])
2291
def test_start_and_stop_working_dir(self):
2292
cwd = osutils.getcwd()
2293
self.make_branch_and_tree('one')
2294
process = self.start_bzr_subprocess(['root'], working_dir='one')
2295
result = self.finish_bzr_subprocess(process, universal_newlines=True)
2296
self.assertEndsWith(result[0], 'one\n')
2297
self.assertEqual('', result[1])
1817
2300
class TestKnownFailure(tests.TestCase):