835
856
ui.ui_factory = ui.SilentUIFactory()
836
857
self.addCleanup(_restore)
859
def _check_locks(self):
860
"""Check that all lock take/release actions have been paired."""
861
# once we have fixed all the current lock problems, we can change the
862
# following code to always check for mismatched locks, but only do
863
# traceback showing with -Dlock (self._lock_check_thorough is True).
864
# For now, because the test suite will fail, we only assert that lock
865
# matching has occured with -Dlock.
867
acquired_locks = [lock for action, lock in self._lock_actions
868
if action == 'acquired']
869
released_locks = [lock for action, lock in self._lock_actions
870
if action == 'released']
871
broken_locks = [lock for action, lock in self._lock_actions
872
if action == 'broken']
873
# trivially, given the tests for lock acquistion and release, if we
874
# have as many in each list, it should be ok. Some lock tests also
875
# break some locks on purpose and should be taken into account by
876
# considering that breaking a lock is just a dirty way of releasing it.
877
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
878
message = ('Different number of acquired and '
879
'released or broken locks. (%s, %s + %s)' %
880
(acquired_locks, released_locks, broken_locks))
881
if not self._lock_check_thorough:
882
# Rather than fail, just warn
883
print "Broken test %s: %s" % (self, message)
887
def _track_locks(self):
888
"""Track lock activity during tests."""
889
self._lock_actions = []
890
self._lock_check_thorough = 'lock' not in debug.debug_flags
891
self.addCleanup(self._check_locks)
892
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
893
self._lock_acquired, None)
894
_mod_lock.Lock.hooks.install_named_hook('lock_released',
895
self._lock_released, None)
896
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
897
self._lock_broken, None)
899
def _lock_acquired(self, result):
900
self._lock_actions.append(('acquired', result))
902
def _lock_released(self, result):
903
self._lock_actions.append(('released', result))
905
def _lock_broken(self, result):
906
self._lock_actions.append(('broken', result))
838
908
def _ndiff_strings(self, a, b):
839
909
"""Return ndiff between two strings containing lines.
2614
2732
if runner_class is None:
2615
2733
runner_class = TextTestRunner
2616
runner = runner_class(stream=sys.stdout,
2736
runner = runner_class(stream=stream,
2617
2737
descriptions=0,
2618
2738
verbosity=verbosity,
2619
2739
bench_history=bench_history,
2620
2740
list_only=list_only,
2622
2743
runner.stop_on_failure=stop_on_failure
2623
# Initialise the random number generator and display the seed used.
2624
# We convert the seed to a long to make it reuseable across invocations.
2625
random_order = False
2626
if random_seed is not None:
2628
if random_seed == "now":
2629
random_seed = long(time.time())
2744
# built in decorator factories:
2746
random_order(random_seed, runner),
2747
exclude_tests(exclude_pattern),
2749
if matching_tests_first:
2750
decorators.append(tests_first(pattern))
2752
decorators.append(filter_tests(pattern))
2753
if suite_decorators:
2754
decorators.extend(suite_decorators)
2755
for decorator in decorators:
2756
suite = decorator(suite)
2757
result = runner.run(suite)
2762
return result.wasStrictlySuccessful()
2764
return result.wasSuccessful()
2767
# A registry where get() returns a suite decorator.
2768
parallel_registry = registry.Registry()
2771
def fork_decorator(suite):
2772
concurrency = osutils.local_concurrency()
2773
if concurrency == 1:
2775
from testtools import ConcurrentTestSuite
2776
return ConcurrentTestSuite(suite, fork_for_tests)
2777
parallel_registry.register('fork', fork_decorator)
2780
def subprocess_decorator(suite):
2781
concurrency = osutils.local_concurrency()
2782
if concurrency == 1:
2784
from testtools import ConcurrentTestSuite
2785
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2786
parallel_registry.register('subprocess', subprocess_decorator)
2789
def exclude_tests(exclude_pattern):
2790
"""Return a test suite decorator that excludes tests."""
2791
if exclude_pattern is None:
2792
return identity_decorator
2793
def decorator(suite):
2794
return ExcludeDecorator(suite, exclude_pattern)
2798
def filter_tests(pattern):
2800
return identity_decorator
2801
def decorator(suite):
2802
return FilterTestsDecorator(suite, pattern)
2806
def random_order(random_seed, runner):
2807
"""Return a test suite decorator factory for randomising tests order.
2809
:param random_seed: now, a string which casts to a long, or a long.
2810
:param runner: A test runner with a stream attribute to report on.
2812
if random_seed is None:
2813
return identity_decorator
2814
def decorator(suite):
2815
return RandomDecorator(suite, random_seed, runner.stream)
2819
def tests_first(pattern):
2821
return identity_decorator
2822
def decorator(suite):
2823
return TestFirstDecorator(suite, pattern)
2827
def identity_decorator(suite):
2832
class TestDecorator(TestSuite):
2833
"""A decorator for TestCase/TestSuite objects.
2835
Usually, subclasses should override __iter__(used when flattening test
2836
suites), which we do to filter, reorder, parallelise and so on, run() and
2840
def __init__(self, suite):
2841
TestSuite.__init__(self)
2844
def countTestCases(self):
2847
cases += test.countTestCases()
2854
def run(self, result):
2855
# Use iteration on self, not self._tests, to allow subclasses to hook
2858
if result.shouldStop:
2864
class ExcludeDecorator(TestDecorator):
2865
"""A decorator which excludes test matching an exclude pattern."""
2867
def __init__(self, suite, exclude_pattern):
2868
TestDecorator.__init__(self, suite)
2869
self.exclude_pattern = exclude_pattern
2870
self.excluded = False
2874
return iter(self._tests)
2875
self.excluded = True
2876
suite = exclude_tests_by_re(self, self.exclude_pattern)
2878
self.addTests(suite)
2879
return iter(self._tests)
2882
class FilterTestsDecorator(TestDecorator):
2883
"""A decorator which filters tests to those matching a pattern."""
2885
def __init__(self, suite, pattern):
2886
TestDecorator.__init__(self, suite)
2887
self.pattern = pattern
2888
self.filtered = False
2892
return iter(self._tests)
2893
self.filtered = True
2894
suite = filter_suite_by_re(self, self.pattern)
2896
self.addTests(suite)
2897
return iter(self._tests)
2900
class RandomDecorator(TestDecorator):
2901
"""A decorator which randomises the order of its tests."""
2903
def __init__(self, suite, random_seed, stream):
2904
TestDecorator.__init__(self, suite)
2905
self.random_seed = random_seed
2906
self.randomised = False
2907
self.stream = stream
2911
return iter(self._tests)
2912
self.randomised = True
2913
self.stream.writeln("Randomizing test order using seed %s\n" %
2914
(self.actual_seed()))
2915
# Initialise the random number generator.
2916
random.seed(self.actual_seed())
2917
suite = randomize_suite(self)
2919
self.addTests(suite)
2920
return iter(self._tests)
2922
def actual_seed(self):
2923
if self.random_seed == "now":
2924
# We convert the seed to a long to make it reuseable across
2925
# invocations (because the user can reenter it).
2926
self.random_seed = long(time.time())
2631
2928
# Convert the seed to a long if we can
2633
random_seed = long(random_seed)
2930
self.random_seed = long(self.random_seed)
2636
runner.stream.writeln("Randomizing test order using seed %s\n" %
2638
random.seed(random_seed)
2639
# Customise the list of tests if requested
2640
if exclude_pattern is not None:
2641
suite = exclude_tests_by_re(suite, exclude_pattern)
2643
order_changer = randomize_suite
2645
order_changer = preserve_input
2646
if pattern != '.*' or random_order:
2647
if matching_tests_first:
2648
suites = map(order_changer, split_suite_by_re(suite, pattern))
2649
suite = TestUtil.TestSuite(suites)
2651
suite = order_changer(filter_suite_by_re(suite, pattern))
2653
result = runner.run(suite)
2656
return result.wasStrictlySuccessful()
2658
return result.wasSuccessful()
2933
return self.random_seed
2936
class TestFirstDecorator(TestDecorator):
2937
"""A decorator which moves named tests to the front."""
2939
def __init__(self, suite, pattern):
2940
TestDecorator.__init__(self, suite)
2941
self.pattern = pattern
2942
self.filtered = False
2946
return iter(self._tests)
2947
self.filtered = True
2948
suites = split_suite_by_re(self, self.pattern)
2950
self.addTests(suites)
2951
return iter(self._tests)
2954
def partition_tests(suite, count):
2955
"""Partition suite into count lists of tests."""
2957
tests = list(iter_suite_tests(suite))
2958
tests_per_process = int(math.ceil(float(len(tests)) / count))
2959
for block in range(count):
2960
low_test = block * tests_per_process
2961
high_test = low_test + tests_per_process
2962
process_tests = tests[low_test:high_test]
2963
result.append(process_tests)
2967
def fork_for_tests(suite):
2968
"""Take suite and start up one runner per CPU by forking()
2970
:return: An iterable of TestCase-like objects which can each have
2971
run(result) called on them to feed tests to result.
2973
concurrency = osutils.local_concurrency()
2975
from subunit import TestProtocolClient, ProtocolTestCase
2977
from subunit.test_results import AutoTimingTestResultDecorator
2979
AutoTimingTestResultDecorator = lambda x:x
2980
class TestInOtherProcess(ProtocolTestCase):
2981
# Should be in subunit, I think. RBC.
2982
def __init__(self, stream, pid):
2983
ProtocolTestCase.__init__(self, stream)
2986
def run(self, result):
2988
ProtocolTestCase.run(self, result)
2990
os.waitpid(self.pid, os.WNOHANG)
2992
test_blocks = partition_tests(suite, concurrency)
2993
for process_tests in test_blocks:
2994
process_suite = TestSuite()
2995
process_suite.addTests(process_tests)
2996
c2pread, c2pwrite = os.pipe()
3001
# Leave stderr and stdout open so we can see test noise
3002
# Close stdin so that the child goes away if it decides to
3003
# read from stdin (otherwise its a roulette to see what
3004
# child actually gets keystrokes for pdb etc).
3007
stream = os.fdopen(c2pwrite, 'wb', 1)
3008
subunit_result = AutoTimingTestResultDecorator(
3009
TestProtocolClient(stream))
3010
process_suite.run(subunit_result)
3015
stream = os.fdopen(c2pread, 'rb', 1)
3016
test = TestInOtherProcess(stream, pid)
3021
def reinvoke_for_tests(suite):
3022
"""Take suite and start up one runner per CPU using subprocess().
3024
:return: An iterable of TestCase-like objects which can each have
3025
run(result) called on them to feed tests to result.
3027
concurrency = osutils.local_concurrency()
3029
from subunit import ProtocolTestCase
3030
class TestInSubprocess(ProtocolTestCase):
3031
def __init__(self, process, name):
3032
ProtocolTestCase.__init__(self, process.stdout)
3033
self.process = process
3034
self.process.stdin.close()
3037
def run(self, result):
3039
ProtocolTestCase.run(self, result)
3042
os.unlink(self.name)
3043
# print "pid %d finished" % finished_process
3044
test_blocks = partition_tests(suite, concurrency)
3045
for process_tests in test_blocks:
3046
# ugly; currently reimplement rather than reuses TestCase methods.
3047
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3048
if not os.path.isfile(bzr_path):
3049
# We are probably installed. Assume sys.argv is the right file
3050
bzr_path = sys.argv[0]
3051
fd, test_list_file_name = tempfile.mkstemp()
3052
test_list_file = os.fdopen(fd, 'wb', 1)
3053
for test in process_tests:
3054
test_list_file.write(test.id() + '\n')
3055
test_list_file.close()
3057
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3059
if '--no-plugins' in sys.argv:
3060
argv.append('--no-plugins')
3061
# stderr=STDOUT would be ideal, but until we prevent noise on
3062
# stderr it can interrupt the subunit protocol.
3063
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3065
test = TestInSubprocess(process, test_list_file_name)
3068
os.unlink(test_list_file_name)
3073
class BZRTransformingResult(unittest.TestResult):
3075
def __init__(self, target):
3076
unittest.TestResult.__init__(self)
3077
self.result = target
3079
def startTest(self, test):
3080
self.result.startTest(test)
3082
def stopTest(self, test):
3083
self.result.stopTest(test)
3085
def addError(self, test, err):
3086
feature = self._error_looks_like('UnavailableFeature: ', err)
3087
if feature is not None:
3088
self.result.addNotSupported(test, feature)
3090
self.result.addError(test, err)
3092
def addFailure(self, test, err):
3093
known = self._error_looks_like('KnownFailure: ', err)
3094
if known is not None:
3095
self.result._addKnownFailure(test, [KnownFailure,
3096
KnownFailure(known), None])
3098
self.result.addFailure(test, err)
3100
def addSkip(self, test, reason):
3101
self.result.addSkip(test, reason)
3103
def addSuccess(self, test):
3104
self.result.addSuccess(test)
3106
def _error_looks_like(self, prefix, err):
3107
"""Deserialize exception and returns the stringify value."""
3111
if isinstance(exc, subunit.RemoteException):
3112
# stringify the exception gives access to the remote traceback
3113
# We search the last line for 'prefix'
3114
lines = str(exc).split('\n')
3115
while lines and not lines[-1]:
3118
if lines[-1].startswith(prefix):
3119
value = lines[-1][len(prefix):]
2661
3123
# Controlled by "bzr selftest -E=..." option
3220
3652
for right_name, right_dict in scenarios_right]
3224
def adapt_modules(mods_list, adapter, loader, suite):
3225
"""Adapt the modules in mods_list using adapter and add to suite."""
3226
tests = loader.loadTestsFromModuleNames(mods_list)
3227
adapt_tests(tests, adapter, suite)
3230
def adapt_tests(tests_list, adapter, suite):
3231
"""Adapt the tests in tests_list using adapter and add to suite."""
3232
for test in iter_suite_tests(tests_list):
3233
suite.addTests(adapter.adapt(test))
3655
def multiply_tests(tests, scenarios, result):
3656
"""Multiply tests_list by scenarios into result.
3658
This is the core workhorse for test parameterisation.
3660
Typically the load_tests() method for a per-implementation test suite will
3661
call multiply_tests and return the result.
3663
:param tests: The tests to parameterise.
3664
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3665
scenario_param_dict).
3666
:param result: A TestSuite to add created tests to.
3668
This returns the passed in result TestSuite with the cross product of all
3669
the tests repeated once for each scenario. Each test is adapted by adding
3670
the scenario name at the end of its id(), and updating the test object's
3671
__dict__ with the scenario_param_dict.
3673
>>> import bzrlib.tests.test_sampler
3674
>>> r = multiply_tests(
3675
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3676
... [('one', dict(param=1)),
3677
... ('two', dict(param=2))],
3679
>>> tests = list(iter_suite_tests(r))
3683
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3689
for test in iter_suite_tests(tests):
3690
apply_scenarios(test, scenarios, result)
3694
def apply_scenarios(test, scenarios, result):
3695
"""Apply the scenarios in scenarios to test and add to result.
3697
:param test: The test to apply scenarios to.
3698
:param scenarios: An iterable of scenarios to apply to test.
3700
:seealso: apply_scenario
3702
for scenario in scenarios:
3703
result.addTest(apply_scenario(test, scenario))
3707
def apply_scenario(test, scenario):
3708
"""Copy test and apply scenario to it.
3710
:param test: A test to adapt.
3711
:param scenario: A tuple describing the scenarion.
3712
The first element of the tuple is the new test id.
3713
The second element is a dict containing attributes to set on the
3715
:return: The adapted test.
3717
new_id = "%s(%s)" % (test.id(), scenario[0])
3718
new_test = clone_test(test, new_id)
3719
for name, value in scenario[1].items():
3720
setattr(new_test, name, value)
3724
def clone_test(test, new_id):
3725
"""Clone a test giving it a new id.
3727
:param test: The test to clone.
3728
:param new_id: The id to assign to it.
3729
:return: The new test.
3731
from copy import deepcopy
3732
new_test = deepcopy(test)
3733
new_test.id = lambda: new_id
3236
3737
def _rmtree_temp_dir(dirname):