167
163
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
168
164
self._bench_history = bench_history
169
self.ui = bzrlib.ui.ui_factory
170
self.num_tests = num_tests
165
self.ui = ui.ui_factory
171
167
self.error_count = 0
172
168
self.failure_count = 0
169
self.known_failure_count = 0
173
170
self.skip_count = 0
171
self.not_applicable_count = 0
172
self.unsupported = {}
175
174
self._overall_start_time = time.time()
177
def extractBenchmarkTime(self, testCase):
175
self._strict = strict
178
# nb: called stopTestRun in the version of this that Python merged
179
# upstream, according to lifeless 20090803
181
ok = self.wasStrictlySuccessful()
183
ok = self.wasSuccessful()
185
self.stream.write('tests passed\n')
187
self.stream.write('tests failed\n')
188
if TestCase._first_thread_leaker_id:
190
'%s is leaking threads among %d leaking tests.\n' % (
191
TestCase._first_thread_leaker_id,
192
TestCase._leaking_threads_tests))
194
def _extractBenchmarkTime(self, testCase):
178
195
"""Add a benchmark time for the current test case."""
179
self._benchmarkTime = getattr(testCase, "_benchtime", None)
196
return getattr(testCase, "_benchtime", None)
181
198
def _elapsedTestTimeString(self):
182
199
"""Return a time string for the overall time the current test has taken."""
183
200
return self._formatTime(time.time() - self._start_time)
185
def _testTimeString(self):
186
if self._benchmarkTime is not None:
188
self._formatTime(self._benchmarkTime),
189
self._elapsedTestTimeString())
202
def _testTimeString(self, testCase):
203
benchmark_time = self._extractBenchmarkTime(testCase)
204
if benchmark_time is not None:
205
return self._formatTime(benchmark_time) + "*"
191
return " %s" % self._elapsedTestTimeString()
207
return self._elapsedTestTimeString()
193
209
def _formatTime(self, seconds):
194
210
"""Format seconds as milliseconds with leading spaces."""
195
return "%5dms" % (1000 * seconds)
211
# some benchmarks can take thousands of seconds to run, so we need 8
213
return "%8dms" % (1000 * seconds)
197
215
def _shortened_test_description(self, test):
199
what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
217
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
202
220
def startTest(self, test):
203
221
unittest.TestResult.startTest(self, test)
204
224
self.report_test_start(test)
225
test.number = self.count
205
226
self._recordTestStartTime()
228
def startTests(self):
230
if getattr(sys, 'frozen', None) is None:
231
bzr_path = osutils.realpath(sys.argv[0])
233
bzr_path = sys.executable
235
'testing: %s\n' % (bzr_path,))
238
bzrlib.__path__[0],))
240
' bzr-%s python-%s %s\n' % (
241
bzrlib.version_string,
242
bzrlib._format_version_tuple(sys.version_info),
243
platform.platform(aliased=1),
245
self.stream.write('\n')
207
247
def _recordTestStartTime(self):
208
248
"""Record that a test has started."""
209
249
self._start_time = time.time()
251
def _cleanupLogFile(self, test):
252
# We can only do this if we have one of our TestCases, not if
254
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
255
if setKeepLogfile is not None:
211
258
def addError(self, test, err):
212
if isinstance(err[1], TestSkipped):
213
return self.addSkipped(test, err)
214
unittest.TestResult.addError(self, test, err)
215
# We can only do this if we have one of our TestCases, not if
217
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
218
if setKeepLogfile is not None:
220
self.extractBenchmarkTime(test)
221
self.report_error(test, err)
259
"""Tell result that test finished with an error.
261
Called from the TestCase run() method when the test
262
fails with an unexpected error.
264
self._testConcluded(test)
265
if isinstance(err[1], TestNotApplicable):
266
return self._addNotApplicable(test, err)
267
elif isinstance(err[1], UnavailableFeature):
268
return self.addNotSupported(test, err[1].args[0])
270
unittest.TestResult.addError(self, test, err)
271
self.error_count += 1
272
self.report_error(test, err)
275
self._cleanupLogFile(test)
225
277
def addFailure(self, test, err):
226
unittest.TestResult.addFailure(self, test, err)
227
# We can only do this if we have one of our TestCases, not if
229
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
230
if setKeepLogfile is not None:
232
self.extractBenchmarkTime(test)
233
self.report_failure(test, err)
278
"""Tell result that test failed.
280
Called from the TestCase run() method when the test
281
fails because e.g. an assert() method failed.
283
self._testConcluded(test)
284
if isinstance(err[1], KnownFailure):
285
return self._addKnownFailure(test, err)
287
unittest.TestResult.addFailure(self, test, err)
288
self.failure_count += 1
289
self.report_failure(test, err)
292
self._cleanupLogFile(test)
237
294
def addSuccess(self, test):
238
self.extractBenchmarkTime(test)
295
"""Tell result that test completed successfully.
297
Called from the TestCase run()
299
self._testConcluded(test)
239
300
if self._bench_history is not None:
240
if self._benchmarkTime is not None:
301
benchmark_time = self._extractBenchmarkTime(test)
302
if benchmark_time is not None:
241
303
self._bench_history.write("%s %s\n" % (
242
self._formatTime(self._benchmarkTime),
304
self._formatTime(benchmark_time),
244
306
self.report_success(test)
307
self._cleanupLogFile(test)
245
308
unittest.TestResult.addSuccess(self, test)
247
def addSkipped(self, test, skip_excinfo):
248
self.extractBenchmarkTime(test)
249
self.report_skip(test, skip_excinfo)
250
# seems best to treat this as success from point-of-view of unittest
251
# -- it actually does nothing so it barely matters :)
309
test._log_contents = ''
311
def _testConcluded(self, test):
312
"""Common code when a test has finished.
314
Called regardless of whether it succeded, failed, etc.
318
def _addKnownFailure(self, test, err):
319
self.known_failure_count += 1
320
self.report_known_failure(test, err)
322
def addNotSupported(self, test, feature):
323
"""The test will not be run because of a missing feature.
325
# this can be called in two different ways: it may be that the
326
# test started running, and then raised (through addError)
327
# UnavailableFeature. Alternatively this method can be called
328
# while probing for features before running the tests; in that
329
# case we will see startTest and stopTest, but the test will never
331
self.unsupported.setdefault(str(feature), 0)
332
self.unsupported[str(feature)] += 1
333
self.report_unsupported(test, feature)
335
def addSkip(self, test, reason):
336
"""A test has not run for 'reason'."""
338
self.report_skip(test, reason)
340
def _addNotApplicable(self, test, skip_excinfo):
341
if isinstance(skip_excinfo[1], TestNotApplicable):
342
self.not_applicable_count += 1
343
self.report_not_applicable(test, skip_excinfo)
254
346
except KeyboardInterrupt:
257
self.addError(test, test.__exc_info())
349
self.addError(test, test.exc_info())
351
# seems best to treat this as success from point-of-view of unittest
352
# -- it actually does nothing so it barely matters :)
259
353
unittest.TestResult.addSuccess(self, test)
354
test._log_contents = ''
261
356
def printErrorList(self, flavour, errors):
262
357
for test, err in errors:
263
358
self.stream.writeln(self.separator1)
264
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
359
self.stream.write("%s: " % flavour)
360
self.stream.writeln(self.getDescription(test))
265
361
if getattr(test, '_get_log', None) is not None:
267
print >>self.stream, \
268
('vvvv[log from %s]' % test.id()).ljust(78,'-')
269
print >>self.stream, test._get_log()
270
print >>self.stream, \
271
('^^^^[log from %s]' % test.id()).ljust(78,'-')
362
log_contents = test._get_log()
364
self.stream.write('\n')
366
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
367
self.stream.write('\n')
368
self.stream.write(log_contents)
369
self.stream.write('\n')
371
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
372
self.stream.write('\n')
272
373
self.stream.writeln(self.separator2)
273
374
self.stream.writeln("%s" % err)
376
def progress(self, offset, whence):
377
"""The test is adjusting the count of tests to run."""
378
if whence == SUBUNIT_SEEK_SET:
379
self.num_tests = offset
380
elif whence == SUBUNIT_SEEK_CUR:
381
self.num_tests += offset
383
raise errors.BzrError("Unknown whence %r" % whence)
275
385
def finished(self):
1584
2552
for readonly urls.
1586
2554
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1587
be used without needed to redo it when a different
2555
be used without needed to redo it when a different
1588
2556
subclass is in use ?
1591
2559
def setUp(self):
1592
2560
super(ChrootedTestCase, self).setUp()
1593
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1594
self.transport_readonly_server = bzrlib.transport.http.HttpServer
2561
if not self.vfs_transport_factory == MemoryServer:
2562
self.transport_readonly_server = HttpServer
2565
def condition_id_re(pattern):
2566
"""Create a condition filter which performs a re check on a test's id.
2568
:param pattern: A regular expression string.
2569
:return: A callable that returns True if the re matches.
2571
filter_re = osutils.re_compile_checked(pattern, 0,
2573
def condition(test):
2575
return filter_re.search(test_id)
2579
def condition_isinstance(klass_or_klass_list):
2580
"""Create a condition filter which returns isinstance(param, klass).
2582
:return: A callable which when called with one parameter obj return the
2583
result of isinstance(obj, klass_or_klass_list).
2586
return isinstance(obj, klass_or_klass_list)
2590
def condition_id_in_list(id_list):
2591
"""Create a condition filter which verify that test's id in a list.
2593
:param id_list: A TestIdList object.
2594
:return: A callable that returns True if the test's id appears in the list.
2596
def condition(test):
2597
return id_list.includes(test.id())
2601
def condition_id_startswith(starts):
2602
"""Create a condition filter verifying that test's id starts with a string.
2604
:param starts: A list of string.
2605
:return: A callable that returns True if the test's id starts with one of
2608
def condition(test):
2609
for start in starts:
2610
if test.id().startswith(start):
2616
def exclude_tests_by_condition(suite, condition):
2617
"""Create a test suite which excludes some tests from suite.
2619
:param suite: The suite to get tests from.
2620
:param condition: A callable whose result evaluates True when called with a
2621
test case which should be excluded from the result.
2622
:return: A suite which contains the tests found in suite that fail
2626
for test in iter_suite_tests(suite):
2627
if not condition(test):
2629
return TestUtil.TestSuite(result)
2632
def filter_suite_by_condition(suite, condition):
2633
"""Create a test suite by filtering another one.
2635
:param suite: The source suite.
2636
:param condition: A callable whose result evaluates True when called with a
2637
test case which should be included in the result.
2638
:return: A suite which contains the tests found in suite that pass
2642
for test in iter_suite_tests(suite):
2645
return TestUtil.TestSuite(result)
1597
2648
def filter_suite_by_re(suite, pattern):
1598
result = TestUtil.TestSuite()
1599
filter_re = re.compile(pattern)
2649
"""Create a test suite by filtering another one.
2651
:param suite: the source suite
2652
:param pattern: pattern that names must match
2653
:returns: the newly created suite
2655
condition = condition_id_re(pattern)
2656
result_suite = filter_suite_by_condition(suite, condition)
2660
def filter_suite_by_id_list(suite, test_id_list):
2661
"""Create a test suite by filtering another one.
2663
:param suite: The source suite.
2664
:param test_id_list: A list of the test ids to keep as strings.
2665
:returns: the newly created suite
2667
condition = condition_id_in_list(test_id_list)
2668
result_suite = filter_suite_by_condition(suite, condition)
2672
def filter_suite_by_id_startswith(suite, start):
2673
"""Create a test suite by filtering another one.
2675
:param suite: The source suite.
2676
:param start: A list of string the test id must start with one of.
2677
:returns: the newly created suite
2679
condition = condition_id_startswith(start)
2680
result_suite = filter_suite_by_condition(suite, condition)
2684
def exclude_tests_by_re(suite, pattern):
2685
"""Create a test suite which excludes some tests from suite.
2687
:param suite: The suite to get tests from.
2688
:param pattern: A regular expression string. Test ids that match this
2689
pattern will be excluded from the result.
2690
:return: A TestSuite that contains all the tests from suite without the
2691
tests that matched pattern. The order of tests is the same as it was in
2694
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2697
def preserve_input(something):
2698
"""A helper for performing test suite transformation chains.
2700
:param something: Anything you want to preserve.
2706
def randomize_suite(suite):
2707
"""Return a new TestSuite with suite's tests in random order.
2709
The tests in the input suite are flattened into a single suite in order to
2710
accomplish this. Any nested TestSuites are removed to provide global
2713
tests = list(iter_suite_tests(suite))
2714
random.shuffle(tests)
2715
return TestUtil.TestSuite(tests)
2718
def split_suite_by_condition(suite, condition):
2719
"""Split a test suite into two by a condition.
2721
:param suite: The suite to split.
2722
:param condition: The condition to match on. Tests that match this
2723
condition are returned in the first test suite, ones that do not match
2724
are in the second suite.
2725
:return: A tuple of two test suites, where the first contains tests from
2726
suite matching the condition, and the second contains the remainder
2727
from suite. The order within each output suite is the same as it was in
1600
2732
for test in iter_suite_tests(suite):
1601
if filter_re.search(test.id()):
1602
result.addTest(test)
2734
matched.append(test)
2736
did_not_match.append(test)
2737
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2740
def split_suite_by_re(suite, pattern):
2741
"""Split a test suite into two by a regular expression.
2743
:param suite: The suite to split.
2744
:param pattern: A regular expression string. Test ids that match this
2745
pattern will be in the first test suite returned, and the others in the
2746
second test suite returned.
2747
:return: A tuple of two test suites, where the first contains tests from
2748
suite matching pattern, and the second contains the remainder from
2749
suite. The order within each output suite is the same as it was in
2752
return split_suite_by_condition(suite, condition_id_re(pattern))
1606
2755
def run_suite(suite, name='test', verbose=False, pattern=".*",
1607
stop_on_failure=False, keep_output=False,
1608
transport=None, lsprof_timed=None, bench_history=None):
2756
stop_on_failure=False,
2757
transport=None, lsprof_timed=None, bench_history=None,
2758
matching_tests_first=None,
2761
exclude_pattern=None,
2764
suite_decorators=None,
2766
"""Run a test suite for bzr selftest.
2768
:param runner_class: The class of runner to use. Must support the
2769
constructor arguments passed by run_suite which are more than standard
2771
:return: A boolean indicating success.
1609
2773
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1614
runner = TextTestRunner(stream=sys.stdout,
2778
if runner_class is None:
2779
runner_class = TextTestRunner
2782
runner = runner_class(stream=stream,
1615
2783
descriptions=0,
1616
2784
verbosity=verbosity,
1617
keep_output=keep_output,
1618
bench_history=bench_history)
2785
bench_history=bench_history,
2786
list_only=list_only,
1619
2789
runner.stop_on_failure=stop_on_failure
1621
suite = filter_suite_by_re(suite, pattern)
2790
# built in decorator factories:
2792
random_order(random_seed, runner),
2793
exclude_tests(exclude_pattern),
2795
if matching_tests_first:
2796
decorators.append(tests_first(pattern))
2798
decorators.append(filter_tests(pattern))
2799
if suite_decorators:
2800
decorators.extend(suite_decorators)
2801
# tell the result object how many tests will be running: (except if
2802
# --parallel=fork is being used. Robert said he will provide a better
2803
# progress design later -- vila 20090817)
2804
if fork_decorator not in decorators:
2805
decorators.append(CountingDecorator)
2806
for decorator in decorators:
2807
suite = decorator(suite)
1622
2808
result = runner.run(suite)
1623
return result.wasSuccessful()
2813
return result.wasStrictlySuccessful()
2815
return result.wasSuccessful()
2818
# A registry where get() returns a suite decorator.
2819
parallel_registry = registry.Registry()
2822
def fork_decorator(suite):
2823
concurrency = osutils.local_concurrency()
2824
if concurrency == 1:
2826
from testtools import ConcurrentTestSuite
2827
return ConcurrentTestSuite(suite, fork_for_tests)
2828
parallel_registry.register('fork', fork_decorator)
2831
def subprocess_decorator(suite):
2832
concurrency = osutils.local_concurrency()
2833
if concurrency == 1:
2835
from testtools import ConcurrentTestSuite
2836
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2837
parallel_registry.register('subprocess', subprocess_decorator)
2840
def exclude_tests(exclude_pattern):
2841
"""Return a test suite decorator that excludes tests."""
2842
if exclude_pattern is None:
2843
return identity_decorator
2844
def decorator(suite):
2845
return ExcludeDecorator(suite, exclude_pattern)
2849
def filter_tests(pattern):
2851
return identity_decorator
2852
def decorator(suite):
2853
return FilterTestsDecorator(suite, pattern)
2857
def random_order(random_seed, runner):
2858
"""Return a test suite decorator factory for randomising tests order.
2860
:param random_seed: now, a string which casts to a long, or a long.
2861
:param runner: A test runner with a stream attribute to report on.
2863
if random_seed is None:
2864
return identity_decorator
2865
def decorator(suite):
2866
return RandomDecorator(suite, random_seed, runner.stream)
2870
def tests_first(pattern):
2872
return identity_decorator
2873
def decorator(suite):
2874
return TestFirstDecorator(suite, pattern)
2878
def identity_decorator(suite):
2883
class TestDecorator(TestSuite):
2884
"""A decorator for TestCase/TestSuite objects.
2886
Usually, subclasses should override __iter__(used when flattening test
2887
suites), which we do to filter, reorder, parallelise and so on, run() and
2891
def __init__(self, suite):
2892
TestSuite.__init__(self)
2895
def countTestCases(self):
2898
cases += test.countTestCases()
2905
def run(self, result):
2906
# Use iteration on self, not self._tests, to allow subclasses to hook
2909
if result.shouldStop:
2915
class CountingDecorator(TestDecorator):
2916
"""A decorator which calls result.progress(self.countTestCases)."""
2918
def run(self, result):
2919
progress_method = getattr(result, 'progress', None)
2920
if callable(progress_method):
2921
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
2922
return super(CountingDecorator, self).run(result)
2925
class ExcludeDecorator(TestDecorator):
2926
"""A decorator which excludes test matching an exclude pattern."""
2928
def __init__(self, suite, exclude_pattern):
2929
TestDecorator.__init__(self, suite)
2930
self.exclude_pattern = exclude_pattern
2931
self.excluded = False
2935
return iter(self._tests)
2936
self.excluded = True
2937
suite = exclude_tests_by_re(self, self.exclude_pattern)
2939
self.addTests(suite)
2940
return iter(self._tests)
2943
class FilterTestsDecorator(TestDecorator):
2944
"""A decorator which filters tests to those matching a pattern."""
2946
def __init__(self, suite, pattern):
2947
TestDecorator.__init__(self, suite)
2948
self.pattern = pattern
2949
self.filtered = False
2953
return iter(self._tests)
2954
self.filtered = True
2955
suite = filter_suite_by_re(self, self.pattern)
2957
self.addTests(suite)
2958
return iter(self._tests)
2961
class RandomDecorator(TestDecorator):
2962
"""A decorator which randomises the order of its tests."""
2964
def __init__(self, suite, random_seed, stream):
2965
TestDecorator.__init__(self, suite)
2966
self.random_seed = random_seed
2967
self.randomised = False
2968
self.stream = stream
2972
return iter(self._tests)
2973
self.randomised = True
2974
self.stream.writeln("Randomizing test order using seed %s\n" %
2975
(self.actual_seed()))
2976
# Initialise the random number generator.
2977
random.seed(self.actual_seed())
2978
suite = randomize_suite(self)
2980
self.addTests(suite)
2981
return iter(self._tests)
2983
def actual_seed(self):
2984
if self.random_seed == "now":
2985
# We convert the seed to a long to make it reuseable across
2986
# invocations (because the user can reenter it).
2987
self.random_seed = long(time.time())
2989
# Convert the seed to a long if we can
2991
self.random_seed = long(self.random_seed)
2994
return self.random_seed
2997
class TestFirstDecorator(TestDecorator):
2998
"""A decorator which moves named tests to the front."""
3000
def __init__(self, suite, pattern):
3001
TestDecorator.__init__(self, suite)
3002
self.pattern = pattern
3003
self.filtered = False
3007
return iter(self._tests)
3008
self.filtered = True
3009
suites = split_suite_by_re(self, self.pattern)
3011
self.addTests(suites)
3012
return iter(self._tests)
3015
def partition_tests(suite, count):
3016
"""Partition suite into count lists of tests."""
3018
tests = list(iter_suite_tests(suite))
3019
tests_per_process = int(math.ceil(float(len(tests)) / count))
3020
for block in range(count):
3021
low_test = block * tests_per_process
3022
high_test = low_test + tests_per_process
3023
process_tests = tests[low_test:high_test]
3024
result.append(process_tests)
3028
def fork_for_tests(suite):
3029
"""Take suite and start up one runner per CPU by forking()
3031
:return: An iterable of TestCase-like objects which can each have
3032
run(result) called on them to feed tests to result.
3034
concurrency = osutils.local_concurrency()
3036
from subunit import TestProtocolClient, ProtocolTestCase
3038
from subunit.test_results import AutoTimingTestResultDecorator
3040
AutoTimingTestResultDecorator = lambda x:x
3041
class TestInOtherProcess(ProtocolTestCase):
3042
# Should be in subunit, I think. RBC.
3043
def __init__(self, stream, pid):
3044
ProtocolTestCase.__init__(self, stream)
3047
def run(self, result):
3049
ProtocolTestCase.run(self, result)
3051
os.waitpid(self.pid, os.WNOHANG)
3053
test_blocks = partition_tests(suite, concurrency)
3054
for process_tests in test_blocks:
3055
process_suite = TestSuite()
3056
process_suite.addTests(process_tests)
3057
c2pread, c2pwrite = os.pipe()
3062
# Leave stderr and stdout open so we can see test noise
3063
# Close stdin so that the child goes away if it decides to
3064
# read from stdin (otherwise its a roulette to see what
3065
# child actually gets keystrokes for pdb etc).
3068
stream = os.fdopen(c2pwrite, 'wb', 1)
3069
subunit_result = AutoTimingTestResultDecorator(
3070
TestProtocolClient(stream))
3071
process_suite.run(subunit_result)
3076
stream = os.fdopen(c2pread, 'rb', 1)
3077
test = TestInOtherProcess(stream, pid)
3082
def reinvoke_for_tests(suite):
3083
"""Take suite and start up one runner per CPU using subprocess().
3085
:return: An iterable of TestCase-like objects which can each have
3086
run(result) called on them to feed tests to result.
3088
concurrency = osutils.local_concurrency()
3090
from subunit import ProtocolTestCase
3091
class TestInSubprocess(ProtocolTestCase):
3092
def __init__(self, process, name):
3093
ProtocolTestCase.__init__(self, process.stdout)
3094
self.process = process
3095
self.process.stdin.close()
3098
def run(self, result):
3100
ProtocolTestCase.run(self, result)
3103
os.unlink(self.name)
3104
# print "pid %d finished" % finished_process
3105
test_blocks = partition_tests(suite, concurrency)
3106
for process_tests in test_blocks:
3107
# ugly; currently reimplement rather than reuses TestCase methods.
3108
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3109
if not os.path.isfile(bzr_path):
3110
# We are probably installed. Assume sys.argv is the right file
3111
bzr_path = sys.argv[0]
3112
fd, test_list_file_name = tempfile.mkstemp()
3113
test_list_file = os.fdopen(fd, 'wb', 1)
3114
for test in process_tests:
3115
test_list_file.write(test.id() + '\n')
3116
test_list_file.close()
3118
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3120
if '--no-plugins' in sys.argv:
3121
argv.append('--no-plugins')
3122
# stderr=STDOUT would be ideal, but until we prevent noise on
3123
# stderr it can interrupt the subunit protocol.
3124
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3126
test = TestInSubprocess(process, test_list_file_name)
3129
os.unlink(test_list_file_name)
3134
class BZRTransformingResult(unittest.TestResult):
3136
def __init__(self, target):
3137
unittest.TestResult.__init__(self)
3138
self.result = target
3140
def startTest(self, test):
3141
self.result.startTest(test)
3143
def stopTest(self, test):
3144
self.result.stopTest(test)
3146
def addError(self, test, err):
3147
feature = self._error_looks_like('UnavailableFeature: ', err)
3148
if feature is not None:
3149
self.result.addNotSupported(test, feature)
3151
self.result.addError(test, err)
3153
def addFailure(self, test, err):
3154
known = self._error_looks_like('KnownFailure: ', err)
3155
if known is not None:
3156
self.result._addKnownFailure(test, [KnownFailure,
3157
KnownFailure(known), None])
3159
self.result.addFailure(test, err)
3161
def addSkip(self, test, reason):
3162
self.result.addSkip(test, reason)
3164
def addSuccess(self, test):
3165
self.result.addSuccess(test)
3167
def _error_looks_like(self, prefix, err):
3168
"""Deserialize exception and returns the stringify value."""
3172
if isinstance(exc, subunit.RemoteException):
3173
# stringify the exception gives access to the remote traceback
3174
# We search the last line for 'prefix'
3175
lines = str(exc).split('\n')
3176
while lines and not lines[-1]:
3179
if lines[-1].startswith(prefix):
3180
value = lines[-1][len(prefix):]
3184
# Controlled by "bzr selftest -E=..." option
3185
# Currently supported:
3186
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3187
# preserves any flags supplied at the command line.
3188
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3189
# rather than failing tests. And no longer raise
3190
# LockContention when fctnl locks are not being used
3191
# with proper exclusion rules.
3192
selftest_debug_flags = set()
1626
3195
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1628
3196
transport=None,
1629
3197
test_suite_factory=None,
1630
3198
lsprof_timed=None,
1631
bench_history=None):
3200
matching_tests_first=None,
3203
exclude_pattern=None,
3209
suite_decorators=None,
1632
3212
"""Run the whole test suite under the enhanced runner"""
1633
3213
# XXX: Very ugly way to do this...
1634
3214
# Disable warning about old formats because we don't want it to disturb
1641
3221
transport = default_transport
1642
3222
old_transport = default_transport
1643
3223
default_transport = transport
3224
global selftest_debug_flags
3225
old_debug_flags = selftest_debug_flags
3226
if debug_flags is not None:
3227
selftest_debug_flags = set(debug_flags)
3229
if load_list is None:
3232
keep_only = load_test_id_list(load_list)
3234
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3235
for start in starting_with]
1645
3236
if test_suite_factory is None:
1646
suite = test_suite()
3237
# Reduce loading time by loading modules based on the starting_with
3239
suite = test_suite(keep_only, starting_with)
1648
3241
suite = test_suite_factory()
3243
# But always filter as requested.
3244
suite = filter_suite_by_id_startswith(suite, starting_with)
1649
3245
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1650
stop_on_failure=stop_on_failure, keep_output=keep_output,
3246
stop_on_failure=stop_on_failure,
1651
3247
transport=transport,
1652
3248
lsprof_timed=lsprof_timed,
1653
bench_history=bench_history)
3249
bench_history=bench_history,
3250
matching_tests_first=matching_tests_first,
3251
list_only=list_only,
3252
random_seed=random_seed,
3253
exclude_pattern=exclude_pattern,
3255
runner_class=runner_class,
3256
suite_decorators=suite_decorators,
1655
3260
default_transport = old_transport
3261
selftest_debug_flags = old_debug_flags
3264
def load_test_id_list(file_name):
3265
"""Load a test id list from a text file.
3267
The format is one test id by line. No special care is taken to impose
3268
strict rules, these test ids are used to filter the test suite so a test id
3269
that do not match an existing test will do no harm. This allows user to add
3270
comments, leave blank lines, etc.
3274
ftest = open(file_name, 'rt')
3276
if e.errno != errno.ENOENT:
3279
raise errors.NoSuchFile(file_name)
3281
for test_name in ftest.readlines():
3282
test_list.append(test_name.strip())
3287
def suite_matches_id_list(test_suite, id_list):
3288
"""Warns about tests not appearing or appearing more than once.
3290
:param test_suite: A TestSuite object.
3291
:param test_id_list: The list of test ids that should be found in
3294
:return: (absents, duplicates) absents is a list containing the test found
3295
in id_list but not in test_suite, duplicates is a list containing the
3296
test found multiple times in test_suite.
3298
When using a prefined test id list, it may occurs that some tests do not
3299
exist anymore or that some tests use the same id. This function warns the
3300
tester about potential problems in his workflow (test lists are volatile)
3301
or in the test suite itself (using the same id for several tests does not
3302
help to localize defects).
3304
# Build a dict counting id occurrences
3306
for test in iter_suite_tests(test_suite):
3308
tests[id] = tests.get(id, 0) + 1
3313
occurs = tests.get(id, 0)
3315
not_found.append(id)
3317
duplicates.append(id)
3319
return not_found, duplicates
3322
class TestIdList(object):
3323
"""Test id list to filter a test suite.
3325
Relying on the assumption that test ids are built as:
3326
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3327
notation, this class offers methods to :
3328
- avoid building a test suite for modules not refered to in the test list,
3329
- keep only the tests listed from the module test suite.
3332
def __init__(self, test_id_list):
3333
# When a test suite needs to be filtered against us we compare test ids
3334
# for equality, so a simple dict offers a quick and simple solution.
3335
self.tests = dict().fromkeys(test_id_list, True)
3337
# While unittest.TestCase have ids like:
3338
# <module>.<class>.<method>[(<param+)],
3339
# doctest.DocTestCase can have ids like:
3342
# <module>.<function>
3343
# <module>.<class>.<method>
3345
# Since we can't predict a test class from its name only, we settle on
3346
# a simple constraint: a test id always begins with its module name.
3349
for test_id in test_id_list:
3350
parts = test_id.split('.')
3351
mod_name = parts.pop(0)
3352
modules[mod_name] = True
3354
mod_name += '.' + part
3355
modules[mod_name] = True
3356
self.modules = modules
3358
def refers_to(self, module_name):
3359
"""Is there tests for the module or one of its sub modules."""
3360
return self.modules.has_key(module_name)
3362
def includes(self, test_id):
3363
return self.tests.has_key(test_id)
3366
class TestPrefixAliasRegistry(registry.Registry):
3367
"""A registry for test prefix aliases.
3369
This helps implement shorcuts for the --starting-with selftest
3370
option. Overriding existing prefixes is not allowed but not fatal (a
3371
warning will be emitted).
3374
def register(self, key, obj, help=None, info=None,
3375
override_existing=False):
3376
"""See Registry.register.
3378
Trying to override an existing alias causes a warning to be emitted,
3379
not a fatal execption.
3382
super(TestPrefixAliasRegistry, self).register(
3383
key, obj, help=help, info=info, override_existing=False)
3385
actual = self.get(key)
3386
note('Test prefix alias %s is already used for %s, ignoring %s'
3387
% (key, actual, obj))
3389
def resolve_alias(self, id_start):
3390
"""Replace the alias by the prefix in the given string.
3392
Using an unknown prefix is an error to help catching typos.
3394
parts = id_start.split('.')
3396
parts[0] = self.get(parts[0])
3398
raise errors.BzrCommandError(
3399
'%s is not a known test prefix alias' % parts[0])
3400
return '.'.join(parts)
3403
test_prefix_alias_registry = TestPrefixAliasRegistry()
3404
"""Registry of test prefix aliases."""
3407
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3408
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3409
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3411
# Obvious higest levels prefixes, feel free to add your own via a plugin
3412
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3413
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3414
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3415
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3416
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3419
def test_suite(keep_only=None, starting_with=None):
1659
3420
"""Build and return TestSuite for the whole of bzrlib.
3422
:param keep_only: A list of test ids limiting the suite returned.
3424
:param starting_with: An id limiting the suite returned to the tests
1661
3427
This function can be replaced if you need to change the default test
1662
3428
suite on a global basis, but it is not encouraged.
1664
3430
testmod_names = [
3432
'bzrlib.tests.blackbox',
3433
'bzrlib.tests.commands',
3434
'bzrlib.tests.per_branch',
3435
'bzrlib.tests.per_bzrdir',
3436
'bzrlib.tests.per_interrepository',
3437
'bzrlib.tests.per_intertree',
3438
'bzrlib.tests.per_inventory',
3439
'bzrlib.tests.per_interbranch',
3440
'bzrlib.tests.per_lock',
3441
'bzrlib.tests.per_transport',
3442
'bzrlib.tests.per_tree',
3443
'bzrlib.tests.per_pack_repository',
3444
'bzrlib.tests.per_repository',
3445
'bzrlib.tests.per_repository_chk',
3446
'bzrlib.tests.per_repository_reference',
3447
'bzrlib.tests.per_versionedfile',
3448
'bzrlib.tests.per_workingtree',
3449
'bzrlib.tests.test__annotator',
3450
'bzrlib.tests.test__chk_map',
3451
'bzrlib.tests.test__dirstate_helpers',
3452
'bzrlib.tests.test__groupcompress',
3453
'bzrlib.tests.test__known_graph',
3454
'bzrlib.tests.test__rio',
3455
'bzrlib.tests.test__walkdirs_win32',
1665
3456
'bzrlib.tests.test_ancestry',
3457
'bzrlib.tests.test_annotate',
1666
3458
'bzrlib.tests.test_api',
1667
3459
'bzrlib.tests.test_atomicfile',
1668
3460
'bzrlib.tests.test_bad_files',
3461
'bzrlib.tests.test_bencode',
3462
'bzrlib.tests.test_bisect_multi',
1669
3463
'bzrlib.tests.test_branch',
3464
'bzrlib.tests.test_branchbuilder',
3465
'bzrlib.tests.test_btree_index',
3466
'bzrlib.tests.test_bugtracker',
1670
3467
'bzrlib.tests.test_bundle',
1671
3468
'bzrlib.tests.test_bzrdir',
3469
'bzrlib.tests.test__chunks_to_lines',
1672
3470
'bzrlib.tests.test_cache_utf8',
1673
'bzrlib.tests.test_command',
3471
'bzrlib.tests.test_chk_map',
3472
'bzrlib.tests.test_chk_serializer',
3473
'bzrlib.tests.test_chunk_writer',
3474
'bzrlib.tests.test_clean_tree',
3475
'bzrlib.tests.test_commands',
1674
3476
'bzrlib.tests.test_commit',
1675
3477
'bzrlib.tests.test_commit_merge',
1676
3478
'bzrlib.tests.test_config',
1677
3479
'bzrlib.tests.test_conflicts',
3480
'bzrlib.tests.test_counted_lock',
3481
'bzrlib.tests.test_crash',
1678
3482
'bzrlib.tests.test_decorators',
3483
'bzrlib.tests.test_delta',
3484
'bzrlib.tests.test_debug',
3485
'bzrlib.tests.test_deprecated_graph',
1679
3486
'bzrlib.tests.test_diff',
1680
'bzrlib.tests.test_doc_generate',
3487
'bzrlib.tests.test_directory_service',
3488
'bzrlib.tests.test_dirstate',
3489
'bzrlib.tests.test_email_message',
3490
'bzrlib.tests.test_eol_filters',
1681
3491
'bzrlib.tests.test_errors',
1682
'bzrlib.tests.test_escaped_store',
3492
'bzrlib.tests.test_export',
3493
'bzrlib.tests.test_extract',
1683
3494
'bzrlib.tests.test_fetch',
3495
'bzrlib.tests.test_fifo_cache',
3496
'bzrlib.tests.test_filters',
1684
3497
'bzrlib.tests.test_ftp_transport',
3498
'bzrlib.tests.test_foreign',
3499
'bzrlib.tests.test_generate_docs',
3500
'bzrlib.tests.test_generate_ids',
3501
'bzrlib.tests.test_globbing',
1685
3502
'bzrlib.tests.test_gpg',
1686
3503
'bzrlib.tests.test_graph',
3504
'bzrlib.tests.test_groupcompress',
1687
3505
'bzrlib.tests.test_hashcache',
3506
'bzrlib.tests.test_help',
3507
'bzrlib.tests.test_hooks',
1688
3508
'bzrlib.tests.test_http',
1689
3509
'bzrlib.tests.test_http_response',
3510
'bzrlib.tests.test_https_ca_bundle',
1690
3511
'bzrlib.tests.test_identitymap',
1691
3512
'bzrlib.tests.test_ignores',
3513
'bzrlib.tests.test_index',
3514
'bzrlib.tests.test_info',
1692
3515
'bzrlib.tests.test_inv',
3516
'bzrlib.tests.test_inventory_delta',
1693
3517
'bzrlib.tests.test_knit',
1694
3518
'bzrlib.tests.test_lazy_import',
1695
3519
'bzrlib.tests.test_lazy_regex',
3520
'bzrlib.tests.test_lock',
3521
'bzrlib.tests.test_lockable_files',
1696
3522
'bzrlib.tests.test_lockdir',
1697
'bzrlib.tests.test_lockable_files',
1698
3523
'bzrlib.tests.test_log',
3524
'bzrlib.tests.test_lru_cache',
3525
'bzrlib.tests.test_lsprof',
3526
'bzrlib.tests.test_mail_client',
1699
3527
'bzrlib.tests.test_memorytree',
1700
3528
'bzrlib.tests.test_merge',
1701
3529
'bzrlib.tests.test_merge3',
1702
3530
'bzrlib.tests.test_merge_core',
3531
'bzrlib.tests.test_merge_directive',
1703
3532
'bzrlib.tests.test_missing',
1704
3533
'bzrlib.tests.test_msgeditor',
3534
'bzrlib.tests.test_multiparent',
3535
'bzrlib.tests.test_mutabletree',
1705
3536
'bzrlib.tests.test_nonascii',
1706
3537
'bzrlib.tests.test_options',
1707
3538
'bzrlib.tests.test_osutils',
3539
'bzrlib.tests.test_osutils_encodings',
3540
'bzrlib.tests.test_pack',
1708
3541
'bzrlib.tests.test_patch',
1709
3542
'bzrlib.tests.test_patches',
1710
3543
'bzrlib.tests.test_permissions',
1711
3544
'bzrlib.tests.test_plugins',
1712
3545
'bzrlib.tests.test_progress',
3546
'bzrlib.tests.test_read_bundle',
1713
3547
'bzrlib.tests.test_reconcile',
3548
'bzrlib.tests.test_reconfigure',
1714
3549
'bzrlib.tests.test_registry',
3550
'bzrlib.tests.test_remote',
3551
'bzrlib.tests.test_rename_map',
1715
3552
'bzrlib.tests.test_repository',
1716
3553
'bzrlib.tests.test_revert',
1717
3554
'bzrlib.tests.test_revision',
1718
'bzrlib.tests.test_revisionnamespaces',
3555
'bzrlib.tests.test_revisionspec',
1719
3556
'bzrlib.tests.test_revisiontree',
1720
3557
'bzrlib.tests.test_rio',
3558
'bzrlib.tests.test_rules',
1721
3559
'bzrlib.tests.test_sampler',
1722
3560
'bzrlib.tests.test_selftest',
3561
'bzrlib.tests.test_serializer',
1723
3562
'bzrlib.tests.test_setup',
1724
3563
'bzrlib.tests.test_sftp_transport',
3564
'bzrlib.tests.test_shelf',
3565
'bzrlib.tests.test_shelf_ui',
3566
'bzrlib.tests.test_smart',
1725
3567
'bzrlib.tests.test_smart_add',
3568
'bzrlib.tests.test_smart_request',
1726
3569
'bzrlib.tests.test_smart_transport',
3570
'bzrlib.tests.test_smtp_connection',
1727
3571
'bzrlib.tests.test_source',
3572
'bzrlib.tests.test_ssh_transport',
1728
3573
'bzrlib.tests.test_status',
1729
3574
'bzrlib.tests.test_store',
3575
'bzrlib.tests.test_strace',
3576
'bzrlib.tests.test_subsume',
3577
'bzrlib.tests.test_switch',
1730
3578
'bzrlib.tests.test_symbol_versioning',
3579
'bzrlib.tests.test_tag',
1731
3580
'bzrlib.tests.test_testament',
1732
3581
'bzrlib.tests.test_textfile',
1733
3582
'bzrlib.tests.test_textmerge',
3583
'bzrlib.tests.test_timestamp',
1734
3584
'bzrlib.tests.test_trace',
1735
3585
'bzrlib.tests.test_transactions',
1736
3586
'bzrlib.tests.test_transform',
1737
3587
'bzrlib.tests.test_transport',
3588
'bzrlib.tests.test_transport_log',
1738
3589
'bzrlib.tests.test_tree',
1739
3590
'bzrlib.tests.test_treebuilder',
1740
3591
'bzrlib.tests.test_tsort',
1741
3592
'bzrlib.tests.test_tuned_gzip',
1742
3593
'bzrlib.tests.test_ui',
3594
'bzrlib.tests.test_uncommit',
1743
3595
'bzrlib.tests.test_upgrade',
3596
'bzrlib.tests.test_upgrade_stacked',
1744
3597
'bzrlib.tests.test_urlutils',
1745
'bzrlib.tests.test_versionedfile',
1746
3598
'bzrlib.tests.test_version',
1747
3599
'bzrlib.tests.test_version_info',
1748
3600
'bzrlib.tests.test_weave',
1749
3601
'bzrlib.tests.test_whitebox',
3602
'bzrlib.tests.test_win32utils',
1750
3603
'bzrlib.tests.test_workingtree',
3604
'bzrlib.tests.test_workingtree_4',
3605
'bzrlib.tests.test_wsgi',
1751
3606
'bzrlib.tests.test_xml',
1753
test_transport_implementations = [
1754
'bzrlib.tests.test_transport_implementations',
1755
'bzrlib.tests.test_read_bundle',
1757
suite = TestUtil.TestSuite()
1758
3609
loader = TestUtil.TestLoader()
3611
if keep_only is not None:
3612
id_filter = TestIdList(keep_only)
3614
# We take precedence over keep_only because *at loading time* using
3615
# both options means we will load less tests for the same final result.
3616
def interesting_module(name):
3617
for start in starting_with:
3619
# Either the module name starts with the specified string
3620
name.startswith(start)
3621
# or it may contain tests starting with the specified string
3622
or start.startswith(name)
3626
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3628
elif keep_only is not None:
3629
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3630
def interesting_module(name):
3631
return id_filter.refers_to(name)
3634
loader = TestUtil.TestLoader()
3635
def interesting_module(name):
3636
# No filtering, all modules are interesting
3639
suite = loader.suiteClass()
3641
# modules building their suite with loadTestsFromModuleNames
1759
3642
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1760
from bzrlib.transport import TransportTestProviderAdapter
1761
adapter = TransportTestProviderAdapter()
1762
adapt_modules(test_transport_implementations, adapter, loader, suite)
1763
for package in packages_to_test():
1764
suite.addTest(package.test_suite())
1765
for m in MODULES_TO_TEST:
1766
suite.addTest(loader.loadTestsFromModule(m))
1767
for m in MODULES_TO_DOCTEST:
3644
modules_to_doctest = [
3646
'bzrlib.branchbuilder',
3649
'bzrlib.iterablefile',
3653
'bzrlib.symbol_versioning',
3656
'bzrlib.version_info_formats.format_custom',
3659
for mod in modules_to_doctest:
3660
if not interesting_module(mod):
3661
# No tests to keep here, move along
1769
suite.addTest(doctest.DocTestSuite(m))
3664
# note that this really does mean "report only" -- doctest
3665
# still runs the rest of the examples
3666
doc_suite = doctest.DocTestSuite(mod,
3667
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
1770
3668
except ValueError, e:
1771
print '**failed to get doctest for: %s\n%s' %(m,e)
3669
print '**failed to get doctest for: %s\n%s' % (mod, e)
1773
for name, plugin in bzrlib.plugin.all_plugins().items():
1774
if getattr(plugin, 'test_suite', None) is not None:
1775
suite.addTest(plugin.test_suite())
3671
if len(doc_suite._tests) == 0:
3672
raise errors.BzrError("no doctests found in %s" % (mod,))
3673
suite.addTest(doc_suite)
3675
default_encoding = sys.getdefaultencoding()
3676
for name, plugin in bzrlib.plugin.plugins().items():
3677
if not interesting_module(plugin.module.__name__):
3679
plugin_suite = plugin.test_suite()
3680
# We used to catch ImportError here and turn it into just a warning,
3681
# but really if you don't have --no-plugins this should be a failure.
3682
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3683
if plugin_suite is None:
3684
plugin_suite = plugin.load_plugin_tests(loader)
3685
if plugin_suite is not None:
3686
suite.addTest(plugin_suite)
3687
if default_encoding != sys.getdefaultencoding():
3688
bzrlib.trace.warning(
3689
'Plugin "%s" tried to reset default encoding to: %s', name,
3690
sys.getdefaultencoding())
3692
sys.setdefaultencoding(default_encoding)
3694
if keep_only is not None:
3695
# Now that the referred modules have loaded their tests, keep only the
3697
suite = filter_suite_by_id_list(suite, id_filter)
3698
# Do some sanity checks on the id_list filtering
3699
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3701
# The tester has used both keep_only and starting_with, so he is
3702
# already aware that some tests are excluded from the list, there
3703
# is no need to tell him which.
3706
# Some tests mentioned in the list are not in the test suite. The
3707
# list may be out of date, report to the tester.
3708
for id in not_found:
3709
bzrlib.trace.warning('"%s" not found in the test suite', id)
3710
for id in duplicates:
3711
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
1779
def adapt_modules(mods_list, adapter, loader, suite):
1780
"""Adapt the modules in mods_list using adapter and add to suite."""
1781
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1782
suite.addTests(adapter.adapt(test))
3716
def multiply_scenarios(scenarios_left, scenarios_right):
3717
"""Multiply two sets of scenarios.
3719
:returns: the cartesian product of the two sets of scenarios, that is
3720
a scenario for every possible combination of a left scenario and a
3724
('%s,%s' % (left_name, right_name),
3725
dict(left_dict.items() + right_dict.items()))
3726
for left_name, left_dict in scenarios_left
3727
for right_name, right_dict in scenarios_right]
3730
def multiply_tests(tests, scenarios, result):
3731
"""Multiply tests_list by scenarios into result.
3733
This is the core workhorse for test parameterisation.
3735
Typically the load_tests() method for a per-implementation test suite will
3736
call multiply_tests and return the result.
3738
:param tests: The tests to parameterise.
3739
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3740
scenario_param_dict).
3741
:param result: A TestSuite to add created tests to.
3743
This returns the passed in result TestSuite with the cross product of all
3744
the tests repeated once for each scenario. Each test is adapted by adding
3745
the scenario name at the end of its id(), and updating the test object's
3746
__dict__ with the scenario_param_dict.
3748
>>> import bzrlib.tests.test_sampler
3749
>>> r = multiply_tests(
3750
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3751
... [('one', dict(param=1)),
3752
... ('two', dict(param=2))],
3754
>>> tests = list(iter_suite_tests(r))
3758
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3764
for test in iter_suite_tests(tests):
3765
apply_scenarios(test, scenarios, result)
3769
def apply_scenarios(test, scenarios, result):
3770
"""Apply the scenarios in scenarios to test and add to result.
3772
:param test: The test to apply scenarios to.
3773
:param scenarios: An iterable of scenarios to apply to test.
3775
:seealso: apply_scenario
3777
for scenario in scenarios:
3778
result.addTest(apply_scenario(test, scenario))
3782
def apply_scenario(test, scenario):
3783
"""Copy test and apply scenario to it.
3785
:param test: A test to adapt.
3786
:param scenario: A tuple describing the scenarion.
3787
The first element of the tuple is the new test id.
3788
The second element is a dict containing attributes to set on the
3790
:return: The adapted test.
3792
new_id = "%s(%s)" % (test.id(), scenario[0])
3793
new_test = clone_test(test, new_id)
3794
for name, value in scenario[1].items():
3795
setattr(new_test, name, value)
3799
def clone_test(test, new_id):
3800
"""Clone a test giving it a new id.
3802
:param test: The test to clone.
3803
:param new_id: The id to assign to it.
3804
:return: The new test.
3806
from copy import deepcopy
3807
new_test = deepcopy(test)
3808
new_test.id = lambda: new_id
3812
def _rmtree_temp_dir(dirname):
3813
# If LANG=C we probably have created some bogus paths
3814
# which rmtree(unicode) will fail to delete
3815
# so make sure we are using rmtree(str) to delete everything
3816
# except on win32, where rmtree(str) will fail
3817
# since it doesn't have the property of byte-stream paths
3818
# (they are either ascii or mbcs)
3819
if sys.platform == 'win32':
3820
# make sure we are using the unicode win32 api
3821
dirname = unicode(dirname)
3823
dirname = dirname.encode(sys.getfilesystemencoding())
3825
osutils.rmtree(dirname)
3827
# We don't want to fail here because some useful display will be lost
3828
# otherwise. Polluting the tmp dir is bad, but not giving all the
3829
# possible info to the test runner is even worse.
3830
sys.stderr.write('Unable to remove testing dir %s\n%s'
3831
% (os.path.basename(dirname), e))
3834
class Feature(object):
3835
"""An operating system Feature."""
3838
self._available = None
3840
def available(self):
3841
"""Is the feature available?
3843
:return: True if the feature is available.
3845
if self._available is None:
3846
self._available = self._probe()
3847
return self._available
3850
"""Implement this method in concrete features.
3852
:return: True if the feature is available.
3854
raise NotImplementedError
3857
if getattr(self, 'feature_name', None):
3858
return self.feature_name()
3859
return self.__class__.__name__
3862
class _SymlinkFeature(Feature):
3865
return osutils.has_symlinks()
3867
def feature_name(self):
3870
SymlinkFeature = _SymlinkFeature()
3873
class _HardlinkFeature(Feature):
3876
return osutils.has_hardlinks()
3878
def feature_name(self):
3881
HardlinkFeature = _HardlinkFeature()
3884
class _OsFifoFeature(Feature):
3887
return getattr(os, 'mkfifo', None)
3889
def feature_name(self):
3890
return 'filesystem fifos'
3892
OsFifoFeature = _OsFifoFeature()
3895
class _UnicodeFilenameFeature(Feature):
3896
"""Does the filesystem support Unicode filenames?"""
3900
# Check for character combinations unlikely to be covered by any
3901
# single non-unicode encoding. We use the characters
3902
# - greek small letter alpha (U+03B1) and
3903
# - braille pattern dots-123456 (U+283F).
3904
os.stat(u'\u03b1\u283f')
3905
except UnicodeEncodeError:
3907
except (IOError, OSError):
3908
# The filesystem allows the Unicode filename but the file doesn't
3912
# The filesystem allows the Unicode filename and the file exists,
3916
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3919
def probe_unicode_in_user_encoding():
3920
"""Try to encode several unicode strings to use in unicode-aware tests.
3921
Return first successfull match.
3923
:return: (unicode value, encoded plain string value) or (None, None)
3925
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3926
for uni_val in possible_vals:
3928
str_val = uni_val.encode(osutils.get_user_encoding())
3929
except UnicodeEncodeError:
3930
# Try a different character
3933
return uni_val, str_val
3937
def probe_bad_non_ascii(encoding):
3938
"""Try to find [bad] character with code [128..255]
3939
that cannot be decoded to unicode in some encoding.
3940
Return None if all non-ascii characters is valid
3943
for i in xrange(128, 256):
3946
char.decode(encoding)
3947
except UnicodeDecodeError:
3952
class _HTTPSServerFeature(Feature):
3953
"""Some tests want an https Server, check if one is available.
3955
Right now, the only way this is available is under python2.6 which provides
3966
def feature_name(self):
3967
return 'HTTPSServer'
3970
HTTPSServerFeature = _HTTPSServerFeature()
3973
class _UnicodeFilename(Feature):
3974
"""Does the filesystem support Unicode filenames?"""
3979
except UnicodeEncodeError:
3981
except (IOError, OSError):
3982
# The filesystem allows the Unicode filename but the file doesn't
3986
# The filesystem allows the Unicode filename and the file exists,
3990
UnicodeFilename = _UnicodeFilename()
3993
class _UTF8Filesystem(Feature):
3994
"""Is the filesystem UTF-8?"""
3997
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4001
UTF8Filesystem = _UTF8Filesystem()
4004
class _CaseInsCasePresFilenameFeature(Feature):
4005
"""Is the file-system case insensitive, but case-preserving?"""
4008
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4010
# first check truly case-preserving for created files, then check
4011
# case insensitive when opening existing files.
4012
name = osutils.normpath(name)
4013
base, rel = osutils.split(name)
4014
found_rel = osutils.canonical_relpath(base, name)
4015
return (found_rel == rel
4016
and os.path.isfile(name.upper())
4017
and os.path.isfile(name.lower()))
4022
def feature_name(self):
4023
return "case-insensitive case-preserving filesystem"
4025
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4028
class _CaseInsensitiveFilesystemFeature(Feature):
4029
"""Check if underlying filesystem is case-insensitive but *not* case
4032
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4033
# more likely to be case preserving, so this case is rare.
4036
if CaseInsCasePresFilenameFeature.available():
4039
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4040
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4041
TestCaseWithMemoryTransport.TEST_ROOT = root
4043
root = TestCaseWithMemoryTransport.TEST_ROOT
4044
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4046
name_a = osutils.pathjoin(tdir, 'a')
4047
name_A = osutils.pathjoin(tdir, 'A')
4049
result = osutils.isdir(name_A)
4050
_rmtree_temp_dir(tdir)
4053
def feature_name(self):
4054
return 'case-insensitive filesystem'
4056
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4059
class _SubUnitFeature(Feature):
4060
"""Check if subunit is available."""
4069
def feature_name(self):
4072
SubUnitFeature = _SubUnitFeature()
4073
# Only define SubUnitBzrRunner if subunit is available.
4075
from subunit import TestProtocolClient
4077
from subunit.test_results import AutoTimingTestResultDecorator
4079
AutoTimingTestResultDecorator = lambda x:x
4080
class SubUnitBzrRunner(TextTestRunner):
4081
def run(self, test):
4082
result = AutoTimingTestResultDecorator(
4083
TestProtocolClient(self.stream))