384
403
def __init__(self, stream, descriptions, verbosity,
385
404
bench_history=None,
389
408
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
390
bench_history, num_tests)
392
self.pb = self.ui.nested_progress_bar()
393
self._supplied_pb = False
396
self._supplied_pb = True
409
bench_history, strict)
410
# We no longer pass them around, but just rely on the UIFactory stack
413
warnings.warn("Passing pb to TextTestResult is deprecated")
414
self.pb = self.ui.nested_progress_bar()
397
415
self.pb.show_pct = False
398
416
self.pb.show_spinner = False
399
417
self.pb.show_eta = False,
400
418
self.pb.show_count = False
401
419
self.pb.show_bar = False
420
self.pb.update_latency = 0
421
self.pb.show_transport_activity = False
424
# called when the tests that are going to run have run
426
super(TextTestResult, self).done()
403
431
def report_starting(self):
404
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
432
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
434
def printErrors(self):
435
# clear the pb to make room for the error listing
437
super(TextTestResult, self).printErrors()
406
439
def _progress_prefix_text(self):
407
a = '[%d' % self.count
408
if self.num_tests is not None:
440
# the longer this text, the less space we have to show the test
442
a = '[%d' % self.count # total that have been run
443
# tests skipped as known not to be relevant are not important enough
445
## if self.skip_count:
446
## a += ', %d skip' % self.skip_count
447
## if self.known_failure_count:
448
## a += '+%dX' % self.known_failure_count
409
450
a +='/%d' % self.num_tests
410
a += ' in %ds' % (time.time() - self._overall_start_time)
452
runtime = time.time() - self._overall_start_time
454
a += '%dm%ds' % (runtime / 60, runtime % 60)
411
457
if self.error_count:
412
a += ', %d errors' % self.error_count
458
a += ', %d err' % self.error_count
413
459
if self.failure_count:
414
a += ', %d failed' % self.failure_count
415
if self.known_failure_count:
416
a += ', %d known failures' % self.known_failure_count
418
a += ', %d skipped' % self.skip_count
460
a += ', %d fail' % self.failure_count
419
461
if self.unsupported:
420
a += ', %d missing features' % len(self.unsupported)
462
a += ', %d missing' % len(self.unsupported)
762
785
retrieved by _get_log(). We use a real OS file, not an in-memory object,
763
786
so that it can also capture file IO. When the test completes this file
764
787
is read into memory and removed from disk.
766
789
There are also convenience functions to invoke bzr's command-line
767
790
routine, and to build and check bzr trees.
769
792
In addition to the usual method of overriding tearDown(), this class also
770
793
allows subclasses to register functions into the _cleanups list, which is
771
794
run in order as the object is torn down. It's less likely this will be
772
795
accidentally overlooked.
798
_active_threads = None
799
_leaking_threads_tests = 0
800
_first_thread_leaker_id = None
775
801
_log_file_name = None
776
802
_log_contents = ''
777
803
_keep_log_file = False
778
804
# record lsprof data when performing benchmark calls.
779
805
_gather_lsprof_in_benchmarks = False
806
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
807
'_log_contents', '_log_file_name', '_benchtime',
808
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
781
810
def __init__(self, methodName='testMethod'):
782
811
super(TestCase, self).__init__(methodName)
783
812
self._cleanups = []
813
self._bzr_test_setUp_run = False
814
self._bzr_test_tearDown_run = False
786
817
unittest.TestCase.setUp(self)
818
self._bzr_test_setUp_run = True
787
819
self._cleanEnvironment()
788
bzrlib.trace.disable_default_logging()
789
820
self._silenceUI()
790
821
self._startLogFile()
791
822
self._benchcalls = []
792
823
self._benchtime = None
793
824
self._clear_hooks()
794
826
self._clear_debug_flags()
827
TestCase._active_threads = threading.activeCount()
828
self.addCleanup(self._check_leaked_threads)
833
pdb.Pdb().set_trace(sys._getframe().f_back)
835
def _check_leaked_threads(self):
836
active = threading.activeCount()
837
leaked_threads = active - TestCase._active_threads
838
TestCase._active_threads = active
840
TestCase._leaking_threads_tests += 1
841
if TestCase._first_thread_leaker_id is None:
842
TestCase._first_thread_leaker_id = self.id()
796
844
def _clear_debug_flags(self):
797
845
"""Prevent externally set debug flags affecting tests.
799
847
Tests that want to use debug flags can just set them in the
800
848
debug_flags set during setup/teardown.
802
850
self._preserved_debug_flags = set(debug.debug_flags)
803
debug.debug_flags.clear()
851
if 'allow_debug' not in selftest_debug_flags:
852
debug.debug_flags.clear()
853
if 'disable_lock_checks' not in selftest_debug_flags:
854
debug.debug_flags.add('strict_locks')
804
855
self.addCleanup(self._restore_debug_flags)
806
857
def _clear_hooks(self):
807
858
# prevent hooks affecting tests
809
import bzrlib.smart.server
810
self._preserved_hooks = {
811
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
812
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
859
self._preserved_hooks = {}
860
for key, factory in hooks.known_hooks.items():
861
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
862
current_hooks = hooks.known_hooks_key_to_object(key)
863
self._preserved_hooks[parent] = (name, current_hooks)
814
864
self.addCleanup(self._restoreHooks)
815
# reset all hooks to an empty instance of the appropriate type
816
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
817
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
865
for key, factory in hooks.known_hooks.items():
866
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
867
setattr(parent, name, factory())
868
# this hook should always be installed
869
request._install_hook()
819
871
def _silenceUI(self):
820
872
"""Turn off UI for duration of test"""
825
877
ui.ui_factory = ui.SilentUIFactory()
826
878
self.addCleanup(_restore)
880
def _check_locks(self):
881
"""Check that all lock take/release actions have been paired."""
882
# We always check for mismatched locks. If a mismatch is found, we
883
# fail unless -Edisable_lock_checks is supplied to selftest, in which
884
# case we just print a warning.
886
acquired_locks = [lock for action, lock in self._lock_actions
887
if action == 'acquired']
888
released_locks = [lock for action, lock in self._lock_actions
889
if action == 'released']
890
broken_locks = [lock for action, lock in self._lock_actions
891
if action == 'broken']
892
# trivially, given the tests for lock acquistion and release, if we
893
# have as many in each list, it should be ok. Some lock tests also
894
# break some locks on purpose and should be taken into account by
895
# considering that breaking a lock is just a dirty way of releasing it.
896
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
897
message = ('Different number of acquired and '
898
'released or broken locks. (%s, %s + %s)' %
899
(acquired_locks, released_locks, broken_locks))
900
if not self._lock_check_thorough:
901
# Rather than fail, just warn
902
print "Broken test %s: %s" % (self, message)
906
def _track_locks(self):
907
"""Track lock activity during tests."""
908
self._lock_actions = []
909
if 'disable_lock_checks' in selftest_debug_flags:
910
self._lock_check_thorough = False
912
self._lock_check_thorough = True
914
self.addCleanup(self._check_locks)
915
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
916
self._lock_acquired, None)
917
_mod_lock.Lock.hooks.install_named_hook('lock_released',
918
self._lock_released, None)
919
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
920
self._lock_broken, None)
922
def _lock_acquired(self, result):
923
self._lock_actions.append(('acquired', result))
925
def _lock_released(self, result):
926
self._lock_actions.append(('released', result))
928
def _lock_broken(self, result):
929
self._lock_actions.append(('broken', result))
828
931
def _ndiff_strings(self, a, b):
829
932
"""Return ndiff between two strings containing lines.
831
934
A trailing newline is added if missing to make the strings
832
935
print properly."""
833
936
if b and b[-1] != '\n':
1081
1250
self.assertEqual(expected_first_warning, call_warnings[0])
1253
def callCatchWarnings(self, fn, *args, **kw):
1254
"""Call a callable that raises python warnings.
1256
The caller's responsible for examining the returned warnings.
1258
If the callable raises an exception, the exception is not
1259
caught and propagates up to the caller. In that case, the list
1260
of warnings is not available.
1262
:returns: ([warning_object, ...], fn_result)
1264
# XXX: This is not perfect, because it completely overrides the
1265
# warnings filters, and some code may depend on suppressing particular
1266
# warnings. It's the easiest way to insulate ourselves from -Werror,
1267
# though. -- Andrew, 20071062
1269
def _catcher(message, category, filename, lineno, file=None, line=None):
1270
# despite the name, 'message' is normally(?) a Warning subclass
1272
wlist.append(message)
1273
saved_showwarning = warnings.showwarning
1274
saved_filters = warnings.filters
1276
warnings.showwarning = _catcher
1277
warnings.filters = []
1278
result = fn(*args, **kw)
1280
warnings.showwarning = saved_showwarning
1281
warnings.filters = saved_filters
1282
return wlist, result
1084
1284
def callDeprecated(self, expected, callable, *args, **kwargs):
1085
1285
"""Assert that a callable is deprecated in a particular way.
1087
This is a very precise test for unusual requirements. The
1287
This is a very precise test for unusual requirements. The
1088
1288
applyDeprecated helper function is probably more suited for most tests
1089
1289
as it allows you to simply specify the deprecation format being used
1090
1290
and will ensure that that is issued for the function being called.
1092
1292
Note that this only captures warnings raised by symbol_versioning.warn,
1093
not other callers that go direct to the warning module.
1293
not other callers that go direct to the warning module. To catch
1294
general warnings, use callCatchWarnings.
1095
1296
:param expected: a list of the deprecation warnings expected, in order
1096
1297
:param callable: The callable to call
1097
1298
:param args: The positional arguments for the callable
1098
1299
:param kwargs: The keyword arguments for the callable
1100
call_warnings, result = self._capture_warnings(callable,
1301
call_warnings, result = self._capture_deprecation_warnings(callable,
1101
1302
*args, **kwargs)
1102
1303
self.assertEqual(expected, call_warnings)
1874
2176
base = self.get_vfs_only_server().get_url()
1875
2177
return self._adjust_url(base, relpath)
2179
def _create_safety_net(self):
2180
"""Make a fake bzr directory.
2182
This prevents any tests propagating up onto the TEST_ROOT directory's
2185
root = TestCaseWithMemoryTransport.TEST_ROOT
2186
bzrdir.BzrDir.create_standalone_workingtree(root)
2188
def _check_safety_net(self):
2189
"""Check that the safety .bzr directory have not been touched.
2191
_make_test_root have created a .bzr directory to prevent tests from
2192
propagating. This method ensures than a test did not leaked.
2194
root = TestCaseWithMemoryTransport.TEST_ROOT
2195
wt = workingtree.WorkingTree.open(root)
2196
last_rev = wt.last_revision()
2197
if last_rev != 'null:':
2198
# The current test have modified the /bzr directory, we need to
2199
# recreate a new one or all the followng tests will fail.
2200
# If you need to inspect its content uncomment the following line
2201
# import pdb; pdb.set_trace()
2202
_rmtree_temp_dir(root + '/.bzr')
2203
self._create_safety_net()
2204
raise AssertionError('%s/.bzr should not be modified' % root)
1877
2206
def _make_test_root(self):
1878
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1880
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1881
TestCaseWithMemoryTransport.TEST_ROOT = root
1883
# make a fake bzr directory there to prevent any tests propagating
1884
# up onto the source directory's real branch
1885
bzrdir.BzrDir.create_standalone_workingtree(root)
1887
# The same directory is used by all tests, and we're not specifically
1888
# told when all tests are finished. This will do.
1889
atexit.register(_rmtree_temp_dir, root)
2207
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2208
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
2209
TestCaseWithMemoryTransport.TEST_ROOT = root
2211
self._create_safety_net()
2213
# The same directory is used by all tests, and we're not
2214
# specifically told when all tests are finished. This will do.
2215
atexit.register(_rmtree_temp_dir, root)
2217
self.addCleanup(self._check_safety_net)
1891
2219
def makeAndChdirToTestDir(self):
1892
2220
"""Create a temporary directories for this one test.
1894
2222
This must set self.test_home_dir and self.test_dir and chdir to
1897
2225
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1899
2227
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1900
2228
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1901
2229
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1903
2231
def make_branch(self, relpath, format=None):
1904
2232
"""Create a branch on the transport at relpath."""
1905
2233
repo = self.make_repository(relpath, format=format)
2204
2562
self.transport_readonly_server = HttpServer
2207
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2208
random_order=False):
2209
"""Create a test suite by filtering another one.
2565
def condition_id_re(pattern):
2566
"""Create a condition filter which performs a re check on a test's id.
2568
:param pattern: A regular expression string.
2569
:return: A callable that returns True if the re matches.
2571
filter_re = osutils.re_compile_checked(pattern, 0,
2573
def condition(test):
2575
return filter_re.search(test_id)
2579
def condition_isinstance(klass_or_klass_list):
2580
"""Create a condition filter which returns isinstance(param, klass).
2582
:return: A callable which when called with one parameter obj return the
2583
result of isinstance(obj, klass_or_klass_list).
2586
return isinstance(obj, klass_or_klass_list)
2590
def condition_id_in_list(id_list):
2591
"""Create a condition filter which verify that test's id in a list.
2593
:param id_list: A TestIdList object.
2594
:return: A callable that returns True if the test's id appears in the list.
2596
def condition(test):
2597
return id_list.includes(test.id())
2601
def condition_id_startswith(starts):
2602
"""Create a condition filter verifying that test's id starts with a string.
2604
:param starts: A list of string.
2605
:return: A callable that returns True if the test's id starts with one of
2608
def condition(test):
2609
for start in starts:
2610
if test.id().startswith(start):
2616
def exclude_tests_by_condition(suite, condition):
2617
"""Create a test suite which excludes some tests from suite.
2619
:param suite: The suite to get tests from.
2620
:param condition: A callable whose result evaluates True when called with a
2621
test case which should be excluded from the result.
2622
:return: A suite which contains the tests found in suite that fail
2626
for test in iter_suite_tests(suite):
2627
if not condition(test):
2629
return TestUtil.TestSuite(result)
2632
def filter_suite_by_condition(suite, condition):
2633
"""Create a test suite by filtering another one.
2635
:param suite: The source suite.
2636
:param condition: A callable whose result evaluates True when called with a
2637
test case which should be included in the result.
2638
:return: A suite which contains the tests found in suite that pass
2642
for test in iter_suite_tests(suite):
2645
return TestUtil.TestSuite(result)
2648
def filter_suite_by_re(suite, pattern):
2649
"""Create a test suite by filtering another one.
2211
2651
:param suite: the source suite
2212
2652
:param pattern: pattern that names must match
2213
:param exclude_pattern: pattern that names must not match, if any
2214
:param random_order: if True, tests in the new suite will be put in
2216
:returns: the newly created suite
2218
return sort_suite_by_re(suite, pattern, exclude_pattern,
2219
random_order, False)
2222
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2223
random_order=False, append_rest=True):
2224
"""Create a test suite by sorting another one.
2226
:param suite: the source suite
2227
:param pattern: pattern that names must match in order to go
2228
first in the new suite
2229
:param exclude_pattern: pattern that names must not match, if any
2230
:param random_order: if True, tests in the new suite will be put in
2232
:param append_rest: if False, pattern is a strict filter and not
2233
just an ordering directive
2234
:returns: the newly created suite
2238
filter_re = re.compile(pattern)
2239
if exclude_pattern is not None:
2240
exclude_re = re.compile(exclude_pattern)
2653
:returns: the newly created suite
2655
condition = condition_id_re(pattern)
2656
result_suite = filter_suite_by_condition(suite, condition)
2660
def filter_suite_by_id_list(suite, test_id_list):
2661
"""Create a test suite by filtering another one.
2663
:param suite: The source suite.
2664
:param test_id_list: A list of the test ids to keep as strings.
2665
:returns: the newly created suite
2667
condition = condition_id_in_list(test_id_list)
2668
result_suite = filter_suite_by_condition(suite, condition)
2672
def filter_suite_by_id_startswith(suite, start):
2673
"""Create a test suite by filtering another one.
2675
:param suite: The source suite.
2676
:param start: A list of string the test id must start with one of.
2677
:returns: the newly created suite
2679
condition = condition_id_startswith(start)
2680
result_suite = filter_suite_by_condition(suite, condition)
2684
def exclude_tests_by_re(suite, pattern):
2685
"""Create a test suite which excludes some tests from suite.
2687
:param suite: The suite to get tests from.
2688
:param pattern: A regular expression string. Test ids that match this
2689
pattern will be excluded from the result.
2690
:return: A TestSuite that contains all the tests from suite without the
2691
tests that matched pattern. The order of tests is the same as it was in
2694
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2697
def preserve_input(something):
2698
"""A helper for performing test suite transformation chains.
2700
:param something: Anything you want to preserve.
2706
def randomize_suite(suite):
2707
"""Return a new TestSuite with suite's tests in random order.
2709
The tests in the input suite are flattened into a single suite in order to
2710
accomplish this. Any nested TestSuites are removed to provide global
2713
tests = list(iter_suite_tests(suite))
2714
random.shuffle(tests)
2715
return TestUtil.TestSuite(tests)
2718
def split_suite_by_condition(suite, condition):
2719
"""Split a test suite into two by a condition.
2721
:param suite: The suite to split.
2722
:param condition: The condition to match on. Tests that match this
2723
condition are returned in the first test suite, ones that do not match
2724
are in the second suite.
2725
:return: A tuple of two test suites, where the first contains tests from
2726
suite matching the condition, and the second contains the remainder
2727
from suite. The order within each output suite is the same as it was in
2241
2732
for test in iter_suite_tests(suite):
2243
if exclude_pattern is None or not exclude_re.search(test_id):
2244
if filter_re.search(test_id):
2249
random.shuffle(first)
2250
random.shuffle(second)
2251
return TestUtil.TestSuite(first + second)
2734
matched.append(test)
2736
did_not_match.append(test)
2737
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2740
def split_suite_by_re(suite, pattern):
2741
"""Split a test suite into two by a regular expression.
2743
:param suite: The suite to split.
2744
:param pattern: A regular expression string. Test ids that match this
2745
pattern will be in the first test suite returned, and the others in the
2746
second test suite returned.
2747
:return: A tuple of two test suites, where the first contains tests from
2748
suite matching pattern, and the second contains the remainder from
2749
suite. The order within each output suite is the same as it was in
2752
return split_suite_by_condition(suite, condition_id_re(pattern))
2254
2755
def run_suite(suite, name='test', verbose=False, pattern=".*",
2259
2760
random_seed=None,
2260
2761
exclude_pattern=None,
2764
suite_decorators=None,
2766
"""Run a test suite for bzr selftest.
2768
:param runner_class: The class of runner to use. Must support the
2769
constructor arguments passed by run_suite which are more than standard
2771
:return: A boolean indicating success.
2263
2773
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2268
runner = TextTestRunner(stream=sys.stdout,
2778
if runner_class is None:
2779
runner_class = TextTestRunner
2782
runner = runner_class(stream=stream,
2269
2783
descriptions=0,
2270
2784
verbosity=verbosity,
2271
2785
bench_history=bench_history,
2272
2786
list_only=list_only,
2274
2789
runner.stop_on_failure=stop_on_failure
2275
# Initialise the random number generator and display the seed used.
2276
# We convert the seed to a long to make it reuseable across invocations.
2277
random_order = False
2278
if random_seed is not None:
2280
if random_seed == "now":
2281
random_seed = long(time.time())
2790
# built in decorator factories:
2792
random_order(random_seed, runner),
2793
exclude_tests(exclude_pattern),
2795
if matching_tests_first:
2796
decorators.append(tests_first(pattern))
2798
decorators.append(filter_tests(pattern))
2799
if suite_decorators:
2800
decorators.extend(suite_decorators)
2801
# tell the result object how many tests will be running: (except if
2802
# --parallel=fork is being used. Robert said he will provide a better
2803
# progress design later -- vila 20090817)
2804
if fork_decorator not in decorators:
2805
decorators.append(CountingDecorator)
2806
for decorator in decorators:
2807
suite = decorator(suite)
2808
result = runner.run(suite)
2813
return result.wasStrictlySuccessful()
2815
return result.wasSuccessful()
2818
# A registry where get() returns a suite decorator.
2819
parallel_registry = registry.Registry()
2822
def fork_decorator(suite):
2823
concurrency = osutils.local_concurrency()
2824
if concurrency == 1:
2826
from testtools import ConcurrentTestSuite
2827
return ConcurrentTestSuite(suite, fork_for_tests)
2828
parallel_registry.register('fork', fork_decorator)
2831
def subprocess_decorator(suite):
2832
concurrency = osutils.local_concurrency()
2833
if concurrency == 1:
2835
from testtools import ConcurrentTestSuite
2836
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2837
parallel_registry.register('subprocess', subprocess_decorator)
2840
def exclude_tests(exclude_pattern):
2841
"""Return a test suite decorator that excludes tests."""
2842
if exclude_pattern is None:
2843
return identity_decorator
2844
def decorator(suite):
2845
return ExcludeDecorator(suite, exclude_pattern)
2849
def filter_tests(pattern):
2851
return identity_decorator
2852
def decorator(suite):
2853
return FilterTestsDecorator(suite, pattern)
2857
def random_order(random_seed, runner):
2858
"""Return a test suite decorator factory for randomising tests order.
2860
:param random_seed: now, a string which casts to a long, or a long.
2861
:param runner: A test runner with a stream attribute to report on.
2863
if random_seed is None:
2864
return identity_decorator
2865
def decorator(suite):
2866
return RandomDecorator(suite, random_seed, runner.stream)
2870
def tests_first(pattern):
2872
return identity_decorator
2873
def decorator(suite):
2874
return TestFirstDecorator(suite, pattern)
2878
def identity_decorator(suite):
2883
class TestDecorator(TestSuite):
2884
"""A decorator for TestCase/TestSuite objects.
2886
Usually, subclasses should override __iter__(used when flattening test
2887
suites), which we do to filter, reorder, parallelise and so on, run() and
2891
def __init__(self, suite):
2892
TestSuite.__init__(self)
2895
def countTestCases(self):
2898
cases += test.countTestCases()
2905
def run(self, result):
2906
# Use iteration on self, not self._tests, to allow subclasses to hook
2909
if result.shouldStop:
2915
class CountingDecorator(TestDecorator):
2916
"""A decorator which calls result.progress(self.countTestCases)."""
2918
def run(self, result):
2919
progress_method = getattr(result, 'progress', None)
2920
if callable(progress_method):
2921
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
2922
return super(CountingDecorator, self).run(result)
2925
class ExcludeDecorator(TestDecorator):
2926
"""A decorator which excludes test matching an exclude pattern."""
2928
def __init__(self, suite, exclude_pattern):
2929
TestDecorator.__init__(self, suite)
2930
self.exclude_pattern = exclude_pattern
2931
self.excluded = False
2935
return iter(self._tests)
2936
self.excluded = True
2937
suite = exclude_tests_by_re(self, self.exclude_pattern)
2939
self.addTests(suite)
2940
return iter(self._tests)
2943
class FilterTestsDecorator(TestDecorator):
2944
"""A decorator which filters tests to those matching a pattern."""
2946
def __init__(self, suite, pattern):
2947
TestDecorator.__init__(self, suite)
2948
self.pattern = pattern
2949
self.filtered = False
2953
return iter(self._tests)
2954
self.filtered = True
2955
suite = filter_suite_by_re(self, self.pattern)
2957
self.addTests(suite)
2958
return iter(self._tests)
2961
class RandomDecorator(TestDecorator):
2962
"""A decorator which randomises the order of its tests."""
2964
def __init__(self, suite, random_seed, stream):
2965
TestDecorator.__init__(self, suite)
2966
self.random_seed = random_seed
2967
self.randomised = False
2968
self.stream = stream
2972
return iter(self._tests)
2973
self.randomised = True
2974
self.stream.writeln("Randomizing test order using seed %s\n" %
2975
(self.actual_seed()))
2976
# Initialise the random number generator.
2977
random.seed(self.actual_seed())
2978
suite = randomize_suite(self)
2980
self.addTests(suite)
2981
return iter(self._tests)
2983
def actual_seed(self):
2984
if self.random_seed == "now":
2985
# We convert the seed to a long to make it reuseable across
2986
# invocations (because the user can reenter it).
2987
self.random_seed = long(time.time())
2283
2989
# Convert the seed to a long if we can
2285
random_seed = long(random_seed)
2991
self.random_seed = long(self.random_seed)
2288
runner.stream.writeln("Randomizing test order using seed %s\n" %
2290
random.seed(random_seed)
2291
# Customise the list of tests if requested
2292
if pattern != '.*' or exclude_pattern is not None or random_order:
2293
if matching_tests_first:
2294
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2297
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2299
result = runner.run(suite)
2302
return result.wasStrictlySuccessful()
2304
return result.wasSuccessful()
2994
return self.random_seed
2997
class TestFirstDecorator(TestDecorator):
2998
"""A decorator which moves named tests to the front."""
3000
def __init__(self, suite, pattern):
3001
TestDecorator.__init__(self, suite)
3002
self.pattern = pattern
3003
self.filtered = False
3007
return iter(self._tests)
3008
self.filtered = True
3009
suites = split_suite_by_re(self, self.pattern)
3011
self.addTests(suites)
3012
return iter(self._tests)
3015
def partition_tests(suite, count):
3016
"""Partition suite into count lists of tests."""
3018
tests = list(iter_suite_tests(suite))
3019
tests_per_process = int(math.ceil(float(len(tests)) / count))
3020
for block in range(count):
3021
low_test = block * tests_per_process
3022
high_test = low_test + tests_per_process
3023
process_tests = tests[low_test:high_test]
3024
result.append(process_tests)
3028
def fork_for_tests(suite):
3029
"""Take suite and start up one runner per CPU by forking()
3031
:return: An iterable of TestCase-like objects which can each have
3032
run(result) called on them to feed tests to result.
3034
concurrency = osutils.local_concurrency()
3036
from subunit import TestProtocolClient, ProtocolTestCase
3038
from subunit.test_results import AutoTimingTestResultDecorator
3040
AutoTimingTestResultDecorator = lambda x:x
3041
class TestInOtherProcess(ProtocolTestCase):
3042
# Should be in subunit, I think. RBC.
3043
def __init__(self, stream, pid):
3044
ProtocolTestCase.__init__(self, stream)
3047
def run(self, result):
3049
ProtocolTestCase.run(self, result)
3051
os.waitpid(self.pid, os.WNOHANG)
3053
test_blocks = partition_tests(suite, concurrency)
3054
for process_tests in test_blocks:
3055
process_suite = TestSuite()
3056
process_suite.addTests(process_tests)
3057
c2pread, c2pwrite = os.pipe()
3062
# Leave stderr and stdout open so we can see test noise
3063
# Close stdin so that the child goes away if it decides to
3064
# read from stdin (otherwise its a roulette to see what
3065
# child actually gets keystrokes for pdb etc).
3068
stream = os.fdopen(c2pwrite, 'wb', 1)
3069
subunit_result = AutoTimingTestResultDecorator(
3070
TestProtocolClient(stream))
3071
process_suite.run(subunit_result)
3076
stream = os.fdopen(c2pread, 'rb', 1)
3077
test = TestInOtherProcess(stream, pid)
3082
def reinvoke_for_tests(suite):
3083
"""Take suite and start up one runner per CPU using subprocess().
3085
:return: An iterable of TestCase-like objects which can each have
3086
run(result) called on them to feed tests to result.
3088
concurrency = osutils.local_concurrency()
3090
from subunit import ProtocolTestCase
3091
class TestInSubprocess(ProtocolTestCase):
3092
def __init__(self, process, name):
3093
ProtocolTestCase.__init__(self, process.stdout)
3094
self.process = process
3095
self.process.stdin.close()
3098
def run(self, result):
3100
ProtocolTestCase.run(self, result)
3103
os.unlink(self.name)
3104
# print "pid %d finished" % finished_process
3105
test_blocks = partition_tests(suite, concurrency)
3106
for process_tests in test_blocks:
3107
# ugly; currently reimplement rather than reuses TestCase methods.
3108
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3109
if not os.path.isfile(bzr_path):
3110
# We are probably installed. Assume sys.argv is the right file
3111
bzr_path = sys.argv[0]
3112
fd, test_list_file_name = tempfile.mkstemp()
3113
test_list_file = os.fdopen(fd, 'wb', 1)
3114
for test in process_tests:
3115
test_list_file.write(test.id() + '\n')
3116
test_list_file.close()
3118
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3120
if '--no-plugins' in sys.argv:
3121
argv.append('--no-plugins')
3122
# stderr=STDOUT would be ideal, but until we prevent noise on
3123
# stderr it can interrupt the subunit protocol.
3124
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3126
test = TestInSubprocess(process, test_list_file_name)
3129
os.unlink(test_list_file_name)
3134
class BZRTransformingResult(unittest.TestResult):
3136
def __init__(self, target):
3137
unittest.TestResult.__init__(self)
3138
self.result = target
3140
def startTest(self, test):
3141
self.result.startTest(test)
3143
def stopTest(self, test):
3144
self.result.stopTest(test)
3146
def addError(self, test, err):
3147
feature = self._error_looks_like('UnavailableFeature: ', err)
3148
if feature is not None:
3149
self.result.addNotSupported(test, feature)
3151
self.result.addError(test, err)
3153
def addFailure(self, test, err):
3154
known = self._error_looks_like('KnownFailure: ', err)
3155
if known is not None:
3156
self.result._addKnownFailure(test, [KnownFailure,
3157
KnownFailure(known), None])
3159
self.result.addFailure(test, err)
3161
def addSkip(self, test, reason):
3162
self.result.addSkip(test, reason)
3164
def addSuccess(self, test):
3165
self.result.addSuccess(test)
3167
def _error_looks_like(self, prefix, err):
3168
"""Deserialize exception and returns the stringify value."""
3172
if isinstance(exc, subunit.RemoteException):
3173
# stringify the exception gives access to the remote traceback
3174
# We search the last line for 'prefix'
3175
lines = str(exc).split('\n')
3176
while lines and not lines[-1]:
3179
if lines[-1].startswith(prefix):
3180
value = lines[-1][len(prefix):]
3184
# Controlled by "bzr selftest -E=..." option
3185
# Currently supported:
3186
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3187
# preserves any flags supplied at the command line.
3188
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3189
# rather than failing tests. And no longer raise
3190
# LockContention when fctnl locks are not being used
3191
# with proper exclusion rules.
3192
selftest_debug_flags = set()
2307
3195
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2341
3251
list_only=list_only,
2342
3252
random_seed=random_seed,
2343
3253
exclude_pattern=exclude_pattern,
3255
runner_class=runner_class,
3256
suite_decorators=suite_decorators,
2346
3260
default_transport = old_transport
3261
selftest_debug_flags = old_debug_flags
3264
def load_test_id_list(file_name):
3265
"""Load a test id list from a text file.
3267
The format is one test id by line. No special care is taken to impose
3268
strict rules, these test ids are used to filter the test suite so a test id
3269
that do not match an existing test will do no harm. This allows user to add
3270
comments, leave blank lines, etc.
3274
ftest = open(file_name, 'rt')
3276
if e.errno != errno.ENOENT:
3279
raise errors.NoSuchFile(file_name)
3281
for test_name in ftest.readlines():
3282
test_list.append(test_name.strip())
3287
def suite_matches_id_list(test_suite, id_list):
3288
"""Warns about tests not appearing or appearing more than once.
3290
:param test_suite: A TestSuite object.
3291
:param test_id_list: The list of test ids that should be found in
3294
:return: (absents, duplicates) absents is a list containing the test found
3295
in id_list but not in test_suite, duplicates is a list containing the
3296
test found multiple times in test_suite.
3298
When using a prefined test id list, it may occurs that some tests do not
3299
exist anymore or that some tests use the same id. This function warns the
3300
tester about potential problems in his workflow (test lists are volatile)
3301
or in the test suite itself (using the same id for several tests does not
3302
help to localize defects).
3304
# Build a dict counting id occurrences
3306
for test in iter_suite_tests(test_suite):
3308
tests[id] = tests.get(id, 0) + 1
3313
occurs = tests.get(id, 0)
3315
not_found.append(id)
3317
duplicates.append(id)
3319
return not_found, duplicates
3322
class TestIdList(object):
3323
"""Test id list to filter a test suite.
3325
Relying on the assumption that test ids are built as:
3326
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3327
notation, this class offers methods to :
3328
- avoid building a test suite for modules not refered to in the test list,
3329
- keep only the tests listed from the module test suite.
3332
def __init__(self, test_id_list):
3333
# When a test suite needs to be filtered against us we compare test ids
3334
# for equality, so a simple dict offers a quick and simple solution.
3335
self.tests = dict().fromkeys(test_id_list, True)
3337
# While unittest.TestCase have ids like:
3338
# <module>.<class>.<method>[(<param+)],
3339
# doctest.DocTestCase can have ids like:
3342
# <module>.<function>
3343
# <module>.<class>.<method>
3345
# Since we can't predict a test class from its name only, we settle on
3346
# a simple constraint: a test id always begins with its module name.
3349
for test_id in test_id_list:
3350
parts = test_id.split('.')
3351
mod_name = parts.pop(0)
3352
modules[mod_name] = True
3354
mod_name += '.' + part
3355
modules[mod_name] = True
3356
self.modules = modules
3358
def refers_to(self, module_name):
3359
"""Is there tests for the module or one of its sub modules."""
3360
return self.modules.has_key(module_name)
3362
def includes(self, test_id):
3363
return self.tests.has_key(test_id)
3366
class TestPrefixAliasRegistry(registry.Registry):
3367
"""A registry for test prefix aliases.
3369
This helps implement shorcuts for the --starting-with selftest
3370
option. Overriding existing prefixes is not allowed but not fatal (a
3371
warning will be emitted).
3374
def register(self, key, obj, help=None, info=None,
3375
override_existing=False):
3376
"""See Registry.register.
3378
Trying to override an existing alias causes a warning to be emitted,
3379
not a fatal execption.
3382
super(TestPrefixAliasRegistry, self).register(
3383
key, obj, help=help, info=info, override_existing=False)
3385
actual = self.get(key)
3386
note('Test prefix alias %s is already used for %s, ignoring %s'
3387
% (key, actual, obj))
3389
def resolve_alias(self, id_start):
3390
"""Replace the alias by the prefix in the given string.
3392
Using an unknown prefix is an error to help catching typos.
3394
parts = id_start.split('.')
3396
parts[0] = self.get(parts[0])
3398
raise errors.BzrCommandError(
3399
'%s is not a known test prefix alias' % parts[0])
3400
return '.'.join(parts)
3403
test_prefix_alias_registry = TestPrefixAliasRegistry()
3404
"""Registry of test prefix aliases."""
3407
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3408
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3409
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3411
# Obvious higest levels prefixes, feel free to add your own via a plugin
3412
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3413
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3414
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3415
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3416
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3419
def test_suite(keep_only=None, starting_with=None):
2350
3420
"""Build and return TestSuite for the whole of bzrlib.
3422
:param keep_only: A list of test ids limiting the suite returned.
3424
:param starting_with: An id limiting the suite returned to the tests
2352
3427
This function can be replaced if you need to change the default test
2353
3428
suite on a global basis, but it is not encouraged.
2355
3430
testmod_names = [
2356
'bzrlib.util.tests.test_bencode',
3432
'bzrlib.tests.blackbox',
3433
'bzrlib.tests.commands',
3434
'bzrlib.tests.per_branch',
3435
'bzrlib.tests.per_bzrdir',
3436
'bzrlib.tests.per_interrepository',
3437
'bzrlib.tests.per_intertree',
3438
'bzrlib.tests.per_inventory',
3439
'bzrlib.tests.per_interbranch',
3440
'bzrlib.tests.per_lock',
3441
'bzrlib.tests.per_transport',
3442
'bzrlib.tests.per_tree',
3443
'bzrlib.tests.per_pack_repository',
3444
'bzrlib.tests.per_repository',
3445
'bzrlib.tests.per_repository_chk',
3446
'bzrlib.tests.per_repository_reference',
3447
'bzrlib.tests.per_versionedfile',
3448
'bzrlib.tests.per_workingtree',
3449
'bzrlib.tests.test__annotator',
3450
'bzrlib.tests.test__chk_map',
2357
3451
'bzrlib.tests.test__dirstate_helpers',
3452
'bzrlib.tests.test__groupcompress',
3453
'bzrlib.tests.test__known_graph',
3454
'bzrlib.tests.test__rio',
3455
'bzrlib.tests.test__walkdirs_win32',
2358
3456
'bzrlib.tests.test_ancestry',
2359
3457
'bzrlib.tests.test_annotate',
2360
3458
'bzrlib.tests.test_api',
2361
3459
'bzrlib.tests.test_atomicfile',
2362
3460
'bzrlib.tests.test_bad_files',
3461
'bzrlib.tests.test_bencode',
3462
'bzrlib.tests.test_bisect_multi',
2363
3463
'bzrlib.tests.test_branch',
2364
3464
'bzrlib.tests.test_branchbuilder',
3465
'bzrlib.tests.test_btree_index',
2365
3466
'bzrlib.tests.test_bugtracker',
2366
3467
'bzrlib.tests.test_bundle',
2367
3468
'bzrlib.tests.test_bzrdir',
3469
'bzrlib.tests.test__chunks_to_lines',
2368
3470
'bzrlib.tests.test_cache_utf8',
3471
'bzrlib.tests.test_chk_map',
3472
'bzrlib.tests.test_chk_serializer',
3473
'bzrlib.tests.test_chunk_writer',
3474
'bzrlib.tests.test_clean_tree',
2369
3475
'bzrlib.tests.test_commands',
2370
3476
'bzrlib.tests.test_commit',
2371
3477
'bzrlib.tests.test_commit_merge',
2372
3478
'bzrlib.tests.test_config',
2373
3479
'bzrlib.tests.test_conflicts',
2374
3480
'bzrlib.tests.test_counted_lock',
3481
'bzrlib.tests.test_crash',
2375
3482
'bzrlib.tests.test_decorators',
2376
3483
'bzrlib.tests.test_delta',
3484
'bzrlib.tests.test_debug',
2377
3485
'bzrlib.tests.test_deprecated_graph',
2378
3486
'bzrlib.tests.test_diff',
3487
'bzrlib.tests.test_directory_service',
2379
3488
'bzrlib.tests.test_dirstate',
2380
3489
'bzrlib.tests.test_email_message',
3490
'bzrlib.tests.test_eol_filters',
2381
3491
'bzrlib.tests.test_errors',
2382
'bzrlib.tests.test_escaped_store',
3492
'bzrlib.tests.test_export',
2383
3493
'bzrlib.tests.test_extract',
2384
3494
'bzrlib.tests.test_fetch',
3495
'bzrlib.tests.test_fifo_cache',
3496
'bzrlib.tests.test_filters',
2385
3497
'bzrlib.tests.test_ftp_transport',
3498
'bzrlib.tests.test_foreign',
2386
3499
'bzrlib.tests.test_generate_docs',
2387
3500
'bzrlib.tests.test_generate_ids',
2388
3501
'bzrlib.tests.test_globbing',
2389
3502
'bzrlib.tests.test_gpg',
2390
3503
'bzrlib.tests.test_graph',
3504
'bzrlib.tests.test_groupcompress',
2391
3505
'bzrlib.tests.test_hashcache',
2392
3506
'bzrlib.tests.test_help',
2393
3507
'bzrlib.tests.test_hooks',
2476
3605
'bzrlib.tests.test_wsgi',
2477
3606
'bzrlib.tests.test_xml',
2479
test_transport_implementations = [
2480
'bzrlib.tests.test_transport_implementations',
2481
'bzrlib.tests.test_read_bundle',
2483
suite = TestUtil.TestSuite()
2484
3609
loader = TestUtil.TestLoader()
3611
if keep_only is not None:
3612
id_filter = TestIdList(keep_only)
3614
# We take precedence over keep_only because *at loading time* using
3615
# both options means we will load less tests for the same final result.
3616
def interesting_module(name):
3617
for start in starting_with:
3619
# Either the module name starts with the specified string
3620
name.startswith(start)
3621
# or it may contain tests starting with the specified string
3622
or start.startswith(name)
3626
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3628
elif keep_only is not None:
3629
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3630
def interesting_module(name):
3631
return id_filter.refers_to(name)
3634
loader = TestUtil.TestLoader()
3635
def interesting_module(name):
3636
# No filtering, all modules are interesting
3639
suite = loader.suiteClass()
3641
# modules building their suite with loadTestsFromModuleNames
2485
3642
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2486
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2487
adapter = TransportTestProviderAdapter()
2488
adapt_modules(test_transport_implementations, adapter, loader, suite)
2489
for package in packages_to_test():
2490
suite.addTest(package.test_suite())
2491
for m in MODULES_TO_TEST:
2492
suite.addTest(loader.loadTestsFromModule(m))
2493
for m in MODULES_TO_DOCTEST:
3644
modules_to_doctest = [
3646
'bzrlib.branchbuilder',
3649
'bzrlib.iterablefile',
3653
'bzrlib.symbol_versioning',
3656
'bzrlib.version_info_formats.format_custom',
3659
for mod in modules_to_doctest:
3660
if not interesting_module(mod):
3661
# No tests to keep here, move along
2495
suite.addTest(doctest.DocTestSuite(m))
3664
# note that this really does mean "report only" -- doctest
3665
# still runs the rest of the examples
3666
doc_suite = doctest.DocTestSuite(mod,
3667
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
2496
3668
except ValueError, e:
2497
print '**failed to get doctest for: %s\n%s' %(m,e)
3669
print '**failed to get doctest for: %s\n%s' % (mod, e)
3671
if len(doc_suite._tests) == 0:
3672
raise errors.BzrError("no doctests found in %s" % (mod,))
3673
suite.addTest(doc_suite)
2499
3675
default_encoding = sys.getdefaultencoding()
2500
3676
for name, plugin in bzrlib.plugin.plugins().items():
2502
plugin_suite = plugin.test_suite()
2503
except ImportError, e:
2504
bzrlib.trace.warning(
2505
'Unable to test plugin "%s": %s', name, e)
2507
if plugin_suite is not None:
2508
suite.addTest(plugin_suite)
3677
if not interesting_module(plugin.module.__name__):
3679
plugin_suite = plugin.test_suite()
3680
# We used to catch ImportError here and turn it into just a warning,
3681
# but really if you don't have --no-plugins this should be a failure.
3682
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3683
if plugin_suite is None:
3684
plugin_suite = plugin.load_plugin_tests(loader)
3685
if plugin_suite is not None:
3686
suite.addTest(plugin_suite)
2509
3687
if default_encoding != sys.getdefaultencoding():
2510
3688
bzrlib.trace.warning(
2511
3689
'Plugin "%s" tried to reset default encoding to: %s', name,
2512
3690
sys.getdefaultencoding())
2514
3692
sys.setdefaultencoding(default_encoding)
3694
if keep_only is not None:
3695
# Now that the referred modules have loaded their tests, keep only the
3697
suite = filter_suite_by_id_list(suite, id_filter)
3698
# Do some sanity checks on the id_list filtering
3699
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3701
# The tester has used both keep_only and starting_with, so he is
3702
# already aware that some tests are excluded from the list, there
3703
# is no need to tell him which.
3706
# Some tests mentioned in the list are not in the test suite. The
3707
# list may be out of date, report to the tester.
3708
for id in not_found:
3709
bzrlib.trace.warning('"%s" not found in the test suite', id)
3710
for id in duplicates:
3711
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
2518
def multiply_tests_from_modules(module_name_list, scenario_iter):
2519
"""Adapt all tests in some given modules to given scenarios.
2521
This is the recommended public interface for test parameterization.
2522
Typically the test_suite() method for a per-implementation test
2523
suite will call multiply_tests_from_modules and return the
2526
:param module_name_list: List of fully-qualified names of test
2528
:param scenario_iter: Iterable of pairs of (scenario_name,
3716
def multiply_scenarios(scenarios_left, scenarios_right):
3717
"""Multiply two sets of scenarios.
3719
:returns: the cartesian product of the two sets of scenarios, that is
3720
a scenario for every possible combination of a left scenario and a
3724
('%s,%s' % (left_name, right_name),
3725
dict(left_dict.items() + right_dict.items()))
3726
for left_name, left_dict in scenarios_left
3727
for right_name, right_dict in scenarios_right]
3730
def multiply_tests(tests, scenarios, result):
3731
"""Multiply tests_list by scenarios into result.
3733
This is the core workhorse for test parameterisation.
3735
Typically the load_tests() method for a per-implementation test suite will
3736
call multiply_tests and return the result.
3738
:param tests: The tests to parameterise.
3739
:param scenarios: The scenarios to apply: pairs of (scenario_name,
2529
3740
scenario_param_dict).
2531
This returns a new TestSuite containing the cross product of
2532
all the tests in all the modules, each repeated for each scenario.
2533
Each test is adapted by adding the scenario name at the end
2534
of its name, and updating the test object's __dict__ with the
2535
scenario_param_dict.
2537
>>> r = multiply_tests_from_modules(
2538
... ['bzrlib.tests.test_sampler'],
2539
... [('one', dict(param=1)),
2540
... ('two', dict(param=2))])
3741
:param result: A TestSuite to add created tests to.
3743
This returns the passed in result TestSuite with the cross product of all
3744
the tests repeated once for each scenario. Each test is adapted by adding
3745
the scenario name at the end of its id(), and updating the test object's
3746
__dict__ with the scenario_param_dict.
3748
>>> import bzrlib.tests.test_sampler
3749
>>> r = multiply_tests(
3750
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3751
... [('one', dict(param=1)),
3752
... ('two', dict(param=2))],
2541
3754
>>> tests = list(iter_suite_tests(r))
2613
3859
return self.__class__.__name__
2616
class TestScenarioApplier(object):
2617
"""A tool to apply scenarios to tests."""
2619
def adapt(self, test):
2620
"""Return a TestSuite containing a copy of test for each scenario."""
2621
result = unittest.TestSuite()
2622
for scenario in self.scenarios:
2623
result.addTest(self.adapt_test_to_scenario(test, scenario))
3862
class _SymlinkFeature(Feature):
3865
return osutils.has_symlinks()
3867
def feature_name(self):
3870
SymlinkFeature = _SymlinkFeature()
3873
class _HardlinkFeature(Feature):
3876
return osutils.has_hardlinks()
3878
def feature_name(self):
3881
HardlinkFeature = _HardlinkFeature()
3884
class _OsFifoFeature(Feature):
3887
return getattr(os, 'mkfifo', None)
3889
def feature_name(self):
3890
return 'filesystem fifos'
3892
OsFifoFeature = _OsFifoFeature()
3895
class _UnicodeFilenameFeature(Feature):
3896
"""Does the filesystem support Unicode filenames?"""
3900
# Check for character combinations unlikely to be covered by any
3901
# single non-unicode encoding. We use the characters
3902
# - greek small letter alpha (U+03B1) and
3903
# - braille pattern dots-123456 (U+283F).
3904
os.stat(u'\u03b1\u283f')
3905
except UnicodeEncodeError:
3907
except (IOError, OSError):
3908
# The filesystem allows the Unicode filename but the file doesn't
3912
# The filesystem allows the Unicode filename and the file exists,
3916
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3919
def probe_unicode_in_user_encoding():
3920
"""Try to encode several unicode strings to use in unicode-aware tests.
3921
Return first successfull match.
3923
:return: (unicode value, encoded plain string value) or (None, None)
3925
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3926
for uni_val in possible_vals:
3928
str_val = uni_val.encode(osutils.get_user_encoding())
3929
except UnicodeEncodeError:
3930
# Try a different character
3933
return uni_val, str_val
3937
def probe_bad_non_ascii(encoding):
3938
"""Try to find [bad] character with code [128..255]
3939
that cannot be decoded to unicode in some encoding.
3940
Return None if all non-ascii characters is valid
3943
for i in xrange(128, 256):
3946
char.decode(encoding)
3947
except UnicodeDecodeError:
3952
class _HTTPSServerFeature(Feature):
3953
"""Some tests want an https Server, check if one is available.
3955
Right now, the only way this is available is under python2.6 which provides
3966
def feature_name(self):
3967
return 'HTTPSServer'
3970
HTTPSServerFeature = _HTTPSServerFeature()
3973
class _UnicodeFilename(Feature):
3974
"""Does the filesystem support Unicode filenames?"""
3979
except UnicodeEncodeError:
3981
except (IOError, OSError):
3982
# The filesystem allows the Unicode filename but the file doesn't
3986
# The filesystem allows the Unicode filename and the file exists,
3990
UnicodeFilename = _UnicodeFilename()
3993
class _UTF8Filesystem(Feature):
3994
"""Is the filesystem UTF-8?"""
3997
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4001
UTF8Filesystem = _UTF8Filesystem()
4004
class _CaseInsCasePresFilenameFeature(Feature):
4005
"""Is the file-system case insensitive, but case-preserving?"""
4008
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4010
# first check truly case-preserving for created files, then check
4011
# case insensitive when opening existing files.
4012
name = osutils.normpath(name)
4013
base, rel = osutils.split(name)
4014
found_rel = osutils.canonical_relpath(base, name)
4015
return (found_rel == rel
4016
and os.path.isfile(name.upper())
4017
and os.path.isfile(name.lower()))
4022
def feature_name(self):
4023
return "case-insensitive case-preserving filesystem"
4025
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4028
class _CaseInsensitiveFilesystemFeature(Feature):
4029
"""Check if underlying filesystem is case-insensitive but *not* case
4032
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4033
# more likely to be case preserving, so this case is rare.
4036
if CaseInsCasePresFilenameFeature.available():
4039
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4040
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4041
TestCaseWithMemoryTransport.TEST_ROOT = root
4043
root = TestCaseWithMemoryTransport.TEST_ROOT
4044
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4046
name_a = osutils.pathjoin(tdir, 'a')
4047
name_A = osutils.pathjoin(tdir, 'A')
4049
result = osutils.isdir(name_A)
4050
_rmtree_temp_dir(tdir)
2626
def adapt_test_to_scenario(self, test, scenario):
2627
"""Copy test and apply scenario to it.
2629
:param test: A test to adapt.
2630
:param scenario: A tuple describing the scenarion.
2631
The first element of the tuple is the new test id.
2632
The second element is a dict containing attributes to set on the
2634
:return: The adapted test.
2636
from copy import deepcopy
2637
new_test = deepcopy(test)
2638
for name, value in scenario[1].items():
2639
setattr(new_test, name, value)
2640
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2641
new_test.id = lambda: new_id
4053
def feature_name(self):
4054
return 'case-insensitive filesystem'
4056
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4059
class _SubUnitFeature(Feature):
4060
"""Check if subunit is available."""
4069
def feature_name(self):
4072
SubUnitFeature = _SubUnitFeature()
4073
# Only define SubUnitBzrRunner if subunit is available.
4075
from subunit import TestProtocolClient
4077
from subunit.test_results import AutoTimingTestResultDecorator
4079
AutoTimingTestResultDecorator = lambda x:x
4080
class SubUnitBzrRunner(TextTestRunner):
4081
def run(self, test):
4082
result = AutoTimingTestResultDecorator(
4083
TestProtocolClient(self.stream))