2639
2620
list_only=list_only,
2641
2622
runner.stop_on_failure=stop_on_failure
2642
# built in decorator factories:
2644
random_order(random_seed, runner),
2645
exclude_tests(exclude_pattern),
2647
if matching_tests_first:
2648
decorators.append(tests_first(pattern))
2623
# Initialise the random number generator and display the seed used.
2624
# We convert the seed to a long to make it reuseable across invocations.
2625
random_order = False
2626
if random_seed is not None:
2628
if random_seed == "now":
2629
random_seed = long(time.time())
2631
# Convert the seed to a long if we can
2633
random_seed = long(random_seed)
2636
runner.stream.writeln("Randomizing test order using seed %s\n" %
2638
random.seed(random_seed)
2639
# Customise the list of tests if requested
2640
if exclude_pattern is not None:
2641
suite = exclude_tests_by_re(suite, exclude_pattern)
2643
order_changer = randomize_suite
2650
decorators.append(filter_tests(pattern))
2651
if suite_decorators:
2652
decorators.extend(suite_decorators)
2653
for decorator in decorators:
2654
suite = decorator(suite)
2645
order_changer = preserve_input
2646
if pattern != '.*' or random_order:
2647
if matching_tests_first:
2648
suites = map(order_changer, split_suite_by_re(suite, pattern))
2649
suite = TestUtil.TestSuite(suites)
2651
suite = order_changer(filter_suite_by_re(suite, pattern))
2655
2653
result = runner.run(suite)
2657
2656
return result.wasStrictlySuccessful()
2659
return result.wasSuccessful()
2662
# A registry where get() returns a suite decorator.
2663
parallel_registry = registry.Registry()
2666
def fork_decorator(suite):
2667
concurrency = local_concurrency()
2668
if concurrency == 1:
2670
from testtools import ConcurrentTestSuite
2671
return ConcurrentTestSuite(suite, fork_for_tests)
2672
parallel_registry.register('fork', fork_decorator)
2675
def subprocess_decorator(suite):
2676
concurrency = local_concurrency()
2677
if concurrency == 1:
2679
from testtools import ConcurrentTestSuite
2680
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2681
parallel_registry.register('subprocess', subprocess_decorator)
2684
def exclude_tests(exclude_pattern):
2685
"""Return a test suite decorator that excludes tests."""
2686
if exclude_pattern is None:
2687
return identity_decorator
2688
def decorator(suite):
2689
return ExcludeDecorator(suite, exclude_pattern)
2693
def filter_tests(pattern):
2695
return identity_decorator
2696
def decorator(suite):
2697
return FilterTestsDecorator(suite, pattern)
2701
def random_order(random_seed, runner):
2702
"""Return a test suite decorator factory for randomising tests order.
2704
:param random_seed: now, a string which casts to a long, or a long.
2705
:param runner: A test runner with a stream attribute to report on.
2707
if random_seed is None:
2708
return identity_decorator
2709
def decorator(suite):
2710
return RandomDecorator(suite, random_seed, runner.stream)
2714
def tests_first(pattern):
2716
return identity_decorator
2717
def decorator(suite):
2718
return TestFirstDecorator(suite, pattern)
2722
def identity_decorator(suite):
2727
class TestDecorator(TestSuite):
2728
"""A decorator for TestCase/TestSuite objects.
2730
Usually, subclasses should override __iter__(used when flattening test
2731
suites), which we do to filter, reorder, parallelise and so on, run() and
2735
def __init__(self, suite):
2736
TestSuite.__init__(self)
2739
def countTestCases(self):
2742
cases += test.countTestCases()
2749
def run(self, result):
2750
# Use iteration on self, not self._tests, to allow subclasses to hook
2753
if result.shouldStop:
2759
class ExcludeDecorator(TestDecorator):
2760
"""A decorator which excludes test matching an exclude pattern."""
2762
def __init__(self, suite, exclude_pattern):
2763
TestDecorator.__init__(self, suite)
2764
self.exclude_pattern = exclude_pattern
2765
self.excluded = False
2769
return iter(self._tests)
2770
self.excluded = True
2771
suite = exclude_tests_by_re(self, self.exclude_pattern)
2773
self.addTests(suite)
2774
return iter(self._tests)
2777
class FilterTestsDecorator(TestDecorator):
2778
"""A decorator which filters tests to those matching a pattern."""
2780
def __init__(self, suite, pattern):
2781
TestDecorator.__init__(self, suite)
2782
self.pattern = pattern
2783
self.filtered = False
2787
return iter(self._tests)
2788
self.filtered = True
2789
suite = filter_suite_by_re(self, self.pattern)
2791
self.addTests(suite)
2792
return iter(self._tests)
2795
class RandomDecorator(TestDecorator):
2796
"""A decorator which randomises the order of its tests."""
2798
def __init__(self, suite, random_seed, stream):
2799
TestDecorator.__init__(self, suite)
2800
self.random_seed = random_seed
2801
self.randomised = False
2802
self.stream = stream
2806
return iter(self._tests)
2807
self.randomised = True
2808
self.stream.writeln("Randomizing test order using seed %s\n" %
2809
(self.actual_seed()))
2810
# Initialise the random number generator.
2811
random.seed(self.actual_seed())
2812
suite = randomize_suite(self)
2814
self.addTests(suite)
2815
return iter(self._tests)
2817
def actual_seed(self):
2818
if self.random_seed == "now":
2819
# We convert the seed to a long to make it reuseable across
2820
# invocations (because the user can reenter it).
2821
self.random_seed = long(time.time())
2823
# Convert the seed to a long if we can
2825
self.random_seed = long(self.random_seed)
2828
return self.random_seed
2831
class TestFirstDecorator(TestDecorator):
2832
"""A decorator which moves named tests to the front."""
2834
def __init__(self, suite, pattern):
2835
TestDecorator.__init__(self, suite)
2836
self.pattern = pattern
2837
self.filtered = False
2841
return iter(self._tests)
2842
self.filtered = True
2843
suites = split_suite_by_re(self, self.pattern)
2845
self.addTests(suites)
2846
return iter(self._tests)
2849
def partition_tests(suite, count):
2850
"""Partition suite into count lists of tests."""
2852
tests = list(iter_suite_tests(suite))
2853
tests_per_process = int(math.ceil(float(len(tests)) / count))
2854
for block in range(count):
2855
low_test = block * tests_per_process
2856
high_test = low_test + tests_per_process
2857
process_tests = tests[low_test:high_test]
2858
result.append(process_tests)
2862
def fork_for_tests(suite):
2863
"""Take suite and start up one runner per CPU by forking()
2865
:return: An iterable of TestCase-like objects which can each have
2866
run(result) called on them to feed tests to result.
2868
concurrency = local_concurrency()
2870
from subunit import TestProtocolClient, ProtocolTestCase
2871
class TestInOtherProcess(ProtocolTestCase):
2872
# Should be in subunit, I think. RBC.
2873
def __init__(self, stream, pid):
2874
ProtocolTestCase.__init__(self, stream)
2877
def run(self, result):
2879
ProtocolTestCase.run(self, result)
2881
os.waitpid(self.pid, os.WNOHANG)
2883
test_blocks = partition_tests(suite, concurrency)
2884
for process_tests in test_blocks:
2885
process_suite = TestSuite()
2886
process_suite.addTests(process_tests)
2887
c2pread, c2pwrite = os.pipe()
2892
# Leave stderr and stdout open so we can see test noise
2893
# Close stdin so that the child goes away if it decides to
2894
# read from stdin (otherwise its a roulette to see what
2895
# child actually gets keystrokes for pdb etc).
2898
stream = os.fdopen(c2pwrite, 'wb', 1)
2899
subunit_result = TestProtocolClient(stream)
2900
process_suite.run(subunit_result)
2905
stream = os.fdopen(c2pread, 'rb', 1)
2906
test = TestInOtherProcess(stream, pid)
2911
def reinvoke_for_tests(suite):
2912
"""Take suite and start up one runner per CPU using subprocess().
2914
:return: An iterable of TestCase-like objects which can each have
2915
run(result) called on them to feed tests to result.
2917
concurrency = local_concurrency()
2919
from subunit import TestProtocolClient, ProtocolTestCase
2920
class TestInSubprocess(ProtocolTestCase):
2921
def __init__(self, process, name):
2922
ProtocolTestCase.__init__(self, process.stdout)
2923
self.process = process
2924
self.process.stdin.close()
2927
def run(self, result):
2929
ProtocolTestCase.run(self, result)
2932
os.unlink(self.name)
2933
# print "pid %d finished" % finished_process
2934
test_blocks = partition_tests(suite, concurrency)
2935
for process_tests in test_blocks:
2936
# ugly; currently reimplement rather than reuses TestCase methods.
2937
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
2938
if not os.path.isfile(bzr_path):
2939
# We are probably installed. Assume sys.argv is the right file
2940
bzr_path = sys.argv[0]
2941
fd, test_list_file_name = tempfile.mkstemp()
2942
test_list_file = os.fdopen(fd, 'wb', 1)
2943
for test in process_tests:
2944
test_list_file.write(test.id() + '\n')
2945
test_list_file.close()
2947
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
2949
if '--no-plugins' in sys.argv:
2950
argv.append('--no-plugins')
2951
# stderr=STDOUT would be ideal, but until we prevent noise on
2952
# stderr it can interrupt the subunit protocol.
2953
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
2955
test = TestInSubprocess(process, test_list_file_name)
2958
os.unlink(test_list_file_name)
2963
def cpucount(content):
2964
lines = content.splitlines()
2965
prefix = 'processor'
2967
if line.startswith(prefix):
2968
concurrency = int(line[line.find(':')+1:]) + 1
2972
def local_concurrency():
2974
content = file('/proc/cpuinfo', 'rb').read()
2975
concurrency = cpucount(content)
2976
except Exception, e:
2981
class BZRTransformingResult(unittest.TestResult):
2983
def __init__(self, target):
2984
unittest.TestResult.__init__(self)
2985
self.result = target
2987
def startTest(self, test):
2988
self.result.startTest(test)
2990
def stopTest(self, test):
2991
self.result.stopTest(test)
2993
def addError(self, test, err):
2994
feature = self._error_looks_like('UnavailableFeature: ', err)
2995
if feature is not None:
2996
self.result.addNotSupported(test, feature)
2998
self.result.addError(test, err)
3000
def addFailure(self, test, err):
3001
known = self._error_looks_like('KnownFailure: ', err)
3002
if known is not None:
3003
self.result._addKnownFailure(test, [KnownFailure,
3004
KnownFailure(known), None])
3006
self.result.addFailure(test, err)
3008
def addSkip(self, test, reason):
3009
self.result.addSkip(test, reason)
3011
def addSuccess(self, test):
3012
self.result.addSuccess(test)
3014
def _error_looks_like(self, prefix, err):
3015
"""Deserialize exception and returns the stringify value."""
3019
if isinstance(exc, subunit.RemoteException):
3020
# stringify the exception gives access to the remote traceback
3021
# We search the last line for 'prefix'
3022
lines = str(exc).split('\n')
3024
last = lines[-2] # -1 is empty, final \n
3027
if last.startswith(prefix):
3028
value = last[len(prefix):]
2658
return result.wasSuccessful()
3032
2661
# Controlled by "bzr selftest -E=..." option
3559
3221
for right_name, right_dict in scenarios_right]
3562
def multiply_tests(tests, scenarios, result):
3563
"""Multiply tests_list by scenarios into result.
3565
This is the core workhorse for test parameterisation.
3567
Typically the load_tests() method for a per-implementation test suite will
3568
call multiply_tests and return the result.
3570
:param tests: The tests to parameterise.
3571
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3572
scenario_param_dict).
3573
:param result: A TestSuite to add created tests to.
3575
This returns the passed in result TestSuite with the cross product of all
3576
the tests repeated once for each scenario. Each test is adapted by adding
3577
the scenario name at the end of its id(), and updating the test object's
3578
__dict__ with the scenario_param_dict.
3580
>>> import bzrlib.tests.test_sampler
3581
>>> r = multiply_tests(
3582
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3583
... [('one', dict(param=1)),
3584
... ('two', dict(param=2))],
3586
>>> tests = list(iter_suite_tests(r))
3590
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3596
for test in iter_suite_tests(tests):
3597
apply_scenarios(test, scenarios, result)
3601
def apply_scenarios(test, scenarios, result):
3602
"""Apply the scenarios in scenarios to test and add to result.
3604
:param test: The test to apply scenarios to.
3605
:param scenarios: An iterable of scenarios to apply to test.
3607
:seealso: apply_scenario
3609
for scenario in scenarios:
3610
result.addTest(apply_scenario(test, scenario))
3614
def apply_scenario(test, scenario):
3615
"""Copy test and apply scenario to it.
3617
:param test: A test to adapt.
3618
:param scenario: A tuple describing the scenarion.
3619
The first element of the tuple is the new test id.
3620
The second element is a dict containing attributes to set on the
3622
:return: The adapted test.
3624
new_id = "%s(%s)" % (test.id(), scenario[0])
3625
new_test = clone_test(test, new_id)
3626
for name, value in scenario[1].items():
3627
setattr(new_test, name, value)
3631
def clone_test(test, new_id):
3632
"""Clone a test giving it a new id.
3634
:param test: The test to clone.
3635
:param new_id: The id to assign to it.
3636
:return: The new test.
3638
from copy import deepcopy
3639
new_test = deepcopy(test)
3640
new_test.id = lambda: new_id
3225
def adapt_modules(mods_list, adapter, loader, suite):
3226
"""Adapt the modules in mods_list using adapter and add to suite."""
3227
tests = loader.loadTestsFromModuleNames(mods_list)
3228
adapt_tests(tests, adapter, suite)
3231
def adapt_tests(tests_list, adapter, suite):
3232
"""Adapt the tests in tests_list using adapter and add to suite."""
3233
for test in iter_suite_tests(tests_list):
3234
suite.addTests(adapter.adapt(test))
3644
3237
def _rmtree_temp_dir(dirname):