314
246
def addError(self, test, err):
315
"""Tell result that test finished with an error.
317
Called from the TestCase run() method when the test
318
fails with an unexpected error.
247
self.extractBenchmarkTime(test)
248
self._cleanupLogFile(test)
249
if isinstance(err[1], TestSkipped):
250
return self.addSkipped(test, err)
251
elif isinstance(err[1], UnavailableFeature):
252
return self.addNotSupported(test, err[1].args[0])
321
253
unittest.TestResult.addError(self, test, err)
322
254
self.error_count += 1
323
255
self.report_error(test, err)
324
256
if self.stop_early:
259
def addFailure(self, test, err):
326
260
self._cleanupLogFile(test)
328
def addFailure(self, test, err):
329
"""Tell result that test failed.
331
Called from the TestCase run() method when the test
332
fails because e.g. an assert() method failed.
261
self.extractBenchmarkTime(test)
262
if isinstance(err[1], KnownFailure):
263
return self.addKnownFailure(test, err)
335
264
unittest.TestResult.addFailure(self, test, err)
336
265
self.failure_count += 1
337
266
self.report_failure(test, err)
338
267
if self.stop_early:
340
self._cleanupLogFile(test)
342
def addSuccess(self, test, details=None):
343
"""Tell result that test completed successfully.
345
Called from the TestCase run()
347
if self._bench_history is not None:
348
benchmark_time = self._extractBenchmarkTime(test, details)
349
if benchmark_time is not None:
350
self._bench_history.write("%s %s\n" % (
351
self._formatTime(benchmark_time),
353
self.report_success(test)
354
self._cleanupLogFile(test)
355
unittest.TestResult.addSuccess(self, test)
356
test._log_contents = ''
358
def addExpectedFailure(self, test, err):
270
def addKnownFailure(self, test, err):
359
271
self.known_failure_count += 1
360
272
self.report_known_failure(test, err)
362
274
def addNotSupported(self, test, feature):
363
"""The test will not be run because of a missing feature.
365
# this can be called in two different ways: it may be that the
366
# test started running, and then raised (through requireFeature)
367
# UnavailableFeature. Alternatively this method can be called
368
# while probing for features before running the test code proper; in
369
# that case we will see startTest and stopTest, but the test will
370
# never actually run.
371
275
self.unsupported.setdefault(str(feature), 0)
372
276
self.unsupported[str(feature)] += 1
373
277
self.report_unsupported(test, feature)
375
def addSkip(self, test, reason):
376
"""A test has not run for 'reason'."""
378
self.report_skip(test, reason)
380
def addNotApplicable(self, test, reason):
381
self.not_applicable_count += 1
382
self.report_not_applicable(test, reason)
384
def _post_mortem(self):
385
"""Start a PDB post mortem session."""
386
if os.environ.get('BZR_TEST_PDB', None):
387
import pdb;pdb.post_mortem()
389
def progress(self, offset, whence):
390
"""The test is adjusting the count of tests to run."""
391
if whence == SUBUNIT_SEEK_SET:
392
self.num_tests = offset
393
elif whence == SUBUNIT_SEEK_CUR:
394
self.num_tests += offset
279
def addSuccess(self, test):
280
self.extractBenchmarkTime(test)
281
if self._bench_history is not None:
282
if self._benchmarkTime is not None:
283
self._bench_history.write("%s %s\n" % (
284
self._formatTime(self._benchmarkTime),
286
self.report_success(test)
287
unittest.TestResult.addSuccess(self, test)
289
def addSkipped(self, test, skip_excinfo):
290
self.report_skip(test, skip_excinfo)
291
# seems best to treat this as success from point-of-view of unittest
292
# -- it actually does nothing so it barely matters :)
295
except KeyboardInterrupt:
298
self.addError(test, test.__exc_info())
396
raise errors.BzrError("Unknown whence %r" % whence)
300
unittest.TestResult.addSuccess(self, test)
302
def printErrorList(self, flavour, errors):
303
for test, err in errors:
304
self.stream.writeln(self.separator1)
305
self.stream.write("%s: " % flavour)
306
self.stream.writeln(self.getDescription(test))
307
if getattr(test, '_get_log', None) is not None:
309
print >>self.stream, \
310
('vvvv[log from %s]' % test.id()).ljust(78,'-')
311
print >>self.stream, test._get_log()
312
print >>self.stream, \
313
('^^^^[log from %s]' % test.id()).ljust(78,'-')
314
self.stream.writeln(self.separator2)
315
self.stream.writeln("%s" % err)
398
320
def report_cleaning_up(self):
401
def startTestRun(self):
402
self.startTime = time.time()
404
323
def report_success(self, test):
407
def wasStrictlySuccessful(self):
408
if self.unsupported or self.known_failure_count:
410
return self.wasSuccessful()
413
327
class TextTestResult(ExtendedTestResult):
414
328
"""Displays progress and results of tests in text form"""
416
330
def __init__(self, stream, descriptions, verbosity,
417
331
bench_history=None,
421
335
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
422
bench_history, strict)
423
# We no longer pass them around, but just rely on the UIFactory stack
426
warnings.warn("Passing pb to TextTestResult is deprecated")
427
self.pb = self.ui.nested_progress_bar()
336
bench_history, num_tests)
338
self.pb = self.ui.nested_progress_bar()
339
self._supplied_pb = False
342
self._supplied_pb = True
428
343
self.pb.show_pct = False
429
344
self.pb.show_spinner = False
430
345
self.pb.show_eta = False,
431
346
self.pb.show_count = False
432
347
self.pb.show_bar = False
433
self.pb.update_latency = 0
434
self.pb.show_transport_activity = False
436
def stopTestRun(self):
437
# called when the tests that are going to run have run
440
super(TextTestResult, self).stopTestRun()
442
def startTestRun(self):
443
super(TextTestResult, self).startTestRun()
444
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
446
def printErrors(self):
447
# clear the pb to make room for the error listing
449
super(TextTestResult, self).printErrors()
349
def report_starting(self):
350
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
451
352
def _progress_prefix_text(self):
452
# the longer this text, the less space we have to show the test
454
a = '[%d' % self.count # total that have been run
455
# tests skipped as known not to be relevant are not important enough
457
## if self.skip_count:
458
## a += ', %d skip' % self.skip_count
459
## if self.known_failure_count:
460
## a += '+%dX' % self.known_failure_count
353
a = '[%d' % self.count
354
if self.num_tests is not None:
462
355
a +='/%d' % self.num_tests
464
runtime = time.time() - self._overall_start_time
466
a += '%dm%ds' % (runtime / 60, runtime % 60)
469
total_fail_count = self.error_count + self.failure_count
471
a += ', %d failed' % total_fail_count
472
# if self.unsupported:
473
# a += ', %d missing' % len(self.unsupported)
356
a += ' in %ds' % (time.time() - self._overall_start_time)
358
a += ', %d errors' % self.error_count
359
if self.failure_count:
360
a += ', %d failed' % self.failure_count
361
if self.known_failure_count:
362
a += ', %d known failures' % self.known_failure_count
364
a += ', %d skipped' % self.skip_count
366
a += ', %d missing features' % len(self.unsupported)
779
706
retrieved by _get_log(). We use a real OS file, not an in-memory object,
780
707
so that it can also capture file IO. When the test completes this file
781
708
is read into memory and removed from disk.
783
710
There are also convenience functions to invoke bzr's command-line
784
711
routine, and to build and check bzr trees.
786
713
In addition to the usual method of overriding tearDown(), this class also
787
714
allows subclasses to register functions into the _cleanups list, which is
788
715
run in order as the object is torn down. It's less likely this will be
789
716
accidentally overlooked.
792
_active_threads = None
793
_leaking_threads_tests = 0
794
_first_thread_leaker_id = None
795
719
_log_file_name = None
721
_keep_log_file = False
796
722
# record lsprof data when performing benchmark calls.
797
723
_gather_lsprof_in_benchmarks = False
799
725
def __init__(self, methodName='testMethod'):
800
726
super(TestCase, self).__init__(methodName)
801
727
self._cleanups = []
802
self._directory_isolation = True
803
self.exception_handlers.insert(0,
804
(UnavailableFeature, self._do_unsupported_or_skip))
805
self.exception_handlers.insert(0,
806
(TestNotApplicable, self._do_not_applicable))
809
super(TestCase, self).setUp()
810
for feature in getattr(self, '_test_needs_features', []):
811
self.requireFeature(feature)
812
self._log_contents = None
813
self.addDetail("log", content.Content(content.ContentType("text",
814
"plain", {"charset": "utf8"}),
815
lambda:[self._get_log(keep_log_file=True)]))
730
unittest.TestCase.setUp(self)
816
731
self._cleanEnvironment()
732
bzrlib.trace.disable_default_logging()
817
733
self._silenceUI()
818
734
self._startLogFile()
819
735
self._benchcalls = []
820
736
self._benchtime = None
821
737
self._clear_hooks()
822
self._track_transports()
824
738
self._clear_debug_flags()
825
TestCase._active_threads = threading.activeCount()
826
self.addCleanup(self._check_leaked_threads)
831
pdb.Pdb().set_trace(sys._getframe().f_back)
833
def _check_leaked_threads(self):
834
active = threading.activeCount()
835
leaked_threads = active - TestCase._active_threads
836
TestCase._active_threads = active
837
# If some tests make the number of threads *decrease*, we'll consider
838
# that they are just observing old threads dieing, not agressively kill
839
# random threads. So we don't report these tests as leaking. The risk
840
# is that we have false positives that way (the test see 2 threads
841
# going away but leak one) but it seems less likely than the actual
842
# false positives (the test see threads going away and does not leak).
843
if leaked_threads > 0:
844
TestCase._leaking_threads_tests += 1
845
if TestCase._first_thread_leaker_id is None:
846
TestCase._first_thread_leaker_id = self.id()
848
740
def _clear_debug_flags(self):
849
741
"""Prevent externally set debug flags affecting tests.
851
743
Tests that want to use debug flags can just set them in the
852
744
debug_flags set during setup/teardown.
854
# Start with a copy of the current debug flags we can safely modify.
855
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
856
if 'allow_debug' not in selftest_debug_flags:
857
debug.debug_flags.clear()
858
if 'disable_lock_checks' not in selftest_debug_flags:
859
debug.debug_flags.add('strict_locks')
746
self._preserved_debug_flags = set(debug.debug_flags)
747
debug.debug_flags.clear()
748
self.addCleanup(self._restore_debug_flags)
861
750
def _clear_hooks(self):
862
751
# prevent hooks affecting tests
863
self._preserved_hooks = {}
864
for key, factory in hooks.known_hooks.items():
865
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
866
current_hooks = hooks.known_hooks_key_to_object(key)
867
self._preserved_hooks[parent] = (name, current_hooks)
753
import bzrlib.smart.server
754
self._preserved_hooks = {
755
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
756
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
868
758
self.addCleanup(self._restoreHooks)
869
for key, factory in hooks.known_hooks.items():
870
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
871
setattr(parent, name, factory())
872
# this hook should always be installed
873
request._install_hook()
875
def disable_directory_isolation(self):
876
"""Turn off directory isolation checks."""
877
self._directory_isolation = False
879
def enable_directory_isolation(self):
880
"""Enable directory isolation checks."""
881
self._directory_isolation = True
759
# reset all hooks to an empty instance of the appropriate type
760
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
761
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
883
763
def _silenceUI(self):
884
764
"""Turn off UI for duration of test"""
885
765
# by default the UI is off; tests can turn it on if they want it.
886
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
888
def _check_locks(self):
889
"""Check that all lock take/release actions have been paired."""
890
# We always check for mismatched locks. If a mismatch is found, we
891
# fail unless -Edisable_lock_checks is supplied to selftest, in which
892
# case we just print a warning.
894
acquired_locks = [lock for action, lock in self._lock_actions
895
if action == 'acquired']
896
released_locks = [lock for action, lock in self._lock_actions
897
if action == 'released']
898
broken_locks = [lock for action, lock in self._lock_actions
899
if action == 'broken']
900
# trivially, given the tests for lock acquistion and release, if we
901
# have as many in each list, it should be ok. Some lock tests also
902
# break some locks on purpose and should be taken into account by
903
# considering that breaking a lock is just a dirty way of releasing it.
904
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
905
message = ('Different number of acquired and '
906
'released or broken locks. (%s, %s + %s)' %
907
(acquired_locks, released_locks, broken_locks))
908
if not self._lock_check_thorough:
909
# Rather than fail, just warn
910
print "Broken test %s: %s" % (self, message)
914
def _track_locks(self):
915
"""Track lock activity during tests."""
916
self._lock_actions = []
917
if 'disable_lock_checks' in selftest_debug_flags:
918
self._lock_check_thorough = False
920
self._lock_check_thorough = True
922
self.addCleanup(self._check_locks)
923
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
924
self._lock_acquired, None)
925
_mod_lock.Lock.hooks.install_named_hook('lock_released',
926
self._lock_released, None)
927
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
928
self._lock_broken, None)
930
def _lock_acquired(self, result):
931
self._lock_actions.append(('acquired', result))
933
def _lock_released(self, result):
934
self._lock_actions.append(('released', result))
936
def _lock_broken(self, result):
937
self._lock_actions.append(('broken', result))
939
def permit_dir(self, name):
940
"""Permit a directory to be used by this test. See permit_url."""
941
name_transport = get_transport(name)
942
self.permit_url(name)
943
self.permit_url(name_transport.base)
945
def permit_url(self, url):
946
"""Declare that url is an ok url to use in this test.
948
Do this for memory transports, temporary test directory etc.
950
Do not do this for the current working directory, /tmp, or any other
951
preexisting non isolated url.
953
if not url.endswith('/'):
955
self._bzr_selftest_roots.append(url)
957
def permit_source_tree_branch_repo(self):
958
"""Permit the source tree bzr is running from to be opened.
960
Some code such as bzrlib.version attempts to read from the bzr branch
961
that bzr is executing from (if any). This method permits that directory
962
to be used in the test suite.
964
path = self.get_source_path()
965
self.record_directory_isolation()
968
workingtree.WorkingTree.open(path)
969
except (errors.NotBranchError, errors.NoWorkingTree):
972
self.enable_directory_isolation()
974
def _preopen_isolate_transport(self, transport):
975
"""Check that all transport openings are done in the test work area."""
976
while isinstance(transport, pathfilter.PathFilteringTransport):
977
# Unwrap pathfiltered transports
978
transport = transport.server.backing_transport.clone(
979
transport._filter('.'))
981
# ReadonlySmartTCPServer_for_testing decorates the backing transport
982
# urls it is given by prepending readonly+. This is appropriate as the
983
# client shouldn't know that the server is readonly (or not readonly).
984
# We could register all servers twice, with readonly+ prepending, but
985
# that makes for a long list; this is about the same but easier to
987
if url.startswith('readonly+'):
988
url = url[len('readonly+'):]
989
self._preopen_isolate_url(url)
991
def _preopen_isolate_url(self, url):
992
if not self._directory_isolation:
994
if self._directory_isolation == 'record':
995
self._bzr_selftest_roots.append(url)
997
# This prevents all transports, including e.g. sftp ones backed on disk
998
# from working unless they are explicitly granted permission. We then
999
# depend on the code that sets up test transports to check that they are
1000
# appropriately isolated and enable their use by calling
1001
# self.permit_transport()
1002
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1003
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1004
% (url, self._bzr_selftest_roots))
1006
def record_directory_isolation(self):
1007
"""Gather accessed directories to permit later access.
1009
This is used for tests that access the branch bzr is running from.
1011
self._directory_isolation = "record"
1013
def start_server(self, transport_server, backing_server=None):
1014
"""Start transport_server for this test.
1016
This starts the server, registers a cleanup for it and permits the
1017
server's urls to be used.
1019
if backing_server is None:
1020
transport_server.start_server()
1022
transport_server.start_server(backing_server)
1023
self.addCleanup(transport_server.stop_server)
1024
# Obtain a real transport because if the server supplies a password, it
1025
# will be hidden from the base on the client side.
1026
t = get_transport(transport_server.get_url())
1027
# Some transport servers effectively chroot the backing transport;
1028
# others like SFTPServer don't - users of the transport can walk up the
1029
# transport to read the entire backing transport. This wouldn't matter
1030
# except that the workdir tests are given - and that they expect the
1031
# server's url to point at - is one directory under the safety net. So
1032
# Branch operations into the transport will attempt to walk up one
1033
# directory. Chrooting all servers would avoid this but also mean that
1034
# we wouldn't be testing directly against non-root urls. Alternatively
1035
# getting the test framework to start the server with a backing server
1036
# at the actual safety net directory would work too, but this then
1037
# means that the self.get_url/self.get_transport methods would need
1038
# to transform all their results. On balance its cleaner to handle it
1039
# here, and permit a higher url when we have one of these transports.
1040
if t.base.endswith('/work/'):
1041
# we have safety net/test root/work
1042
t = t.clone('../..')
1043
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1044
# The smart server adds a path similar to work, which is traversed
1045
# up from by the client. But the server is chrooted - the actual
1046
# backing transport is not escaped from, and VFS requests to the
1047
# root will error (because they try to escape the chroot).
1049
while t2.base != t.base:
1052
self.permit_url(t.base)
1054
def _track_transports(self):
1055
"""Install checks for transport usage."""
1056
# TestCase has no safe place it can write to.
1057
self._bzr_selftest_roots = []
1058
# Currently the easiest way to be sure that nothing is going on is to
1059
# hook into bzr dir opening. This leaves a small window of error for
1060
# transport tests, but they are well known, and we can improve on this
1062
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1063
self._preopen_isolate_transport, "Check bzr directories are safe.")
766
saved = ui.ui_factory
768
ui.ui_factory = saved
769
ui.ui_factory = ui.SilentUIFactory()
770
self.addCleanup(_restore)
1065
772
def _ndiff_strings(self, a, b):
1066
773
"""Return ndiff between two strings containing lines.
1068
775
A trailing newline is added if missing to make the strings
1069
776
print properly."""
1070
777
if b and b[-1] != '\n':
2685
2114
self.transport_readonly_server = HttpServer
2688
def condition_id_re(pattern):
2689
"""Create a condition filter which performs a re check on a test's id.
2691
:param pattern: A regular expression string.
2692
:return: A callable that returns True if the re matches.
2694
filter_re = osutils.re_compile_checked(pattern, 0,
2696
def condition(test):
2117
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2118
random_order=False):
2119
"""Create a test suite by filtering another one.
2121
:param suite: the source suite
2122
:param pattern: pattern that names must match
2123
:param exclude_pattern: pattern that names must not match, if any
2124
:param random_order: if True, tests in the new suite will be put in
2126
:returns: the newly created suite
2128
return sort_suite_by_re(suite, pattern, exclude_pattern,
2129
random_order, False)
2132
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2133
random_order=False, append_rest=True):
2134
"""Create a test suite by sorting another one.
2136
:param suite: the source suite
2137
:param pattern: pattern that names must match in order to go
2138
first in the new suite
2139
:param exclude_pattern: pattern that names must not match, if any
2140
:param random_order: if True, tests in the new suite will be put in
2142
:param append_rest: if False, pattern is a strict filter and not
2143
just an ordering directive
2144
:returns: the newly created suite
2148
filter_re = re.compile(pattern)
2149
if exclude_pattern is not None:
2150
exclude_re = re.compile(exclude_pattern)
2151
for test in iter_suite_tests(suite):
2697
2152
test_id = test.id()
2698
return filter_re.search(test_id)
2702
def condition_isinstance(klass_or_klass_list):
2703
"""Create a condition filter which returns isinstance(param, klass).
2705
:return: A callable which when called with one parameter obj return the
2706
result of isinstance(obj, klass_or_klass_list).
2709
return isinstance(obj, klass_or_klass_list)
2713
def condition_id_in_list(id_list):
2714
"""Create a condition filter which verify that test's id in a list.
2716
:param id_list: A TestIdList object.
2717
:return: A callable that returns True if the test's id appears in the list.
2719
def condition(test):
2720
return id_list.includes(test.id())
2724
def condition_id_startswith(starts):
2725
"""Create a condition filter verifying that test's id starts with a string.
2727
:param starts: A list of string.
2728
:return: A callable that returns True if the test's id starts with one of
2731
def condition(test):
2732
for start in starts:
2733
if test.id().startswith(start):
2739
def exclude_tests_by_condition(suite, condition):
2740
"""Create a test suite which excludes some tests from suite.
2742
:param suite: The suite to get tests from.
2743
:param condition: A callable whose result evaluates True when called with a
2744
test case which should be excluded from the result.
2745
:return: A suite which contains the tests found in suite that fail
2749
for test in iter_suite_tests(suite):
2750
if not condition(test):
2752
return TestUtil.TestSuite(result)
2755
def filter_suite_by_condition(suite, condition):
2756
"""Create a test suite by filtering another one.
2758
:param suite: The source suite.
2759
:param condition: A callable whose result evaluates True when called with a
2760
test case which should be included in the result.
2761
:return: A suite which contains the tests found in suite that pass
2765
for test in iter_suite_tests(suite):
2768
return TestUtil.TestSuite(result)
2771
def filter_suite_by_re(suite, pattern):
2772
"""Create a test suite by filtering another one.
2774
:param suite: the source suite
2775
:param pattern: pattern that names must match
2776
:returns: the newly created suite
2778
condition = condition_id_re(pattern)
2779
result_suite = filter_suite_by_condition(suite, condition)
2783
def filter_suite_by_id_list(suite, test_id_list):
2784
"""Create a test suite by filtering another one.
2786
:param suite: The source suite.
2787
:param test_id_list: A list of the test ids to keep as strings.
2788
:returns: the newly created suite
2790
condition = condition_id_in_list(test_id_list)
2791
result_suite = filter_suite_by_condition(suite, condition)
2795
def filter_suite_by_id_startswith(suite, start):
2796
"""Create a test suite by filtering another one.
2798
:param suite: The source suite.
2799
:param start: A list of string the test id must start with one of.
2800
:returns: the newly created suite
2802
condition = condition_id_startswith(start)
2803
result_suite = filter_suite_by_condition(suite, condition)
2807
def exclude_tests_by_re(suite, pattern):
2808
"""Create a test suite which excludes some tests from suite.
2810
:param suite: The suite to get tests from.
2811
:param pattern: A regular expression string. Test ids that match this
2812
pattern will be excluded from the result.
2813
:return: A TestSuite that contains all the tests from suite without the
2814
tests that matched pattern. The order of tests is the same as it was in
2817
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2820
def preserve_input(something):
2821
"""A helper for performing test suite transformation chains.
2823
:param something: Anything you want to preserve.
2829
def randomize_suite(suite):
2830
"""Return a new TestSuite with suite's tests in random order.
2832
The tests in the input suite are flattened into a single suite in order to
2833
accomplish this. Any nested TestSuites are removed to provide global
2836
tests = list(iter_suite_tests(suite))
2837
random.shuffle(tests)
2838
return TestUtil.TestSuite(tests)
2841
def split_suite_by_condition(suite, condition):
2842
"""Split a test suite into two by a condition.
2844
:param suite: The suite to split.
2845
:param condition: The condition to match on. Tests that match this
2846
condition are returned in the first test suite, ones that do not match
2847
are in the second suite.
2848
:return: A tuple of two test suites, where the first contains tests from
2849
suite matching the condition, and the second contains the remainder
2850
from suite. The order within each output suite is the same as it was in
2855
for test in iter_suite_tests(suite):
2857
matched.append(test)
2859
did_not_match.append(test)
2860
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2863
def split_suite_by_re(suite, pattern):
2864
"""Split a test suite into two by a regular expression.
2866
:param suite: The suite to split.
2867
:param pattern: A regular expression string. Test ids that match this
2868
pattern will be in the first test suite returned, and the others in the
2869
second test suite returned.
2870
:return: A tuple of two test suites, where the first contains tests from
2871
suite matching pattern, and the second contains the remainder from
2872
suite. The order within each output suite is the same as it was in
2875
return split_suite_by_condition(suite, condition_id_re(pattern))
2153
if exclude_pattern is None or not exclude_re.search(test_id):
2154
if filter_re.search(test_id):
2159
random.shuffle(first)
2160
random.shuffle(second)
2161
return TestUtil.TestSuite(first + second)
2878
2164
def run_suite(suite, name='test', verbose=False, pattern=".*",
2882
2168
list_only=False,
2883
2169
random_seed=None,
2884
2170
exclude_pattern=None,
2887
suite_decorators=None,
2889
result_decorators=None,
2891
"""Run a test suite for bzr selftest.
2893
:param runner_class: The class of runner to use. Must support the
2894
constructor arguments passed by run_suite which are more than standard
2896
:return: A boolean indicating success.
2898
2172
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2903
if runner_class is None:
2904
runner_class = TextTestRunner
2907
runner = runner_class(stream=stream,
2177
runner = TextTestRunner(stream=sys.stdout,
2908
2178
descriptions=0,
2909
2179
verbosity=verbosity,
2910
2180
bench_history=bench_history,
2912
result_decorators=result_decorators,
2181
list_only=list_only,
2914
2183
runner.stop_on_failure=stop_on_failure
2915
# built in decorator factories:
2917
random_order(random_seed, runner),
2918
exclude_tests(exclude_pattern),
2920
if matching_tests_first:
2921
decorators.append(tests_first(pattern))
2923
decorators.append(filter_tests(pattern))
2924
if suite_decorators:
2925
decorators.extend(suite_decorators)
2926
# tell the result object how many tests will be running: (except if
2927
# --parallel=fork is being used. Robert said he will provide a better
2928
# progress design later -- vila 20090817)
2929
if fork_decorator not in decorators:
2930
decorators.append(CountingDecorator)
2931
for decorator in decorators:
2932
suite = decorator(suite)
2934
# Done after test suite decoration to allow randomisation etc
2935
# to take effect, though that is of marginal benefit.
2937
stream.write("Listing tests only ...\n")
2938
for t in iter_suite_tests(suite):
2939
stream.write("%s\n" % (t.id()))
2941
result = runner.run(suite)
2943
return result.wasStrictlySuccessful()
2945
return result.wasSuccessful()
2948
# A registry where get() returns a suite decorator.
2949
parallel_registry = registry.Registry()
2952
def fork_decorator(suite):
2953
concurrency = osutils.local_concurrency()
2954
if concurrency == 1:
2956
from testtools import ConcurrentTestSuite
2957
return ConcurrentTestSuite(suite, fork_for_tests)
2958
parallel_registry.register('fork', fork_decorator)
2961
def subprocess_decorator(suite):
2962
concurrency = osutils.local_concurrency()
2963
if concurrency == 1:
2965
from testtools import ConcurrentTestSuite
2966
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2967
parallel_registry.register('subprocess', subprocess_decorator)
2970
def exclude_tests(exclude_pattern):
2971
"""Return a test suite decorator that excludes tests."""
2972
if exclude_pattern is None:
2973
return identity_decorator
2974
def decorator(suite):
2975
return ExcludeDecorator(suite, exclude_pattern)
2979
def filter_tests(pattern):
2981
return identity_decorator
2982
def decorator(suite):
2983
return FilterTestsDecorator(suite, pattern)
2987
def random_order(random_seed, runner):
2988
"""Return a test suite decorator factory for randomising tests order.
2990
:param random_seed: now, a string which casts to a long, or a long.
2991
:param runner: A test runner with a stream attribute to report on.
2993
if random_seed is None:
2994
return identity_decorator
2995
def decorator(suite):
2996
return RandomDecorator(suite, random_seed, runner.stream)
3000
def tests_first(pattern):
3002
return identity_decorator
3003
def decorator(suite):
3004
return TestFirstDecorator(suite, pattern)
3008
def identity_decorator(suite):
3013
class TestDecorator(TestSuite):
3014
"""A decorator for TestCase/TestSuite objects.
3016
Usually, subclasses should override __iter__(used when flattening test
3017
suites), which we do to filter, reorder, parallelise and so on, run() and
3021
def __init__(self, suite):
3022
TestSuite.__init__(self)
3025
def countTestCases(self):
3028
cases += test.countTestCases()
3035
def run(self, result):
3036
# Use iteration on self, not self._tests, to allow subclasses to hook
3039
if result.shouldStop:
3045
class CountingDecorator(TestDecorator):
3046
"""A decorator which calls result.progress(self.countTestCases)."""
3048
def run(self, result):
3049
progress_method = getattr(result, 'progress', None)
3050
if callable(progress_method):
3051
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3052
return super(CountingDecorator, self).run(result)
3055
class ExcludeDecorator(TestDecorator):
3056
"""A decorator which excludes test matching an exclude pattern."""
3058
def __init__(self, suite, exclude_pattern):
3059
TestDecorator.__init__(self, suite)
3060
self.exclude_pattern = exclude_pattern
3061
self.excluded = False
3065
return iter(self._tests)
3066
self.excluded = True
3067
suite = exclude_tests_by_re(self, self.exclude_pattern)
3069
self.addTests(suite)
3070
return iter(self._tests)
3073
class FilterTestsDecorator(TestDecorator):
3074
"""A decorator which filters tests to those matching a pattern."""
3076
def __init__(self, suite, pattern):
3077
TestDecorator.__init__(self, suite)
3078
self.pattern = pattern
3079
self.filtered = False
3083
return iter(self._tests)
3084
self.filtered = True
3085
suite = filter_suite_by_re(self, self.pattern)
3087
self.addTests(suite)
3088
return iter(self._tests)
3091
class RandomDecorator(TestDecorator):
3092
"""A decorator which randomises the order of its tests."""
3094
def __init__(self, suite, random_seed, stream):
3095
TestDecorator.__init__(self, suite)
3096
self.random_seed = random_seed
3097
self.randomised = False
3098
self.stream = stream
3102
return iter(self._tests)
3103
self.randomised = True
3104
self.stream.write("Randomizing test order using seed %s\n\n" %
3105
(self.actual_seed()))
3106
# Initialise the random number generator.
3107
random.seed(self.actual_seed())
3108
suite = randomize_suite(self)
3110
self.addTests(suite)
3111
return iter(self._tests)
3113
def actual_seed(self):
3114
if self.random_seed == "now":
3115
# We convert the seed to a long to make it reuseable across
3116
# invocations (because the user can reenter it).
3117
self.random_seed = long(time.time())
2184
# Initialise the random number generator and display the seed used.
2185
# We convert the seed to a long to make it reuseable across invocations.
2186
random_order = False
2187
if random_seed is not None:
2189
if random_seed == "now":
2190
random_seed = long(time.time())
3119
2192
# Convert the seed to a long if we can
3121
self.random_seed = long(self.random_seed)
2194
random_seed = long(random_seed)
3124
return self.random_seed
3127
class TestFirstDecorator(TestDecorator):
3128
"""A decorator which moves named tests to the front."""
3130
def __init__(self, suite, pattern):
3131
TestDecorator.__init__(self, suite)
3132
self.pattern = pattern
3133
self.filtered = False
3137
return iter(self._tests)
3138
self.filtered = True
3139
suites = split_suite_by_re(self, self.pattern)
3141
self.addTests(suites)
3142
return iter(self._tests)
3145
def partition_tests(suite, count):
3146
"""Partition suite into count lists of tests."""
3148
tests = list(iter_suite_tests(suite))
3149
tests_per_process = int(math.ceil(float(len(tests)) / count))
3150
for block in range(count):
3151
low_test = block * tests_per_process
3152
high_test = low_test + tests_per_process
3153
process_tests = tests[low_test:high_test]
3154
result.append(process_tests)
3158
def fork_for_tests(suite):
3159
"""Take suite and start up one runner per CPU by forking()
3161
:return: An iterable of TestCase-like objects which can each have
3162
run(result) called on them to feed tests to result.
3164
concurrency = osutils.local_concurrency()
3166
from subunit import TestProtocolClient, ProtocolTestCase
3167
from subunit.test_results import AutoTimingTestResultDecorator
3168
class TestInOtherProcess(ProtocolTestCase):
3169
# Should be in subunit, I think. RBC.
3170
def __init__(self, stream, pid):
3171
ProtocolTestCase.__init__(self, stream)
3174
def run(self, result):
3176
ProtocolTestCase.run(self, result)
3178
os.waitpid(self.pid, os.WNOHANG)
3180
test_blocks = partition_tests(suite, concurrency)
3181
for process_tests in test_blocks:
3182
process_suite = TestSuite()
3183
process_suite.addTests(process_tests)
3184
c2pread, c2pwrite = os.pipe()
3189
# Leave stderr and stdout open so we can see test noise
3190
# Close stdin so that the child goes away if it decides to
3191
# read from stdin (otherwise its a roulette to see what
3192
# child actually gets keystrokes for pdb etc).
3195
stream = os.fdopen(c2pwrite, 'wb', 1)
3196
subunit_result = AutoTimingTestResultDecorator(
3197
TestProtocolClient(stream))
3198
process_suite.run(subunit_result)
2197
runner.stream.writeln("Randomizing test order using seed %s\n" %
2199
random.seed(random_seed)
2200
# Customise the list of tests if requested
2201
if pattern != '.*' or exclude_pattern is not None or random_order:
2202
if matching_tests_first:
2203
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
3203
stream = os.fdopen(c2pread, 'rb', 1)
3204
test = TestInOtherProcess(stream, pid)
3209
def reinvoke_for_tests(suite):
3210
"""Take suite and start up one runner per CPU using subprocess().
3212
:return: An iterable of TestCase-like objects which can each have
3213
run(result) called on them to feed tests to result.
3215
concurrency = osutils.local_concurrency()
3217
from subunit import ProtocolTestCase
3218
class TestInSubprocess(ProtocolTestCase):
3219
def __init__(self, process, name):
3220
ProtocolTestCase.__init__(self, process.stdout)
3221
self.process = process
3222
self.process.stdin.close()
3225
def run(self, result):
3227
ProtocolTestCase.run(self, result)
3230
os.unlink(self.name)
3231
# print "pid %d finished" % finished_process
3232
test_blocks = partition_tests(suite, concurrency)
3233
for process_tests in test_blocks:
3234
# ugly; currently reimplement rather than reuses TestCase methods.
3235
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3236
if not os.path.isfile(bzr_path):
3237
# We are probably installed. Assume sys.argv is the right file
3238
bzr_path = sys.argv[0]
3239
bzr_path = [bzr_path]
3240
if sys.platform == "win32":
3241
# if we're on windows, we can't execute the bzr script directly
3242
bzr_path = [sys.executable] + bzr_path
3243
fd, test_list_file_name = tempfile.mkstemp()
3244
test_list_file = os.fdopen(fd, 'wb', 1)
3245
for test in process_tests:
3246
test_list_file.write(test.id() + '\n')
3247
test_list_file.close()
3249
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3251
if '--no-plugins' in sys.argv:
3252
argv.append('--no-plugins')
3253
# stderr=STDOUT would be ideal, but until we prevent noise on
3254
# stderr it can interrupt the subunit protocol.
3255
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3257
test = TestInSubprocess(process, test_list_file_name)
3260
os.unlink(test_list_file_name)
3265
class ForwardingResult(unittest.TestResult):
3267
def __init__(self, target):
3268
unittest.TestResult.__init__(self)
3269
self.result = target
3271
def startTest(self, test):
3272
self.result.startTest(test)
3274
def stopTest(self, test):
3275
self.result.stopTest(test)
3277
def startTestRun(self):
3278
self.result.startTestRun()
3280
def stopTestRun(self):
3281
self.result.stopTestRun()
3283
def addSkip(self, test, reason):
3284
self.result.addSkip(test, reason)
3286
def addSuccess(self, test):
3287
self.result.addSuccess(test)
3289
def addError(self, test, err):
3290
self.result.addError(test, err)
3292
def addFailure(self, test, err):
3293
self.result.addFailure(test, err)
3294
ForwardingResult = testtools.ExtendedToOriginalDecorator
3297
class ProfileResult(ForwardingResult):
3298
"""Generate profiling data for all activity between start and success.
3300
The profile data is appended to the test's _benchcalls attribute and can
3301
be accessed by the forwarded-to TestResult.
3303
While it might be cleaner do accumulate this in stopTest, addSuccess is
3304
where our existing output support for lsprof is, and this class aims to
3305
fit in with that: while it could be moved it's not necessary to accomplish
3306
test profiling, nor would it be dramatically cleaner.
3309
def startTest(self, test):
3310
self.profiler = bzrlib.lsprof.BzrProfiler()
3311
self.profiler.start()
3312
ForwardingResult.startTest(self, test)
3314
def addSuccess(self, test):
3315
stats = self.profiler.stop()
3317
calls = test._benchcalls
3318
except AttributeError:
3319
test._benchcalls = []
3320
calls = test._benchcalls
3321
calls.append(((test.id(), "", ""), stats))
3322
ForwardingResult.addSuccess(self, test)
3324
def stopTest(self, test):
3325
ForwardingResult.stopTest(self, test)
3326
self.profiler = None
3329
# Controlled by "bzr selftest -E=..." option
3330
# Currently supported:
3331
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3332
# preserves any flags supplied at the command line.
3333
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3334
# rather than failing tests. And no longer raise
3335
# LockContention when fctnl locks are not being used
3336
# with proper exclusion rules.
3337
selftest_debug_flags = set()
2206
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2208
result = runner.run(suite)
2209
return result.wasSuccessful()
3340
2212
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3399
2243
matching_tests_first=matching_tests_first,
3400
2244
list_only=list_only,
3401
2245
random_seed=random_seed,
3402
exclude_pattern=exclude_pattern,
3404
runner_class=runner_class,
3405
suite_decorators=suite_decorators,
3407
result_decorators=result_decorators,
2246
exclude_pattern=exclude_pattern)
3410
2248
default_transport = old_transport
3411
selftest_debug_flags = old_debug_flags
3414
def load_test_id_list(file_name):
3415
"""Load a test id list from a text file.
3417
The format is one test id by line. No special care is taken to impose
3418
strict rules, these test ids are used to filter the test suite so a test id
3419
that do not match an existing test will do no harm. This allows user to add
3420
comments, leave blank lines, etc.
3424
ftest = open(file_name, 'rt')
3426
if e.errno != errno.ENOENT:
3429
raise errors.NoSuchFile(file_name)
3431
for test_name in ftest.readlines():
3432
test_list.append(test_name.strip())
3437
def suite_matches_id_list(test_suite, id_list):
3438
"""Warns about tests not appearing or appearing more than once.
3440
:param test_suite: A TestSuite object.
3441
:param test_id_list: The list of test ids that should be found in
3444
:return: (absents, duplicates) absents is a list containing the test found
3445
in id_list but not in test_suite, duplicates is a list containing the
3446
test found multiple times in test_suite.
3448
When using a prefined test id list, it may occurs that some tests do not
3449
exist anymore or that some tests use the same id. This function warns the
3450
tester about potential problems in his workflow (test lists are volatile)
3451
or in the test suite itself (using the same id for several tests does not
3452
help to localize defects).
3454
# Build a dict counting id occurrences
3456
for test in iter_suite_tests(test_suite):
3458
tests[id] = tests.get(id, 0) + 1
3463
occurs = tests.get(id, 0)
3465
not_found.append(id)
3467
duplicates.append(id)
3469
return not_found, duplicates
3472
class TestIdList(object):
3473
"""Test id list to filter a test suite.
3475
Relying on the assumption that test ids are built as:
3476
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3477
notation, this class offers methods to :
3478
- avoid building a test suite for modules not refered to in the test list,
3479
- keep only the tests listed from the module test suite.
3482
def __init__(self, test_id_list):
3483
# When a test suite needs to be filtered against us we compare test ids
3484
# for equality, so a simple dict offers a quick and simple solution.
3485
self.tests = dict().fromkeys(test_id_list, True)
3487
# While unittest.TestCase have ids like:
3488
# <module>.<class>.<method>[(<param+)],
3489
# doctest.DocTestCase can have ids like:
3492
# <module>.<function>
3493
# <module>.<class>.<method>
3495
# Since we can't predict a test class from its name only, we settle on
3496
# a simple constraint: a test id always begins with its module name.
3499
for test_id in test_id_list:
3500
parts = test_id.split('.')
3501
mod_name = parts.pop(0)
3502
modules[mod_name] = True
3504
mod_name += '.' + part
3505
modules[mod_name] = True
3506
self.modules = modules
3508
def refers_to(self, module_name):
3509
"""Is there tests for the module or one of its sub modules."""
3510
return self.modules.has_key(module_name)
3512
def includes(self, test_id):
3513
return self.tests.has_key(test_id)
3516
class TestPrefixAliasRegistry(registry.Registry):
3517
"""A registry for test prefix aliases.
3519
This helps implement shorcuts for the --starting-with selftest
3520
option. Overriding existing prefixes is not allowed but not fatal (a
3521
warning will be emitted).
3524
def register(self, key, obj, help=None, info=None,
3525
override_existing=False):
3526
"""See Registry.register.
3528
Trying to override an existing alias causes a warning to be emitted,
3529
not a fatal execption.
3532
super(TestPrefixAliasRegistry, self).register(
3533
key, obj, help=help, info=info, override_existing=False)
3535
actual = self.get(key)
3536
note('Test prefix alias %s is already used for %s, ignoring %s'
3537
% (key, actual, obj))
3539
def resolve_alias(self, id_start):
3540
"""Replace the alias by the prefix in the given string.
3542
Using an unknown prefix is an error to help catching typos.
3544
parts = id_start.split('.')
3546
parts[0] = self.get(parts[0])
3548
raise errors.BzrCommandError(
3549
'%s is not a known test prefix alias' % parts[0])
3550
return '.'.join(parts)
3553
test_prefix_alias_registry = TestPrefixAliasRegistry()
3554
"""Registry of test prefix aliases."""
3557
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3558
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3559
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3561
# Obvious highest levels prefixes, feel free to add your own via a plugin
3562
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3563
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3564
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3565
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3566
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3569
def _test_suite_testmod_names():
3570
"""Return the standard list of test module names to test."""
3573
'bzrlib.tests.blackbox',
3574
'bzrlib.tests.commands',
3575
'bzrlib.tests.per_branch',
3576
'bzrlib.tests.per_bzrdir',
3577
'bzrlib.tests.per_foreign_vcs',
3578
'bzrlib.tests.per_interrepository',
3579
'bzrlib.tests.per_intertree',
3580
'bzrlib.tests.per_inventory',
3581
'bzrlib.tests.per_interbranch',
3582
'bzrlib.tests.per_lock',
3583
'bzrlib.tests.per_merger',
3584
'bzrlib.tests.per_transport',
3585
'bzrlib.tests.per_tree',
3586
'bzrlib.tests.per_pack_repository',
3587
'bzrlib.tests.per_repository',
3588
'bzrlib.tests.per_repository_chk',
3589
'bzrlib.tests.per_repository_reference',
3590
'bzrlib.tests.per_uifactory',
3591
'bzrlib.tests.per_versionedfile',
3592
'bzrlib.tests.per_workingtree',
3593
'bzrlib.tests.test__annotator',
3594
'bzrlib.tests.test__bencode',
3595
'bzrlib.tests.test__chk_map',
3596
'bzrlib.tests.test__dirstate_helpers',
3597
'bzrlib.tests.test__groupcompress',
3598
'bzrlib.tests.test__known_graph',
3599
'bzrlib.tests.test__rio',
3600
'bzrlib.tests.test__simple_set',
3601
'bzrlib.tests.test__static_tuple',
3602
'bzrlib.tests.test__walkdirs_win32',
3603
'bzrlib.tests.test_ancestry',
3604
'bzrlib.tests.test_annotate',
3605
'bzrlib.tests.test_api',
3606
'bzrlib.tests.test_atomicfile',
3607
'bzrlib.tests.test_bad_files',
3608
'bzrlib.tests.test_bisect_multi',
3609
'bzrlib.tests.test_branch',
3610
'bzrlib.tests.test_branchbuilder',
3611
'bzrlib.tests.test_btree_index',
3612
'bzrlib.tests.test_bugtracker',
3613
'bzrlib.tests.test_bundle',
3614
'bzrlib.tests.test_bzrdir',
3615
'bzrlib.tests.test__chunks_to_lines',
3616
'bzrlib.tests.test_cache_utf8',
3617
'bzrlib.tests.test_chk_map',
3618
'bzrlib.tests.test_chk_serializer',
3619
'bzrlib.tests.test_chunk_writer',
3620
'bzrlib.tests.test_clean_tree',
3621
'bzrlib.tests.test_cleanup',
3622
'bzrlib.tests.test_commands',
3623
'bzrlib.tests.test_commit',
3624
'bzrlib.tests.test_commit_merge',
3625
'bzrlib.tests.test_config',
3626
'bzrlib.tests.test_conflicts',
3627
'bzrlib.tests.test_counted_lock',
3628
'bzrlib.tests.test_crash',
3629
'bzrlib.tests.test_decorators',
3630
'bzrlib.tests.test_delta',
3631
'bzrlib.tests.test_debug',
3632
'bzrlib.tests.test_deprecated_graph',
3633
'bzrlib.tests.test_diff',
3634
'bzrlib.tests.test_directory_service',
3635
'bzrlib.tests.test_dirstate',
3636
'bzrlib.tests.test_email_message',
3637
'bzrlib.tests.test_eol_filters',
3638
'bzrlib.tests.test_errors',
3639
'bzrlib.tests.test_export',
3640
'bzrlib.tests.test_extract',
3641
'bzrlib.tests.test_fetch',
3642
'bzrlib.tests.test_fifo_cache',
3643
'bzrlib.tests.test_filters',
3644
'bzrlib.tests.test_ftp_transport',
3645
'bzrlib.tests.test_foreign',
3646
'bzrlib.tests.test_generate_docs',
3647
'bzrlib.tests.test_generate_ids',
3648
'bzrlib.tests.test_globbing',
3649
'bzrlib.tests.test_gpg',
3650
'bzrlib.tests.test_graph',
3651
'bzrlib.tests.test_groupcompress',
3652
'bzrlib.tests.test_hashcache',
3653
'bzrlib.tests.test_help',
3654
'bzrlib.tests.test_hooks',
3655
'bzrlib.tests.test_http',
3656
'bzrlib.tests.test_http_response',
3657
'bzrlib.tests.test_https_ca_bundle',
3658
'bzrlib.tests.test_identitymap',
3659
'bzrlib.tests.test_ignores',
3660
'bzrlib.tests.test_index',
3661
'bzrlib.tests.test_info',
3662
'bzrlib.tests.test_inv',
3663
'bzrlib.tests.test_inventory_delta',
3664
'bzrlib.tests.test_knit',
3665
'bzrlib.tests.test_lazy_import',
3666
'bzrlib.tests.test_lazy_regex',
3667
'bzrlib.tests.test_lock',
3668
'bzrlib.tests.test_lockable_files',
3669
'bzrlib.tests.test_lockdir',
3670
'bzrlib.tests.test_log',
3671
'bzrlib.tests.test_lru_cache',
3672
'bzrlib.tests.test_lsprof',
3673
'bzrlib.tests.test_mail_client',
3674
'bzrlib.tests.test_memorytree',
3675
'bzrlib.tests.test_merge',
3676
'bzrlib.tests.test_merge3',
3677
'bzrlib.tests.test_merge_core',
3678
'bzrlib.tests.test_merge_directive',
3679
'bzrlib.tests.test_missing',
3680
'bzrlib.tests.test_msgeditor',
3681
'bzrlib.tests.test_multiparent',
3682
'bzrlib.tests.test_mutabletree',
3683
'bzrlib.tests.test_nonascii',
3684
'bzrlib.tests.test_options',
3685
'bzrlib.tests.test_osutils',
3686
'bzrlib.tests.test_osutils_encodings',
3687
'bzrlib.tests.test_pack',
3688
'bzrlib.tests.test_patch',
3689
'bzrlib.tests.test_patches',
3690
'bzrlib.tests.test_permissions',
3691
'bzrlib.tests.test_plugins',
3692
'bzrlib.tests.test_progress',
3693
'bzrlib.tests.test_read_bundle',
3694
'bzrlib.tests.test_reconcile',
3695
'bzrlib.tests.test_reconfigure',
3696
'bzrlib.tests.test_registry',
3697
'bzrlib.tests.test_remote',
3698
'bzrlib.tests.test_rename_map',
3699
'bzrlib.tests.test_repository',
3700
'bzrlib.tests.test_revert',
3701
'bzrlib.tests.test_revision',
3702
'bzrlib.tests.test_revisionspec',
3703
'bzrlib.tests.test_revisiontree',
3704
'bzrlib.tests.test_rio',
3705
'bzrlib.tests.test_rules',
3706
'bzrlib.tests.test_sampler',
3707
'bzrlib.tests.test_script',
3708
'bzrlib.tests.test_selftest',
3709
'bzrlib.tests.test_serializer',
3710
'bzrlib.tests.test_setup',
3711
'bzrlib.tests.test_sftp_transport',
3712
'bzrlib.tests.test_shelf',
3713
'bzrlib.tests.test_shelf_ui',
3714
'bzrlib.tests.test_smart',
3715
'bzrlib.tests.test_smart_add',
3716
'bzrlib.tests.test_smart_request',
3717
'bzrlib.tests.test_smart_transport',
3718
'bzrlib.tests.test_smtp_connection',
3719
'bzrlib.tests.test_source',
3720
'bzrlib.tests.test_ssh_transport',
3721
'bzrlib.tests.test_status',
3722
'bzrlib.tests.test_store',
3723
'bzrlib.tests.test_strace',
3724
'bzrlib.tests.test_subsume',
3725
'bzrlib.tests.test_switch',
3726
'bzrlib.tests.test_symbol_versioning',
3727
'bzrlib.tests.test_tag',
3728
'bzrlib.tests.test_testament',
3729
'bzrlib.tests.test_textfile',
3730
'bzrlib.tests.test_textmerge',
3731
'bzrlib.tests.test_timestamp',
3732
'bzrlib.tests.test_trace',
3733
'bzrlib.tests.test_transactions',
3734
'bzrlib.tests.test_transform',
3735
'bzrlib.tests.test_transport',
3736
'bzrlib.tests.test_transport_log',
3737
'bzrlib.tests.test_tree',
3738
'bzrlib.tests.test_treebuilder',
3739
'bzrlib.tests.test_tsort',
3740
'bzrlib.tests.test_tuned_gzip',
3741
'bzrlib.tests.test_ui',
3742
'bzrlib.tests.test_uncommit',
3743
'bzrlib.tests.test_upgrade',
3744
'bzrlib.tests.test_upgrade_stacked',
3745
'bzrlib.tests.test_urlutils',
3746
'bzrlib.tests.test_version',
3747
'bzrlib.tests.test_version_info',
3748
'bzrlib.tests.test_weave',
3749
'bzrlib.tests.test_whitebox',
3750
'bzrlib.tests.test_win32utils',
3751
'bzrlib.tests.test_workingtree',
3752
'bzrlib.tests.test_workingtree_4',
3753
'bzrlib.tests.test_wsgi',
3754
'bzrlib.tests.test_xml',
3758
def _test_suite_modules_to_doctest():
3759
"""Return the list of modules to doctest."""
3762
'bzrlib.branchbuilder',
3763
'bzrlib.decorators',
3766
'bzrlib.iterablefile',
3770
'bzrlib.symbol_versioning',
3773
'bzrlib.version_info_formats.format_custom',
3777
def test_suite(keep_only=None, starting_with=None):
3778
2252
"""Build and return TestSuite for the whole of bzrlib.
3780
:param keep_only: A list of test ids limiting the suite returned.
3782
:param starting_with: An id limiting the suite returned to the tests
3785
2254
This function can be replaced if you need to change the default test
3786
2255
suite on a global basis, but it is not encouraged.
2258
'bzrlib.util.tests.test_bencode',
2259
'bzrlib.tests.test__dirstate_helpers',
2260
'bzrlib.tests.test_ancestry',
2261
'bzrlib.tests.test_annotate',
2262
'bzrlib.tests.test_api',
2263
'bzrlib.tests.test_atomicfile',
2264
'bzrlib.tests.test_bad_files',
2265
'bzrlib.tests.test_branch',
2266
'bzrlib.tests.test_branchbuilder',
2267
'bzrlib.tests.test_bugtracker',
2268
'bzrlib.tests.test_bundle',
2269
'bzrlib.tests.test_bzrdir',
2270
'bzrlib.tests.test_cache_utf8',
2271
'bzrlib.tests.test_commands',
2272
'bzrlib.tests.test_commit',
2273
'bzrlib.tests.test_commit_merge',
2274
'bzrlib.tests.test_config',
2275
'bzrlib.tests.test_conflicts',
2276
'bzrlib.tests.test_counted_lock',
2277
'bzrlib.tests.test_decorators',
2278
'bzrlib.tests.test_delta',
2279
'bzrlib.tests.test_deprecated_graph',
2280
'bzrlib.tests.test_diff',
2281
'bzrlib.tests.test_dirstate',
2282
'bzrlib.tests.test_email_message',
2283
'bzrlib.tests.test_errors',
2284
'bzrlib.tests.test_escaped_store',
2285
'bzrlib.tests.test_extract',
2286
'bzrlib.tests.test_fetch',
2287
'bzrlib.tests.test_file_names',
2288
'bzrlib.tests.test_ftp_transport',
2289
'bzrlib.tests.test_generate_docs',
2290
'bzrlib.tests.test_generate_ids',
2291
'bzrlib.tests.test_globbing',
2292
'bzrlib.tests.test_gpg',
2293
'bzrlib.tests.test_graph',
2294
'bzrlib.tests.test_hashcache',
2295
'bzrlib.tests.test_help',
2296
'bzrlib.tests.test_hooks',
2297
'bzrlib.tests.test_http',
2298
'bzrlib.tests.test_http_response',
2299
'bzrlib.tests.test_https_ca_bundle',
2300
'bzrlib.tests.test_identitymap',
2301
'bzrlib.tests.test_ignores',
2302
'bzrlib.tests.test_index',
2303
'bzrlib.tests.test_info',
2304
'bzrlib.tests.test_inv',
2305
'bzrlib.tests.test_knit',
2306
'bzrlib.tests.test_lazy_import',
2307
'bzrlib.tests.test_lazy_regex',
2308
'bzrlib.tests.test_lockdir',
2309
'bzrlib.tests.test_lockable_files',
2310
'bzrlib.tests.test_log',
2311
'bzrlib.tests.test_lsprof',
2312
'bzrlib.tests.test_memorytree',
2313
'bzrlib.tests.test_merge',
2314
'bzrlib.tests.test_merge3',
2315
'bzrlib.tests.test_merge_core',
2316
'bzrlib.tests.test_merge_directive',
2317
'bzrlib.tests.test_missing',
2318
'bzrlib.tests.test_msgeditor',
2319
'bzrlib.tests.test_multiparent',
2320
'bzrlib.tests.test_nonascii',
2321
'bzrlib.tests.test_options',
2322
'bzrlib.tests.test_osutils',
2323
'bzrlib.tests.test_osutils_encodings',
2324
'bzrlib.tests.test_pack',
2325
'bzrlib.tests.test_patch',
2326
'bzrlib.tests.test_patches',
2327
'bzrlib.tests.test_permissions',
2328
'bzrlib.tests.test_plugins',
2329
'bzrlib.tests.test_progress',
2330
'bzrlib.tests.test_reconcile',
2331
'bzrlib.tests.test_registry',
2332
'bzrlib.tests.test_remote',
2333
'bzrlib.tests.test_repository',
2334
'bzrlib.tests.test_revert',
2335
'bzrlib.tests.test_revision',
2336
'bzrlib.tests.test_revisionnamespaces',
2337
'bzrlib.tests.test_revisiontree',
2338
'bzrlib.tests.test_rio',
2339
'bzrlib.tests.test_sampler',
2340
'bzrlib.tests.test_selftest',
2341
'bzrlib.tests.test_setup',
2342
'bzrlib.tests.test_sftp_transport',
2343
'bzrlib.tests.test_smart',
2344
'bzrlib.tests.test_smart_add',
2345
'bzrlib.tests.test_smart_transport',
2346
'bzrlib.tests.test_smtp_connection',
2347
'bzrlib.tests.test_source',
2348
'bzrlib.tests.test_ssh_transport',
2349
'bzrlib.tests.test_status',
2350
'bzrlib.tests.test_store',
2351
'bzrlib.tests.test_strace',
2352
'bzrlib.tests.test_subsume',
2353
'bzrlib.tests.test_symbol_versioning',
2354
'bzrlib.tests.test_tag',
2355
'bzrlib.tests.test_testament',
2356
'bzrlib.tests.test_textfile',
2357
'bzrlib.tests.test_textmerge',
2358
'bzrlib.tests.test_timestamp',
2359
'bzrlib.tests.test_trace',
2360
'bzrlib.tests.test_transactions',
2361
'bzrlib.tests.test_transform',
2362
'bzrlib.tests.test_transport',
2363
'bzrlib.tests.test_tree',
2364
'bzrlib.tests.test_treebuilder',
2365
'bzrlib.tests.test_tsort',
2366
'bzrlib.tests.test_tuned_gzip',
2367
'bzrlib.tests.test_ui',
2368
'bzrlib.tests.test_upgrade',
2369
'bzrlib.tests.test_urlutils',
2370
'bzrlib.tests.test_versionedfile',
2371
'bzrlib.tests.test_version',
2372
'bzrlib.tests.test_version_info',
2373
'bzrlib.tests.test_weave',
2374
'bzrlib.tests.test_whitebox',
2375
'bzrlib.tests.test_win32utils',
2376
'bzrlib.tests.test_workingtree',
2377
'bzrlib.tests.test_workingtree_4',
2378
'bzrlib.tests.test_wsgi',
2379
'bzrlib.tests.test_xml',
2381
test_transport_implementations = [
2382
'bzrlib.tests.test_transport_implementations',
2383
'bzrlib.tests.test_read_bundle',
2385
suite = TestUtil.TestSuite()
3789
2386
loader = TestUtil.TestLoader()
3791
if keep_only is not None:
3792
id_filter = TestIdList(keep_only)
3794
# We take precedence over keep_only because *at loading time* using
3795
# both options means we will load less tests for the same final result.
3796
def interesting_module(name):
3797
for start in starting_with:
3799
# Either the module name starts with the specified string
3800
name.startswith(start)
3801
# or it may contain tests starting with the specified string
3802
or start.startswith(name)
3806
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3808
elif keep_only is not None:
3809
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3810
def interesting_module(name):
3811
return id_filter.refers_to(name)
3814
loader = TestUtil.TestLoader()
3815
def interesting_module(name):
3816
# No filtering, all modules are interesting
3819
suite = loader.suiteClass()
3821
# modules building their suite with loadTestsFromModuleNames
3822
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3824
for mod in _test_suite_modules_to_doctest():
3825
if not interesting_module(mod):
3826
# No tests to keep here, move along
2387
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2388
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2389
adapter = TransportTestProviderAdapter()
2390
adapt_modules(test_transport_implementations, adapter, loader, suite)
2391
for package in packages_to_test():
2392
suite.addTest(package.test_suite())
2393
for m in MODULES_TO_TEST:
2394
suite.addTest(loader.loadTestsFromModule(m))
2395
for m in MODULES_TO_DOCTEST:
3829
# note that this really does mean "report only" -- doctest
3830
# still runs the rest of the examples
3831
doc_suite = doctest.DocTestSuite(mod,
3832
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
2397
suite.addTest(doctest.DocTestSuite(m))
3833
2398
except ValueError, e:
3834
print '**failed to get doctest for: %s\n%s' % (mod, e)
2399
print '**failed to get doctest for: %s\n%s' %(m,e)
3836
if len(doc_suite._tests) == 0:
3837
raise errors.BzrError("no doctests found in %s" % (mod,))
3838
suite.addTest(doc_suite)
3840
default_encoding = sys.getdefaultencoding()
3841
for name, plugin in bzrlib.plugin.plugins().items():
3842
if not interesting_module(plugin.module.__name__):
3844
plugin_suite = plugin.test_suite()
3845
# We used to catch ImportError here and turn it into just a warning,
3846
# but really if you don't have --no-plugins this should be a failure.
3847
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3848
if plugin_suite is None:
3849
plugin_suite = plugin.load_plugin_tests(loader)
3850
if plugin_suite is not None:
3851
suite.addTest(plugin_suite)
3852
if default_encoding != sys.getdefaultencoding():
3853
bzrlib.trace.warning(
3854
'Plugin "%s" tried to reset default encoding to: %s', name,
3855
sys.getdefaultencoding())
3857
sys.setdefaultencoding(default_encoding)
3859
if keep_only is not None:
3860
# Now that the referred modules have loaded their tests, keep only the
3862
suite = filter_suite_by_id_list(suite, id_filter)
3863
# Do some sanity checks on the id_list filtering
3864
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3866
# The tester has used both keep_only and starting_with, so he is
3867
# already aware that some tests are excluded from the list, there
3868
# is no need to tell him which.
3871
# Some tests mentioned in the list are not in the test suite. The
3872
# list may be out of date, report to the tester.
3873
for id in not_found:
3874
bzrlib.trace.warning('"%s" not found in the test suite', id)
3875
for id in duplicates:
3876
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
2401
for name, plugin in bzrlib.plugin.all_plugins().items():
2402
if getattr(plugin, 'test_suite', None) is not None:
2403
default_encoding = sys.getdefaultencoding()
2405
plugin_suite = plugin.test_suite()
2406
except ImportError, e:
2407
bzrlib.trace.warning(
2408
'Unable to test plugin "%s": %s', name, e)
2410
suite.addTest(plugin_suite)
2411
if default_encoding != sys.getdefaultencoding():
2412
bzrlib.trace.warning(
2413
'Plugin "%s" tried to reset default encoding to: %s', name,
2414
sys.getdefaultencoding())
2416
sys.setdefaultencoding(default_encoding)
3881
def multiply_scenarios(scenarios_left, scenarios_right):
3882
"""Multiply two sets of scenarios.
3884
:returns: the cartesian product of the two sets of scenarios, that is
3885
a scenario for every possible combination of a left scenario and a
3889
('%s,%s' % (left_name, right_name),
3890
dict(left_dict.items() + right_dict.items()))
3891
for left_name, left_dict in scenarios_left
3892
for right_name, right_dict in scenarios_right]
3895
def multiply_tests(tests, scenarios, result):
3896
"""Multiply tests_list by scenarios into result.
3898
This is the core workhorse for test parameterisation.
3900
Typically the load_tests() method for a per-implementation test suite will
3901
call multiply_tests and return the result.
3903
:param tests: The tests to parameterise.
3904
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3905
scenario_param_dict).
3906
:param result: A TestSuite to add created tests to.
3908
This returns the passed in result TestSuite with the cross product of all
3909
the tests repeated once for each scenario. Each test is adapted by adding
3910
the scenario name at the end of its id(), and updating the test object's
3911
__dict__ with the scenario_param_dict.
3913
>>> import bzrlib.tests.test_sampler
3914
>>> r = multiply_tests(
3915
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3916
... [('one', dict(param=1)),
3917
... ('two', dict(param=2))],
3919
>>> tests = list(iter_suite_tests(r))
3923
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3929
for test in iter_suite_tests(tests):
3930
apply_scenarios(test, scenarios, result)
3934
def apply_scenarios(test, scenarios, result):
3935
"""Apply the scenarios in scenarios to test and add to result.
3937
:param test: The test to apply scenarios to.
3938
:param scenarios: An iterable of scenarios to apply to test.
3940
:seealso: apply_scenario
3942
for scenario in scenarios:
3943
result.addTest(apply_scenario(test, scenario))
3947
def apply_scenario(test, scenario):
3948
"""Copy test and apply scenario to it.
3950
:param test: A test to adapt.
3951
:param scenario: A tuple describing the scenarion.
3952
The first element of the tuple is the new test id.
3953
The second element is a dict containing attributes to set on the
3955
:return: The adapted test.
3957
new_id = "%s(%s)" % (test.id(), scenario[0])
3958
new_test = clone_test(test, new_id)
3959
for name, value in scenario[1].items():
3960
setattr(new_test, name, value)
3964
def clone_test(test, new_id):
3965
"""Clone a test giving it a new id.
3967
:param test: The test to clone.
3968
:param new_id: The id to assign to it.
3969
:return: The new test.
3971
new_test = copy(test)
3972
new_test.id = lambda: new_id
3976
def permute_tests_for_extension(standard_tests, loader, py_module_name,
3978
"""Helper for permutating tests against an extension module.
3980
This is meant to be used inside a modules 'load_tests()' function. It will
3981
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
3982
against both implementations. Setting 'test.module' to the appropriate
3983
module. See bzrlib.tests.test__chk_map.load_tests as an example.
3985
:param standard_tests: A test suite to permute
3986
:param loader: A TestLoader
3987
:param py_module_name: The python path to a python module that can always
3988
be loaded, and will be considered the 'python' implementation. (eg
3989
'bzrlib._chk_map_py')
3990
:param ext_module_name: The python path to an extension module. If the
3991
module cannot be loaded, a single test will be added, which notes that
3992
the module is not available. If it can be loaded, all standard_tests
3993
will be run against that module.
3994
:return: (suite, feature) suite is a test-suite that has all the permuted
3995
tests. feature is the Feature object that can be used to determine if
3996
the module is available.
3999
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4001
('python', {'module': py_module}),
4003
suite = loader.suiteClass()
4004
feature = ModuleAvailableFeature(ext_module_name)
4005
if feature.available():
4006
scenarios.append(('C', {'module': feature.module}))
4008
# the compiled module isn't available, so we add a failing test
4009
class FailWithoutFeature(TestCase):
4010
def test_fail(self):
4011
self.requireFeature(feature)
4012
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4013
result = multiply_tests(standard_tests, scenarios, suite)
4014
return result, feature
4017
def _rmtree_temp_dir(dirname, test_id=None):
2420
def adapt_modules(mods_list, adapter, loader, suite):
2421
"""Adapt the modules in mods_list using adapter and add to suite."""
2422
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2423
suite.addTests(adapter.adapt(test))
2426
def _rmtree_temp_dir(dirname):
4018
2427
# If LANG=C we probably have created some bogus paths
4019
2428
# which rmtree(unicode) will fail to delete
4020
2429
# so make sure we are using rmtree(str) to delete everything
4067
2474
return self.__class__.__name__
4070
class _SymlinkFeature(Feature):
4073
return osutils.has_symlinks()
4075
def feature_name(self):
4078
SymlinkFeature = _SymlinkFeature()
4081
class _HardlinkFeature(Feature):
4084
return osutils.has_hardlinks()
4086
def feature_name(self):
4089
HardlinkFeature = _HardlinkFeature()
4092
class _OsFifoFeature(Feature):
4095
return getattr(os, 'mkfifo', None)
4097
def feature_name(self):
4098
return 'filesystem fifos'
4100
OsFifoFeature = _OsFifoFeature()
4103
class _UnicodeFilenameFeature(Feature):
4104
"""Does the filesystem support Unicode filenames?"""
4108
# Check for character combinations unlikely to be covered by any
4109
# single non-unicode encoding. We use the characters
4110
# - greek small letter alpha (U+03B1) and
4111
# - braille pattern dots-123456 (U+283F).
4112
os.stat(u'\u03b1\u283f')
4113
except UnicodeEncodeError:
4115
except (IOError, OSError):
4116
# The filesystem allows the Unicode filename but the file doesn't
4120
# The filesystem allows the Unicode filename and the file exists,
4124
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4127
class _CompatabilityThunkFeature(Feature):
4128
"""This feature is just a thunk to another feature.
4130
It issues a deprecation warning if it is accessed, to let you know that you
4131
should really use a different feature.
4134
def __init__(self, dep_version, module, name,
4135
replacement_name, replacement_module=None):
4136
super(_CompatabilityThunkFeature, self).__init__()
4137
self._module = module
4138
if replacement_module is None:
4139
replacement_module = module
4140
self._replacement_module = replacement_module
4142
self._replacement_name = replacement_name
4143
self._dep_version = dep_version
4144
self._feature = None
4147
if self._feature is None:
4148
depr_msg = self._dep_version % ('%s.%s'
4149
% (self._module, self._name))
4150
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4151
self._replacement_name)
4152
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4153
# Import the new feature and use it as a replacement for the
4155
mod = __import__(self._replacement_module, {}, {},
4156
[self._replacement_name])
4157
self._feature = getattr(mod, self._replacement_name)
4161
return self._feature._probe()
4164
class ModuleAvailableFeature(Feature):
4165
"""This is a feature than describes a module we want to be available.
4167
Declare the name of the module in __init__(), and then after probing, the
4168
module will be available as 'self.module'.
4170
:ivar module: The module if it is available, else None.
4173
def __init__(self, module_name):
4174
super(ModuleAvailableFeature, self).__init__()
4175
self.module_name = module_name
4179
self._module = __import__(self.module_name, {}, {}, [''])
4186
if self.available(): # Make sure the probe has been done
4190
def feature_name(self):
4191
return self.module_name
4194
# This is kept here for compatibility, it is recommended to use
4195
# 'bzrlib.tests.feature.paramiko' instead
4196
ParamikoFeature = _CompatabilityThunkFeature(
4197
deprecated_in((2,1,0)),
4198
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4201
def probe_unicode_in_user_encoding():
4202
"""Try to encode several unicode strings to use in unicode-aware tests.
4203
Return first successfull match.
4205
:return: (unicode value, encoded plain string value) or (None, None)
4207
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4208
for uni_val in possible_vals:
4210
str_val = uni_val.encode(osutils.get_user_encoding())
4211
except UnicodeEncodeError:
4212
# Try a different character
4215
return uni_val, str_val
4219
def probe_bad_non_ascii(encoding):
4220
"""Try to find [bad] character with code [128..255]
4221
that cannot be decoded to unicode in some encoding.
4222
Return None if all non-ascii characters is valid
4225
for i in xrange(128, 256):
4228
char.decode(encoding)
4229
except UnicodeDecodeError:
4234
class _HTTPSServerFeature(Feature):
4235
"""Some tests want an https Server, check if one is available.
4237
Right now, the only way this is available is under python2.6 which provides
4248
def feature_name(self):
4249
return 'HTTPSServer'
4252
HTTPSServerFeature = _HTTPSServerFeature()
4255
class _UnicodeFilename(Feature):
4256
"""Does the filesystem support Unicode filenames?"""
4261
except UnicodeEncodeError:
4263
except (IOError, OSError):
4264
# The filesystem allows the Unicode filename but the file doesn't
4268
# The filesystem allows the Unicode filename and the file exists,
4272
UnicodeFilename = _UnicodeFilename()
4275
class _UTF8Filesystem(Feature):
4276
"""Is the filesystem UTF-8?"""
4279
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4283
UTF8Filesystem = _UTF8Filesystem()
4286
class _BreakinFeature(Feature):
4287
"""Does this platform support the breakin feature?"""
4290
from bzrlib import breakin
4291
if breakin.determine_signal() is None:
4293
if sys.platform == 'win32':
4294
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4295
# We trigger SIGBREAK via a Console api so we need ctypes to
4296
# access the function
4303
def feature_name(self):
4304
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4307
BreakinFeature = _BreakinFeature()
4310
class _CaseInsCasePresFilenameFeature(Feature):
4311
"""Is the file-system case insensitive, but case-preserving?"""
4314
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4316
# first check truly case-preserving for created files, then check
4317
# case insensitive when opening existing files.
4318
name = osutils.normpath(name)
4319
base, rel = osutils.split(name)
4320
found_rel = osutils.canonical_relpath(base, name)
4321
return (found_rel == rel
4322
and os.path.isfile(name.upper())
4323
and os.path.isfile(name.lower()))
4328
def feature_name(self):
4329
return "case-insensitive case-preserving filesystem"
4331
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4334
class _CaseInsensitiveFilesystemFeature(Feature):
4335
"""Check if underlying filesystem is case-insensitive but *not* case
4338
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4339
# more likely to be case preserving, so this case is rare.
4342
if CaseInsCasePresFilenameFeature.available():
4345
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4346
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4347
TestCaseWithMemoryTransport.TEST_ROOT = root
4349
root = TestCaseWithMemoryTransport.TEST_ROOT
4350
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4352
name_a = osutils.pathjoin(tdir, 'a')
4353
name_A = osutils.pathjoin(tdir, 'A')
4355
result = osutils.isdir(name_A)
4356
_rmtree_temp_dir(tdir)
2477
class TestScenarioApplier(object):
2478
"""A tool to apply scenarios to tests."""
2480
def adapt(self, test):
2481
"""Return a TestSuite containing a copy of test for each scenario."""
2482
result = unittest.TestSuite()
2483
for scenario in self.scenarios:
2484
result.addTest(self.adapt_test_to_scenario(test, scenario))
4359
def feature_name(self):
4360
return 'case-insensitive filesystem'
4362
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4365
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4366
SubUnitFeature = _CompatabilityThunkFeature(
4367
deprecated_in((2,1,0)),
4368
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4369
# Only define SubUnitBzrRunner if subunit is available.
4371
from subunit import TestProtocolClient
4372
from subunit.test_results import AutoTimingTestResultDecorator
4373
class SubUnitBzrRunner(TextTestRunner):
4374
def run(self, test):
4375
result = AutoTimingTestResultDecorator(
4376
TestProtocolClient(self.stream))
2487
def adapt_test_to_scenario(self, test, scenario):
2488
"""Copy test and apply scenario to it.
2490
:param test: A test to adapt.
2491
:param scenario: A tuple describing the scenarion.
2492
The first element of the tuple is the new test id.
2493
The second element is a dict containing attributes to set on the
2495
:return: The adapted test.
2497
from copy import deepcopy
2498
new_test = deepcopy(test)
2499
for name, value in scenario[1].items():
2500
setattr(new_test, name, value)
2501
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2502
new_test.id = lambda: new_id