239
258
def addError(self, test, err):
240
self.extractBenchmarkTime(test)
241
self._cleanupLogFile(test)
242
if isinstance(err[1], TestSkipped):
243
return self.addSkipped(test, err)
259
"""Tell result that test finished with an error.
261
Called from the TestCase run() method when the test
262
fails with an unexpected error.
264
self._testConcluded(test)
265
if isinstance(err[1], TestNotApplicable):
266
return self._addNotApplicable(test, err)
244
267
elif isinstance(err[1], UnavailableFeature):
245
268
return self.addNotSupported(test, err[1].args[0])
246
unittest.TestResult.addError(self, test, err)
247
self.error_count += 1
248
self.report_error(test, err)
270
unittest.TestResult.addError(self, test, err)
271
self.error_count += 1
272
self.report_error(test, err)
275
self._cleanupLogFile(test)
252
277
def addFailure(self, test, err):
253
self._cleanupLogFile(test)
254
self.extractBenchmarkTime(test)
278
"""Tell result that test failed.
280
Called from the TestCase run() method when the test
281
fails because e.g. an assert() method failed.
283
self._testConcluded(test)
255
284
if isinstance(err[1], KnownFailure):
256
return self.addKnownFailure(test, err)
257
unittest.TestResult.addFailure(self, test, err)
258
self.failure_count += 1
259
self.report_failure(test, err)
263
def addKnownFailure(self, test, err):
285
return self._addKnownFailure(test, err)
287
unittest.TestResult.addFailure(self, test, err)
288
self.failure_count += 1
289
self.report_failure(test, err)
292
self._cleanupLogFile(test)
294
def addSuccess(self, test):
295
"""Tell result that test completed successfully.
297
Called from the TestCase run()
299
self._testConcluded(test)
300
if self._bench_history is not None:
301
benchmark_time = self._extractBenchmarkTime(test)
302
if benchmark_time is not None:
303
self._bench_history.write("%s %s\n" % (
304
self._formatTime(benchmark_time),
306
self.report_success(test)
307
self._cleanupLogFile(test)
308
unittest.TestResult.addSuccess(self, test)
309
test._log_contents = ''
311
def _testConcluded(self, test):
312
"""Common code when a test has finished.
314
Called regardless of whether it succeded, failed, etc.
318
def _addKnownFailure(self, test, err):
264
319
self.known_failure_count += 1
265
320
self.report_known_failure(test, err)
267
322
def addNotSupported(self, test, feature):
323
"""The test will not be run because of a missing feature.
325
# this can be called in two different ways: it may be that the
326
# test started running, and then raised (through addError)
327
# UnavailableFeature. Alternatively this method can be called
328
# while probing for features before running the tests; in that
329
# case we will see startTest and stopTest, but the test will never
268
331
self.unsupported.setdefault(str(feature), 0)
269
332
self.unsupported[str(feature)] += 1
270
333
self.report_unsupported(test, feature)
272
def addSuccess(self, test):
273
self.extractBenchmarkTime(test)
274
if self._bench_history is not None:
275
if self._benchmarkTime is not None:
276
self._bench_history.write("%s %s\n" % (
277
self._formatTime(self._benchmarkTime),
279
self.report_success(test)
280
unittest.TestResult.addSuccess(self, test)
335
def addSkip(self, test, reason):
336
"""A test has not run for 'reason'."""
338
self.report_skip(test, reason)
282
def addSkipped(self, test, skip_excinfo):
283
self.report_skip(test, skip_excinfo)
284
# seems best to treat this as success from point-of-view of unittest
285
# -- it actually does nothing so it barely matters :)
340
def _addNotApplicable(self, test, skip_excinfo):
341
if isinstance(skip_excinfo[1], TestNotApplicable):
342
self.not_applicable_count += 1
343
self.report_not_applicable(test, skip_excinfo)
288
346
except KeyboardInterrupt:
291
self.addError(test, test.__exc_info())
349
self.addError(test, test.exc_info())
351
# seems best to treat this as success from point-of-view of unittest
352
# -- it actually does nothing so it barely matters :)
293
353
unittest.TestResult.addSuccess(self, test)
354
test._log_contents = ''
295
356
def printErrorList(self, flavour, errors):
296
357
for test, err in errors:
297
358
self.stream.writeln(self.separator1)
298
359
self.stream.write("%s: " % flavour)
299
if self.use_numbered_dirs:
300
self.stream.write('#%d ' % test.number)
301
360
self.stream.writeln(self.getDescription(test))
302
361
if getattr(test, '_get_log', None) is not None:
304
print >>self.stream, \
305
('vvvv[log from %s]' % test.id()).ljust(78,'-')
306
print >>self.stream, test._get_log()
307
print >>self.stream, \
308
('^^^^[log from %s]' % test.id()).ljust(78,'-')
362
log_contents = test._get_log()
364
self.stream.write('\n')
366
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
367
self.stream.write('\n')
368
self.stream.write(log_contents)
369
self.stream.write('\n')
371
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
372
self.stream.write('\n')
309
373
self.stream.writeln(self.separator2)
310
374
self.stream.writeln("%s" % err)
376
def progress(self, offset, whence):
377
"""The test is adjusting the count of tests to run."""
378
if whence == SUBUNIT_SEEK_SET:
379
self.num_tests = offset
380
elif whence == SUBUNIT_SEEK_CUR:
381
self.num_tests += offset
383
raise errors.BzrError("Unknown whence %r" % whence)
312
385
def finished(self):
1310
1686
This sends the stdout/stderr results into the test's log,
1311
1687
where it may be useful for debugging. See also run_captured.
1313
:param stdin: A string to be used as stdin for the command.
1314
:param retcode: The status code the command should return
1315
:param working_dir: The directory to run the command in
1689
:keyword stdin: A string to be used as stdin for the command.
1690
:keyword retcode: The status code the command should return;
1692
:keyword working_dir: The directory to run the command in
1693
:keyword error_regexes: A list of expected error messages. If
1694
specified they must be seen in the error output of the command.
1317
retcode = kwargs.pop('retcode', 0)
1318
encoding = kwargs.pop('encoding', None)
1319
stdin = kwargs.pop('stdin', None)
1320
working_dir = kwargs.pop('working_dir', None)
1321
error_regexes = kwargs.pop('error_regexes', [])
1323
out, err = self.run_bzr_captured(args, retcode=retcode,
1324
encoding=encoding, stdin=stdin, working_dir=working_dir)
1696
out, err = self._run_bzr_autosplit(
1701
working_dir=working_dir,
1703
self.assertIsInstance(error_regexes, (list, tuple))
1326
1704
for regex in error_regexes:
1327
1705
self.assertContainsRe(err, regex)
1328
1706
return out, err
1331
def run_bzr_decode(self, *args, **kwargs):
1332
if 'encoding' in kwargs:
1333
encoding = kwargs['encoding']
1335
encoding = bzrlib.user_encoding
1336
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1338
1708
def run_bzr_error(self, error_regexes, *args, **kwargs):
1339
1709
"""Run bzr, and check that stderr contains the supplied regexes
1341
:param error_regexes: Sequence of regular expressions which
1711
:param error_regexes: Sequence of regular expressions which
1342
1712
must each be found in the error output. The relative ordering
1343
1713
is not enforced.
1344
1714
:param args: command-line arguments for bzr
1345
1715
:param kwargs: Keyword arguments which are interpreted by run_bzr
1346
1716
This function changes the default value of retcode to be 3,
1347
1717
since in most cases this is run when you expect bzr to fail.
1348
:return: (out, err) The actual output of running the command (in case you
1349
want to do more inspection)
1719
:return: (out, err) The actual output of running the command (in case
1720
you want to do more inspection)
1352
1724
# Make sure that commit is failing because there is nothing to do
1353
1725
self.run_bzr_error(['no changes to commit'],
1354
'commit', '-m', 'my commit comment')
1726
['commit', '-m', 'my commit comment'])
1355
1727
# Make sure --strict is handling an unknown file, rather than
1356
1728
# giving us the 'nothing to do' error
1357
1729
self.build_tree(['unknown'])
1358
1730
self.run_bzr_error(['Commit refused because there are unknown files'],
1359
'commit', '--strict', '-m', 'my commit comment')
1731
['commit', --strict', '-m', 'my commit comment'])
1361
1733
kwargs.setdefault('retcode', 3)
1362
out, err = self.run_bzr(error_regexes=error_regexes, *args, **kwargs)
1734
kwargs['error_regexes'] = error_regexes
1735
out, err = self.run_bzr(*args, **kwargs)
1363
1736
return out, err
1365
1738
def run_bzr_subprocess(self, *args, **kwargs):
1366
1739
"""Run bzr in a subprocess for testing.
1368
This starts a new Python interpreter and runs bzr in there.
1741
This starts a new Python interpreter and runs bzr in there.
1369
1742
This should only be used for tests that have a justifiable need for
1370
1743
this isolation: e.g. they are testing startup time, or signal
1371
handling, or early startup code, etc. Subprocess code can't be
1744
handling, or early startup code, etc. Subprocess code can't be
1372
1745
profiled or debugged so easily.
1374
:param retcode: The status code that is expected. Defaults to 0. If
1747
:keyword retcode: The status code that is expected. Defaults to 0. If
1375
1748
None is supplied, the status code is not checked.
1376
:param env_changes: A dictionary which lists changes to environment
1749
:keyword env_changes: A dictionary which lists changes to environment
1377
1750
variables. A value of None will unset the env variable.
1378
1751
The values must be strings. The change will only occur in the
1379
1752
child, so you don't need to fix the environment after running.
1380
:param universal_newlines: Convert CRLF => LF
1381
:param allow_plugins: By default the subprocess is run with
1753
:keyword universal_newlines: Convert CRLF => LF
1754
:keyword allow_plugins: By default the subprocess is run with
1382
1755
--no-plugins to ensure test reproducibility. Also, it is possible
1383
1756
for system-wide plugins to create unexpected output on stderr,
1384
1757
which can cause unnecessary test failures.
2112
2562
self.transport_readonly_server = HttpServer
2115
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2116
random_order=False):
2117
"""Create a test suite by filtering another one.
2565
def condition_id_re(pattern):
2566
"""Create a condition filter which performs a re check on a test's id.
2568
:param pattern: A regular expression string.
2569
:return: A callable that returns True if the re matches.
2571
filter_re = osutils.re_compile_checked(pattern, 0,
2573
def condition(test):
2575
return filter_re.search(test_id)
2579
def condition_isinstance(klass_or_klass_list):
2580
"""Create a condition filter which returns isinstance(param, klass).
2582
:return: A callable which when called with one parameter obj return the
2583
result of isinstance(obj, klass_or_klass_list).
2586
return isinstance(obj, klass_or_klass_list)
2590
def condition_id_in_list(id_list):
2591
"""Create a condition filter which verify that test's id in a list.
2593
:param id_list: A TestIdList object.
2594
:return: A callable that returns True if the test's id appears in the list.
2596
def condition(test):
2597
return id_list.includes(test.id())
2601
def condition_id_startswith(starts):
2602
"""Create a condition filter verifying that test's id starts with a string.
2604
:param starts: A list of string.
2605
:return: A callable that returns True if the test's id starts with one of
2608
def condition(test):
2609
for start in starts:
2610
if test.id().startswith(start):
2616
def exclude_tests_by_condition(suite, condition):
2617
"""Create a test suite which excludes some tests from suite.
2619
:param suite: The suite to get tests from.
2620
:param condition: A callable whose result evaluates True when called with a
2621
test case which should be excluded from the result.
2622
:return: A suite which contains the tests found in suite that fail
2626
for test in iter_suite_tests(suite):
2627
if not condition(test):
2629
return TestUtil.TestSuite(result)
2632
def filter_suite_by_condition(suite, condition):
2633
"""Create a test suite by filtering another one.
2635
:param suite: The source suite.
2636
:param condition: A callable whose result evaluates True when called with a
2637
test case which should be included in the result.
2638
:return: A suite which contains the tests found in suite that pass
2642
for test in iter_suite_tests(suite):
2645
return TestUtil.TestSuite(result)
2648
def filter_suite_by_re(suite, pattern):
2649
"""Create a test suite by filtering another one.
2119
2651
:param suite: the source suite
2120
2652
:param pattern: pattern that names must match
2121
:param exclude_pattern: pattern that names must not match, if any
2122
:param random_order: if True, tests in the new suite will be put in
2124
:returns: the newly created suite
2126
return sort_suite_by_re(suite, pattern, exclude_pattern,
2127
random_order, False)
2130
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2131
random_order=False, append_rest=True):
2132
"""Create a test suite by sorting another one.
2134
:param suite: the source suite
2135
:param pattern: pattern that names must match in order to go
2136
first in the new suite
2137
:param exclude_pattern: pattern that names must not match, if any
2138
:param random_order: if True, tests in the new suite will be put in
2140
:param append_rest: if False, pattern is a strict filter and not
2141
just an ordering directive
2142
:returns: the newly created suite
2146
filter_re = re.compile(pattern)
2147
if exclude_pattern is not None:
2148
exclude_re = re.compile(exclude_pattern)
2653
:returns: the newly created suite
2655
condition = condition_id_re(pattern)
2656
result_suite = filter_suite_by_condition(suite, condition)
2660
def filter_suite_by_id_list(suite, test_id_list):
2661
"""Create a test suite by filtering another one.
2663
:param suite: The source suite.
2664
:param test_id_list: A list of the test ids to keep as strings.
2665
:returns: the newly created suite
2667
condition = condition_id_in_list(test_id_list)
2668
result_suite = filter_suite_by_condition(suite, condition)
2672
def filter_suite_by_id_startswith(suite, start):
2673
"""Create a test suite by filtering another one.
2675
:param suite: The source suite.
2676
:param start: A list of string the test id must start with one of.
2677
:returns: the newly created suite
2679
condition = condition_id_startswith(start)
2680
result_suite = filter_suite_by_condition(suite, condition)
2684
def exclude_tests_by_re(suite, pattern):
2685
"""Create a test suite which excludes some tests from suite.
2687
:param suite: The suite to get tests from.
2688
:param pattern: A regular expression string. Test ids that match this
2689
pattern will be excluded from the result.
2690
:return: A TestSuite that contains all the tests from suite without the
2691
tests that matched pattern. The order of tests is the same as it was in
2694
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2697
def preserve_input(something):
2698
"""A helper for performing test suite transformation chains.
2700
:param something: Anything you want to preserve.
2706
def randomize_suite(suite):
2707
"""Return a new TestSuite with suite's tests in random order.
2709
The tests in the input suite are flattened into a single suite in order to
2710
accomplish this. Any nested TestSuites are removed to provide global
2713
tests = list(iter_suite_tests(suite))
2714
random.shuffle(tests)
2715
return TestUtil.TestSuite(tests)
2718
def split_suite_by_condition(suite, condition):
2719
"""Split a test suite into two by a condition.
2721
:param suite: The suite to split.
2722
:param condition: The condition to match on. Tests that match this
2723
condition are returned in the first test suite, ones that do not match
2724
are in the second suite.
2725
:return: A tuple of two test suites, where the first contains tests from
2726
suite matching the condition, and the second contains the remainder
2727
from suite. The order within each output suite is the same as it was in
2149
2732
for test in iter_suite_tests(suite):
2151
if exclude_pattern is None or not exclude_re.search(test_id):
2152
if filter_re.search(test_id):
2157
random.shuffle(first)
2158
random.shuffle(second)
2159
return TestUtil.TestSuite(first + second)
2734
matched.append(test)
2736
did_not_match.append(test)
2737
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2740
def split_suite_by_re(suite, pattern):
2741
"""Split a test suite into two by a regular expression.
2743
:param suite: The suite to split.
2744
:param pattern: A regular expression string. Test ids that match this
2745
pattern will be in the first test suite returned, and the others in the
2746
second test suite returned.
2747
:return: A tuple of two test suites, where the first contains tests from
2748
suite matching pattern, and the second contains the remainder from
2749
suite. The order within each output suite is the same as it was in
2752
return split_suite_by_condition(suite, condition_id_re(pattern))
2162
2755
def run_suite(suite, name='test', verbose=False, pattern=".*",
2163
stop_on_failure=False, keep_output=False,
2756
stop_on_failure=False,
2164
2757
transport=None, lsprof_timed=None, bench_history=None,
2165
2758
matching_tests_first=None,
2167
2759
list_only=False,
2168
2760
random_seed=None,
2169
2761
exclude_pattern=None,
2171
use_numbered_dirs = bool(numbered_dirs)
2764
suite_decorators=None,
2766
"""Run a test suite for bzr selftest.
2768
:param runner_class: The class of runner to use. Must support the
2769
constructor arguments passed by run_suite which are more than standard
2771
:return: A boolean indicating success.
2173
2773
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2174
if numbered_dirs is not None:
2175
TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
2180
runner = TextTestRunner(stream=sys.stdout,
2778
if runner_class is None:
2779
runner_class = TextTestRunner
2782
runner = runner_class(stream=stream,
2181
2783
descriptions=0,
2182
2784
verbosity=verbosity,
2183
keep_output=keep_output,
2184
2785
bench_history=bench_history,
2185
use_numbered_dirs=use_numbered_dirs,
2186
2786
list_only=list_only,
2188
2789
runner.stop_on_failure=stop_on_failure
2189
# Initialise the random number generator and display the seed used.
2190
# We convert the seed to a long to make it reuseable across invocations.
2191
random_order = False
2192
if random_seed is not None:
2194
if random_seed == "now":
2195
random_seed = long(time.time())
2790
# built in decorator factories:
2792
random_order(random_seed, runner),
2793
exclude_tests(exclude_pattern),
2795
if matching_tests_first:
2796
decorators.append(tests_first(pattern))
2798
decorators.append(filter_tests(pattern))
2799
if suite_decorators:
2800
decorators.extend(suite_decorators)
2801
# tell the result object how many tests will be running: (except if
2802
# --parallel=fork is being used. Robert said he will provide a better
2803
# progress design later -- vila 20090817)
2804
if fork_decorator not in decorators:
2805
decorators.append(CountingDecorator)
2806
for decorator in decorators:
2807
suite = decorator(suite)
2808
result = runner.run(suite)
2813
return result.wasStrictlySuccessful()
2815
return result.wasSuccessful()
2818
# A registry where get() returns a suite decorator.
2819
parallel_registry = registry.Registry()
2822
def fork_decorator(suite):
2823
concurrency = osutils.local_concurrency()
2824
if concurrency == 1:
2826
from testtools import ConcurrentTestSuite
2827
return ConcurrentTestSuite(suite, fork_for_tests)
2828
parallel_registry.register('fork', fork_decorator)
2831
def subprocess_decorator(suite):
2832
concurrency = osutils.local_concurrency()
2833
if concurrency == 1:
2835
from testtools import ConcurrentTestSuite
2836
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2837
parallel_registry.register('subprocess', subprocess_decorator)
2840
def exclude_tests(exclude_pattern):
2841
"""Return a test suite decorator that excludes tests."""
2842
if exclude_pattern is None:
2843
return identity_decorator
2844
def decorator(suite):
2845
return ExcludeDecorator(suite, exclude_pattern)
2849
def filter_tests(pattern):
2851
return identity_decorator
2852
def decorator(suite):
2853
return FilterTestsDecorator(suite, pattern)
2857
def random_order(random_seed, runner):
2858
"""Return a test suite decorator factory for randomising tests order.
2860
:param random_seed: now, a string which casts to a long, or a long.
2861
:param runner: A test runner with a stream attribute to report on.
2863
if random_seed is None:
2864
return identity_decorator
2865
def decorator(suite):
2866
return RandomDecorator(suite, random_seed, runner.stream)
2870
def tests_first(pattern):
2872
return identity_decorator
2873
def decorator(suite):
2874
return TestFirstDecorator(suite, pattern)
2878
def identity_decorator(suite):
2883
class TestDecorator(TestSuite):
2884
"""A decorator for TestCase/TestSuite objects.
2886
Usually, subclasses should override __iter__(used when flattening test
2887
suites), which we do to filter, reorder, parallelise and so on, run() and
2891
def __init__(self, suite):
2892
TestSuite.__init__(self)
2895
def countTestCases(self):
2898
cases += test.countTestCases()
2905
def run(self, result):
2906
# Use iteration on self, not self._tests, to allow subclasses to hook
2909
if result.shouldStop:
2915
class CountingDecorator(TestDecorator):
2916
"""A decorator which calls result.progress(self.countTestCases)."""
2918
def run(self, result):
2919
progress_method = getattr(result, 'progress', None)
2920
if callable(progress_method):
2921
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
2922
return super(CountingDecorator, self).run(result)
2925
class ExcludeDecorator(TestDecorator):
2926
"""A decorator which excludes test matching an exclude pattern."""
2928
def __init__(self, suite, exclude_pattern):
2929
TestDecorator.__init__(self, suite)
2930
self.exclude_pattern = exclude_pattern
2931
self.excluded = False
2935
return iter(self._tests)
2936
self.excluded = True
2937
suite = exclude_tests_by_re(self, self.exclude_pattern)
2939
self.addTests(suite)
2940
return iter(self._tests)
2943
class FilterTestsDecorator(TestDecorator):
2944
"""A decorator which filters tests to those matching a pattern."""
2946
def __init__(self, suite, pattern):
2947
TestDecorator.__init__(self, suite)
2948
self.pattern = pattern
2949
self.filtered = False
2953
return iter(self._tests)
2954
self.filtered = True
2955
suite = filter_suite_by_re(self, self.pattern)
2957
self.addTests(suite)
2958
return iter(self._tests)
2961
class RandomDecorator(TestDecorator):
2962
"""A decorator which randomises the order of its tests."""
2964
def __init__(self, suite, random_seed, stream):
2965
TestDecorator.__init__(self, suite)
2966
self.random_seed = random_seed
2967
self.randomised = False
2968
self.stream = stream
2972
return iter(self._tests)
2973
self.randomised = True
2974
self.stream.writeln("Randomizing test order using seed %s\n" %
2975
(self.actual_seed()))
2976
# Initialise the random number generator.
2977
random.seed(self.actual_seed())
2978
suite = randomize_suite(self)
2980
self.addTests(suite)
2981
return iter(self._tests)
2983
def actual_seed(self):
2984
if self.random_seed == "now":
2985
# We convert the seed to a long to make it reuseable across
2986
# invocations (because the user can reenter it).
2987
self.random_seed = long(time.time())
2197
2989
# Convert the seed to a long if we can
2199
random_seed = long(random_seed)
2991
self.random_seed = long(self.random_seed)
2202
runner.stream.writeln("Randomizing test order using seed %s\n" %
2204
random.seed(random_seed)
2205
# Customise the list of tests if requested
2206
if pattern != '.*' or exclude_pattern is not None or random_order:
2207
if matching_tests_first:
2208
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2211
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2213
result = runner.run(suite)
2214
return result.wasSuccessful()
2994
return self.random_seed
2997
class TestFirstDecorator(TestDecorator):
2998
"""A decorator which moves named tests to the front."""
3000
def __init__(self, suite, pattern):
3001
TestDecorator.__init__(self, suite)
3002
self.pattern = pattern
3003
self.filtered = False
3007
return iter(self._tests)
3008
self.filtered = True
3009
suites = split_suite_by_re(self, self.pattern)
3011
self.addTests(suites)
3012
return iter(self._tests)
3015
def partition_tests(suite, count):
3016
"""Partition suite into count lists of tests."""
3018
tests = list(iter_suite_tests(suite))
3019
tests_per_process = int(math.ceil(float(len(tests)) / count))
3020
for block in range(count):
3021
low_test = block * tests_per_process
3022
high_test = low_test + tests_per_process
3023
process_tests = tests[low_test:high_test]
3024
result.append(process_tests)
3028
def fork_for_tests(suite):
3029
"""Take suite and start up one runner per CPU by forking()
3031
:return: An iterable of TestCase-like objects which can each have
3032
run(result) called on them to feed tests to result.
3034
concurrency = osutils.local_concurrency()
3036
from subunit import TestProtocolClient, ProtocolTestCase
3038
from subunit.test_results import AutoTimingTestResultDecorator
3040
AutoTimingTestResultDecorator = lambda x:x
3041
class TestInOtherProcess(ProtocolTestCase):
3042
# Should be in subunit, I think. RBC.
3043
def __init__(self, stream, pid):
3044
ProtocolTestCase.__init__(self, stream)
3047
def run(self, result):
3049
ProtocolTestCase.run(self, result)
3051
os.waitpid(self.pid, os.WNOHANG)
3053
test_blocks = partition_tests(suite, concurrency)
3054
for process_tests in test_blocks:
3055
process_suite = TestSuite()
3056
process_suite.addTests(process_tests)
3057
c2pread, c2pwrite = os.pipe()
3062
# Leave stderr and stdout open so we can see test noise
3063
# Close stdin so that the child goes away if it decides to
3064
# read from stdin (otherwise its a roulette to see what
3065
# child actually gets keystrokes for pdb etc).
3068
stream = os.fdopen(c2pwrite, 'wb', 1)
3069
subunit_result = AutoTimingTestResultDecorator(
3070
TestProtocolClient(stream))
3071
process_suite.run(subunit_result)
3076
stream = os.fdopen(c2pread, 'rb', 1)
3077
test = TestInOtherProcess(stream, pid)
3082
def reinvoke_for_tests(suite):
3083
"""Take suite and start up one runner per CPU using subprocess().
3085
:return: An iterable of TestCase-like objects which can each have
3086
run(result) called on them to feed tests to result.
3088
concurrency = osutils.local_concurrency()
3090
from subunit import ProtocolTestCase
3091
class TestInSubprocess(ProtocolTestCase):
3092
def __init__(self, process, name):
3093
ProtocolTestCase.__init__(self, process.stdout)
3094
self.process = process
3095
self.process.stdin.close()
3098
def run(self, result):
3100
ProtocolTestCase.run(self, result)
3103
os.unlink(self.name)
3104
# print "pid %d finished" % finished_process
3105
test_blocks = partition_tests(suite, concurrency)
3106
for process_tests in test_blocks:
3107
# ugly; currently reimplement rather than reuses TestCase methods.
3108
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3109
if not os.path.isfile(bzr_path):
3110
# We are probably installed. Assume sys.argv is the right file
3111
bzr_path = sys.argv[0]
3112
fd, test_list_file_name = tempfile.mkstemp()
3113
test_list_file = os.fdopen(fd, 'wb', 1)
3114
for test in process_tests:
3115
test_list_file.write(test.id() + '\n')
3116
test_list_file.close()
3118
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3120
if '--no-plugins' in sys.argv:
3121
argv.append('--no-plugins')
3122
# stderr=STDOUT would be ideal, but until we prevent noise on
3123
# stderr it can interrupt the subunit protocol.
3124
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3126
test = TestInSubprocess(process, test_list_file_name)
3129
os.unlink(test_list_file_name)
3134
class BZRTransformingResult(unittest.TestResult):
3136
def __init__(self, target):
3137
unittest.TestResult.__init__(self)
3138
self.result = target
3140
def startTest(self, test):
3141
self.result.startTest(test)
3143
def stopTest(self, test):
3144
self.result.stopTest(test)
3146
def addError(self, test, err):
3147
feature = self._error_looks_like('UnavailableFeature: ', err)
3148
if feature is not None:
3149
self.result.addNotSupported(test, feature)
3151
self.result.addError(test, err)
3153
def addFailure(self, test, err):
3154
known = self._error_looks_like('KnownFailure: ', err)
3155
if known is not None:
3156
self.result._addKnownFailure(test, [KnownFailure,
3157
KnownFailure(known), None])
3159
self.result.addFailure(test, err)
3161
def addSkip(self, test, reason):
3162
self.result.addSkip(test, reason)
3164
def addSuccess(self, test):
3165
self.result.addSuccess(test)
3167
def _error_looks_like(self, prefix, err):
3168
"""Deserialize exception and returns the stringify value."""
3172
if isinstance(exc, subunit.RemoteException):
3173
# stringify the exception gives access to the remote traceback
3174
# We search the last line for 'prefix'
3175
lines = str(exc).split('\n')
3176
while lines and not lines[-1]:
3179
if lines[-1].startswith(prefix):
3180
value = lines[-1][len(prefix):]
3184
# Controlled by "bzr selftest -E=..." option
3185
# Currently supported:
3186
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3187
# preserves any flags supplied at the command line.
3188
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3189
# rather than failing tests. And no longer raise
3190
# LockContention when fctnl locks are not being used
3191
# with proper exclusion rules.
3192
selftest_debug_flags = set()
2217
3195
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2219
3196
transport=None,
2220
3197
test_suite_factory=None,
2221
3198
lsprof_timed=None,
2222
3199
bench_history=None,
2223
3200
matching_tests_first=None,
2225
3201
list_only=False,
2226
3202
random_seed=None,
2227
exclude_pattern=None):
3203
exclude_pattern=None,
3209
suite_decorators=None,
2228
3212
"""Run the whole test suite under the enhanced runner"""
2229
3213
# XXX: Very ugly way to do this...
2230
3214
# Disable warning about old formats because we don't want it to disturb
2237
3221
transport = default_transport
2238
3222
old_transport = default_transport
2239
3223
default_transport = transport
3224
global selftest_debug_flags
3225
old_debug_flags = selftest_debug_flags
3226
if debug_flags is not None:
3227
selftest_debug_flags = set(debug_flags)
3229
if load_list is None:
3232
keep_only = load_test_id_list(load_list)
3234
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3235
for start in starting_with]
2241
3236
if test_suite_factory is None:
2242
suite = test_suite()
3237
# Reduce loading time by loading modules based on the starting_with
3239
suite = test_suite(keep_only, starting_with)
2244
3241
suite = test_suite_factory()
3243
# But always filter as requested.
3244
suite = filter_suite_by_id_startswith(suite, starting_with)
2245
3245
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2246
stop_on_failure=stop_on_failure, keep_output=keep_output,
3246
stop_on_failure=stop_on_failure,
2247
3247
transport=transport,
2248
3248
lsprof_timed=lsprof_timed,
2249
3249
bench_history=bench_history,
2250
3250
matching_tests_first=matching_tests_first,
2251
numbered_dirs=numbered_dirs,
2252
3251
list_only=list_only,
2253
3252
random_seed=random_seed,
2254
exclude_pattern=exclude_pattern)
3253
exclude_pattern=exclude_pattern,
3255
runner_class=runner_class,
3256
suite_decorators=suite_decorators,
2256
3260
default_transport = old_transport
3261
selftest_debug_flags = old_debug_flags
3264
def load_test_id_list(file_name):
3265
"""Load a test id list from a text file.
3267
The format is one test id by line. No special care is taken to impose
3268
strict rules, these test ids are used to filter the test suite so a test id
3269
that do not match an existing test will do no harm. This allows user to add
3270
comments, leave blank lines, etc.
3274
ftest = open(file_name, 'rt')
3276
if e.errno != errno.ENOENT:
3279
raise errors.NoSuchFile(file_name)
3281
for test_name in ftest.readlines():
3282
test_list.append(test_name.strip())
3287
def suite_matches_id_list(test_suite, id_list):
3288
"""Warns about tests not appearing or appearing more than once.
3290
:param test_suite: A TestSuite object.
3291
:param test_id_list: The list of test ids that should be found in
3294
:return: (absents, duplicates) absents is a list containing the test found
3295
in id_list but not in test_suite, duplicates is a list containing the
3296
test found multiple times in test_suite.
3298
When using a prefined test id list, it may occurs that some tests do not
3299
exist anymore or that some tests use the same id. This function warns the
3300
tester about potential problems in his workflow (test lists are volatile)
3301
or in the test suite itself (using the same id for several tests does not
3302
help to localize defects).
3304
# Build a dict counting id occurrences
3306
for test in iter_suite_tests(test_suite):
3308
tests[id] = tests.get(id, 0) + 1
3313
occurs = tests.get(id, 0)
3315
not_found.append(id)
3317
duplicates.append(id)
3319
return not_found, duplicates
3322
class TestIdList(object):
3323
"""Test id list to filter a test suite.
3325
Relying on the assumption that test ids are built as:
3326
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3327
notation, this class offers methods to :
3328
- avoid building a test suite for modules not refered to in the test list,
3329
- keep only the tests listed from the module test suite.
3332
def __init__(self, test_id_list):
3333
# When a test suite needs to be filtered against us we compare test ids
3334
# for equality, so a simple dict offers a quick and simple solution.
3335
self.tests = dict().fromkeys(test_id_list, True)
3337
# While unittest.TestCase have ids like:
3338
# <module>.<class>.<method>[(<param+)],
3339
# doctest.DocTestCase can have ids like:
3342
# <module>.<function>
3343
# <module>.<class>.<method>
3345
# Since we can't predict a test class from its name only, we settle on
3346
# a simple constraint: a test id always begins with its module name.
3349
for test_id in test_id_list:
3350
parts = test_id.split('.')
3351
mod_name = parts.pop(0)
3352
modules[mod_name] = True
3354
mod_name += '.' + part
3355
modules[mod_name] = True
3356
self.modules = modules
3358
def refers_to(self, module_name):
3359
"""Is there tests for the module or one of its sub modules."""
3360
return self.modules.has_key(module_name)
3362
def includes(self, test_id):
3363
return self.tests.has_key(test_id)
3366
class TestPrefixAliasRegistry(registry.Registry):
3367
"""A registry for test prefix aliases.
3369
This helps implement shorcuts for the --starting-with selftest
3370
option. Overriding existing prefixes is not allowed but not fatal (a
3371
warning will be emitted).
3374
def register(self, key, obj, help=None, info=None,
3375
override_existing=False):
3376
"""See Registry.register.
3378
Trying to override an existing alias causes a warning to be emitted,
3379
not a fatal execption.
3382
super(TestPrefixAliasRegistry, self).register(
3383
key, obj, help=help, info=info, override_existing=False)
3385
actual = self.get(key)
3386
note('Test prefix alias %s is already used for %s, ignoring %s'
3387
% (key, actual, obj))
3389
def resolve_alias(self, id_start):
3390
"""Replace the alias by the prefix in the given string.
3392
Using an unknown prefix is an error to help catching typos.
3394
parts = id_start.split('.')
3396
parts[0] = self.get(parts[0])
3398
raise errors.BzrCommandError(
3399
'%s is not a known test prefix alias' % parts[0])
3400
return '.'.join(parts)
3403
test_prefix_alias_registry = TestPrefixAliasRegistry()
3404
"""Registry of test prefix aliases."""
3407
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3408
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3409
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3411
# Obvious higest levels prefixes, feel free to add your own via a plugin
3412
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3413
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3414
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3415
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3416
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3419
def test_suite(keep_only=None, starting_with=None):
2260
3420
"""Build and return TestSuite for the whole of bzrlib.
3422
:param keep_only: A list of test ids limiting the suite returned.
3424
:param starting_with: An id limiting the suite returned to the tests
2262
3427
This function can be replaced if you need to change the default test
2263
3428
suite on a global basis, but it is not encouraged.
2265
3430
testmod_names = [
3432
'bzrlib.tests.blackbox',
3433
'bzrlib.tests.commands',
3434
'bzrlib.tests.per_branch',
3435
'bzrlib.tests.per_bzrdir',
3436
'bzrlib.tests.per_interrepository',
3437
'bzrlib.tests.per_intertree',
3438
'bzrlib.tests.per_inventory',
3439
'bzrlib.tests.per_interbranch',
3440
'bzrlib.tests.per_lock',
3441
'bzrlib.tests.per_transport',
3442
'bzrlib.tests.per_tree',
3443
'bzrlib.tests.per_pack_repository',
3444
'bzrlib.tests.per_repository',
3445
'bzrlib.tests.per_repository_chk',
3446
'bzrlib.tests.per_repository_reference',
3447
'bzrlib.tests.per_versionedfile',
3448
'bzrlib.tests.per_workingtree',
3449
'bzrlib.tests.test__annotator',
3450
'bzrlib.tests.test__chk_map',
3451
'bzrlib.tests.test__dirstate_helpers',
3452
'bzrlib.tests.test__groupcompress',
3453
'bzrlib.tests.test__known_graph',
3454
'bzrlib.tests.test__rio',
3455
'bzrlib.tests.test__walkdirs_win32',
2266
3456
'bzrlib.tests.test_ancestry',
2267
3457
'bzrlib.tests.test_annotate',
2268
3458
'bzrlib.tests.test_api',
2269
3459
'bzrlib.tests.test_atomicfile',
2270
3460
'bzrlib.tests.test_bad_files',
3461
'bzrlib.tests.test_bencode',
3462
'bzrlib.tests.test_bisect_multi',
2271
3463
'bzrlib.tests.test_branch',
2272
3464
'bzrlib.tests.test_branchbuilder',
3465
'bzrlib.tests.test_btree_index',
2273
3466
'bzrlib.tests.test_bugtracker',
2274
3467
'bzrlib.tests.test_bundle',
2275
3468
'bzrlib.tests.test_bzrdir',
3469
'bzrlib.tests.test__chunks_to_lines',
2276
3470
'bzrlib.tests.test_cache_utf8',
3471
'bzrlib.tests.test_chk_map',
3472
'bzrlib.tests.test_chk_serializer',
3473
'bzrlib.tests.test_chunk_writer',
3474
'bzrlib.tests.test_clean_tree',
2277
3475
'bzrlib.tests.test_commands',
2278
3476
'bzrlib.tests.test_commit',
2279
3477
'bzrlib.tests.test_commit_merge',
2280
3478
'bzrlib.tests.test_config',
2281
3479
'bzrlib.tests.test_conflicts',
2282
3480
'bzrlib.tests.test_counted_lock',
3481
'bzrlib.tests.test_crash',
2283
3482
'bzrlib.tests.test_decorators',
2284
3483
'bzrlib.tests.test_delta',
3484
'bzrlib.tests.test_debug',
3485
'bzrlib.tests.test_deprecated_graph',
2285
3486
'bzrlib.tests.test_diff',
3487
'bzrlib.tests.test_directory_service',
2286
3488
'bzrlib.tests.test_dirstate',
3489
'bzrlib.tests.test_email_message',
3490
'bzrlib.tests.test_eol_filters',
2287
3491
'bzrlib.tests.test_errors',
2288
'bzrlib.tests.test_escaped_store',
3492
'bzrlib.tests.test_export',
2289
3493
'bzrlib.tests.test_extract',
2290
3494
'bzrlib.tests.test_fetch',
3495
'bzrlib.tests.test_fifo_cache',
3496
'bzrlib.tests.test_filters',
2291
3497
'bzrlib.tests.test_ftp_transport',
3498
'bzrlib.tests.test_foreign',
2292
3499
'bzrlib.tests.test_generate_docs',
2293
3500
'bzrlib.tests.test_generate_ids',
2294
3501
'bzrlib.tests.test_globbing',
2295
3502
'bzrlib.tests.test_gpg',
2296
3503
'bzrlib.tests.test_graph',
3504
'bzrlib.tests.test_groupcompress',
2297
3505
'bzrlib.tests.test_hashcache',
2298
3506
'bzrlib.tests.test_help',
3507
'bzrlib.tests.test_hooks',
2299
3508
'bzrlib.tests.test_http',
2300
3509
'bzrlib.tests.test_http_response',
2301
3510
'bzrlib.tests.test_https_ca_bundle',
2302
3511
'bzrlib.tests.test_identitymap',
2303
3512
'bzrlib.tests.test_ignores',
3513
'bzrlib.tests.test_index',
3514
'bzrlib.tests.test_info',
2304
3515
'bzrlib.tests.test_inv',
3516
'bzrlib.tests.test_inventory_delta',
2305
3517
'bzrlib.tests.test_knit',
2306
3518
'bzrlib.tests.test_lazy_import',
2307
3519
'bzrlib.tests.test_lazy_regex',
3520
'bzrlib.tests.test_lock',
3521
'bzrlib.tests.test_lockable_files',
2308
3522
'bzrlib.tests.test_lockdir',
2309
'bzrlib.tests.test_lockable_files',
2310
3523
'bzrlib.tests.test_log',
3524
'bzrlib.tests.test_lru_cache',
3525
'bzrlib.tests.test_lsprof',
3526
'bzrlib.tests.test_mail_client',
2311
3527
'bzrlib.tests.test_memorytree',
2312
3528
'bzrlib.tests.test_merge',
2313
3529
'bzrlib.tests.test_merge3',
2356
3585
'bzrlib.tests.test_transactions',
2357
3586
'bzrlib.tests.test_transform',
2358
3587
'bzrlib.tests.test_transport',
3588
'bzrlib.tests.test_transport_log',
2359
3589
'bzrlib.tests.test_tree',
2360
3590
'bzrlib.tests.test_treebuilder',
2361
3591
'bzrlib.tests.test_tsort',
2362
3592
'bzrlib.tests.test_tuned_gzip',
2363
3593
'bzrlib.tests.test_ui',
3594
'bzrlib.tests.test_uncommit',
2364
3595
'bzrlib.tests.test_upgrade',
3596
'bzrlib.tests.test_upgrade_stacked',
2365
3597
'bzrlib.tests.test_urlutils',
2366
'bzrlib.tests.test_versionedfile',
2367
3598
'bzrlib.tests.test_version',
2368
3599
'bzrlib.tests.test_version_info',
2369
3600
'bzrlib.tests.test_weave',
2370
3601
'bzrlib.tests.test_whitebox',
3602
'bzrlib.tests.test_win32utils',
2371
3603
'bzrlib.tests.test_workingtree',
2372
3604
'bzrlib.tests.test_workingtree_4',
2373
3605
'bzrlib.tests.test_wsgi',
2374
3606
'bzrlib.tests.test_xml',
2376
test_transport_implementations = [
2377
'bzrlib.tests.test_transport_implementations',
2378
'bzrlib.tests.test_read_bundle',
2380
suite = TestUtil.TestSuite()
2381
3609
loader = TestUtil.TestLoader()
3611
if keep_only is not None:
3612
id_filter = TestIdList(keep_only)
3614
# We take precedence over keep_only because *at loading time* using
3615
# both options means we will load less tests for the same final result.
3616
def interesting_module(name):
3617
for start in starting_with:
3619
# Either the module name starts with the specified string
3620
name.startswith(start)
3621
# or it may contain tests starting with the specified string
3622
or start.startswith(name)
3626
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3628
elif keep_only is not None:
3629
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3630
def interesting_module(name):
3631
return id_filter.refers_to(name)
3634
loader = TestUtil.TestLoader()
3635
def interesting_module(name):
3636
# No filtering, all modules are interesting
3639
suite = loader.suiteClass()
3641
# modules building their suite with loadTestsFromModuleNames
2382
3642
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2383
from bzrlib.transport import TransportTestProviderAdapter
2384
adapter = TransportTestProviderAdapter()
2385
adapt_modules(test_transport_implementations, adapter, loader, suite)
2386
for package in packages_to_test():
2387
suite.addTest(package.test_suite())
2388
for m in MODULES_TO_TEST:
2389
suite.addTest(loader.loadTestsFromModule(m))
2390
for m in MODULES_TO_DOCTEST:
3644
modules_to_doctest = [
3646
'bzrlib.branchbuilder',
3649
'bzrlib.iterablefile',
3653
'bzrlib.symbol_versioning',
3656
'bzrlib.version_info_formats.format_custom',
3659
for mod in modules_to_doctest:
3660
if not interesting_module(mod):
3661
# No tests to keep here, move along
2392
suite.addTest(doctest.DocTestSuite(m))
3664
# note that this really does mean "report only" -- doctest
3665
# still runs the rest of the examples
3666
doc_suite = doctest.DocTestSuite(mod,
3667
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
2393
3668
except ValueError, e:
2394
print '**failed to get doctest for: %s\n%s' %(m,e)
3669
print '**failed to get doctest for: %s\n%s' % (mod, e)
2396
for name, plugin in bzrlib.plugin.all_plugins().items():
2397
if getattr(plugin, 'test_suite', None) is not None:
2398
default_encoding = sys.getdefaultencoding()
2400
plugin_suite = plugin.test_suite()
2401
except ImportError, e:
2402
bzrlib.trace.warning(
2403
'Unable to test plugin "%s": %s', name, e)
2405
suite.addTest(plugin_suite)
2406
if default_encoding != sys.getdefaultencoding():
2407
bzrlib.trace.warning(
2408
'Plugin "%s" tried to reset default encoding to: %s', name,
2409
sys.getdefaultencoding())
2411
sys.setdefaultencoding(default_encoding)
3671
if len(doc_suite._tests) == 0:
3672
raise errors.BzrError("no doctests found in %s" % (mod,))
3673
suite.addTest(doc_suite)
3675
default_encoding = sys.getdefaultencoding()
3676
for name, plugin in bzrlib.plugin.plugins().items():
3677
if not interesting_module(plugin.module.__name__):
3679
plugin_suite = plugin.test_suite()
3680
# We used to catch ImportError here and turn it into just a warning,
3681
# but really if you don't have --no-plugins this should be a failure.
3682
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3683
if plugin_suite is None:
3684
plugin_suite = plugin.load_plugin_tests(loader)
3685
if plugin_suite is not None:
3686
suite.addTest(plugin_suite)
3687
if default_encoding != sys.getdefaultencoding():
3688
bzrlib.trace.warning(
3689
'Plugin "%s" tried to reset default encoding to: %s', name,
3690
sys.getdefaultencoding())
3692
sys.setdefaultencoding(default_encoding)
3694
if keep_only is not None:
3695
# Now that the referred modules have loaded their tests, keep only the
3697
suite = filter_suite_by_id_list(suite, id_filter)
3698
# Do some sanity checks on the id_list filtering
3699
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3701
# The tester has used both keep_only and starting_with, so he is
3702
# already aware that some tests are excluded from the list, there
3703
# is no need to tell him which.
3706
# Some tests mentioned in the list are not in the test suite. The
3707
# list may be out of date, report to the tester.
3708
for id in not_found:
3709
bzrlib.trace.warning('"%s" not found in the test suite', id)
3710
for id in duplicates:
3711
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
2415
def adapt_modules(mods_list, adapter, loader, suite):
2416
"""Adapt the modules in mods_list using adapter and add to suite."""
2417
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2418
suite.addTests(adapter.adapt(test))
3716
def multiply_scenarios(scenarios_left, scenarios_right):
3717
"""Multiply two sets of scenarios.
3719
:returns: the cartesian product of the two sets of scenarios, that is
3720
a scenario for every possible combination of a left scenario and a
3724
('%s,%s' % (left_name, right_name),
3725
dict(left_dict.items() + right_dict.items()))
3726
for left_name, left_dict in scenarios_left
3727
for right_name, right_dict in scenarios_right]
3730
def multiply_tests(tests, scenarios, result):
3731
"""Multiply tests_list by scenarios into result.
3733
This is the core workhorse for test parameterisation.
3735
Typically the load_tests() method for a per-implementation test suite will
3736
call multiply_tests and return the result.
3738
:param tests: The tests to parameterise.
3739
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3740
scenario_param_dict).
3741
:param result: A TestSuite to add created tests to.
3743
This returns the passed in result TestSuite with the cross product of all
3744
the tests repeated once for each scenario. Each test is adapted by adding
3745
the scenario name at the end of its id(), and updating the test object's
3746
__dict__ with the scenario_param_dict.
3748
>>> import bzrlib.tests.test_sampler
3749
>>> r = multiply_tests(
3750
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3751
... [('one', dict(param=1)),
3752
... ('two', dict(param=2))],
3754
>>> tests = list(iter_suite_tests(r))
3758
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3764
for test in iter_suite_tests(tests):
3765
apply_scenarios(test, scenarios, result)
3769
def apply_scenarios(test, scenarios, result):
3770
"""Apply the scenarios in scenarios to test and add to result.
3772
:param test: The test to apply scenarios to.
3773
:param scenarios: An iterable of scenarios to apply to test.
3775
:seealso: apply_scenario
3777
for scenario in scenarios:
3778
result.addTest(apply_scenario(test, scenario))
3782
def apply_scenario(test, scenario):
3783
"""Copy test and apply scenario to it.
3785
:param test: A test to adapt.
3786
:param scenario: A tuple describing the scenarion.
3787
The first element of the tuple is the new test id.
3788
The second element is a dict containing attributes to set on the
3790
:return: The adapted test.
3792
new_id = "%s(%s)" % (test.id(), scenario[0])
3793
new_test = clone_test(test, new_id)
3794
for name, value in scenario[1].items():
3795
setattr(new_test, name, value)
3799
def clone_test(test, new_id):
3800
"""Clone a test giving it a new id.
3802
:param test: The test to clone.
3803
:param new_id: The id to assign to it.
3804
:return: The new test.
3806
from copy import deepcopy
3807
new_test = deepcopy(test)
3808
new_test.id = lambda: new_id
2421
3812
def _rmtree_temp_dir(dirname):
3813
# If LANG=C we probably have created some bogus paths
3814
# which rmtree(unicode) will fail to delete
3815
# so make sure we are using rmtree(str) to delete everything
3816
# except on win32, where rmtree(str) will fail
3817
# since it doesn't have the property of byte-stream paths
3818
# (they are either ascii or mbcs)
3819
if sys.platform == 'win32':
3820
# make sure we are using the unicode win32 api
3821
dirname = unicode(dirname)
3823
dirname = dirname.encode(sys.getfilesystemencoding())
2423
3825
osutils.rmtree(dirname)
2424
3826
except OSError, e:
2425
if sys.platform == 'win32' and e.errno == errno.EACCES:
2426
print >>sys.stderr, ('Permission denied: '
2427
'unable to remove testing dir '
2428
'%s' % os.path.basename(dirname))
2433
def clean_selftest_output(root=None, quiet=False):
2434
"""Remove all selftest output directories from root directory.
2436
:param root: root directory for clean
2437
(if ommitted or None then clean current directory).
2438
:param quiet: suppress report about deleting directories
2441
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2444
for i in os.listdir(root):
2445
if os.path.isdir(i) and re_dir.match(i):
2447
print 'delete directory:', i
3827
# We don't want to fail here because some useful display will be lost
3828
# otherwise. Polluting the tmp dir is bad, but not giving all the
3829
# possible info to the test runner is even worse.
3830
sys.stderr.write('Unable to remove testing dir %s\n%s'
3831
% (os.path.basename(dirname), e))
2451
3834
class Feature(object):
2474
3857
if getattr(self, 'feature_name', None):
2475
3858
return self.feature_name()
2476
3859
return self.__class__.__name__
3862
class _SymlinkFeature(Feature):
3865
return osutils.has_symlinks()
3867
def feature_name(self):
3870
SymlinkFeature = _SymlinkFeature()
3873
class _HardlinkFeature(Feature):
3876
return osutils.has_hardlinks()
3878
def feature_name(self):
3881
HardlinkFeature = _HardlinkFeature()
3884
class _OsFifoFeature(Feature):
3887
return getattr(os, 'mkfifo', None)
3889
def feature_name(self):
3890
return 'filesystem fifos'
3892
OsFifoFeature = _OsFifoFeature()
3895
class _UnicodeFilenameFeature(Feature):
3896
"""Does the filesystem support Unicode filenames?"""
3900
# Check for character combinations unlikely to be covered by any
3901
# single non-unicode encoding. We use the characters
3902
# - greek small letter alpha (U+03B1) and
3903
# - braille pattern dots-123456 (U+283F).
3904
os.stat(u'\u03b1\u283f')
3905
except UnicodeEncodeError:
3907
except (IOError, OSError):
3908
# The filesystem allows the Unicode filename but the file doesn't
3912
# The filesystem allows the Unicode filename and the file exists,
3916
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3919
def probe_unicode_in_user_encoding():
3920
"""Try to encode several unicode strings to use in unicode-aware tests.
3921
Return first successfull match.
3923
:return: (unicode value, encoded plain string value) or (None, None)
3925
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3926
for uni_val in possible_vals:
3928
str_val = uni_val.encode(osutils.get_user_encoding())
3929
except UnicodeEncodeError:
3930
# Try a different character
3933
return uni_val, str_val
3937
def probe_bad_non_ascii(encoding):
3938
"""Try to find [bad] character with code [128..255]
3939
that cannot be decoded to unicode in some encoding.
3940
Return None if all non-ascii characters is valid
3943
for i in xrange(128, 256):
3946
char.decode(encoding)
3947
except UnicodeDecodeError:
3952
class _HTTPSServerFeature(Feature):
3953
"""Some tests want an https Server, check if one is available.
3955
Right now, the only way this is available is under python2.6 which provides
3966
def feature_name(self):
3967
return 'HTTPSServer'
3970
HTTPSServerFeature = _HTTPSServerFeature()
3973
class _UnicodeFilename(Feature):
3974
"""Does the filesystem support Unicode filenames?"""
3979
except UnicodeEncodeError:
3981
except (IOError, OSError):
3982
# The filesystem allows the Unicode filename but the file doesn't
3986
# The filesystem allows the Unicode filename and the file exists,
3990
UnicodeFilename = _UnicodeFilename()
3993
class _UTF8Filesystem(Feature):
3994
"""Is the filesystem UTF-8?"""
3997
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4001
UTF8Filesystem = _UTF8Filesystem()
4004
class _CaseInsCasePresFilenameFeature(Feature):
4005
"""Is the file-system case insensitive, but case-preserving?"""
4008
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4010
# first check truly case-preserving for created files, then check
4011
# case insensitive when opening existing files.
4012
name = osutils.normpath(name)
4013
base, rel = osutils.split(name)
4014
found_rel = osutils.canonical_relpath(base, name)
4015
return (found_rel == rel
4016
and os.path.isfile(name.upper())
4017
and os.path.isfile(name.lower()))
4022
def feature_name(self):
4023
return "case-insensitive case-preserving filesystem"
4025
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4028
class _CaseInsensitiveFilesystemFeature(Feature):
4029
"""Check if underlying filesystem is case-insensitive but *not* case
4032
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4033
# more likely to be case preserving, so this case is rare.
4036
if CaseInsCasePresFilenameFeature.available():
4039
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4040
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4041
TestCaseWithMemoryTransport.TEST_ROOT = root
4043
root = TestCaseWithMemoryTransport.TEST_ROOT
4044
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4046
name_a = osutils.pathjoin(tdir, 'a')
4047
name_A = osutils.pathjoin(tdir, 'A')
4049
result = osutils.isdir(name_A)
4050
_rmtree_temp_dir(tdir)
4053
def feature_name(self):
4054
return 'case-insensitive filesystem'
4056
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4059
class _SubUnitFeature(Feature):
4060
"""Check if subunit is available."""
4069
def feature_name(self):
4072
SubUnitFeature = _SubUnitFeature()
4073
# Only define SubUnitBzrRunner if subunit is available.
4075
from subunit import TestProtocolClient
4077
from subunit.test_results import AutoTimingTestResultDecorator
4079
AutoTimingTestResultDecorator = lambda x:x
4080
class SubUnitBzrRunner(TextTestRunner):
4081
def run(self, test):
4082
result = AutoTimingTestResultDecorator(
4083
TestProtocolClient(self.stream))