171
254
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
172
255
self._bench_history = bench_history
173
self.ui = bzrlib.ui.ui_factory
174
self.num_tests = num_tests
256
self.ui = ui.ui_factory
175
258
self.error_count = 0
176
259
self.failure_count = 0
260
self.known_failure_count = 0
177
261
self.skip_count = 0
262
self.not_applicable_count = 0
263
self.unsupported = {}
179
265
self._overall_start_time = time.time()
181
def extractBenchmarkTime(self, testCase):
266
self._strict = strict
267
self._first_thread_leaker_id = None
268
self._tests_leaking_threads_count = 0
269
self._traceback_from_test = None
271
def stopTestRun(self):
274
stopTime = time.time()
275
timeTaken = stopTime - self.startTime
276
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
277
# the parent class method is similar have to duplicate
278
self._show_list('ERROR', self.errors)
279
self._show_list('FAIL', self.failures)
280
self.stream.write(self.sep2)
281
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
282
run, run != 1 and "s" or "", timeTaken))
283
if not self.wasSuccessful():
284
self.stream.write("FAILED (")
285
failed, errored = map(len, (self.failures, self.errors))
287
self.stream.write("failures=%d" % failed)
289
if failed: self.stream.write(", ")
290
self.stream.write("errors=%d" % errored)
291
if self.known_failure_count:
292
if failed or errored: self.stream.write(", ")
293
self.stream.write("known_failure_count=%d" %
294
self.known_failure_count)
295
self.stream.write(")\n")
297
if self.known_failure_count:
298
self.stream.write("OK (known_failures=%d)\n" %
299
self.known_failure_count)
301
self.stream.write("OK\n")
302
if self.skip_count > 0:
303
skipped = self.skip_count
304
self.stream.write('%d test%s skipped\n' %
305
(skipped, skipped != 1 and "s" or ""))
307
for feature, count in sorted(self.unsupported.items()):
308
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
311
ok = self.wasStrictlySuccessful()
313
ok = self.wasSuccessful()
314
if self._first_thread_leaker_id:
316
'%s is leaking threads among %d leaking tests.\n' % (
317
self._first_thread_leaker_id,
318
self._tests_leaking_threads_count))
319
# We don't report the main thread as an active one.
321
'%d non-main threads were left active in the end.\n'
322
% (len(self._active_threads) - 1))
324
def getDescription(self, test):
327
def _extractBenchmarkTime(self, testCase, details=None):
182
328
"""Add a benchmark time for the current test case."""
183
self._benchmarkTime = getattr(testCase, "_benchtime", None)
329
if details and 'benchtime' in details:
330
return float(''.join(details['benchtime'].iter_bytes()))
331
return getattr(testCase, "_benchtime", None)
185
333
def _elapsedTestTimeString(self):
186
334
"""Return a time string for the overall time the current test has taken."""
187
return self._formatTime(time.time() - self._start_time)
335
return self._formatTime(self._delta_to_float(
336
self._now() - self._start_datetime))
189
def _testTimeString(self):
190
if self._benchmarkTime is not None:
192
self._formatTime(self._benchmarkTime),
193
self._elapsedTestTimeString())
338
def _testTimeString(self, testCase):
339
benchmark_time = self._extractBenchmarkTime(testCase)
340
if benchmark_time is not None:
341
return self._formatTime(benchmark_time) + "*"
195
return " %s" % self._elapsedTestTimeString()
343
return self._elapsedTestTimeString()
197
345
def _formatTime(self, seconds):
198
346
"""Format seconds as milliseconds with leading spaces."""
199
return "%5dms" % (1000 * seconds)
347
# some benchmarks can take thousands of seconds to run, so we need 8
349
return "%8dms" % (1000 * seconds)
201
351
def _shortened_test_description(self, test):
203
what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
353
what = re.sub(r'^bzrlib\.tests\.', '', what)
356
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
357
# multiple times in a row, because the handler is added for
358
# each test but the container list is shared between cases.
359
# See lp:498869 lp:625574 and lp:637725 for background.
360
def _record_traceback_from_test(self, exc_info):
361
"""Store the traceback from passed exc_info tuple till"""
362
self._traceback_from_test = exc_info[2]
206
364
def startTest(self, test):
207
unittest.TestResult.startTest(self, test)
365
super(ExtendedTestResult, self).startTest(test)
208
369
self.report_test_start(test)
370
test.number = self.count
209
371
self._recordTestStartTime()
372
# Make testtools cases give us the real traceback on failure
373
addOnException = getattr(test, "addOnException", None)
374
if addOnException is not None:
375
addOnException(self._record_traceback_from_test)
376
# Only check for thread leaks on bzrlib derived test cases
377
if isinstance(test, TestCase):
378
test.addCleanup(self._check_leaked_threads, test)
380
def startTests(self):
381
self.report_tests_starting()
382
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
387
def _check_leaked_threads(self, test):
388
"""See if any threads have leaked since last call
390
A sample of live threads is stored in the _active_threads attribute,
391
when this method runs it compares the current live threads and any not
392
in the previous sample are treated as having leaked.
394
now_active_threads = set(threading.enumerate())
395
threads_leaked = now_active_threads.difference(self._active_threads)
397
self._report_thread_leak(test, threads_leaked, now_active_threads)
398
self._tests_leaking_threads_count += 1
399
if self._first_thread_leaker_id is None:
400
self._first_thread_leaker_id = test.id()
401
self._active_threads = now_active_threads
211
403
def _recordTestStartTime(self):
212
404
"""Record that a test has started."""
213
self._start_time = time.time()
405
self._start_datetime = self._now()
215
407
def addError(self, test, err):
216
if isinstance(err[1], TestSkipped):
217
return self.addSkipped(test, err)
218
unittest.TestResult.addError(self, test, err)
219
# We can only do this if we have one of our TestCases, not if
221
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
222
if setKeepLogfile is not None:
224
self.extractBenchmarkTime(test)
408
"""Tell result that test finished with an error.
410
Called from the TestCase run() method when the test
411
fails with an unexpected error.
413
self._post_mortem(self._traceback_from_test)
414
super(ExtendedTestResult, self).addError(test, err)
415
self.error_count += 1
225
416
self.report_error(test, err)
226
417
if self.stop_early:
229
420
def addFailure(self, test, err):
230
unittest.TestResult.addFailure(self, test, err)
231
# We can only do this if we have one of our TestCases, not if
233
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
234
if setKeepLogfile is not None:
236
self.extractBenchmarkTime(test)
421
"""Tell result that test failed.
423
Called from the TestCase run() method when the test
424
fails because e.g. an assert() method failed.
426
self._post_mortem(self._traceback_from_test)
427
super(ExtendedTestResult, self).addFailure(test, err)
428
self.failure_count += 1
237
429
self.report_failure(test, err)
238
430
if self.stop_early:
241
def addSuccess(self, test):
242
self.extractBenchmarkTime(test)
433
def addSuccess(self, test, details=None):
434
"""Tell result that test completed successfully.
436
Called from the TestCase run()
243
438
if self._bench_history is not None:
244
if self._benchmarkTime is not None:
439
benchmark_time = self._extractBenchmarkTime(test, details)
440
if benchmark_time is not None:
245
441
self._bench_history.write("%s %s\n" % (
246
self._formatTime(self._benchmarkTime),
442
self._formatTime(benchmark_time),
248
444
self.report_success(test)
249
unittest.TestResult.addSuccess(self, test)
251
def addSkipped(self, test, skip_excinfo):
252
self.extractBenchmarkTime(test)
253
self.report_skip(test, skip_excinfo)
254
# seems best to treat this as success from point-of-view of unittest
255
# -- it actually does nothing so it barely matters :)
258
except KeyboardInterrupt:
261
self.addError(test, test.__exc_info())
263
unittest.TestResult.addSuccess(self, test)
265
def printErrorList(self, flavour, errors):
266
for test, err in errors:
267
self.stream.writeln(self.separator1)
268
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
269
if getattr(test, '_get_log', None) is not None:
271
print >>self.stream, \
272
('vvvv[log from %s]' % test.id()).ljust(78,'-')
273
print >>self.stream, test._get_log()
274
print >>self.stream, \
275
('^^^^[log from %s]' % test.id()).ljust(78,'-')
276
self.stream.writeln(self.separator2)
277
self.stream.writeln("%s" % err)
282
def report_cleaning_up(self):
445
super(ExtendedTestResult, self).addSuccess(test)
446
test._log_contents = ''
448
def addExpectedFailure(self, test, err):
449
self.known_failure_count += 1
450
self.report_known_failure(test, err)
452
def addNotSupported(self, test, feature):
453
"""The test will not be run because of a missing feature.
455
# this can be called in two different ways: it may be that the
456
# test started running, and then raised (through requireFeature)
457
# UnavailableFeature. Alternatively this method can be called
458
# while probing for features before running the test code proper; in
459
# that case we will see startTest and stopTest, but the test will
460
# never actually run.
461
self.unsupported.setdefault(str(feature), 0)
462
self.unsupported[str(feature)] += 1
463
self.report_unsupported(test, feature)
465
def addSkip(self, test, reason):
466
"""A test has not run for 'reason'."""
468
self.report_skip(test, reason)
470
def addNotApplicable(self, test, reason):
471
self.not_applicable_count += 1
472
self.report_not_applicable(test, reason)
474
def _post_mortem(self, tb=None):
475
"""Start a PDB post mortem session."""
476
if os.environ.get('BZR_TEST_PDB', None):
480
def progress(self, offset, whence):
481
"""The test is adjusting the count of tests to run."""
482
if whence == SUBUNIT_SEEK_SET:
483
self.num_tests = offset
484
elif whence == SUBUNIT_SEEK_CUR:
485
self.num_tests += offset
487
raise errors.BzrError("Unknown whence %r" % whence)
489
def report_tests_starting(self):
490
"""Display information before the test run begins"""
491
if getattr(sys, 'frozen', None) is None:
492
bzr_path = osutils.realpath(sys.argv[0])
494
bzr_path = sys.executable
496
'bzr selftest: %s\n' % (bzr_path,))
499
bzrlib.__path__[0],))
501
' bzr-%s python-%s %s\n' % (
502
bzrlib.version_string,
503
bzrlib._format_version_tuple(sys.version_info),
504
platform.platform(aliased=1),
506
self.stream.write('\n')
508
def report_test_start(self, test):
509
"""Display information on the test just about to be run"""
511
def _report_thread_leak(self, test, leaked_threads, active_threads):
512
"""Display information on a test that leaked one or more threads"""
513
# GZ 2010-09-09: A leak summary reported separately from the general
514
# thread debugging would be nice. Tests under subunit
515
# need something not using stream, perhaps adding a
516
# testtools details object would be fitting.
517
if 'threads' in selftest_debug_flags:
518
self.stream.write('%s is leaking, active is now %d\n' %
519
(test.id(), len(active_threads)))
521
def startTestRun(self):
522
self.startTime = time.time()
285
524
def report_success(self, test):
527
def wasStrictlySuccessful(self):
528
if self.unsupported or self.known_failure_count:
530
return self.wasSuccessful()
289
533
class TextTestResult(ExtendedTestResult):
290
534
"""Displays progress and results of tests in text form"""
292
def __init__(self, *args, **kw):
293
ExtendedTestResult.__init__(self, *args, **kw)
536
def __init__(self, stream, descriptions, verbosity,
541
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
542
bench_history, strict)
543
# We no longer pass them around, but just rely on the UIFactory stack
546
warnings.warn("Passing pb to TextTestResult is deprecated")
294
547
self.pb = self.ui.nested_progress_bar()
295
548
self.pb.show_pct = False
296
549
self.pb.show_spinner = False
297
self.pb.show_eta = False,
550
self.pb.show_eta = False,
298
551
self.pb.show_count = False
299
552
self.pb.show_bar = False
301
def report_starting(self):
302
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
553
self.pb.update_latency = 0
554
self.pb.show_transport_activity = False
556
def stopTestRun(self):
557
# called when the tests that are going to run have run
560
super(TextTestResult, self).stopTestRun()
562
def report_tests_starting(self):
563
super(TextTestResult, self).report_tests_starting()
564
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
304
566
def _progress_prefix_text(self):
305
a = '[%d' % self.count
306
if self.num_tests is not None:
567
# the longer this text, the less space we have to show the test
569
a = '[%d' % self.count # total that have been run
570
# tests skipped as known not to be relevant are not important enough
572
## if self.skip_count:
573
## a += ', %d skip' % self.skip_count
574
## if self.known_failure_count:
575
## a += '+%dX' % self.known_failure_count
307
577
a +='/%d' % self.num_tests
308
a += ' in %ds' % (time.time() - self._overall_start_time)
310
a += ', %d errors' % self.error_count
311
if self.failure_count:
312
a += ', %d failed' % self.failure_count
314
a += ', %d skipped' % self.skip_count
579
runtime = time.time() - self._overall_start_time
581
a += '%dm%ds' % (runtime / 60, runtime % 60)
584
total_fail_count = self.error_count + self.failure_count
586
a += ', %d failed' % total_fail_count
587
# if self.unsupported:
588
# a += ', %d missing' % len(self.unsupported)
318
592
def report_test_start(self, test):
321
594
self._progress_prefix_text()
323
596
+ self._shortened_test_description(test))
598
def _test_description(self, test):
599
return self._shortened_test_description(test)
325
601
def report_error(self, test, err):
326
self.error_count += 1
327
self.pb.note('ERROR: %s\n %s\n',
328
self._shortened_test_description(test),
602
self.stream.write('ERROR: %s\n %s\n' % (
603
self._test_description(test),
332
607
def report_failure(self, test, err):
333
self.failure_count += 1
334
self.pb.note('FAIL: %s\n %s\n',
335
self._shortened_test_description(test),
608
self.stream.write('FAIL: %s\n %s\n' % (
609
self._test_description(test),
339
def report_skip(self, test, skip_excinfo):
342
# at the moment these are mostly not things we can fix
343
# and so they just produce stipple; use the verbose reporter
346
# show test and reason for skip
347
self.pb.note('SKIP: %s\n %s\n',
348
self._shortened_test_description(test),
351
# since the class name was left behind in the still-visible
353
self.pb.note('SKIP: %s', skip_excinfo[1])
355
def report_cleaning_up(self):
356
self.pb.update('cleaning up...')
613
def report_known_failure(self, test, err):
616
def report_skip(self, test, reason):
619
def report_not_applicable(self, test, reason):
622
def report_unsupported(self, test, feature):
623
"""test cannot be run because feature is missing."""
362
626
class VerboseTestResult(ExtendedTestResult):
536
911
retrieved by _get_log(). We use a real OS file, not an in-memory object,
537
912
so that it can also capture file IO. When the test completes this file
538
913
is read into memory and removed from disk.
540
915
There are also convenience functions to invoke bzr's command-line
541
916
routine, and to build and check bzr trees.
543
918
In addition to the usual method of overriding tearDown(), this class also
544
allows subclasses to register functions into the _cleanups list, which is
919
allows subclasses to register cleanup functions via addCleanup, which are
545
920
run in order as the object is torn down. It's less likely this will be
546
921
accidentally overlooked.
549
_log_file_name = None
551
_keep_log_file = False
552
925
# record lsprof data when performing benchmark calls.
553
926
_gather_lsprof_in_benchmarks = False
555
928
def __init__(self, methodName='testMethod'):
556
929
super(TestCase, self).__init__(methodName)
930
self._directory_isolation = True
931
self.exception_handlers.insert(0,
932
(UnavailableFeature, self._do_unsupported_or_skip))
933
self.exception_handlers.insert(0,
934
(TestNotApplicable, self._do_not_applicable))
560
unittest.TestCase.setUp(self)
937
super(TestCase, self).setUp()
938
for feature in getattr(self, '_test_needs_features', []):
939
self.requireFeature(feature)
940
self._log_contents = None
941
self.addDetail("log", content.Content(content.ContentType("text",
942
"plain", {"charset": "utf8"}),
943
lambda:[self._get_log(keep_log_file=True)]))
561
944
self._cleanEnvironment()
562
bzrlib.trace.disable_default_logging()
563
945
self._silenceUI()
564
946
self._startLogFile()
565
947
self._benchcalls = []
566
948
self._benchtime = None
950
self._track_transports()
952
self._clear_debug_flags()
953
# Isolate global verbosity level, to make sure it's reproducible
954
# between tests. We should get rid of this altogether: bug 656694. --
956
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
957
# Isolate config option expansion until its default value for bzrlib is
958
# settled on or a the FIXME associated with _get_expand_default_value
959
# is addressed -- vila 20110219
960
self.overrideAttr(config, '_expand_default_value', None)
965
pdb.Pdb().set_trace(sys._getframe().f_back)
967
def discardDetail(self, name):
968
"""Extend the addDetail, getDetails api so we can remove a detail.
970
eg. bzr always adds the 'log' detail at startup, but we don't want to
971
include it for skipped, xfail, etc tests.
973
It is safe to call this for a detail that doesn't exist, in case this
974
gets called multiple times.
976
# We cheat. details is stored in __details which means we shouldn't
977
# touch it. but getDetails() returns the dict directly, so we can
979
details = self.getDetails()
983
def _clear_debug_flags(self):
984
"""Prevent externally set debug flags affecting tests.
986
Tests that want to use debug flags can just set them in the
987
debug_flags set during setup/teardown.
989
# Start with a copy of the current debug flags we can safely modify.
990
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
991
if 'allow_debug' not in selftest_debug_flags:
992
debug.debug_flags.clear()
993
if 'disable_lock_checks' not in selftest_debug_flags:
994
debug.debug_flags.add('strict_locks')
996
def _clear_hooks(self):
997
# prevent hooks affecting tests
998
known_hooks = hooks.known_hooks
999
self._preserved_hooks = {}
1000
for key, (parent, name) in known_hooks.iter_parent_objects():
1001
current_hooks = getattr(parent, name)
1002
self._preserved_hooks[parent] = (name, current_hooks)
1003
self._preserved_lazy_hooks = hooks._lazy_hooks
1004
hooks._lazy_hooks = {}
1005
self.addCleanup(self._restoreHooks)
1006
for key, (parent, name) in known_hooks.iter_parent_objects():
1007
factory = known_hooks.get(key)
1008
setattr(parent, name, factory())
1009
# this hook should always be installed
1010
request._install_hook()
1012
def disable_directory_isolation(self):
1013
"""Turn off directory isolation checks."""
1014
self._directory_isolation = False
1016
def enable_directory_isolation(self):
1017
"""Enable directory isolation checks."""
1018
self._directory_isolation = True
568
1020
def _silenceUI(self):
569
1021
"""Turn off UI for duration of test"""
570
1022
# by default the UI is off; tests can turn it on if they want it.
571
saved = bzrlib.ui.ui_factory
573
bzrlib.ui.ui_factory = saved
574
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
575
self.addCleanup(_restore)
1023
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1025
def _check_locks(self):
1026
"""Check that all lock take/release actions have been paired."""
1027
# We always check for mismatched locks. If a mismatch is found, we
1028
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1029
# case we just print a warning.
1031
acquired_locks = [lock for action, lock in self._lock_actions
1032
if action == 'acquired']
1033
released_locks = [lock for action, lock in self._lock_actions
1034
if action == 'released']
1035
broken_locks = [lock for action, lock in self._lock_actions
1036
if action == 'broken']
1037
# trivially, given the tests for lock acquistion and release, if we
1038
# have as many in each list, it should be ok. Some lock tests also
1039
# break some locks on purpose and should be taken into account by
1040
# considering that breaking a lock is just a dirty way of releasing it.
1041
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1042
message = ('Different number of acquired and '
1043
'released or broken locks. (%s, %s + %s)' %
1044
(acquired_locks, released_locks, broken_locks))
1045
if not self._lock_check_thorough:
1046
# Rather than fail, just warn
1047
print "Broken test %s: %s" % (self, message)
1051
def _track_locks(self):
1052
"""Track lock activity during tests."""
1053
self._lock_actions = []
1054
if 'disable_lock_checks' in selftest_debug_flags:
1055
self._lock_check_thorough = False
1057
self._lock_check_thorough = True
1059
self.addCleanup(self._check_locks)
1060
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1061
self._lock_acquired, None)
1062
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1063
self._lock_released, None)
1064
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1065
self._lock_broken, None)
1067
def _lock_acquired(self, result):
1068
self._lock_actions.append(('acquired', result))
1070
def _lock_released(self, result):
1071
self._lock_actions.append(('released', result))
1073
def _lock_broken(self, result):
1074
self._lock_actions.append(('broken', result))
1076
def permit_dir(self, name):
1077
"""Permit a directory to be used by this test. See permit_url."""
1078
name_transport = _mod_transport.get_transport(name)
1079
self.permit_url(name)
1080
self.permit_url(name_transport.base)
1082
def permit_url(self, url):
1083
"""Declare that url is an ok url to use in this test.
1085
Do this for memory transports, temporary test directory etc.
1087
Do not do this for the current working directory, /tmp, or any other
1088
preexisting non isolated url.
1090
if not url.endswith('/'):
1092
self._bzr_selftest_roots.append(url)
1094
def permit_source_tree_branch_repo(self):
1095
"""Permit the source tree bzr is running from to be opened.
1097
Some code such as bzrlib.version attempts to read from the bzr branch
1098
that bzr is executing from (if any). This method permits that directory
1099
to be used in the test suite.
1101
path = self.get_source_path()
1102
self.record_directory_isolation()
1105
workingtree.WorkingTree.open(path)
1106
except (errors.NotBranchError, errors.NoWorkingTree):
1107
raise TestSkipped('Needs a working tree of bzr sources')
1109
self.enable_directory_isolation()
1111
def _preopen_isolate_transport(self, transport):
1112
"""Check that all transport openings are done in the test work area."""
1113
while isinstance(transport, pathfilter.PathFilteringTransport):
1114
# Unwrap pathfiltered transports
1115
transport = transport.server.backing_transport.clone(
1116
transport._filter('.'))
1117
url = transport.base
1118
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1119
# urls it is given by prepending readonly+. This is appropriate as the
1120
# client shouldn't know that the server is readonly (or not readonly).
1121
# We could register all servers twice, with readonly+ prepending, but
1122
# that makes for a long list; this is about the same but easier to
1124
if url.startswith('readonly+'):
1125
url = url[len('readonly+'):]
1126
self._preopen_isolate_url(url)
1128
def _preopen_isolate_url(self, url):
1129
if not self._directory_isolation:
1131
if self._directory_isolation == 'record':
1132
self._bzr_selftest_roots.append(url)
1134
# This prevents all transports, including e.g. sftp ones backed on disk
1135
# from working unless they are explicitly granted permission. We then
1136
# depend on the code that sets up test transports to check that they are
1137
# appropriately isolated and enable their use by calling
1138
# self.permit_transport()
1139
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1140
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1141
% (url, self._bzr_selftest_roots))
1143
def record_directory_isolation(self):
1144
"""Gather accessed directories to permit later access.
1146
This is used for tests that access the branch bzr is running from.
1148
self._directory_isolation = "record"
1150
def start_server(self, transport_server, backing_server=None):
1151
"""Start transport_server for this test.
1153
This starts the server, registers a cleanup for it and permits the
1154
server's urls to be used.
1156
if backing_server is None:
1157
transport_server.start_server()
1159
transport_server.start_server(backing_server)
1160
self.addCleanup(transport_server.stop_server)
1161
# Obtain a real transport because if the server supplies a password, it
1162
# will be hidden from the base on the client side.
1163
t = _mod_transport.get_transport(transport_server.get_url())
1164
# Some transport servers effectively chroot the backing transport;
1165
# others like SFTPServer don't - users of the transport can walk up the
1166
# transport to read the entire backing transport. This wouldn't matter
1167
# except that the workdir tests are given - and that they expect the
1168
# server's url to point at - is one directory under the safety net. So
1169
# Branch operations into the transport will attempt to walk up one
1170
# directory. Chrooting all servers would avoid this but also mean that
1171
# we wouldn't be testing directly against non-root urls. Alternatively
1172
# getting the test framework to start the server with a backing server
1173
# at the actual safety net directory would work too, but this then
1174
# means that the self.get_url/self.get_transport methods would need
1175
# to transform all their results. On balance its cleaner to handle it
1176
# here, and permit a higher url when we have one of these transports.
1177
if t.base.endswith('/work/'):
1178
# we have safety net/test root/work
1179
t = t.clone('../..')
1180
elif isinstance(transport_server,
1181
test_server.SmartTCPServer_for_testing):
1182
# The smart server adds a path similar to work, which is traversed
1183
# up from by the client. But the server is chrooted - the actual
1184
# backing transport is not escaped from, and VFS requests to the
1185
# root will error (because they try to escape the chroot).
1187
while t2.base != t.base:
1190
self.permit_url(t.base)
1192
def _track_transports(self):
1193
"""Install checks for transport usage."""
1194
# TestCase has no safe place it can write to.
1195
self._bzr_selftest_roots = []
1196
# Currently the easiest way to be sure that nothing is going on is to
1197
# hook into bzr dir opening. This leaves a small window of error for
1198
# transport tests, but they are well known, and we can improve on this
1200
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1201
self._preopen_isolate_transport, "Check bzr directories are safe.")
577
1203
def _ndiff_strings(self, a, b):
578
1204
"""Return ndiff between two strings containing lines.
580
1206
A trailing newline is added if missing to make the strings
581
1207
print properly."""
582
1208
if b and b[-1] != '\n':
1625
2819
for readonly urls.
1627
2821
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1628
be used without needed to redo it when a different
2822
be used without needed to redo it when a different
1629
2823
subclass is in use ?
1632
2826
def setUp(self):
2827
from bzrlib.tests import http_server
1633
2828
super(ChrootedTestCase, self).setUp()
1634
if not self.transport_server == MemoryServer:
1635
self.transport_readonly_server = HttpServer
2829
if not self.vfs_transport_factory == memory.MemoryServer:
2830
self.transport_readonly_server = http_server.HttpServer
2833
def condition_id_re(pattern):
2834
"""Create a condition filter which performs a re check on a test's id.
2836
:param pattern: A regular expression string.
2837
:return: A callable that returns True if the re matches.
2839
filter_re = re.compile(pattern, 0)
2840
def condition(test):
2842
return filter_re.search(test_id)
2846
def condition_isinstance(klass_or_klass_list):
2847
"""Create a condition filter which returns isinstance(param, klass).
2849
:return: A callable which when called with one parameter obj return the
2850
result of isinstance(obj, klass_or_klass_list).
2853
return isinstance(obj, klass_or_klass_list)
2857
def condition_id_in_list(id_list):
2858
"""Create a condition filter which verify that test's id in a list.
2860
:param id_list: A TestIdList object.
2861
:return: A callable that returns True if the test's id appears in the list.
2863
def condition(test):
2864
return id_list.includes(test.id())
2868
def condition_id_startswith(starts):
2869
"""Create a condition filter verifying that test's id starts with a string.
2871
:param starts: A list of string.
2872
:return: A callable that returns True if the test's id starts with one of
2875
def condition(test):
2876
for start in starts:
2877
if test.id().startswith(start):
2883
def exclude_tests_by_condition(suite, condition):
2884
"""Create a test suite which excludes some tests from suite.
2886
:param suite: The suite to get tests from.
2887
:param condition: A callable whose result evaluates True when called with a
2888
test case which should be excluded from the result.
2889
:return: A suite which contains the tests found in suite that fail
2893
for test in iter_suite_tests(suite):
2894
if not condition(test):
2896
return TestUtil.TestSuite(result)
2899
def filter_suite_by_condition(suite, condition):
2900
"""Create a test suite by filtering another one.
2902
:param suite: The source suite.
2903
:param condition: A callable whose result evaluates True when called with a
2904
test case which should be included in the result.
2905
:return: A suite which contains the tests found in suite that pass
2909
for test in iter_suite_tests(suite):
2912
return TestUtil.TestSuite(result)
1638
2915
def filter_suite_by_re(suite, pattern):
1639
result = TestUtil.TestSuite()
1640
filter_re = re.compile(pattern)
2916
"""Create a test suite by filtering another one.
2918
:param suite: the source suite
2919
:param pattern: pattern that names must match
2920
:returns: the newly created suite
2922
condition = condition_id_re(pattern)
2923
result_suite = filter_suite_by_condition(suite, condition)
2927
def filter_suite_by_id_list(suite, test_id_list):
2928
"""Create a test suite by filtering another one.
2930
:param suite: The source suite.
2931
:param test_id_list: A list of the test ids to keep as strings.
2932
:returns: the newly created suite
2934
condition = condition_id_in_list(test_id_list)
2935
result_suite = filter_suite_by_condition(suite, condition)
2939
def filter_suite_by_id_startswith(suite, start):
2940
"""Create a test suite by filtering another one.
2942
:param suite: The source suite.
2943
:param start: A list of string the test id must start with one of.
2944
:returns: the newly created suite
2946
condition = condition_id_startswith(start)
2947
result_suite = filter_suite_by_condition(suite, condition)
2951
def exclude_tests_by_re(suite, pattern):
2952
"""Create a test suite which excludes some tests from suite.
2954
:param suite: The suite to get tests from.
2955
:param pattern: A regular expression string. Test ids that match this
2956
pattern will be excluded from the result.
2957
:return: A TestSuite that contains all the tests from suite without the
2958
tests that matched pattern. The order of tests is the same as it was in
2961
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2964
def preserve_input(something):
2965
"""A helper for performing test suite transformation chains.
2967
:param something: Anything you want to preserve.
2973
def randomize_suite(suite):
2974
"""Return a new TestSuite with suite's tests in random order.
2976
The tests in the input suite are flattened into a single suite in order to
2977
accomplish this. Any nested TestSuites are removed to provide global
2980
tests = list(iter_suite_tests(suite))
2981
random.shuffle(tests)
2982
return TestUtil.TestSuite(tests)
2985
def split_suite_by_condition(suite, condition):
2986
"""Split a test suite into two by a condition.
2988
:param suite: The suite to split.
2989
:param condition: The condition to match on. Tests that match this
2990
condition are returned in the first test suite, ones that do not match
2991
are in the second suite.
2992
:return: A tuple of two test suites, where the first contains tests from
2993
suite matching the condition, and the second contains the remainder
2994
from suite. The order within each output suite is the same as it was in
1641
2999
for test in iter_suite_tests(suite):
1642
if filter_re.search(test.id()):
1643
result.addTest(test)
3001
matched.append(test)
3003
did_not_match.append(test)
3004
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3007
def split_suite_by_re(suite, pattern):
3008
"""Split a test suite into two by a regular expression.
3010
:param suite: The suite to split.
3011
:param pattern: A regular expression string. Test ids that match this
3012
pattern will be in the first test suite returned, and the others in the
3013
second test suite returned.
3014
:return: A tuple of two test suites, where the first contains tests from
3015
suite matching pattern, and the second contains the remainder from
3016
suite. The order within each output suite is the same as it was in
3019
return split_suite_by_condition(suite, condition_id_re(pattern))
1647
3022
def run_suite(suite, name='test', verbose=False, pattern=".*",
1648
stop_on_failure=False, keep_output=False,
1649
transport=None, lsprof_timed=None, bench_history=None):
3023
stop_on_failure=False,
3024
transport=None, lsprof_timed=None, bench_history=None,
3025
matching_tests_first=None,
3028
exclude_pattern=None,
3031
suite_decorators=None,
3033
result_decorators=None,
3035
"""Run a test suite for bzr selftest.
3037
:param runner_class: The class of runner to use. Must support the
3038
constructor arguments passed by run_suite which are more than standard
3040
:return: A boolean indicating success.
1650
3042
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1655
runner = TextTestRunner(stream=sys.stdout,
3047
if runner_class is None:
3048
runner_class = TextTestRunner
3051
runner = runner_class(stream=stream,
1656
3052
descriptions=0,
1657
3053
verbosity=verbosity,
1658
keep_output=keep_output,
1659
bench_history=bench_history)
3054
bench_history=bench_history,
3056
result_decorators=result_decorators,
1660
3058
runner.stop_on_failure=stop_on_failure
1662
suite = filter_suite_by_re(suite, pattern)
3059
# built in decorator factories:
3061
random_order(random_seed, runner),
3062
exclude_tests(exclude_pattern),
3064
if matching_tests_first:
3065
decorators.append(tests_first(pattern))
3067
decorators.append(filter_tests(pattern))
3068
if suite_decorators:
3069
decorators.extend(suite_decorators)
3070
# tell the result object how many tests will be running: (except if
3071
# --parallel=fork is being used. Robert said he will provide a better
3072
# progress design later -- vila 20090817)
3073
if fork_decorator not in decorators:
3074
decorators.append(CountingDecorator)
3075
for decorator in decorators:
3076
suite = decorator(suite)
3078
# Done after test suite decoration to allow randomisation etc
3079
# to take effect, though that is of marginal benefit.
3081
stream.write("Listing tests only ...\n")
3082
for t in iter_suite_tests(suite):
3083
stream.write("%s\n" % (t.id()))
1663
3085
result = runner.run(suite)
1664
return result.wasSuccessful()
3087
return result.wasStrictlySuccessful()
3089
return result.wasSuccessful()
3092
# A registry where get() returns a suite decorator.
3093
parallel_registry = registry.Registry()
3096
def fork_decorator(suite):
3097
if getattr(os, "fork", None) is None:
3098
raise errors.BzrCommandError("platform does not support fork,"
3099
" try --parallel=subprocess instead.")
3100
concurrency = osutils.local_concurrency()
3101
if concurrency == 1:
3103
from testtools import ConcurrentTestSuite
3104
return ConcurrentTestSuite(suite, fork_for_tests)
3105
parallel_registry.register('fork', fork_decorator)
3108
def subprocess_decorator(suite):
3109
concurrency = osutils.local_concurrency()
3110
if concurrency == 1:
3112
from testtools import ConcurrentTestSuite
3113
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3114
parallel_registry.register('subprocess', subprocess_decorator)
3117
def exclude_tests(exclude_pattern):
3118
"""Return a test suite decorator that excludes tests."""
3119
if exclude_pattern is None:
3120
return identity_decorator
3121
def decorator(suite):
3122
return ExcludeDecorator(suite, exclude_pattern)
3126
def filter_tests(pattern):
3128
return identity_decorator
3129
def decorator(suite):
3130
return FilterTestsDecorator(suite, pattern)
3134
def random_order(random_seed, runner):
3135
"""Return a test suite decorator factory for randomising tests order.
3137
:param random_seed: now, a string which casts to a long, or a long.
3138
:param runner: A test runner with a stream attribute to report on.
3140
if random_seed is None:
3141
return identity_decorator
3142
def decorator(suite):
3143
return RandomDecorator(suite, random_seed, runner.stream)
3147
def tests_first(pattern):
3149
return identity_decorator
3150
def decorator(suite):
3151
return TestFirstDecorator(suite, pattern)
3155
def identity_decorator(suite):
3160
class TestDecorator(TestUtil.TestSuite):
3161
"""A decorator for TestCase/TestSuite objects.
3163
Usually, subclasses should override __iter__(used when flattening test
3164
suites), which we do to filter, reorder, parallelise and so on, run() and
3168
def __init__(self, suite):
3169
TestUtil.TestSuite.__init__(self)
3172
def countTestCases(self):
3175
cases += test.countTestCases()
3182
def run(self, result):
3183
# Use iteration on self, not self._tests, to allow subclasses to hook
3186
if result.shouldStop:
3192
class CountingDecorator(TestDecorator):
3193
"""A decorator which calls result.progress(self.countTestCases)."""
3195
def run(self, result):
3196
progress_method = getattr(result, 'progress', None)
3197
if callable(progress_method):
3198
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3199
return super(CountingDecorator, self).run(result)
3202
class ExcludeDecorator(TestDecorator):
3203
"""A decorator which excludes test matching an exclude pattern."""
3205
def __init__(self, suite, exclude_pattern):
3206
TestDecorator.__init__(self, suite)
3207
self.exclude_pattern = exclude_pattern
3208
self.excluded = False
3212
return iter(self._tests)
3213
self.excluded = True
3214
suite = exclude_tests_by_re(self, self.exclude_pattern)
3216
self.addTests(suite)
3217
return iter(self._tests)
3220
class FilterTestsDecorator(TestDecorator):
3221
"""A decorator which filters tests to those matching a pattern."""
3223
def __init__(self, suite, pattern):
3224
TestDecorator.__init__(self, suite)
3225
self.pattern = pattern
3226
self.filtered = False
3230
return iter(self._tests)
3231
self.filtered = True
3232
suite = filter_suite_by_re(self, self.pattern)
3234
self.addTests(suite)
3235
return iter(self._tests)
3238
class RandomDecorator(TestDecorator):
3239
"""A decorator which randomises the order of its tests."""
3241
def __init__(self, suite, random_seed, stream):
3242
TestDecorator.__init__(self, suite)
3243
self.random_seed = random_seed
3244
self.randomised = False
3245
self.stream = stream
3249
return iter(self._tests)
3250
self.randomised = True
3251
self.stream.write("Randomizing test order using seed %s\n\n" %
3252
(self.actual_seed()))
3253
# Initialise the random number generator.
3254
random.seed(self.actual_seed())
3255
suite = randomize_suite(self)
3257
self.addTests(suite)
3258
return iter(self._tests)
3260
def actual_seed(self):
3261
if self.random_seed == "now":
3262
# We convert the seed to a long to make it reuseable across
3263
# invocations (because the user can reenter it).
3264
self.random_seed = long(time.time())
3266
# Convert the seed to a long if we can
3268
self.random_seed = long(self.random_seed)
3271
return self.random_seed
3274
class TestFirstDecorator(TestDecorator):
3275
"""A decorator which moves named tests to the front."""
3277
def __init__(self, suite, pattern):
3278
TestDecorator.__init__(self, suite)
3279
self.pattern = pattern
3280
self.filtered = False
3284
return iter(self._tests)
3285
self.filtered = True
3286
suites = split_suite_by_re(self, self.pattern)
3288
self.addTests(suites)
3289
return iter(self._tests)
3292
def partition_tests(suite, count):
3293
"""Partition suite into count lists of tests."""
3294
# This just assigns tests in a round-robin fashion. On one hand this
3295
# splits up blocks of related tests that might run faster if they shared
3296
# resources, but on the other it avoids assigning blocks of slow tests to
3297
# just one partition. So the slowest partition shouldn't be much slower
3299
partitions = [list() for i in range(count)]
3300
tests = iter_suite_tests(suite)
3301
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3302
partition.append(test)
3306
def workaround_zealous_crypto_random():
3307
"""Crypto.Random want to help us being secure, but we don't care here.
3309
This workaround some test failure related to the sftp server. Once paramiko
3310
stop using the controversial API in Crypto.Random, we may get rid of it.
3313
from Crypto.Random import atfork
3319
def fork_for_tests(suite):
3320
"""Take suite and start up one runner per CPU by forking()
3322
:return: An iterable of TestCase-like objects which can each have
3323
run(result) called on them to feed tests to result.
3325
concurrency = osutils.local_concurrency()
3327
from subunit import TestProtocolClient, ProtocolTestCase
3328
from subunit.test_results import AutoTimingTestResultDecorator
3329
class TestInOtherProcess(ProtocolTestCase):
3330
# Should be in subunit, I think. RBC.
3331
def __init__(self, stream, pid):
3332
ProtocolTestCase.__init__(self, stream)
3335
def run(self, result):
3337
ProtocolTestCase.run(self, result)
3339
os.waitpid(self.pid, 0)
3341
test_blocks = partition_tests(suite, concurrency)
3342
for process_tests in test_blocks:
3343
process_suite = TestUtil.TestSuite()
3344
process_suite.addTests(process_tests)
3345
c2pread, c2pwrite = os.pipe()
3348
workaround_zealous_crypto_random()
3351
# Leave stderr and stdout open so we can see test noise
3352
# Close stdin so that the child goes away if it decides to
3353
# read from stdin (otherwise its a roulette to see what
3354
# child actually gets keystrokes for pdb etc).
3357
stream = os.fdopen(c2pwrite, 'wb', 1)
3358
subunit_result = AutoTimingTestResultDecorator(
3359
TestProtocolClient(stream))
3360
process_suite.run(subunit_result)
3365
stream = os.fdopen(c2pread, 'rb', 1)
3366
test = TestInOtherProcess(stream, pid)
3371
def reinvoke_for_tests(suite):
3372
"""Take suite and start up one runner per CPU using subprocess().
3374
:return: An iterable of TestCase-like objects which can each have
3375
run(result) called on them to feed tests to result.
3377
concurrency = osutils.local_concurrency()
3379
from subunit import ProtocolTestCase
3380
class TestInSubprocess(ProtocolTestCase):
3381
def __init__(self, process, name):
3382
ProtocolTestCase.__init__(self, process.stdout)
3383
self.process = process
3384
self.process.stdin.close()
3387
def run(self, result):
3389
ProtocolTestCase.run(self, result)
3392
os.unlink(self.name)
3393
# print "pid %d finished" % finished_process
3394
test_blocks = partition_tests(suite, concurrency)
3395
for process_tests in test_blocks:
3396
# ugly; currently reimplement rather than reuses TestCase methods.
3397
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3398
if not os.path.isfile(bzr_path):
3399
# We are probably installed. Assume sys.argv is the right file
3400
bzr_path = sys.argv[0]
3401
bzr_path = [bzr_path]
3402
if sys.platform == "win32":
3403
# if we're on windows, we can't execute the bzr script directly
3404
bzr_path = [sys.executable] + bzr_path
3405
fd, test_list_file_name = tempfile.mkstemp()
3406
test_list_file = os.fdopen(fd, 'wb', 1)
3407
for test in process_tests:
3408
test_list_file.write(test.id() + '\n')
3409
test_list_file.close()
3411
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3413
if '--no-plugins' in sys.argv:
3414
argv.append('--no-plugins')
3415
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3416
# noise on stderr it can interrupt the subunit protocol.
3417
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3418
stdout=subprocess.PIPE,
3419
stderr=subprocess.PIPE,
3421
test = TestInSubprocess(process, test_list_file_name)
3424
os.unlink(test_list_file_name)
3429
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3430
"""Generate profiling data for all activity between start and success.
3432
The profile data is appended to the test's _benchcalls attribute and can
3433
be accessed by the forwarded-to TestResult.
3435
While it might be cleaner do accumulate this in stopTest, addSuccess is
3436
where our existing output support for lsprof is, and this class aims to
3437
fit in with that: while it could be moved it's not necessary to accomplish
3438
test profiling, nor would it be dramatically cleaner.
3441
def startTest(self, test):
3442
self.profiler = bzrlib.lsprof.BzrProfiler()
3443
# Prevent deadlocks in tests that use lsprof: those tests will
3445
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3446
self.profiler.start()
3447
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3449
def addSuccess(self, test):
3450
stats = self.profiler.stop()
3452
calls = test._benchcalls
3453
except AttributeError:
3454
test._benchcalls = []
3455
calls = test._benchcalls
3456
calls.append(((test.id(), "", ""), stats))
3457
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3459
def stopTest(self, test):
3460
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3461
self.profiler = None
3464
# Controlled by "bzr selftest -E=..." option
3465
# Currently supported:
3466
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3467
# preserves any flags supplied at the command line.
3468
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3469
# rather than failing tests. And no longer raise
3470
# LockContention when fctnl locks are not being used
3471
# with proper exclusion rules.
3472
# -Ethreads Will display thread ident at creation/join time to
3473
# help track thread leaks
3474
selftest_debug_flags = set()
1667
3477
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1669
3478
transport=None,
1670
3479
test_suite_factory=None,
1671
3480
lsprof_timed=None,
1672
bench_history=None):
3482
matching_tests_first=None,
3485
exclude_pattern=None,
3491
suite_decorators=None,
1673
3495
"""Run the whole test suite under the enhanced runner"""
1674
3496
# XXX: Very ugly way to do this...
1675
3497
# Disable warning about old formats because we don't want it to disturb
1682
3504
transport = default_transport
1683
3505
old_transport = default_transport
1684
3506
default_transport = transport
3507
global selftest_debug_flags
3508
old_debug_flags = selftest_debug_flags
3509
if debug_flags is not None:
3510
selftest_debug_flags = set(debug_flags)
3512
if load_list is None:
3515
keep_only = load_test_id_list(load_list)
3517
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3518
for start in starting_with]
1686
3519
if test_suite_factory is None:
1687
suite = test_suite()
3520
# Reduce loading time by loading modules based on the starting_with
3522
suite = test_suite(keep_only, starting_with)
1689
3524
suite = test_suite_factory()
3526
# But always filter as requested.
3527
suite = filter_suite_by_id_startswith(suite, starting_with)
3528
result_decorators = []
3530
result_decorators.append(ProfileResult)
1690
3531
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1691
stop_on_failure=stop_on_failure, keep_output=keep_output,
3532
stop_on_failure=stop_on_failure,
1692
3533
transport=transport,
1693
3534
lsprof_timed=lsprof_timed,
1694
bench_history=bench_history)
3535
bench_history=bench_history,
3536
matching_tests_first=matching_tests_first,
3537
list_only=list_only,
3538
random_seed=random_seed,
3539
exclude_pattern=exclude_pattern,
3541
runner_class=runner_class,
3542
suite_decorators=suite_decorators,
3544
result_decorators=result_decorators,
1696
3547
default_transport = old_transport
3548
selftest_debug_flags = old_debug_flags
3551
def load_test_id_list(file_name):
3552
"""Load a test id list from a text file.
3554
The format is one test id by line. No special care is taken to impose
3555
strict rules, these test ids are used to filter the test suite so a test id
3556
that do not match an existing test will do no harm. This allows user to add
3557
comments, leave blank lines, etc.
3561
ftest = open(file_name, 'rt')
3563
if e.errno != errno.ENOENT:
3566
raise errors.NoSuchFile(file_name)
3568
for test_name in ftest.readlines():
3569
test_list.append(test_name.strip())
3574
def suite_matches_id_list(test_suite, id_list):
3575
"""Warns about tests not appearing or appearing more than once.
3577
:param test_suite: A TestSuite object.
3578
:param test_id_list: The list of test ids that should be found in
3581
:return: (absents, duplicates) absents is a list containing the test found
3582
in id_list but not in test_suite, duplicates is a list containing the
3583
test found multiple times in test_suite.
3585
When using a prefined test id list, it may occurs that some tests do not
3586
exist anymore or that some tests use the same id. This function warns the
3587
tester about potential problems in his workflow (test lists are volatile)
3588
or in the test suite itself (using the same id for several tests does not
3589
help to localize defects).
3591
# Build a dict counting id occurrences
3593
for test in iter_suite_tests(test_suite):
3595
tests[id] = tests.get(id, 0) + 1
3600
occurs = tests.get(id, 0)
3602
not_found.append(id)
3604
duplicates.append(id)
3606
return not_found, duplicates
3609
class TestIdList(object):
3610
"""Test id list to filter a test suite.
3612
Relying on the assumption that test ids are built as:
3613
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3614
notation, this class offers methods to :
3615
- avoid building a test suite for modules not refered to in the test list,
3616
- keep only the tests listed from the module test suite.
3619
def __init__(self, test_id_list):
3620
# When a test suite needs to be filtered against us we compare test ids
3621
# for equality, so a simple dict offers a quick and simple solution.
3622
self.tests = dict().fromkeys(test_id_list, True)
3624
# While unittest.TestCase have ids like:
3625
# <module>.<class>.<method>[(<param+)],
3626
# doctest.DocTestCase can have ids like:
3629
# <module>.<function>
3630
# <module>.<class>.<method>
3632
# Since we can't predict a test class from its name only, we settle on
3633
# a simple constraint: a test id always begins with its module name.
3636
for test_id in test_id_list:
3637
parts = test_id.split('.')
3638
mod_name = parts.pop(0)
3639
modules[mod_name] = True
3641
mod_name += '.' + part
3642
modules[mod_name] = True
3643
self.modules = modules
3645
def refers_to(self, module_name):
3646
"""Is there tests for the module or one of its sub modules."""
3647
return self.modules.has_key(module_name)
3649
def includes(self, test_id):
3650
return self.tests.has_key(test_id)
3653
class TestPrefixAliasRegistry(registry.Registry):
3654
"""A registry for test prefix aliases.
3656
This helps implement shorcuts for the --starting-with selftest
3657
option. Overriding existing prefixes is not allowed but not fatal (a
3658
warning will be emitted).
3661
def register(self, key, obj, help=None, info=None,
3662
override_existing=False):
3663
"""See Registry.register.
3665
Trying to override an existing alias causes a warning to be emitted,
3666
not a fatal execption.
3669
super(TestPrefixAliasRegistry, self).register(
3670
key, obj, help=help, info=info, override_existing=False)
3672
actual = self.get(key)
3674
'Test prefix alias %s is already used for %s, ignoring %s'
3675
% (key, actual, obj))
3677
def resolve_alias(self, id_start):
3678
"""Replace the alias by the prefix in the given string.
3680
Using an unknown prefix is an error to help catching typos.
3682
parts = id_start.split('.')
3684
parts[0] = self.get(parts[0])
3686
raise errors.BzrCommandError(
3687
'%s is not a known test prefix alias' % parts[0])
3688
return '.'.join(parts)
3691
test_prefix_alias_registry = TestPrefixAliasRegistry()
3692
"""Registry of test prefix aliases."""
3695
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3696
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3697
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3699
# Obvious highest levels prefixes, feel free to add your own via a plugin
3700
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3701
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3702
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3703
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3704
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3707
def _test_suite_testmod_names():
3708
"""Return the standard list of test module names to test."""
3711
'bzrlib.tests.blackbox',
3712
'bzrlib.tests.commands',
3713
'bzrlib.tests.doc_generate',
3714
'bzrlib.tests.per_branch',
3715
'bzrlib.tests.per_bzrdir',
3716
'bzrlib.tests.per_controldir',
3717
'bzrlib.tests.per_controldir_colo',
3718
'bzrlib.tests.per_foreign_vcs',
3719
'bzrlib.tests.per_interrepository',
3720
'bzrlib.tests.per_intertree',
3721
'bzrlib.tests.per_inventory',
3722
'bzrlib.tests.per_interbranch',
3723
'bzrlib.tests.per_lock',
3724
'bzrlib.tests.per_merger',
3725
'bzrlib.tests.per_transport',
3726
'bzrlib.tests.per_tree',
3727
'bzrlib.tests.per_pack_repository',
3728
'bzrlib.tests.per_repository',
3729
'bzrlib.tests.per_repository_chk',
3730
'bzrlib.tests.per_repository_reference',
3731
'bzrlib.tests.per_repository_vf',
3732
'bzrlib.tests.per_uifactory',
3733
'bzrlib.tests.per_versionedfile',
3734
'bzrlib.tests.per_workingtree',
3735
'bzrlib.tests.test__annotator',
3736
'bzrlib.tests.test__bencode',
3737
'bzrlib.tests.test__btree_serializer',
3738
'bzrlib.tests.test__chk_map',
3739
'bzrlib.tests.test__dirstate_helpers',
3740
'bzrlib.tests.test__groupcompress',
3741
'bzrlib.tests.test__known_graph',
3742
'bzrlib.tests.test__rio',
3743
'bzrlib.tests.test__simple_set',
3744
'bzrlib.tests.test__static_tuple',
3745
'bzrlib.tests.test__walkdirs_win32',
3746
'bzrlib.tests.test_ancestry',
3747
'bzrlib.tests.test_annotate',
3748
'bzrlib.tests.test_api',
3749
'bzrlib.tests.test_atomicfile',
3750
'bzrlib.tests.test_bad_files',
3751
'bzrlib.tests.test_bisect_multi',
3752
'bzrlib.tests.test_branch',
3753
'bzrlib.tests.test_branchbuilder',
3754
'bzrlib.tests.test_btree_index',
3755
'bzrlib.tests.test_bugtracker',
3756
'bzrlib.tests.test_bundle',
3757
'bzrlib.tests.test_bzrdir',
3758
'bzrlib.tests.test__chunks_to_lines',
3759
'bzrlib.tests.test_cache_utf8',
3760
'bzrlib.tests.test_chk_map',
3761
'bzrlib.tests.test_chk_serializer',
3762
'bzrlib.tests.test_chunk_writer',
3763
'bzrlib.tests.test_clean_tree',
3764
'bzrlib.tests.test_cleanup',
3765
'bzrlib.tests.test_cmdline',
3766
'bzrlib.tests.test_commands',
3767
'bzrlib.tests.test_commit',
3768
'bzrlib.tests.test_commit_merge',
3769
'bzrlib.tests.test_config',
3770
'bzrlib.tests.test_conflicts',
3771
'bzrlib.tests.test_controldir',
3772
'bzrlib.tests.test_counted_lock',
3773
'bzrlib.tests.test_crash',
3774
'bzrlib.tests.test_decorators',
3775
'bzrlib.tests.test_delta',
3776
'bzrlib.tests.test_debug',
3777
'bzrlib.tests.test_deprecated_graph',
3778
'bzrlib.tests.test_diff',
3779
'bzrlib.tests.test_directory_service',
3780
'bzrlib.tests.test_dirstate',
3781
'bzrlib.tests.test_email_message',
3782
'bzrlib.tests.test_eol_filters',
3783
'bzrlib.tests.test_errors',
3784
'bzrlib.tests.test_export',
3785
'bzrlib.tests.test_extract',
3786
'bzrlib.tests.test_fetch',
3787
'bzrlib.tests.test_fixtures',
3788
'bzrlib.tests.test_fifo_cache',
3789
'bzrlib.tests.test_filters',
3790
'bzrlib.tests.test_ftp_transport',
3791
'bzrlib.tests.test_foreign',
3792
'bzrlib.tests.test_generate_docs',
3793
'bzrlib.tests.test_generate_ids',
3794
'bzrlib.tests.test_globbing',
3795
'bzrlib.tests.test_gpg',
3796
'bzrlib.tests.test_graph',
3797
'bzrlib.tests.test_groupcompress',
3798
'bzrlib.tests.test_hashcache',
3799
'bzrlib.tests.test_help',
3800
'bzrlib.tests.test_hooks',
3801
'bzrlib.tests.test_http',
3802
'bzrlib.tests.test_http_response',
3803
'bzrlib.tests.test_https_ca_bundle',
3804
'bzrlib.tests.test_identitymap',
3805
'bzrlib.tests.test_ignores',
3806
'bzrlib.tests.test_index',
3807
'bzrlib.tests.test_import_tariff',
3808
'bzrlib.tests.test_info',
3809
'bzrlib.tests.test_inv',
3810
'bzrlib.tests.test_inventory_delta',
3811
'bzrlib.tests.test_knit',
3812
'bzrlib.tests.test_lazy_import',
3813
'bzrlib.tests.test_lazy_regex',
3814
'bzrlib.tests.test_library_state',
3815
'bzrlib.tests.test_lock',
3816
'bzrlib.tests.test_lockable_files',
3817
'bzrlib.tests.test_lockdir',
3818
'bzrlib.tests.test_log',
3819
'bzrlib.tests.test_lru_cache',
3820
'bzrlib.tests.test_lsprof',
3821
'bzrlib.tests.test_mail_client',
3822
'bzrlib.tests.test_matchers',
3823
'bzrlib.tests.test_memorytree',
3824
'bzrlib.tests.test_merge',
3825
'bzrlib.tests.test_merge3',
3826
'bzrlib.tests.test_merge_core',
3827
'bzrlib.tests.test_merge_directive',
3828
'bzrlib.tests.test_mergetools',
3829
'bzrlib.tests.test_missing',
3830
'bzrlib.tests.test_msgeditor',
3831
'bzrlib.tests.test_multiparent',
3832
'bzrlib.tests.test_mutabletree',
3833
'bzrlib.tests.test_nonascii',
3834
'bzrlib.tests.test_options',
3835
'bzrlib.tests.test_osutils',
3836
'bzrlib.tests.test_osutils_encodings',
3837
'bzrlib.tests.test_pack',
3838
'bzrlib.tests.test_patch',
3839
'bzrlib.tests.test_patches',
3840
'bzrlib.tests.test_permissions',
3841
'bzrlib.tests.test_plugins',
3842
'bzrlib.tests.test_progress',
3843
'bzrlib.tests.test_pyutils',
3844
'bzrlib.tests.test_read_bundle',
3845
'bzrlib.tests.test_reconcile',
3846
'bzrlib.tests.test_reconfigure',
3847
'bzrlib.tests.test_registry',
3848
'bzrlib.tests.test_remote',
3849
'bzrlib.tests.test_rename_map',
3850
'bzrlib.tests.test_repository',
3851
'bzrlib.tests.test_revert',
3852
'bzrlib.tests.test_revision',
3853
'bzrlib.tests.test_revisionspec',
3854
'bzrlib.tests.test_revisiontree',
3855
'bzrlib.tests.test_rio',
3856
'bzrlib.tests.test_rules',
3857
'bzrlib.tests.test_sampler',
3858
'bzrlib.tests.test_scenarios',
3859
'bzrlib.tests.test_script',
3860
'bzrlib.tests.test_selftest',
3861
'bzrlib.tests.test_serializer',
3862
'bzrlib.tests.test_setup',
3863
'bzrlib.tests.test_sftp_transport',
3864
'bzrlib.tests.test_shelf',
3865
'bzrlib.tests.test_shelf_ui',
3866
'bzrlib.tests.test_smart',
3867
'bzrlib.tests.test_smart_add',
3868
'bzrlib.tests.test_smart_request',
3869
'bzrlib.tests.test_smart_transport',
3870
'bzrlib.tests.test_smtp_connection',
3871
'bzrlib.tests.test_source',
3872
'bzrlib.tests.test_ssh_transport',
3873
'bzrlib.tests.test_status',
3874
'bzrlib.tests.test_store',
3875
'bzrlib.tests.test_strace',
3876
'bzrlib.tests.test_subsume',
3877
'bzrlib.tests.test_switch',
3878
'bzrlib.tests.test_symbol_versioning',
3879
'bzrlib.tests.test_tag',
3880
'bzrlib.tests.test_test_server',
3881
'bzrlib.tests.test_testament',
3882
'bzrlib.tests.test_textfile',
3883
'bzrlib.tests.test_textmerge',
3884
'bzrlib.tests.test_cethread',
3885
'bzrlib.tests.test_timestamp',
3886
'bzrlib.tests.test_trace',
3887
'bzrlib.tests.test_transactions',
3888
'bzrlib.tests.test_transform',
3889
'bzrlib.tests.test_transport',
3890
'bzrlib.tests.test_transport_log',
3891
'bzrlib.tests.test_tree',
3892
'bzrlib.tests.test_treebuilder',
3893
'bzrlib.tests.test_treeshape',
3894
'bzrlib.tests.test_tsort',
3895
'bzrlib.tests.test_tuned_gzip',
3896
'bzrlib.tests.test_ui',
3897
'bzrlib.tests.test_uncommit',
3898
'bzrlib.tests.test_upgrade',
3899
'bzrlib.tests.test_upgrade_stacked',
3900
'bzrlib.tests.test_urlutils',
3901
'bzrlib.tests.test_version',
3902
'bzrlib.tests.test_version_info',
3903
'bzrlib.tests.test_versionedfile',
3904
'bzrlib.tests.test_weave',
3905
'bzrlib.tests.test_whitebox',
3906
'bzrlib.tests.test_win32utils',
3907
'bzrlib.tests.test_workingtree',
3908
'bzrlib.tests.test_workingtree_4',
3909
'bzrlib.tests.test_wsgi',
3910
'bzrlib.tests.test_xml',
3914
def _test_suite_modules_to_doctest():
3915
"""Return the list of modules to doctest."""
3917
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3921
'bzrlib.branchbuilder',
3922
'bzrlib.decorators',
3924
'bzrlib.iterablefile',
3929
'bzrlib.symbol_versioning',
3931
'bzrlib.tests.fixtures',
3933
'bzrlib.transport.http',
3934
'bzrlib.version_info_formats.format_custom',
3938
def test_suite(keep_only=None, starting_with=None):
1700
3939
"""Build and return TestSuite for the whole of bzrlib.
3941
:param keep_only: A list of test ids limiting the suite returned.
3943
:param starting_with: An id limiting the suite returned to the tests
1702
3946
This function can be replaced if you need to change the default test
1703
3947
suite on a global basis, but it is not encouraged.
1706
'bzrlib.tests.test_ancestry',
1707
'bzrlib.tests.test_api',
1708
'bzrlib.tests.test_atomicfile',
1709
'bzrlib.tests.test_bad_files',
1710
'bzrlib.tests.test_branch',
1711
'bzrlib.tests.test_bundle',
1712
'bzrlib.tests.test_bzrdir',
1713
'bzrlib.tests.test_cache_utf8',
1714
'bzrlib.tests.test_command',
1715
'bzrlib.tests.test_commit',
1716
'bzrlib.tests.test_commit_merge',
1717
'bzrlib.tests.test_config',
1718
'bzrlib.tests.test_conflicts',
1719
'bzrlib.tests.test_decorators',
1720
'bzrlib.tests.test_diff',
1721
'bzrlib.tests.test_doc_generate',
1722
'bzrlib.tests.test_errors',
1723
'bzrlib.tests.test_escaped_store',
1724
'bzrlib.tests.test_fetch',
1725
'bzrlib.tests.test_ftp_transport',
1726
'bzrlib.tests.test_generate_ids',
1727
'bzrlib.tests.test_gpg',
1728
'bzrlib.tests.test_graph',
1729
'bzrlib.tests.test_hashcache',
1730
'bzrlib.tests.test_http',
1731
'bzrlib.tests.test_http_response',
1732
'bzrlib.tests.test_identitymap',
1733
'bzrlib.tests.test_ignores',
1734
'bzrlib.tests.test_inv',
1735
'bzrlib.tests.test_knit',
1736
'bzrlib.tests.test_lazy_import',
1737
'bzrlib.tests.test_lazy_regex',
1738
'bzrlib.tests.test_lockdir',
1739
'bzrlib.tests.test_lockable_files',
1740
'bzrlib.tests.test_log',
1741
'bzrlib.tests.test_memorytree',
1742
'bzrlib.tests.test_merge',
1743
'bzrlib.tests.test_merge3',
1744
'bzrlib.tests.test_merge_core',
1745
'bzrlib.tests.test_missing',
1746
'bzrlib.tests.test_msgeditor',
1747
'bzrlib.tests.test_nonascii',
1748
'bzrlib.tests.test_options',
1749
'bzrlib.tests.test_osutils',
1750
'bzrlib.tests.test_patch',
1751
'bzrlib.tests.test_patches',
1752
'bzrlib.tests.test_permissions',
1753
'bzrlib.tests.test_plugins',
1754
'bzrlib.tests.test_progress',
1755
'bzrlib.tests.test_reconcile',
1756
'bzrlib.tests.test_registry',
1757
'bzrlib.tests.test_repository',
1758
'bzrlib.tests.test_revert',
1759
'bzrlib.tests.test_revision',
1760
'bzrlib.tests.test_revisionnamespaces',
1761
'bzrlib.tests.test_revisiontree',
1762
'bzrlib.tests.test_rio',
1763
'bzrlib.tests.test_sampler',
1764
'bzrlib.tests.test_selftest',
1765
'bzrlib.tests.test_setup',
1766
'bzrlib.tests.test_sftp_transport',
1767
'bzrlib.tests.test_smart_add',
1768
'bzrlib.tests.test_smart_transport',
1769
'bzrlib.tests.test_source',
1770
'bzrlib.tests.test_status',
1771
'bzrlib.tests.test_store',
1772
'bzrlib.tests.test_symbol_versioning',
1773
'bzrlib.tests.test_testament',
1774
'bzrlib.tests.test_textfile',
1775
'bzrlib.tests.test_textmerge',
1776
'bzrlib.tests.test_trace',
1777
'bzrlib.tests.test_transactions',
1778
'bzrlib.tests.test_transform',
1779
'bzrlib.tests.test_transport',
1780
'bzrlib.tests.test_tree',
1781
'bzrlib.tests.test_treebuilder',
1782
'bzrlib.tests.test_tsort',
1783
'bzrlib.tests.test_tuned_gzip',
1784
'bzrlib.tests.test_ui',
1785
'bzrlib.tests.test_upgrade',
1786
'bzrlib.tests.test_urlutils',
1787
'bzrlib.tests.test_versionedfile',
1788
'bzrlib.tests.test_version',
1789
'bzrlib.tests.test_version_info',
1790
'bzrlib.tests.test_weave',
1791
'bzrlib.tests.test_whitebox',
1792
'bzrlib.tests.test_workingtree',
1793
'bzrlib.tests.test_wsgi',
1794
'bzrlib.tests.test_xml',
1796
test_transport_implementations = [
1797
'bzrlib.tests.test_transport_implementations',
1798
'bzrlib.tests.test_read_bundle',
1800
suite = TestUtil.TestSuite()
1801
3950
loader = TestUtil.TestLoader()
1802
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1803
from bzrlib.transport import TransportTestProviderAdapter
1804
adapter = TransportTestProviderAdapter()
1805
adapt_modules(test_transport_implementations, adapter, loader, suite)
1806
for package in packages_to_test():
1807
suite.addTest(package.test_suite())
1808
for m in MODULES_TO_TEST:
1809
suite.addTest(loader.loadTestsFromModule(m))
1810
for m in MODULES_TO_DOCTEST:
3952
if keep_only is not None:
3953
id_filter = TestIdList(keep_only)
3955
# We take precedence over keep_only because *at loading time* using
3956
# both options means we will load less tests for the same final result.
3957
def interesting_module(name):
3958
for start in starting_with:
3960
# Either the module name starts with the specified string
3961
name.startswith(start)
3962
# or it may contain tests starting with the specified string
3963
or start.startswith(name)
3967
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3969
elif keep_only is not None:
3970
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3971
def interesting_module(name):
3972
return id_filter.refers_to(name)
3975
loader = TestUtil.TestLoader()
3976
def interesting_module(name):
3977
# No filtering, all modules are interesting
3980
suite = loader.suiteClass()
3982
# modules building their suite with loadTestsFromModuleNames
3983
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3985
for mod in _test_suite_modules_to_doctest():
3986
if not interesting_module(mod):
3987
# No tests to keep here, move along
1812
suite.addTest(doctest.DocTestSuite(m))
3990
# note that this really does mean "report only" -- doctest
3991
# still runs the rest of the examples
3992
doc_suite = IsolatedDocTestSuite(
3993
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
1813
3994
except ValueError, e:
1814
print '**failed to get doctest for: %s\n%s' %(m,e)
3995
print '**failed to get doctest for: %s\n%s' % (mod, e)
1816
for name, plugin in bzrlib.plugin.all_plugins().items():
1817
if getattr(plugin, 'test_suite', None) is not None:
1818
suite.addTest(plugin.test_suite())
3997
if len(doc_suite._tests) == 0:
3998
raise errors.BzrError("no doctests found in %s" % (mod,))
3999
suite.addTest(doc_suite)
4001
default_encoding = sys.getdefaultencoding()
4002
for name, plugin in _mod_plugin.plugins().items():
4003
if not interesting_module(plugin.module.__name__):
4005
plugin_suite = plugin.test_suite()
4006
# We used to catch ImportError here and turn it into just a warning,
4007
# but really if you don't have --no-plugins this should be a failure.
4008
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4009
if plugin_suite is None:
4010
plugin_suite = plugin.load_plugin_tests(loader)
4011
if plugin_suite is not None:
4012
suite.addTest(plugin_suite)
4013
if default_encoding != sys.getdefaultencoding():
4015
'Plugin "%s" tried to reset default encoding to: %s', name,
4016
sys.getdefaultencoding())
4018
sys.setdefaultencoding(default_encoding)
4020
if keep_only is not None:
4021
# Now that the referred modules have loaded their tests, keep only the
4023
suite = filter_suite_by_id_list(suite, id_filter)
4024
# Do some sanity checks on the id_list filtering
4025
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4027
# The tester has used both keep_only and starting_with, so he is
4028
# already aware that some tests are excluded from the list, there
4029
# is no need to tell him which.
4032
# Some tests mentioned in the list are not in the test suite. The
4033
# list may be out of date, report to the tester.
4034
for id in not_found:
4035
trace.warning('"%s" not found in the test suite', id)
4036
for id in duplicates:
4037
trace.warning('"%s" is used as an id by several tests', id)
1822
def adapt_modules(mods_list, adapter, loader, suite):
1823
"""Adapt the modules in mods_list using adapter and add to suite."""
1824
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1825
suite.addTests(adapter.adapt(test))
4042
def multiply_scenarios(*scenarios):
4043
"""Multiply two or more iterables of scenarios.
4045
It is safe to pass scenario generators or iterators.
4047
:returns: A list of compound scenarios: the cross-product of all
4048
scenarios, with the names concatenated and the parameters
4051
return reduce(_multiply_two_scenarios, map(list, scenarios))
4054
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4055
"""Multiply two sets of scenarios.
4057
:returns: the cartesian product of the two sets of scenarios, that is
4058
a scenario for every possible combination of a left scenario and a
4062
('%s,%s' % (left_name, right_name),
4063
dict(left_dict.items() + right_dict.items()))
4064
for left_name, left_dict in scenarios_left
4065
for right_name, right_dict in scenarios_right]
4068
def multiply_tests(tests, scenarios, result):
4069
"""Multiply tests_list by scenarios into result.
4071
This is the core workhorse for test parameterisation.
4073
Typically the load_tests() method for a per-implementation test suite will
4074
call multiply_tests and return the result.
4076
:param tests: The tests to parameterise.
4077
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4078
scenario_param_dict).
4079
:param result: A TestSuite to add created tests to.
4081
This returns the passed in result TestSuite with the cross product of all
4082
the tests repeated once for each scenario. Each test is adapted by adding
4083
the scenario name at the end of its id(), and updating the test object's
4084
__dict__ with the scenario_param_dict.
4086
>>> import bzrlib.tests.test_sampler
4087
>>> r = multiply_tests(
4088
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4089
... [('one', dict(param=1)),
4090
... ('two', dict(param=2))],
4091
... TestUtil.TestSuite())
4092
>>> tests = list(iter_suite_tests(r))
4096
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4102
for test in iter_suite_tests(tests):
4103
apply_scenarios(test, scenarios, result)
4107
def apply_scenarios(test, scenarios, result):
4108
"""Apply the scenarios in scenarios to test and add to result.
4110
:param test: The test to apply scenarios to.
4111
:param scenarios: An iterable of scenarios to apply to test.
4113
:seealso: apply_scenario
4115
for scenario in scenarios:
4116
result.addTest(apply_scenario(test, scenario))
4120
def apply_scenario(test, scenario):
4121
"""Copy test and apply scenario to it.
4123
:param test: A test to adapt.
4124
:param scenario: A tuple describing the scenarion.
4125
The first element of the tuple is the new test id.
4126
The second element is a dict containing attributes to set on the
4128
:return: The adapted test.
4130
new_id = "%s(%s)" % (test.id(), scenario[0])
4131
new_test = clone_test(test, new_id)
4132
for name, value in scenario[1].items():
4133
setattr(new_test, name, value)
4137
def clone_test(test, new_id):
4138
"""Clone a test giving it a new id.
4140
:param test: The test to clone.
4141
:param new_id: The id to assign to it.
4142
:return: The new test.
4144
new_test = copy.copy(test)
4145
new_test.id = lambda: new_id
4146
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4147
# causes cloned tests to share the 'details' dict. This makes it hard to
4148
# read the test output for parameterized tests, because tracebacks will be
4149
# associated with irrelevant tests.
4151
details = new_test._TestCase__details
4152
except AttributeError:
4153
# must be a different version of testtools than expected. Do nothing.
4156
# Reset the '__details' dict.
4157
new_test._TestCase__details = {}
4161
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4163
"""Helper for permutating tests against an extension module.
4165
This is meant to be used inside a modules 'load_tests()' function. It will
4166
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4167
against both implementations. Setting 'test.module' to the appropriate
4168
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4170
:param standard_tests: A test suite to permute
4171
:param loader: A TestLoader
4172
:param py_module_name: The python path to a python module that can always
4173
be loaded, and will be considered the 'python' implementation. (eg
4174
'bzrlib._chk_map_py')
4175
:param ext_module_name: The python path to an extension module. If the
4176
module cannot be loaded, a single test will be added, which notes that
4177
the module is not available. If it can be loaded, all standard_tests
4178
will be run against that module.
4179
:return: (suite, feature) suite is a test-suite that has all the permuted
4180
tests. feature is the Feature object that can be used to determine if
4181
the module is available.
4184
py_module = pyutils.get_named_object(py_module_name)
4186
('python', {'module': py_module}),
4188
suite = loader.suiteClass()
4189
feature = ModuleAvailableFeature(ext_module_name)
4190
if feature.available():
4191
scenarios.append(('C', {'module': feature.module}))
4193
# the compiled module isn't available, so we add a failing test
4194
class FailWithoutFeature(TestCase):
4195
def test_fail(self):
4196
self.requireFeature(feature)
4197
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4198
result = multiply_tests(standard_tests, scenarios, suite)
4199
return result, feature
4202
def _rmtree_temp_dir(dirname, test_id=None):
4203
# If LANG=C we probably have created some bogus paths
4204
# which rmtree(unicode) will fail to delete
4205
# so make sure we are using rmtree(str) to delete everything
4206
# except on win32, where rmtree(str) will fail
4207
# since it doesn't have the property of byte-stream paths
4208
# (they are either ascii or mbcs)
4209
if sys.platform == 'win32':
4210
# make sure we are using the unicode win32 api
4211
dirname = unicode(dirname)
4213
dirname = dirname.encode(sys.getfilesystemencoding())
4215
osutils.rmtree(dirname)
4217
# We don't want to fail here because some useful display will be lost
4218
# otherwise. Polluting the tmp dir is bad, but not giving all the
4219
# possible info to the test runner is even worse.
4221
ui.ui_factory.clear_term()
4222
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4223
# Ugly, but the last thing we want here is fail, so bear with it.
4224
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4225
).encode('ascii', 'replace')
4226
sys.stderr.write('Unable to remove testing dir %s\n%s'
4227
% (os.path.basename(dirname), printable_e))
4230
class Feature(object):
4231
"""An operating system Feature."""
4234
self._available = None
4236
def available(self):
4237
"""Is the feature available?
4239
:return: True if the feature is available.
4241
if self._available is None:
4242
self._available = self._probe()
4243
return self._available
4246
"""Implement this method in concrete features.
4248
:return: True if the feature is available.
4250
raise NotImplementedError
4253
if getattr(self, 'feature_name', None):
4254
return self.feature_name()
4255
return self.__class__.__name__
4258
class _SymlinkFeature(Feature):
4261
return osutils.has_symlinks()
4263
def feature_name(self):
4266
SymlinkFeature = _SymlinkFeature()
4269
class _HardlinkFeature(Feature):
4272
return osutils.has_hardlinks()
4274
def feature_name(self):
4277
HardlinkFeature = _HardlinkFeature()
4280
class _OsFifoFeature(Feature):
4283
return getattr(os, 'mkfifo', None)
4285
def feature_name(self):
4286
return 'filesystem fifos'
4288
OsFifoFeature = _OsFifoFeature()
4291
class _UnicodeFilenameFeature(Feature):
4292
"""Does the filesystem support Unicode filenames?"""
4296
# Check for character combinations unlikely to be covered by any
4297
# single non-unicode encoding. We use the characters
4298
# - greek small letter alpha (U+03B1) and
4299
# - braille pattern dots-123456 (U+283F).
4300
os.stat(u'\u03b1\u283f')
4301
except UnicodeEncodeError:
4303
except (IOError, OSError):
4304
# The filesystem allows the Unicode filename but the file doesn't
4308
# The filesystem allows the Unicode filename and the file exists,
4312
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4315
class _CompatabilityThunkFeature(Feature):
4316
"""This feature is just a thunk to another feature.
4318
It issues a deprecation warning if it is accessed, to let you know that you
4319
should really use a different feature.
4322
def __init__(self, dep_version, module, name,
4323
replacement_name, replacement_module=None):
4324
super(_CompatabilityThunkFeature, self).__init__()
4325
self._module = module
4326
if replacement_module is None:
4327
replacement_module = module
4328
self._replacement_module = replacement_module
4330
self._replacement_name = replacement_name
4331
self._dep_version = dep_version
4332
self._feature = None
4335
if self._feature is None:
4336
depr_msg = self._dep_version % ('%s.%s'
4337
% (self._module, self._name))
4338
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4339
self._replacement_name)
4340
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4341
# Import the new feature and use it as a replacement for the
4343
self._feature = pyutils.get_named_object(
4344
self._replacement_module, self._replacement_name)
4348
return self._feature._probe()
4351
class ModuleAvailableFeature(Feature):
4352
"""This is a feature than describes a module we want to be available.
4354
Declare the name of the module in __init__(), and then after probing, the
4355
module will be available as 'self.module'.
4357
:ivar module: The module if it is available, else None.
4360
def __init__(self, module_name):
4361
super(ModuleAvailableFeature, self).__init__()
4362
self.module_name = module_name
4366
self._module = __import__(self.module_name, {}, {}, [''])
4373
if self.available(): # Make sure the probe has been done
4377
def feature_name(self):
4378
return self.module_name
4381
def probe_unicode_in_user_encoding():
4382
"""Try to encode several unicode strings to use in unicode-aware tests.
4383
Return first successfull match.
4385
:return: (unicode value, encoded plain string value) or (None, None)
4387
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4388
for uni_val in possible_vals:
4390
str_val = uni_val.encode(osutils.get_user_encoding())
4391
except UnicodeEncodeError:
4392
# Try a different character
4395
return uni_val, str_val
4399
def probe_bad_non_ascii(encoding):
4400
"""Try to find [bad] character with code [128..255]
4401
that cannot be decoded to unicode in some encoding.
4402
Return None if all non-ascii characters is valid
4405
for i in xrange(128, 256):
4408
char.decode(encoding)
4409
except UnicodeDecodeError:
4414
class _HTTPSServerFeature(Feature):
4415
"""Some tests want an https Server, check if one is available.
4417
Right now, the only way this is available is under python2.6 which provides
4428
def feature_name(self):
4429
return 'HTTPSServer'
4432
HTTPSServerFeature = _HTTPSServerFeature()
4435
class _UnicodeFilename(Feature):
4436
"""Does the filesystem support Unicode filenames?"""
4441
except UnicodeEncodeError:
4443
except (IOError, OSError):
4444
# The filesystem allows the Unicode filename but the file doesn't
4448
# The filesystem allows the Unicode filename and the file exists,
4452
UnicodeFilename = _UnicodeFilename()
4455
class _ByteStringNamedFilesystem(Feature):
4456
"""Is the filesystem based on bytes?"""
4459
if os.name == "posix":
4463
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4466
class _UTF8Filesystem(Feature):
4467
"""Is the filesystem UTF-8?"""
4470
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4474
UTF8Filesystem = _UTF8Filesystem()
4477
class _BreakinFeature(Feature):
4478
"""Does this platform support the breakin feature?"""
4481
from bzrlib import breakin
4482
if breakin.determine_signal() is None:
4484
if sys.platform == 'win32':
4485
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4486
# We trigger SIGBREAK via a Console api so we need ctypes to
4487
# access the function
4494
def feature_name(self):
4495
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4498
BreakinFeature = _BreakinFeature()
4501
class _CaseInsCasePresFilenameFeature(Feature):
4502
"""Is the file-system case insensitive, but case-preserving?"""
4505
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4507
# first check truly case-preserving for created files, then check
4508
# case insensitive when opening existing files.
4509
name = osutils.normpath(name)
4510
base, rel = osutils.split(name)
4511
found_rel = osutils.canonical_relpath(base, name)
4512
return (found_rel == rel
4513
and os.path.isfile(name.upper())
4514
and os.path.isfile(name.lower()))
4519
def feature_name(self):
4520
return "case-insensitive case-preserving filesystem"
4522
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4525
class _CaseInsensitiveFilesystemFeature(Feature):
4526
"""Check if underlying filesystem is case-insensitive but *not* case
4529
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4530
# more likely to be case preserving, so this case is rare.
4533
if CaseInsCasePresFilenameFeature.available():
4536
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4537
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4538
TestCaseWithMemoryTransport.TEST_ROOT = root
4540
root = TestCaseWithMemoryTransport.TEST_ROOT
4541
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4543
name_a = osutils.pathjoin(tdir, 'a')
4544
name_A = osutils.pathjoin(tdir, 'A')
4546
result = osutils.isdir(name_A)
4547
_rmtree_temp_dir(tdir)
4550
def feature_name(self):
4551
return 'case-insensitive filesystem'
4553
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4556
class _CaseSensitiveFilesystemFeature(Feature):
4559
if CaseInsCasePresFilenameFeature.available():
4561
elif CaseInsensitiveFilesystemFeature.available():
4566
def feature_name(self):
4567
return 'case-sensitive filesystem'
4569
# new coding style is for feature instances to be lowercase
4570
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4573
# Only define SubUnitBzrRunner if subunit is available.
4575
from subunit import TestProtocolClient
4576
from subunit.test_results import AutoTimingTestResultDecorator
4577
class SubUnitBzrProtocolClient(TestProtocolClient):
4579
def addSuccess(self, test, details=None):
4580
# The subunit client always includes the details in the subunit
4581
# stream, but we don't want to include it in ours.
4582
if details is not None and 'log' in details:
4584
return super(SubUnitBzrProtocolClient, self).addSuccess(
4587
class SubUnitBzrRunner(TextTestRunner):
4588
def run(self, test):
4589
result = AutoTimingTestResultDecorator(
4590
SubUnitBzrProtocolClient(self.stream))
4596
class _PosixPermissionsFeature(Feature):
4600
# create temporary file and check if specified perms are maintained.
4603
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4604
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4607
os.chmod(name, write_perms)
4609
read_perms = os.stat(name).st_mode & 0777
4611
return (write_perms == read_perms)
4613
return (os.name == 'posix') and has_perms()
4615
def feature_name(self):
4616
return 'POSIX permissions support'
4618
posix_permissions_feature = _PosixPermissionsFeature()