250
241
def addError(self, test, err):
251
"""Tell result that test finished with an error.
253
Called from the TestCase run() method when the test
254
fails with an unexpected error.
256
self._testConcluded(test)
242
self.extractBenchmarkTime(test)
243
self._cleanupLogFile(test)
257
244
if isinstance(err[1], TestSkipped):
258
return self._addSkipped(test, err)
245
return self.addSkipped(test, err)
259
246
elif isinstance(err[1], UnavailableFeature):
260
247
return self.addNotSupported(test, err[1].args[0])
262
self._cleanupLogFile(test)
263
unittest.TestResult.addError(self, test, err)
264
self.error_count += 1
265
self.report_error(test, err)
248
unittest.TestResult.addError(self, test, err)
249
self.error_count += 1
250
self.report_error(test, err)
269
254
def addFailure(self, test, err):
270
"""Tell result that test failed.
272
Called from the TestCase run() method when the test
273
fails because e.g. an assert() method failed.
275
self._testConcluded(test)
255
self._cleanupLogFile(test)
256
self.extractBenchmarkTime(test)
276
257
if isinstance(err[1], KnownFailure):
277
return self._addKnownFailure(test, err)
279
self._cleanupLogFile(test)
280
unittest.TestResult.addFailure(self, test, err)
281
self.failure_count += 1
282
self.report_failure(test, err)
258
return self.addKnownFailure(test, err)
259
unittest.TestResult.addFailure(self, test, err)
260
self.failure_count += 1
261
self.report_failure(test, err)
265
def addKnownFailure(self, test, err):
266
self.known_failure_count += 1
267
self.report_known_failure(test, err)
269
def addNotSupported(self, test, feature):
270
self.unsupported.setdefault(str(feature), 0)
271
self.unsupported[str(feature)] += 1
272
self.report_unsupported(test, feature)
286
274
def addSuccess(self, test):
287
"""Tell result that test completed successfully.
289
Called from the TestCase run()
291
self._testConcluded(test)
275
self.extractBenchmarkTime(test)
292
276
if self._bench_history is not None:
293
benchmark_time = self._extractBenchmarkTime(test)
294
if benchmark_time is not None:
277
if self._benchmarkTime is not None:
295
278
self._bench_history.write("%s %s\n" % (
296
self._formatTime(benchmark_time),
279
self._formatTime(self._benchmarkTime),
298
281
self.report_success(test)
299
self._cleanupLogFile(test)
300
282
unittest.TestResult.addSuccess(self, test)
301
test._log_contents = ''
303
def _testConcluded(self, test):
304
"""Common code when a test has finished.
306
Called regardless of whether it succeded, failed, etc.
310
def _addKnownFailure(self, test, err):
311
self.known_failure_count += 1
312
self.report_known_failure(test, err)
314
def addNotSupported(self, test, feature):
315
"""The test will not be run because of a missing feature.
317
# this can be called in two different ways: it may be that the
318
# test started running, and then raised (through addError)
319
# UnavailableFeature. Alternatively this method can be called
320
# while probing for features before running the tests; in that
321
# case we will see startTest and stopTest, but the test will never
323
self.unsupported.setdefault(str(feature), 0)
324
self.unsupported[str(feature)] += 1
325
self.report_unsupported(test, feature)
327
def _addSkipped(self, test, skip_excinfo):
328
if isinstance(skip_excinfo[1], TestNotApplicable):
329
self.not_applicable_count += 1
330
self.report_not_applicable(test, skip_excinfo)
333
self.report_skip(test, skip_excinfo)
284
def addSkipped(self, test, skip_excinfo):
285
self.report_skip(test, skip_excinfo)
286
# seems best to treat this as success from point-of-view of unittest
287
# -- it actually does nothing so it barely matters :)
336
290
except KeyboardInterrupt:
339
self.addError(test, test._exc_info())
293
self.addError(test, test.__exc_info())
341
# seems best to treat this as success from point-of-view of unittest
342
# -- it actually does nothing so it barely matters :)
343
295
unittest.TestResult.addSuccess(self, test)
344
test._log_contents = ''
346
297
def printErrorList(self, flavour, errors):
347
298
for test, err in errors:
348
299
self.stream.writeln(self.separator1)
349
300
self.stream.write("%s: " % flavour)
301
if self.use_numbered_dirs:
302
self.stream.write('#%d ' % test.number)
350
303
self.stream.writeln(self.getDescription(test))
351
304
if getattr(test, '_get_log', None) is not None:
352
self.stream.write('\n')
354
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
355
self.stream.write('\n')
356
self.stream.write(test._get_log())
357
self.stream.write('\n')
359
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
360
self.stream.write('\n')
306
print >>self.stream, \
307
('vvvv[log from %s]' % test.id()).ljust(78,'-')
308
print >>self.stream, test._get_log()
309
print >>self.stream, \
310
('^^^^[log from %s]' % test.id()).ljust(78,'-')
361
311
self.stream.writeln(self.separator2)
362
312
self.stream.writeln("%s" % err)
491
442
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
492
443
# numbers, plus a trailing blank
493
444
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
494
self.stream.write(self._ellipsize_to_right(name,
495
osutils.terminal_width()-30))
445
if self.use_numbered_dirs:
446
self.stream.write('%5d ' % self.count)
447
self.stream.write(self._ellipsize_to_right(name,
448
osutils.terminal_width()-36))
450
self.stream.write(self._ellipsize_to_right(name,
451
osutils.terminal_width()-30))
496
452
self.stream.flush()
498
454
def _error_summary(self, err):
456
if self.use_numbered_dirs:
500
458
return '%s%s' % (indent, err[1])
502
460
def report_error(self, test, err):
503
461
self.stream.writeln('ERROR %s\n%s'
504
% (self._testTimeString(test),
462
% (self._testTimeString(),
505
463
self._error_summary(err)))
507
465
def report_failure(self, test, err):
508
466
self.stream.writeln(' FAIL %s\n%s'
509
% (self._testTimeString(test),
467
% (self._testTimeString(),
510
468
self._error_summary(err)))
512
470
def report_known_failure(self, test, err):
513
471
self.stream.writeln('XFAIL %s\n%s'
514
% (self._testTimeString(test),
472
% (self._testTimeString(),
515
473
self._error_summary(err)))
517
475
def report_success(self, test):
518
self.stream.writeln(' OK %s' % self._testTimeString(test))
476
self.stream.writeln(' OK %s' % self._testTimeString())
519
477
for bench_called, stats in getattr(test, '_benchcalls', []):
520
478
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
521
479
stats.pprint(file=self.stream)
1041
984
self.fail('Unexpected success. Should have failed: %s' % reason)
1043
def assertFileEqual(self, content, path):
1044
"""Fail if path does not contain 'content'."""
1045
self.failUnlessExists(path)
1046
f = file(path, 'rb')
1051
self.assertEqualDiff(content, s)
1053
def failUnlessExists(self, path):
1054
"""Fail unless path or paths, which may be abs or relative, exist."""
1055
if not isinstance(path, basestring):
1057
self.failUnlessExists(p)
1059
self.failUnless(osutils.lexists(path),path+" does not exist")
1061
def failIfExists(self, path):
1062
"""Fail if path or paths, which may be abs or relative, exist."""
1063
if not isinstance(path, basestring):
1065
self.failIfExists(p)
1067
self.failIf(osutils.lexists(path),path+" exists")
1069
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
986
def _capture_warnings(self, a_callable, *args, **kwargs):
1070
987
"""A helper for callDeprecated and applyDeprecated.
1072
989
:param a_callable: A callable to call.
1073
990
:param args: The positional arguments for the callable
1074
991
:param kwargs: The keyword arguments for the callable
1075
992
:return: A tuple (warnings, result). result is the result of calling
1076
a_callable(``*args``, ``**kwargs``).
993
a_callable(*args, **kwargs).
1078
995
local_warnings = []
1079
996
def capture_warnings(msg, cls=None, stacklevel=None):
1092
1009
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1093
1010
"""Call a deprecated callable without warning the user.
1095
Note that this only captures warnings raised by symbol_versioning.warn,
1096
not other callers that go direct to the warning module.
1098
To test that a deprecated method raises an error, do something like
1101
self.assertRaises(errors.ReservedId,
1102
self.applyDeprecated, zero_ninetyone,
1103
br.append_revision, 'current:')
1105
1012
:param deprecation_format: The deprecation format that the callable
1106
should have been deprecated with. This is the same type as the
1107
parameter to deprecated_method/deprecated_function. If the
1013
should have been deprecated with. This is the same type as the
1014
parameter to deprecated_method/deprecated_function. If the
1108
1015
callable is not deprecated with this format, an assertion error
1109
1016
will be raised.
1110
1017
:param a_callable: A callable to call. This may be a bound method or
1111
a regular function. It will be called with ``*args`` and
1018
a regular function. It will be called with *args and **kwargs.
1113
1019
:param args: The positional arguments for the callable
1114
1020
:param kwargs: The keyword arguments for the callable
1115
:return: The result of a_callable(``*args``, ``**kwargs``)
1021
:return: The result of a_callable(*args, **kwargs)
1117
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1023
call_warnings, result = self._capture_warnings(a_callable,
1118
1024
*args, **kwargs)
1119
1025
expected_first_warning = symbol_versioning.deprecation_string(
1120
1026
a_callable, deprecation_format)
1124
1030
self.assertEqual(expected_first_warning, call_warnings[0])
1127
def callCatchWarnings(self, fn, *args, **kw):
1128
"""Call a callable that raises python warnings.
1130
The caller's responsible for examining the returned warnings.
1132
If the callable raises an exception, the exception is not
1133
caught and propagates up to the caller. In that case, the list
1134
of warnings is not available.
1136
:returns: ([warning_object, ...], fn_result)
1138
# XXX: This is not perfect, because it completely overrides the
1139
# warnings filters, and some code may depend on suppressing particular
1140
# warnings. It's the easiest way to insulate ourselves from -Werror,
1141
# though. -- Andrew, 20071062
1143
def _catcher(message, category, filename, lineno, file=None):
1144
# despite the name, 'message' is normally(?) a Warning subclass
1146
wlist.append(message)
1147
saved_showwarning = warnings.showwarning
1148
saved_filters = warnings.filters
1150
warnings.showwarning = _catcher
1151
warnings.filters = []
1152
result = fn(*args, **kw)
1154
warnings.showwarning = saved_showwarning
1155
warnings.filters = saved_filters
1156
return wlist, result
1158
1033
def callDeprecated(self, expected, callable, *args, **kwargs):
1159
1034
"""Assert that a callable is deprecated in a particular way.
1379
1229
if not feature.available():
1380
1230
raise UnavailableFeature(feature)
1382
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1384
"""Run bazaar command line, splitting up a string command line."""
1385
if isinstance(args, basestring):
1386
# shlex don't understand unicode strings,
1387
# so args should be plain string (bialix 20070906)
1388
args = list(shlex.split(str(args)))
1389
return self._run_bzr_core(args, retcode=retcode,
1390
encoding=encoding, stdin=stdin, working_dir=working_dir,
1393
def _run_bzr_core(self, args, retcode, encoding, stdin,
1232
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1234
"""Invoke bzr and return (stdout, stderr).
1236
Useful for code that wants to check the contents of the
1237
output, the way error messages are presented, etc.
1239
This should be the main method for tests that want to exercise the
1240
overall behavior of the bzr application (rather than a unit test
1241
or a functional test of the library.)
1243
Much of the old code runs bzr by forking a new copy of Python, but
1244
that is slower, harder to debug, and generally not necessary.
1246
This runs bzr through the interface that catches and reports
1247
errors, and with logging set to something approximating the
1248
default, so that error reporting can be checked.
1250
:param argv: arguments to invoke bzr
1251
:param retcode: expected return code, or None for don't-care.
1252
:param encoding: encoding for sys.stdout and sys.stderr
1253
:param stdin: A string to be used as stdin for the command.
1254
:param working_dir: Change to this directory before running
1395
1256
if encoding is None:
1396
1257
encoding = bzrlib.user_encoding
1397
1258
stdout = StringIOWrapper()
1461
1311
This sends the stdout/stderr results into the test's log,
1462
1312
where it may be useful for debugging. See also run_captured.
1464
:keyword stdin: A string to be used as stdin for the command.
1465
:keyword retcode: The status code the command should return;
1467
:keyword working_dir: The directory to run the command in
1468
:keyword error_regexes: A list of expected error messages. If
1469
specified they must be seen in the error output of the command.
1314
:param stdin: A string to be used as stdin for the command.
1315
:param retcode: The status code the command should return
1316
:param working_dir: The directory to run the command in
1471
out, err = self._run_bzr_autosplit(
1476
working_dir=working_dir,
1318
retcode = kwargs.pop('retcode', 0)
1319
encoding = kwargs.pop('encoding', None)
1320
stdin = kwargs.pop('stdin', None)
1321
working_dir = kwargs.pop('working_dir', None)
1322
error_regexes = kwargs.pop('error_regexes', [])
1324
out, err = self.run_bzr_captured(args, retcode=retcode,
1325
encoding=encoding, stdin=stdin, working_dir=working_dir)
1478
1327
for regex in error_regexes:
1479
1328
self.assertContainsRe(err, regex)
1480
1329
return out, err
1332
def run_bzr_decode(self, *args, **kwargs):
1333
if 'encoding' in kwargs:
1334
encoding = kwargs['encoding']
1336
encoding = bzrlib.user_encoding
1337
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1482
1339
def run_bzr_error(self, error_regexes, *args, **kwargs):
1483
1340
"""Run bzr, and check that stderr contains the supplied regexes
1485
:param error_regexes: Sequence of regular expressions which
1342
:param error_regexes: Sequence of regular expressions which
1486
1343
must each be found in the error output. The relative ordering
1487
1344
is not enforced.
1488
1345
:param args: command-line arguments for bzr
1489
1346
:param kwargs: Keyword arguments which are interpreted by run_bzr
1490
1347
This function changes the default value of retcode to be 3,
1491
1348
since in most cases this is run when you expect bzr to fail.
1493
:return: (out, err) The actual output of running the command (in case
1494
you want to do more inspection)
1349
:return: (out, err) The actual output of running the command (in case you
1350
want to do more inspection)
1498
1353
# Make sure that commit is failing because there is nothing to do
1499
1354
self.run_bzr_error(['no changes to commit'],
1500
['commit', '-m', 'my commit comment'])
1355
'commit', '-m', 'my commit comment')
1501
1356
# Make sure --strict is handling an unknown file, rather than
1502
1357
# giving us the 'nothing to do' error
1503
1358
self.build_tree(['unknown'])
1504
1359
self.run_bzr_error(['Commit refused because there are unknown files'],
1505
['commit', --strict', '-m', 'my commit comment'])
1360
'commit', '--strict', '-m', 'my commit comment')
1507
1362
kwargs.setdefault('retcode', 3)
1508
kwargs['error_regexes'] = error_regexes
1509
out, err = self.run_bzr(*args, **kwargs)
1363
out, err = self.run_bzr(error_regexes=error_regexes, *args, **kwargs)
1510
1364
return out, err
1512
1366
def run_bzr_subprocess(self, *args, **kwargs):
1906
1752
capabilities of the local filesystem, but it might actually be a
1907
1753
MemoryTransport or some other similar virtual filesystem.
1909
This is the backing transport (if any) of the server returned by
1755
This is the backing transport (if any) of the server returned by
1910
1756
get_url and get_readonly_url.
1912
1758
:param relpath: provides for clients to get a path relative to the base
1913
1759
url. These should only be downwards relative, not upwards.
1916
1761
base = self.get_vfs_only_server().get_url()
1917
1762
return self._adjust_url(base, relpath)
1919
def _create_safety_net(self):
1920
"""Make a fake bzr directory.
1922
This prevents any tests propagating up onto the TEST_ROOT directory's
1925
root = TestCaseWithMemoryTransport.TEST_ROOT
1926
bzrdir.BzrDir.create_standalone_workingtree(root)
1928
def _check_safety_net(self):
1929
"""Check that the safety .bzr directory have not been touched.
1931
_make_test_root have created a .bzr directory to prevent tests from
1932
propagating. This method ensures than a test did not leaked.
1934
root = TestCaseWithMemoryTransport.TEST_ROOT
1935
wt = workingtree.WorkingTree.open(root)
1936
last_rev = wt.last_revision()
1937
if last_rev != 'null:':
1938
# The current test have modified the /bzr directory, we need to
1939
# recreate a new one or all the followng tests will fail.
1940
# If you need to inspect its content uncomment the following line
1941
# import pdb; pdb.set_trace()
1942
_rmtree_temp_dir(root + '/.bzr')
1943
self._create_safety_net()
1944
raise AssertionError('%s/.bzr should not be modified' % root)
1946
1764
def _make_test_root(self):
1947
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1948
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1949
TestCaseWithMemoryTransport.TEST_ROOT = root
1951
self._create_safety_net()
1953
# The same directory is used by all tests, and we're not
1954
# specifically told when all tests are finished. This will do.
1955
atexit.register(_rmtree_temp_dir, root)
1957
self.addCleanup(self._check_safety_net)
1765
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1769
root = u'test%04d.tmp' % i
1773
if e.errno == errno.EEXIST:
1778
# successfully created
1779
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1781
# make a fake bzr directory there to prevent any tests propagating
1782
# up onto the source directory's real branch
1783
bzrdir.BzrDir.create_standalone_workingtree(
1784
TestCaseWithMemoryTransport.TEST_ROOT)
1959
1786
def makeAndChdirToTestDir(self):
1960
1787
"""Create a temporary directories for this one test.
2060
1884
For TestCaseInTempDir we create a temporary directory based on the test
2061
1885
name and then create two subdirs - test and home under it.
2063
# create a directory within the top level test directory
2064
candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
2065
# now create test and home directories within this dir
2066
self.test_base_dir = candidate_dir
2067
self.test_home_dir = self.test_base_dir + '/home'
2068
os.mkdir(self.test_home_dir)
2069
self.test_dir = self.test_base_dir + '/work'
2070
os.mkdir(self.test_dir)
2071
os.chdir(self.test_dir)
2072
# put name of test inside
2073
f = file(self.test_base_dir + '/name', 'w')
1887
if self.use_numbered_dirs: # strongly recommended on Windows
1888
# due the path length limitation (260 ch.)
1889
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1890
int(self.number/1000),
1892
os.makedirs(candidate_dir)
1893
self.test_home_dir = candidate_dir + '/home'
1894
os.mkdir(self.test_home_dir)
1895
self.test_dir = candidate_dir + '/work'
1896
os.mkdir(self.test_dir)
1897
os.chdir(self.test_dir)
1898
# put name of test inside
1899
f = file(candidate_dir + '/name', 'w')
2075
1900
f.write(self.id())
2078
self.addCleanup(self.deleteTestDir)
2080
def deleteTestDir(self):
2081
os.chdir(self.TEST_ROOT)
2082
_rmtree_temp_dir(self.test_base_dir)
1904
# shorten the name, to avoid test failures due to path length
1905
short_id = self.id().replace('bzrlib.tests.', '') \
1906
.replace('__main__.', '')[-100:]
1907
# it's possible the same test class is run several times for
1908
# parameterized tests, so make sure the names don't collide.
1912
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1914
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1915
if os.path.exists(candidate_dir):
1919
os.mkdir(candidate_dir)
1920
self.test_home_dir = candidate_dir + '/home'
1921
os.mkdir(self.test_home_dir)
1922
self.test_dir = candidate_dir + '/work'
1923
os.mkdir(self.test_dir)
1924
os.chdir(self.test_dir)
2084
1927
def build_tree(self, shape, line_endings='binary', transport=None):
2085
1928
"""Build a test tree according to a pattern.
2250
2117
self.transport_readonly_server = HttpServer
2253
def condition_id_re(pattern):
2254
"""Create a condition filter which performs a re check on a test's id.
2256
:param pattern: A regular expression string.
2257
:return: A callable that returns True if the re matches.
2259
filter_re = re.compile(pattern)
2260
def condition(test):
2262
return filter_re.search(test_id)
2266
def condition_isinstance(klass_or_klass_list):
2267
"""Create a condition filter which returns isinstance(param, klass).
2269
:return: A callable which when called with one parameter obj return the
2270
result of isinstance(obj, klass_or_klass_list).
2273
return isinstance(obj, klass_or_klass_list)
2277
def condition_id_in_list(id_list):
2278
"""Create a condition filter which verify that test's id in a list.
2280
:param id_list: A TestIdList object.
2281
:return: A callable that returns True if the test's id appears in the list.
2283
def condition(test):
2284
return id_list.includes(test.id())
2288
def exclude_tests_by_condition(suite, condition):
2289
"""Create a test suite which excludes some tests from suite.
2291
:param suite: The suite to get tests from.
2292
:param condition: A callable whose result evaluates True when called with a
2293
test case which should be excluded from the result.
2294
:return: A suite which contains the tests found in suite that fail
2298
for test in iter_suite_tests(suite):
2299
if not condition(test):
2301
return TestUtil.TestSuite(result)
2304
def filter_suite_by_condition(suite, condition):
2305
"""Create a test suite by filtering another one.
2307
:param suite: The source suite.
2308
:param condition: A callable whose result evaluates True when called with a
2309
test case which should be included in the result.
2310
:return: A suite which contains the tests found in suite that pass
2314
for test in iter_suite_tests(suite):
2317
return TestUtil.TestSuite(result)
2320
def filter_suite_by_re(suite, pattern, exclude_pattern=DEPRECATED_PARAMETER,
2321
random_order=DEPRECATED_PARAMETER):
2120
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2121
random_order=False):
2322
2122
"""Create a test suite by filtering another one.
2324
2124
:param suite: the source suite
2325
2125
:param pattern: pattern that names must match
2326
:param exclude_pattern: A pattern that names must not match. This parameter
2327
is deprecated as of bzrlib 1.0. Please use the separate function
2328
exclude_tests_by_re instead.
2329
:param random_order: If True, tests in the new suite will be put in
2330
random order. This parameter is deprecated as of bzrlib 1.0. Please
2331
use the separate function randomize_suite instead.
2126
:param exclude_pattern: pattern that names must not match, if any
2127
:param random_order: if True, tests in the new suite will be put in
2332
2129
:returns: the newly created suite
2334
if deprecated_passed(exclude_pattern):
2335
symbol_versioning.warn(
2336
one_zero % "passing exclude_pattern to filter_suite_by_re",
2337
DeprecationWarning, stacklevel=2)
2338
if exclude_pattern is not None:
2339
suite = exclude_tests_by_re(suite, exclude_pattern)
2340
condition = condition_id_re(pattern)
2341
result_suite = filter_suite_by_condition(suite, condition)
2342
if deprecated_passed(random_order):
2343
symbol_versioning.warn(
2344
one_zero % "passing random_order to filter_suite_by_re",
2345
DeprecationWarning, stacklevel=2)
2347
result_suite = randomize_suite(result_suite)
2351
def filter_suite_by_id_list(suite, test_id_list):
2352
"""Create a test suite by filtering another one.
2354
:param suite: The source suite.
2355
:param test_id_list: A list of the test ids to keep as strings.
2356
:returns: the newly created suite
2358
condition = condition_id_in_list(test_id_list)
2359
result_suite = filter_suite_by_condition(suite, condition)
2363
def exclude_tests_by_re(suite, pattern):
2364
"""Create a test suite which excludes some tests from suite.
2366
:param suite: The suite to get tests from.
2367
:param pattern: A regular expression string. Test ids that match this
2368
pattern will be excluded from the result.
2369
:return: A TestSuite that contains all the tests from suite without the
2370
tests that matched pattern. The order of tests is the same as it was in
2373
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2376
def preserve_input(something):
2377
"""A helper for performing test suite transformation chains.
2379
:param something: Anything you want to preserve.
2385
def randomize_suite(suite):
2386
"""Return a new TestSuite with suite's tests in random order.
2388
The tests in the input suite are flattened into a single suite in order to
2389
accomplish this. Any nested TestSuites are removed to provide global
2392
tests = list(iter_suite_tests(suite))
2393
random.shuffle(tests)
2394
return TestUtil.TestSuite(tests)
2397
@deprecated_function(one_zero)
2131
return sort_suite_by_re(suite, pattern, exclude_pattern,
2132
random_order, False)
2398
2135
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2399
2136
random_order=False, append_rest=True):
2400
"""DEPRECATED: Create a test suite by sorting another one.
2402
This method has been decomposed into separate helper methods that should be
2404
- filter_suite_by_re
2405
- exclude_tests_by_re
2137
"""Create a test suite by sorting another one.
2409
2139
:param suite: the source suite
2410
2140
:param pattern: pattern that names must match in order to go
2411
2141
first in the new suite
2412
2142
:param exclude_pattern: pattern that names must not match, if any
2413
2143
:param random_order: if True, tests in the new suite will be put in
2414
random order (with all tests matching pattern
2416
2145
:param append_rest: if False, pattern is a strict filter and not
2417
2146
just an ordering directive
2418
2147
:returns: the newly created suite
2151
filter_re = re.compile(pattern)
2420
2152
if exclude_pattern is not None:
2421
suite = exclude_tests_by_re(suite, exclude_pattern)
2423
order_changer = randomize_suite
2425
order_changer = preserve_input
2427
suites = map(order_changer, split_suite_by_re(suite, pattern))
2428
return TestUtil.TestSuite(suites)
2430
return order_changer(filter_suite_by_re(suite, pattern))
2433
def split_suite_by_re(suite, pattern):
2434
"""Split a test suite into two by a regular expression.
2436
:param suite: The suite to split.
2437
:param pattern: A regular expression string. Test ids that match this
2438
pattern will be in the first test suite returned, and the others in the
2439
second test suite returned.
2440
:return: A tuple of two test suites, where the first contains tests from
2441
suite matching pattern, and the second contains the remainder from
2442
suite. The order within each output suite is the same as it was in
2447
filter_re = re.compile(pattern)
2153
exclude_re = re.compile(exclude_pattern)
2448
2154
for test in iter_suite_tests(suite):
2449
2155
test_id = test.id()
2450
if filter_re.search(test_id):
2451
matched.append(test)
2453
did_not_match.append(test)
2454
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2156
if exclude_pattern is None or not exclude_re.search(test_id):
2157
if filter_re.search(test_id):
2162
random.shuffle(first)
2163
random.shuffle(second)
2164
return TestUtil.TestSuite(first + second)
2457
2167
def run_suite(suite, name='test', verbose=False, pattern=".*",
2458
stop_on_failure=False,
2168
stop_on_failure=False, keep_output=False,
2459
2169
transport=None, lsprof_timed=None, bench_history=None,
2460
2170
matching_tests_first=None,
2461
2172
list_only=False,
2462
2173
random_seed=None,
2463
2174
exclude_pattern=None,
2176
use_numbered_dirs = bool(numbered_dirs)
2465
2178
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2179
if numbered_dirs is not None:
2180
TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
2537
2243
old_transport = default_transport
2538
2244
default_transport = transport
2540
if load_list is None:
2543
keep_only = load_test_id_list(load_list)
2544
2246
if test_suite_factory is None:
2545
suite = test_suite(keep_only)
2247
suite = test_suite()
2547
2249
suite = test_suite_factory()
2548
2250
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2549
stop_on_failure=stop_on_failure,
2251
stop_on_failure=stop_on_failure, keep_output=keep_output,
2550
2252
transport=transport,
2551
2253
lsprof_timed=lsprof_timed,
2552
2254
bench_history=bench_history,
2553
2255
matching_tests_first=matching_tests_first,
2256
numbered_dirs=numbered_dirs,
2554
2257
list_only=list_only,
2555
2258
random_seed=random_seed,
2556
exclude_pattern=exclude_pattern,
2259
exclude_pattern=exclude_pattern)
2559
2261
default_transport = old_transport
2562
def load_test_id_list(file_name):
2563
"""Load a test id list from a text file.
2565
The format is one test id by line. No special care is taken to impose
2566
strict rules, these test ids are used to filter the test suite so a test id
2567
that do not match an existing test will do no harm. This allows user to add
2568
comments, leave blank lines, etc.
2572
ftest = open(file_name, 'rt')
2574
if e.errno != errno.ENOENT:
2577
raise errors.NoSuchFile(file_name)
2579
for test_name in ftest.readlines():
2580
test_list.append(test_name.strip())
2585
def suite_matches_id_list(test_suite, id_list):
2586
"""Warns about tests not appearing or appearing more than once.
2588
:param test_suite: A TestSuite object.
2589
:param test_id_list: The list of test ids that should be found in
2592
:return: (absents, duplicates) absents is a list containing the test found
2593
in id_list but not in test_suite, duplicates is a list containing the
2594
test found multiple times in test_suite.
2596
When using a prefined test id list, it may occurs that some tests do not
2597
exist anymore or that some tests use the same id. This function warns the
2598
tester about potential problems in his workflow (test lists are volatile)
2599
or in the test suite itself (using the same id for several tests does not
2600
help to localize defects).
2602
# Build a dict counting id occurrences
2604
for test in iter_suite_tests(test_suite):
2606
tests[id] = tests.get(id, 0) + 1
2611
occurs = tests.get(id, 0)
2613
not_found.append(id)
2615
duplicates.append(id)
2617
return not_found, duplicates
2620
class TestIdList(object):
2621
"""Test id list to filter a test suite.
2623
Relying on the assumption that test ids are built as:
2624
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2625
notation, this class offers methods to :
2626
- avoid building a test suite for modules not refered to in the test list,
2627
- keep only the tests listed from the module test suite.
2630
def __init__(self, test_id_list):
2631
# When a test suite needs to be filtered against us we compare test ids
2632
# for equality, so a simple dict offers a quick and simple solution.
2633
self.tests = dict().fromkeys(test_id_list, True)
2635
# While unittest.TestCase have ids like:
2636
# <module>.<class>.<method>[(<param+)],
2637
# doctest.DocTestCase can have ids like:
2640
# <module>.<function>
2641
# <module>.<class>.<method>
2643
# Since we can't predict a test class from its name only, we settle on
2644
# a simple constraint: a test id always begins with its module name.
2647
for test_id in test_id_list:
2648
parts = test_id.split('.')
2649
mod_name = parts.pop(0)
2650
modules[mod_name] = True
2652
mod_name += '.' + part
2653
modules[mod_name] = True
2654
self.modules = modules
2656
def refers_to(self, module_name):
2657
"""Is there tests for the module or one of its sub modules."""
2658
return self.modules.has_key(module_name)
2660
def includes(self, test_id):
2661
return self.tests.has_key(test_id)
2664
def test_suite(keep_only=None):
2665
2265
"""Build and return TestSuite for the whole of bzrlib.
2667
:param keep_only: A list of test ids limiting the suite returned.
2669
2267
This function can be replaced if you need to change the default test
2670
2268
suite on a global basis, but it is not encouraged.
2672
2270
testmod_names = [
2673
'bzrlib.util.tests.test_bencode',
2674
'bzrlib.tests.test__dirstate_helpers',
2675
2271
'bzrlib.tests.test_ancestry',
2676
2272
'bzrlib.tests.test_annotate',
2677
2273
'bzrlib.tests.test_api',
2678
2274
'bzrlib.tests.test_atomicfile',
2679
2275
'bzrlib.tests.test_bad_files',
2680
'bzrlib.tests.test_bisect_multi',
2681
2276
'bzrlib.tests.test_branch',
2682
'bzrlib.tests.test_branchbuilder',
2683
2277
'bzrlib.tests.test_bugtracker',
2684
2278
'bzrlib.tests.test_bundle',
2685
2279
'bzrlib.tests.test_bzrdir',
2735
2318
'bzrlib.tests.test_merge_directive',
2736
2319
'bzrlib.tests.test_missing',
2737
2320
'bzrlib.tests.test_msgeditor',
2738
'bzrlib.tests.test_multiparent',
2739
'bzrlib.tests.test_mutabletree',
2740
2321
'bzrlib.tests.test_nonascii',
2741
2322
'bzrlib.tests.test_options',
2742
2323
'bzrlib.tests.test_osutils',
2743
2324
'bzrlib.tests.test_osutils_encodings',
2744
'bzrlib.tests.test_pack',
2745
2325
'bzrlib.tests.test_patch',
2746
2326
'bzrlib.tests.test_patches',
2747
2327
'bzrlib.tests.test_permissions',
2748
2328
'bzrlib.tests.test_plugins',
2749
2329
'bzrlib.tests.test_progress',
2750
'bzrlib.tests.test_reconfigure',
2751
2330
'bzrlib.tests.test_reconcile',
2752
2331
'bzrlib.tests.test_registry',
2753
2332
'bzrlib.tests.test_remote',
2754
2333
'bzrlib.tests.test_repository',
2755
2334
'bzrlib.tests.test_revert',
2756
2335
'bzrlib.tests.test_revision',
2757
'bzrlib.tests.test_revisionspec',
2336
'bzrlib.tests.test_revisionnamespaces',
2758
2337
'bzrlib.tests.test_revisiontree',
2759
2338
'bzrlib.tests.test_rio',
2760
2339
'bzrlib.tests.test_sampler',
2805
2380
'bzrlib.tests.test_transport_implementations',
2806
2381
'bzrlib.tests.test_read_bundle',
2383
suite = TestUtil.TestSuite()
2808
2384
loader = TestUtil.TestLoader()
2810
if keep_only is None:
2811
loader = TestUtil.TestLoader()
2813
id_filter = TestIdList(keep_only)
2814
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2815
suite = loader.suiteClass()
2817
# modules building their suite with loadTestsFromModuleNames
2818
2385
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2820
# modules adapted for transport implementations
2821
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2386
from bzrlib.transport import TransportTestProviderAdapter
2822
2387
adapter = TransportTestProviderAdapter()
2823
2388
adapt_modules(test_transport_implementations, adapter, loader, suite)
2825
# modules defining their own test_suite()
2826
for package in [p for p in packages_to_test()
2827
if (keep_only is None
2828
or id_filter.refers_to(p.__name__))]:
2829
pack_suite = package.test_suite()
2830
suite.addTest(pack_suite)
2832
modules_to_doctest = [
2837
'bzrlib.iterablefile',
2844
'bzrlib.version_info_formats.format_custom',
2847
for mod in modules_to_doctest:
2848
if not (keep_only is None or id_filter.refers_to(mod)):
2849
# No tests to keep here, move along
2389
for package in packages_to_test():
2390
suite.addTest(package.test_suite())
2391
for m in MODULES_TO_TEST:
2392
suite.addTest(loader.loadTestsFromModule(m))
2393
for m in MODULES_TO_DOCTEST:
2852
doc_suite = doctest.DocTestSuite(mod)
2395
suite.addTest(doctest.DocTestSuite(m))
2853
2396
except ValueError, e:
2854
print '**failed to get doctest for: %s\n%s' % (mod, e)
2397
print '**failed to get doctest for: %s\n%s' %(m,e)
2856
suite.addTest(doc_suite)
2858
default_encoding = sys.getdefaultencoding()
2859
for name, plugin in bzrlib.plugin.plugins().items():
2860
if keep_only is not None:
2861
if not id_filter.refers_to(plugin.module.__name__):
2863
plugin_suite = plugin.test_suite()
2864
# We used to catch ImportError here and turn it into just a warning,
2865
# but really if you don't have --no-plugins this should be a failure.
2866
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2867
if plugin_suite is None:
2868
plugin_suite = plugin.load_plugin_tests(loader)
2869
if plugin_suite is not None:
2870
suite.addTest(plugin_suite)
2871
if default_encoding != sys.getdefaultencoding():
2872
bzrlib.trace.warning(
2873
'Plugin "%s" tried to reset default encoding to: %s', name,
2874
sys.getdefaultencoding())
2876
sys.setdefaultencoding(default_encoding)
2878
if keep_only is not None:
2879
# Now that the referred modules have loaded their tests, keep only the
2881
suite = filter_suite_by_id_list(suite, id_filter)
2882
# Do some sanity checks on the id_list filtering
2883
not_found, duplicates = suite_matches_id_list(suite, keep_only)
2884
for id in not_found:
2885
bzrlib.trace.warning('"%s" not found in the test suite', id)
2886
for id in duplicates:
2887
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
2892
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
2893
"""Adapt all tests in some given modules to given scenarios.
2895
This is the recommended public interface for test parameterization.
2896
Typically the test_suite() method for a per-implementation test
2897
suite will call multiply_tests_from_modules and return the
2900
:param module_name_list: List of fully-qualified names of test
2902
:param scenario_iter: Iterable of pairs of (scenario_name,
2903
scenario_param_dict).
2904
:param loader: If provided, will be used instead of a new
2905
bzrlib.tests.TestLoader() instance.
2907
This returns a new TestSuite containing the cross product of
2908
all the tests in all the modules, each repeated for each scenario.
2909
Each test is adapted by adding the scenario name at the end
2910
of its name, and updating the test object's __dict__ with the
2911
scenario_param_dict.
2913
>>> r = multiply_tests_from_modules(
2914
... ['bzrlib.tests.test_sampler'],
2915
... [('one', dict(param=1)),
2916
... ('two', dict(param=2))])
2917
>>> tests = list(iter_suite_tests(r))
2921
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
2927
# XXX: Isn't load_tests() a better way to provide the same functionality
2928
# without forcing a predefined TestScenarioApplier ? --vila 080215
2930
loader = TestUtil.TestLoader()
2932
suite = loader.suiteClass()
2934
adapter = TestScenarioApplier()
2935
adapter.scenarios = list(scenario_iter)
2936
adapt_modules(module_name_list, adapter, loader, suite)
2940
def multiply_scenarios(scenarios_left, scenarios_right):
2941
"""Multiply two sets of scenarios.
2943
:returns: the cartesian product of the two sets of scenarios, that is
2944
a scenario for every possible combination of a left scenario and a
2948
('%s,%s' % (left_name, right_name),
2949
dict(left_dict.items() + right_dict.items()))
2950
for left_name, left_dict in scenarios_left
2951
for right_name, right_dict in scenarios_right]
2399
for name, plugin in bzrlib.plugin.all_plugins().items():
2400
if getattr(plugin, 'test_suite', None) is not None:
2401
default_encoding = sys.getdefaultencoding()
2403
plugin_suite = plugin.test_suite()
2404
except ImportError, e:
2405
bzrlib.trace.warning(
2406
'Unable to test plugin "%s": %s', name, e)
2408
suite.addTest(plugin_suite)
2409
if default_encoding != sys.getdefaultencoding():
2410
bzrlib.trace.warning(
2411
'Plugin "%s" tried to reset default encoding to: %s', name,
2412
sys.getdefaultencoding())
2414
sys.setdefaultencoding(default_encoding)
2955
2418
def adapt_modules(mods_list, adapter, loader, suite):
2958
2421
suite.addTests(adapter.adapt(test))
2961
def adapt_tests(tests_list, adapter, loader, suite):
2962
"""Adapt the tests in tests_list using adapter and add to suite."""
2963
for test in tests_list:
2964
suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
2967
2424
def _rmtree_temp_dir(dirname):
2968
# If LANG=C we probably have created some bogus paths
2969
# which rmtree(unicode) will fail to delete
2970
# so make sure we are using rmtree(str) to delete everything
2971
# except on win32, where rmtree(str) will fail
2972
# since it doesn't have the property of byte-stream paths
2973
# (they are either ascii or mbcs)
2974
if sys.platform == 'win32':
2975
# make sure we are using the unicode win32 api
2976
dirname = unicode(dirname)
2978
dirname = dirname.encode(sys.getfilesystemencoding())
2980
2426
osutils.rmtree(dirname)
2981
2427
except OSError, e:
2982
2428
if sys.platform == 'win32' and e.errno == errno.EACCES:
2983
sys.stderr.write(('Permission denied: '
2429
print >>sys.stderr, ('Permission denied: '
2984
2430
'unable to remove testing dir '
2985
'%s\n' % os.path.basename(dirname)))
2431
'%s' % os.path.basename(dirname))
2436
def clean_selftest_output(root=None, quiet=False):
2437
"""Remove all selftest output directories from root directory.
2439
:param root: root directory for clean
2440
(if ommitted or None then clean current directory).
2441
:param quiet: suppress report about deleting directories
2444
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2447
for i in os.listdir(root):
2448
if os.path.isdir(i) and re_dir.match(i):
2450
print 'delete directory:', i
2990
2454
class Feature(object):
2991
2455
"""An operating system Feature."""
3013
2477
if getattr(self, 'feature_name', None):
3014
2478
return self.feature_name()
3015
2479
return self.__class__.__name__
3018
class _SymlinkFeature(Feature):
3021
return osutils.has_symlinks()
3023
def feature_name(self):
3026
SymlinkFeature = _SymlinkFeature()
3029
class _HardlinkFeature(Feature):
3032
return osutils.has_hardlinks()
3034
def feature_name(self):
3037
HardlinkFeature = _HardlinkFeature()
3040
class _OsFifoFeature(Feature):
3043
return getattr(os, 'mkfifo', None)
3045
def feature_name(self):
3046
return 'filesystem fifos'
3048
OsFifoFeature = _OsFifoFeature()
3051
class TestScenarioApplier(object):
3052
"""A tool to apply scenarios to tests."""
3054
def adapt(self, test):
3055
"""Return a TestSuite containing a copy of test for each scenario."""
3056
result = unittest.TestSuite()
3057
for scenario in self.scenarios:
3058
result.addTest(self.adapt_test_to_scenario(test, scenario))
3061
def adapt_test_to_scenario(self, test, scenario):
3062
"""Copy test and apply scenario to it.
3064
:param test: A test to adapt.
3065
:param scenario: A tuple describing the scenarion.
3066
The first element of the tuple is the new test id.
3067
The second element is a dict containing attributes to set on the
3069
:return: The adapted test.
3071
from copy import deepcopy
3072
new_test = deepcopy(test)
3073
for name, value in scenario[1].items():
3074
setattr(new_test, name, value)
3075
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3076
new_test.id = lambda: new_id
3080
def probe_unicode_in_user_encoding():
3081
"""Try to encode several unicode strings to use in unicode-aware tests.
3082
Return first successfull match.
3084
:return: (unicode value, encoded plain string value) or (None, None)
3086
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3087
for uni_val in possible_vals:
3089
str_val = uni_val.encode(bzrlib.user_encoding)
3090
except UnicodeEncodeError:
3091
# Try a different character
3094
return uni_val, str_val
3098
def probe_bad_non_ascii(encoding):
3099
"""Try to find [bad] character with code [128..255]
3100
that cannot be decoded to unicode in some encoding.
3101
Return None if all non-ascii characters is valid
3104
for i in xrange(128, 256):
3107
char.decode(encoding)
3108
except UnicodeDecodeError:
3113
class _FTPServerFeature(Feature):
3114
"""Some tests want an FTP Server, check if one is available.
3116
Right now, the only way this is available is if 'medusa' is installed.
3117
http://www.amk.ca/python/code/medusa.html
3122
import bzrlib.tests.ftp_server
3127
def feature_name(self):
3130
FTPServerFeature = _FTPServerFeature()
3133
class _CaseInsensitiveFilesystemFeature(Feature):
3134
"""Check if underlined filesystem is case-insensitive
3135
(e.g. on Windows, Cygwin, MacOS)
3139
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3140
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3141
TestCaseWithMemoryTransport.TEST_ROOT = root
3143
root = TestCaseWithMemoryTransport.TEST_ROOT
3144
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3146
name_a = osutils.pathjoin(tdir, 'a')
3147
name_A = osutils.pathjoin(tdir, 'A')
3149
result = osutils.isdir(name_A)
3150
_rmtree_temp_dir(tdir)
3153
def feature_name(self):
3154
return 'case-insensitive filesystem'
3156
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()