166
254
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
167
255
self._bench_history = bench_history
169
def extractBenchmarkTime(self, testCase):
256
self.ui = ui.ui_factory
259
self.failure_count = 0
260
self.known_failure_count = 0
262
self.not_applicable_count = 0
263
self.unsupported = {}
265
self._overall_start_time = time.time()
266
self._strict = strict
267
self._first_thread_leaker_id = None
268
self._tests_leaking_threads_count = 0
269
self._traceback_from_test = None
271
def stopTestRun(self):
274
stopTime = time.time()
275
timeTaken = stopTime - self.startTime
276
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
277
# the parent class method is similar have to duplicate
278
self._show_list('ERROR', self.errors)
279
self._show_list('FAIL', self.failures)
280
self.stream.write(self.sep2)
281
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
282
run, run != 1 and "s" or "", timeTaken))
283
if not self.wasSuccessful():
284
self.stream.write("FAILED (")
285
failed, errored = map(len, (self.failures, self.errors))
287
self.stream.write("failures=%d" % failed)
289
if failed: self.stream.write(", ")
290
self.stream.write("errors=%d" % errored)
291
if self.known_failure_count:
292
if failed or errored: self.stream.write(", ")
293
self.stream.write("known_failure_count=%d" %
294
self.known_failure_count)
295
self.stream.write(")\n")
297
if self.known_failure_count:
298
self.stream.write("OK (known_failures=%d)\n" %
299
self.known_failure_count)
301
self.stream.write("OK\n")
302
if self.skip_count > 0:
303
skipped = self.skip_count
304
self.stream.write('%d test%s skipped\n' %
305
(skipped, skipped != 1 and "s" or ""))
307
for feature, count in sorted(self.unsupported.items()):
308
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
311
ok = self.wasStrictlySuccessful()
313
ok = self.wasSuccessful()
314
if self._first_thread_leaker_id:
316
'%s is leaking threads among %d leaking tests.\n' % (
317
self._first_thread_leaker_id,
318
self._tests_leaking_threads_count))
319
# We don't report the main thread as an active one.
321
'%d non-main threads were left active in the end.\n'
322
% (len(self._active_threads) - 1))
324
def getDescription(self, test):
327
def _extractBenchmarkTime(self, testCase, details=None):
170
328
"""Add a benchmark time for the current test case."""
171
self._benchmarkTime = getattr(testCase, "_benchtime", None)
329
if details and 'benchtime' in details:
330
return float(''.join(details['benchtime'].iter_bytes()))
331
return getattr(testCase, "_benchtime", None)
173
333
def _elapsedTestTimeString(self):
174
334
"""Return a time string for the overall time the current test has taken."""
175
return self._formatTime(time.time() - self._start_time)
335
return self._formatTime(self._delta_to_float(
336
self._now() - self._start_datetime))
177
def _testTimeString(self):
178
if self._benchmarkTime is not None:
180
self._formatTime(self._benchmarkTime),
181
self._elapsedTestTimeString())
338
def _testTimeString(self, testCase):
339
benchmark_time = self._extractBenchmarkTime(testCase)
340
if benchmark_time is not None:
341
return self._formatTime(benchmark_time) + "*"
183
return " %s" % self._elapsedTestTimeString()
343
return self._elapsedTestTimeString()
185
345
def _formatTime(self, seconds):
186
346
"""Format seconds as milliseconds with leading spaces."""
187
return "%5dms" % (1000 * seconds)
189
def _ellipsise_unimportant_words(self, a_string, final_width,
191
"""Add ellipses (sp?) for overly long strings.
193
:param keep_start: If true preserve the start of a_string rather
197
if len(a_string) > final_width:
198
result = a_string[:final_width-3] + '...'
202
if len(a_string) > final_width:
203
result = '...' + a_string[3-final_width:]
206
return result.ljust(final_width)
347
# some benchmarks can take thousands of seconds to run, so we need 8
349
return "%8dms" % (1000 * seconds)
351
def _shortened_test_description(self, test):
353
what = re.sub(r'^bzrlib\.tests\.', '', what)
356
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
357
# multiple times in a row, because the handler is added for
358
# each test but the container list is shared between cases.
359
# See lp:498869 lp:625574 and lp:637725 for background.
360
def _record_traceback_from_test(self, exc_info):
361
"""Store the traceback from passed exc_info tuple till"""
362
self._traceback_from_test = exc_info[2]
208
364
def startTest(self, test):
209
unittest.TestResult.startTest(self, test)
210
# In a short description, the important words are in
211
# the beginning, but in an id, the important words are
213
SHOW_DESCRIPTIONS = False
215
if not self.showAll and self.dots and self.pb is not None:
218
final_width = osutils.terminal_width()
219
final_width = final_width - 15 - 8
221
if SHOW_DESCRIPTIONS:
222
what = test.shortDescription()
224
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
227
if what.startswith('bzrlib.tests.'):
229
what = self._ellipsise_unimportant_words(what, final_width)
231
self.stream.write(what)
232
elif self.dots and self.pb is not None:
233
self.pb.update(what, self.testsRun - 1, None)
365
super(ExtendedTestResult, self).startTest(test)
369
self.report_test_start(test)
370
test.number = self.count
235
371
self._recordTestStartTime()
372
# Make testtools cases give us the real traceback on failure
373
addOnException = getattr(test, "addOnException", None)
374
if addOnException is not None:
375
addOnException(self._record_traceback_from_test)
376
# Only check for thread leaks on bzrlib derived test cases
377
if isinstance(test, TestCase):
378
test.addCleanup(self._check_leaked_threads, test)
380
def startTests(self):
381
self.report_tests_starting()
382
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
387
def _check_leaked_threads(self, test):
388
"""See if any threads have leaked since last call
390
A sample of live threads is stored in the _active_threads attribute,
391
when this method runs it compares the current live threads and any not
392
in the previous sample are treated as having leaked.
394
now_active_threads = set(threading.enumerate())
395
threads_leaked = now_active_threads.difference(self._active_threads)
397
self._report_thread_leak(test, threads_leaked, now_active_threads)
398
self._tests_leaking_threads_count += 1
399
if self._first_thread_leaker_id is None:
400
self._first_thread_leaker_id = test.id()
401
self._active_threads = now_active_threads
237
403
def _recordTestStartTime(self):
238
404
"""Record that a test has started."""
239
self._start_time = time.time()
405
self._start_datetime = self._now()
241
407
def addError(self, test, err):
242
if isinstance(err[1], TestSkipped):
243
return self.addSkipped(test, err)
244
unittest.TestResult.addError(self, test, err)
245
# We can only do this if we have one of our TestCases, not if
247
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
248
if setKeepLogfile is not None:
250
self.extractBenchmarkTime(test)
252
self.stream.writeln("ERROR %s" % self._testTimeString())
253
elif self.dots and self.pb is None:
254
self.stream.write('E')
256
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
257
self.pb.note(self._ellipsise_unimportant_words(
258
test.id() + ': ERROR',
259
osutils.terminal_width()))
408
"""Tell result that test finished with an error.
410
Called from the TestCase run() method when the test
411
fails with an unexpected error.
413
self._post_mortem(self._traceback_from_test)
414
super(ExtendedTestResult, self).addError(test, err)
415
self.error_count += 1
416
self.report_error(test, err)
261
417
if self.stop_early:
264
420
def addFailure(self, test, err):
265
unittest.TestResult.addFailure(self, test, err)
266
# We can only do this if we have one of our TestCases, not if
268
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
269
if setKeepLogfile is not None:
271
self.extractBenchmarkTime(test)
273
self.stream.writeln(" FAIL %s" % self._testTimeString())
274
elif self.dots and self.pb is None:
275
self.stream.write('F')
277
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
278
self.pb.note(self._ellipsise_unimportant_words(
279
test.id() + ': FAIL',
280
osutils.terminal_width()))
421
"""Tell result that test failed.
423
Called from the TestCase run() method when the test
424
fails because e.g. an assert() method failed.
426
self._post_mortem(self._traceback_from_test)
427
super(ExtendedTestResult, self).addFailure(test, err)
428
self.failure_count += 1
429
self.report_failure(test, err)
282
430
if self.stop_early:
285
def addSuccess(self, test):
286
self.extractBenchmarkTime(test)
433
def addSuccess(self, test, details=None):
434
"""Tell result that test completed successfully.
436
Called from the TestCase run()
287
438
if self._bench_history is not None:
288
if self._benchmarkTime is not None:
439
benchmark_time = self._extractBenchmarkTime(test, details)
440
if benchmark_time is not None:
289
441
self._bench_history.write("%s %s\n" % (
290
self._formatTime(self._benchmarkTime),
442
self._formatTime(benchmark_time),
293
self.stream.writeln(' OK %s' % self._testTimeString())
294
for bench_called, stats in getattr(test, '_benchcalls', []):
295
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
296
stats.pprint(file=self.stream)
297
elif self.dots and self.pb is None:
298
self.stream.write('~')
300
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
302
unittest.TestResult.addSuccess(self, test)
304
def addSkipped(self, test, skip_excinfo):
305
self.extractBenchmarkTime(test)
307
print >>self.stream, ' SKIP %s' % self._testTimeString()
308
print >>self.stream, ' %s' % skip_excinfo[1]
309
elif self.dots and self.pb is None:
310
self.stream.write('S')
312
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
314
# seems best to treat this as success from point-of-view of unittest
315
# -- it actually does nothing so it barely matters :)
318
except KeyboardInterrupt:
321
self.addError(test, test.__exc_info())
323
unittest.TestResult.addSuccess(self, test)
325
def printErrorList(self, flavour, errors):
326
for test, err in errors:
327
self.stream.writeln(self.separator1)
328
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
329
if getattr(test, '_get_log', None) is not None:
331
print >>self.stream, \
332
('vvvv[log from %s]' % test.id()).ljust(78,'-')
333
print >>self.stream, test._get_log()
334
print >>self.stream, \
335
('^^^^[log from %s]' % test.id()).ljust(78,'-')
336
self.stream.writeln(self.separator2)
337
self.stream.writeln("%s" % err)
444
self.report_success(test)
445
super(ExtendedTestResult, self).addSuccess(test)
446
test._log_contents = ''
448
def addExpectedFailure(self, test, err):
449
self.known_failure_count += 1
450
self.report_known_failure(test, err)
452
def addNotSupported(self, test, feature):
453
"""The test will not be run because of a missing feature.
455
# this can be called in two different ways: it may be that the
456
# test started running, and then raised (through requireFeature)
457
# UnavailableFeature. Alternatively this method can be called
458
# while probing for features before running the test code proper; in
459
# that case we will see startTest and stopTest, but the test will
460
# never actually run.
461
self.unsupported.setdefault(str(feature), 0)
462
self.unsupported[str(feature)] += 1
463
self.report_unsupported(test, feature)
465
def addSkip(self, test, reason):
466
"""A test has not run for 'reason'."""
468
self.report_skip(test, reason)
470
def addNotApplicable(self, test, reason):
471
self.not_applicable_count += 1
472
self.report_not_applicable(test, reason)
474
def _post_mortem(self, tb=None):
475
"""Start a PDB post mortem session."""
476
if os.environ.get('BZR_TEST_PDB', None):
480
def progress(self, offset, whence):
481
"""The test is adjusting the count of tests to run."""
482
if whence == SUBUNIT_SEEK_SET:
483
self.num_tests = offset
484
elif whence == SUBUNIT_SEEK_CUR:
485
self.num_tests += offset
487
raise errors.BzrError("Unknown whence %r" % whence)
489
def report_tests_starting(self):
490
"""Display information before the test run begins"""
491
if getattr(sys, 'frozen', None) is None:
492
bzr_path = osutils.realpath(sys.argv[0])
494
bzr_path = sys.executable
496
'bzr selftest: %s\n' % (bzr_path,))
499
bzrlib.__path__[0],))
501
' bzr-%s python-%s %s\n' % (
502
bzrlib.version_string,
503
bzrlib._format_version_tuple(sys.version_info),
504
platform.platform(aliased=1),
506
self.stream.write('\n')
508
def report_test_start(self, test):
509
"""Display information on the test just about to be run"""
511
def _report_thread_leak(self, test, leaked_threads, active_threads):
512
"""Display information on a test that leaked one or more threads"""
513
# GZ 2010-09-09: A leak summary reported separately from the general
514
# thread debugging would be nice. Tests under subunit
515
# need something not using stream, perhaps adding a
516
# testtools details object would be fitting.
517
if 'threads' in selftest_debug_flags:
518
self.stream.write('%s is leaking, active is now %d\n' %
519
(test.id(), len(active_threads)))
521
def startTestRun(self):
522
self.startTime = time.time()
524
def report_success(self, test):
527
def wasStrictlySuccessful(self):
528
if self.unsupported or self.known_failure_count:
530
return self.wasSuccessful()
533
class TextTestResult(ExtendedTestResult):
534
"""Displays progress and results of tests in text form"""
536
def __init__(self, stream, descriptions, verbosity,
541
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
542
bench_history, strict)
543
# We no longer pass them around, but just rely on the UIFactory stack
546
warnings.warn("Passing pb to TextTestResult is deprecated")
547
self.pb = self.ui.nested_progress_bar()
548
self.pb.show_pct = False
549
self.pb.show_spinner = False
550
self.pb.show_eta = False,
551
self.pb.show_count = False
552
self.pb.show_bar = False
553
self.pb.update_latency = 0
554
self.pb.show_transport_activity = False
556
def stopTestRun(self):
557
# called when the tests that are going to run have run
560
super(TextTestResult, self).stopTestRun()
562
def report_tests_starting(self):
563
super(TextTestResult, self).report_tests_starting()
564
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
566
def _progress_prefix_text(self):
567
# the longer this text, the less space we have to show the test
569
a = '[%d' % self.count # total that have been run
570
# tests skipped as known not to be relevant are not important enough
572
## if self.skip_count:
573
## a += ', %d skip' % self.skip_count
574
## if self.known_failure_count:
575
## a += '+%dX' % self.known_failure_count
577
a +='/%d' % self.num_tests
579
runtime = time.time() - self._overall_start_time
581
a += '%dm%ds' % (runtime / 60, runtime % 60)
584
total_fail_count = self.error_count + self.failure_count
586
a += ', %d failed' % total_fail_count
587
# if self.unsupported:
588
# a += ', %d missing' % len(self.unsupported)
592
def report_test_start(self, test):
594
self._progress_prefix_text()
596
+ self._shortened_test_description(test))
598
def _test_description(self, test):
599
return self._shortened_test_description(test)
601
def report_error(self, test, err):
602
self.stream.write('ERROR: %s\n %s\n' % (
603
self._test_description(test),
607
def report_failure(self, test, err):
608
self.stream.write('FAIL: %s\n %s\n' % (
609
self._test_description(test),
613
def report_known_failure(self, test, err):
616
def report_skip(self, test, reason):
619
def report_not_applicable(self, test, reason):
622
def report_unsupported(self, test, feature):
623
"""test cannot be run because feature is missing."""
626
class VerboseTestResult(ExtendedTestResult):
627
"""Produce long output, with one line per test run plus times"""
629
def _ellipsize_to_right(self, a_string, final_width):
630
"""Truncate and pad a string, keeping the right hand side"""
631
if len(a_string) > final_width:
632
result = '...' + a_string[3-final_width:]
635
return result.ljust(final_width)
637
def report_tests_starting(self):
638
self.stream.write('running %d tests...\n' % self.num_tests)
639
super(VerboseTestResult, self).report_tests_starting()
641
def report_test_start(self, test):
642
name = self._shortened_test_description(test)
643
width = osutils.terminal_width()
644
if width is not None:
645
# width needs space for 6 char status, plus 1 for slash, plus an
646
# 11-char time string, plus a trailing blank
647
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
649
self.stream.write(self._ellipsize_to_right(name, width-18))
651
self.stream.write(name)
654
def _error_summary(self, err):
656
return '%s%s' % (indent, err[1])
658
def report_error(self, test, err):
659
self.stream.write('ERROR %s\n%s\n'
660
% (self._testTimeString(test),
661
self._error_summary(err)))
663
def report_failure(self, test, err):
664
self.stream.write(' FAIL %s\n%s\n'
665
% (self._testTimeString(test),
666
self._error_summary(err)))
668
def report_known_failure(self, test, err):
669
self.stream.write('XFAIL %s\n%s\n'
670
% (self._testTimeString(test),
671
self._error_summary(err)))
673
def report_success(self, test):
674
self.stream.write(' OK %s\n' % self._testTimeString(test))
675
for bench_called, stats in getattr(test, '_benchcalls', []):
676
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
677
stats.pprint(file=self.stream)
678
# flush the stream so that we get smooth output. This verbose mode is
679
# used to show the output in PQM.
682
def report_skip(self, test, reason):
683
self.stream.write(' SKIP %s\n%s\n'
684
% (self._testTimeString(test), reason))
686
def report_not_applicable(self, test, reason):
687
self.stream.write(' N/A %s\n %s\n'
688
% (self._testTimeString(test), reason))
690
def report_unsupported(self, test, feature):
691
"""test cannot be run because feature is missing."""
692
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
693
%(self._testTimeString(test), feature))
340
696
class TextTestRunner(object):
482
911
retrieved by _get_log(). We use a real OS file, not an in-memory object,
483
912
so that it can also capture file IO. When the test completes this file
484
913
is read into memory and removed from disk.
486
915
There are also convenience functions to invoke bzr's command-line
487
916
routine, and to build and check bzr trees.
489
918
In addition to the usual method of overriding tearDown(), this class also
490
allows subclasses to register functions into the _cleanups list, which is
919
allows subclasses to register cleanup functions via addCleanup, which are
491
920
run in order as the object is torn down. It's less likely this will be
492
921
accidentally overlooked.
495
_log_file_name = None
497
_keep_log_file = False
498
925
# record lsprof data when performing benchmark calls.
499
926
_gather_lsprof_in_benchmarks = False
501
928
def __init__(self, methodName='testMethod'):
502
929
super(TestCase, self).__init__(methodName)
930
self._directory_isolation = True
931
self.exception_handlers.insert(0,
932
(UnavailableFeature, self._do_unsupported_or_skip))
933
self.exception_handlers.insert(0,
934
(TestNotApplicable, self._do_not_applicable))
506
unittest.TestCase.setUp(self)
937
super(TestCase, self).setUp()
938
for feature in getattr(self, '_test_needs_features', []):
939
self.requireFeature(feature)
940
self._log_contents = None
941
self.addDetail("log", content.Content(content.ContentType("text",
942
"plain", {"charset": "utf8"}),
943
lambda:[self._get_log(keep_log_file=True)]))
507
944
self._cleanEnvironment()
508
bzrlib.trace.disable_default_logging()
509
946
self._startLogFile()
510
947
self._benchcalls = []
511
948
self._benchtime = None
950
self._track_transports()
952
self._clear_debug_flags()
953
# Isolate global verbosity level, to make sure it's reproducible
954
# between tests. We should get rid of this altogether: bug 656694. --
956
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
957
# Isolate config option expansion until its default value for bzrlib is
958
# settled on or a the FIXME associated with _get_expand_default_value
959
# is addressed -- vila 20110219
960
self.overrideAttr(config, '_expand_default_value', None)
965
pdb.Pdb().set_trace(sys._getframe().f_back)
967
def discardDetail(self, name):
968
"""Extend the addDetail, getDetails api so we can remove a detail.
970
eg. bzr always adds the 'log' detail at startup, but we don't want to
971
include it for skipped, xfail, etc tests.
973
It is safe to call this for a detail that doesn't exist, in case this
974
gets called multiple times.
976
# We cheat. details is stored in __details which means we shouldn't
977
# touch it. but getDetails() returns the dict directly, so we can
979
details = self.getDetails()
983
def _clear_debug_flags(self):
984
"""Prevent externally set debug flags affecting tests.
986
Tests that want to use debug flags can just set them in the
987
debug_flags set during setup/teardown.
989
# Start with a copy of the current debug flags we can safely modify.
990
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
991
if 'allow_debug' not in selftest_debug_flags:
992
debug.debug_flags.clear()
993
if 'disable_lock_checks' not in selftest_debug_flags:
994
debug.debug_flags.add('strict_locks')
996
def _clear_hooks(self):
997
# prevent hooks affecting tests
998
known_hooks = hooks.known_hooks
999
self._preserved_hooks = {}
1000
for key, (parent, name) in known_hooks.iter_parent_objects():
1001
current_hooks = getattr(parent, name)
1002
self._preserved_hooks[parent] = (name, current_hooks)
1003
self._preserved_lazy_hooks = hooks._lazy_hooks
1004
hooks._lazy_hooks = {}
1005
self.addCleanup(self._restoreHooks)
1006
for key, (parent, name) in known_hooks.iter_parent_objects():
1007
factory = known_hooks.get(key)
1008
setattr(parent, name, factory())
1009
# this hook should always be installed
1010
request._install_hook()
1012
def disable_directory_isolation(self):
1013
"""Turn off directory isolation checks."""
1014
self._directory_isolation = False
1016
def enable_directory_isolation(self):
1017
"""Enable directory isolation checks."""
1018
self._directory_isolation = True
1020
def _silenceUI(self):
1021
"""Turn off UI for duration of test"""
1022
# by default the UI is off; tests can turn it on if they want it.
1023
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1025
def _check_locks(self):
1026
"""Check that all lock take/release actions have been paired."""
1027
# We always check for mismatched locks. If a mismatch is found, we
1028
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1029
# case we just print a warning.
1031
acquired_locks = [lock for action, lock in self._lock_actions
1032
if action == 'acquired']
1033
released_locks = [lock for action, lock in self._lock_actions
1034
if action == 'released']
1035
broken_locks = [lock for action, lock in self._lock_actions
1036
if action == 'broken']
1037
# trivially, given the tests for lock acquistion and release, if we
1038
# have as many in each list, it should be ok. Some lock tests also
1039
# break some locks on purpose and should be taken into account by
1040
# considering that breaking a lock is just a dirty way of releasing it.
1041
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1042
message = ('Different number of acquired and '
1043
'released or broken locks. (%s, %s + %s)' %
1044
(acquired_locks, released_locks, broken_locks))
1045
if not self._lock_check_thorough:
1046
# Rather than fail, just warn
1047
print "Broken test %s: %s" % (self, message)
1051
def _track_locks(self):
1052
"""Track lock activity during tests."""
1053
self._lock_actions = []
1054
if 'disable_lock_checks' in selftest_debug_flags:
1055
self._lock_check_thorough = False
1057
self._lock_check_thorough = True
1059
self.addCleanup(self._check_locks)
1060
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1061
self._lock_acquired, None)
1062
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1063
self._lock_released, None)
1064
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1065
self._lock_broken, None)
1067
def _lock_acquired(self, result):
1068
self._lock_actions.append(('acquired', result))
1070
def _lock_released(self, result):
1071
self._lock_actions.append(('released', result))
1073
def _lock_broken(self, result):
1074
self._lock_actions.append(('broken', result))
1076
def permit_dir(self, name):
1077
"""Permit a directory to be used by this test. See permit_url."""
1078
name_transport = _mod_transport.get_transport(name)
1079
self.permit_url(name)
1080
self.permit_url(name_transport.base)
1082
def permit_url(self, url):
1083
"""Declare that url is an ok url to use in this test.
1085
Do this for memory transports, temporary test directory etc.
1087
Do not do this for the current working directory, /tmp, or any other
1088
preexisting non isolated url.
1090
if not url.endswith('/'):
1092
self._bzr_selftest_roots.append(url)
1094
def permit_source_tree_branch_repo(self):
1095
"""Permit the source tree bzr is running from to be opened.
1097
Some code such as bzrlib.version attempts to read from the bzr branch
1098
that bzr is executing from (if any). This method permits that directory
1099
to be used in the test suite.
1101
path = self.get_source_path()
1102
self.record_directory_isolation()
1105
workingtree.WorkingTree.open(path)
1106
except (errors.NotBranchError, errors.NoWorkingTree):
1107
raise TestSkipped('Needs a working tree of bzr sources')
1109
self.enable_directory_isolation()
1111
def _preopen_isolate_transport(self, transport):
1112
"""Check that all transport openings are done in the test work area."""
1113
while isinstance(transport, pathfilter.PathFilteringTransport):
1114
# Unwrap pathfiltered transports
1115
transport = transport.server.backing_transport.clone(
1116
transport._filter('.'))
1117
url = transport.base
1118
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1119
# urls it is given by prepending readonly+. This is appropriate as the
1120
# client shouldn't know that the server is readonly (or not readonly).
1121
# We could register all servers twice, with readonly+ prepending, but
1122
# that makes for a long list; this is about the same but easier to
1124
if url.startswith('readonly+'):
1125
url = url[len('readonly+'):]
1126
self._preopen_isolate_url(url)
1128
def _preopen_isolate_url(self, url):
1129
if not self._directory_isolation:
1131
if self._directory_isolation == 'record':
1132
self._bzr_selftest_roots.append(url)
1134
# This prevents all transports, including e.g. sftp ones backed on disk
1135
# from working unless they are explicitly granted permission. We then
1136
# depend on the code that sets up test transports to check that they are
1137
# appropriately isolated and enable their use by calling
1138
# self.permit_transport()
1139
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1140
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1141
% (url, self._bzr_selftest_roots))
1143
def record_directory_isolation(self):
1144
"""Gather accessed directories to permit later access.
1146
This is used for tests that access the branch bzr is running from.
1148
self._directory_isolation = "record"
1150
def start_server(self, transport_server, backing_server=None):
1151
"""Start transport_server for this test.
1153
This starts the server, registers a cleanup for it and permits the
1154
server's urls to be used.
1156
if backing_server is None:
1157
transport_server.start_server()
1159
transport_server.start_server(backing_server)
1160
self.addCleanup(transport_server.stop_server)
1161
# Obtain a real transport because if the server supplies a password, it
1162
# will be hidden from the base on the client side.
1163
t = _mod_transport.get_transport(transport_server.get_url())
1164
# Some transport servers effectively chroot the backing transport;
1165
# others like SFTPServer don't - users of the transport can walk up the
1166
# transport to read the entire backing transport. This wouldn't matter
1167
# except that the workdir tests are given - and that they expect the
1168
# server's url to point at - is one directory under the safety net. So
1169
# Branch operations into the transport will attempt to walk up one
1170
# directory. Chrooting all servers would avoid this but also mean that
1171
# we wouldn't be testing directly against non-root urls. Alternatively
1172
# getting the test framework to start the server with a backing server
1173
# at the actual safety net directory would work too, but this then
1174
# means that the self.get_url/self.get_transport methods would need
1175
# to transform all their results. On balance its cleaner to handle it
1176
# here, and permit a higher url when we have one of these transports.
1177
if t.base.endswith('/work/'):
1178
# we have safety net/test root/work
1179
t = t.clone('../..')
1180
elif isinstance(transport_server,
1181
test_server.SmartTCPServer_for_testing):
1182
# The smart server adds a path similar to work, which is traversed
1183
# up from by the client. But the server is chrooted - the actual
1184
# backing transport is not escaped from, and VFS requests to the
1185
# root will error (because they try to escape the chroot).
1187
while t2.base != t.base:
1190
self.permit_url(t.base)
1192
def _track_transports(self):
1193
"""Install checks for transport usage."""
1194
# TestCase has no safe place it can write to.
1195
self._bzr_selftest_roots = []
1196
# Currently the easiest way to be sure that nothing is going on is to
1197
# hook into bzr dir opening. This leaves a small window of error for
1198
# transport tests, but they are well known, and we can improve on this
1200
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1201
self._preopen_isolate_transport, "Check bzr directories are safe.")
513
1203
def _ndiff_strings(self, a, b):
514
1204
"""Return ndiff between two strings containing lines.
516
1206
A trailing newline is added if missing to make the strings
517
1207
print properly."""
518
1208
if b and b[-1] != '\n':
1087
2179
sys.stderr = real_stderr
1088
2180
sys.stdin = real_stdin
1090
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1091
def merge(self, branch_from, wt_to):
1092
"""A helper for tests to do a ui-less merge.
1094
This should move to the main library when someone has time to integrate
1097
# minimal ui-less merge.
1098
wt_to.branch.fetch(branch_from)
1099
base_rev = common_ancestor(branch_from.last_revision(),
1100
wt_to.branch.last_revision(),
1101
wt_to.branch.repository)
1102
merge_inner(wt_to.branch, branch_from.basis_tree(),
1103
wt_to.branch.repository.revision_tree(base_rev),
1105
wt_to.add_parent_tree_id(branch_from.last_revision())
1108
BzrTestBase = TestCase
1111
class TestCaseInTempDir(TestCase):
1112
"""Derived class that runs a test within a temporary directory.
1114
This is useful for tests that need to create a branch, etc.
1116
The directory is created in a slightly complex way: for each
1117
Python invocation, a new temporary top-level directory is created.
1118
All test cases create their own directory within that. If the
1119
tests complete successfully, the directory is removed.
1121
InTempDir is an old alias for FunctionalTestCase.
2182
def reduceLockdirTimeout(self):
2183
"""Reduce the default lock timeout for the duration of the test, so that
2184
if LockContention occurs during a test, it does so quickly.
2186
Tests that expect to provoke LockContention errors should call this.
2188
self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2190
def make_utf8_encoded_stringio(self, encoding_type=None):
2191
"""Return a StringIOWrapper instance, that will encode Unicode
2194
if encoding_type is None:
2195
encoding_type = 'strict'
2197
output_encoding = 'utf-8'
2198
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2199
sio.encoding = output_encoding
2202
def disable_verb(self, verb):
2203
"""Disable a smart server verb for one test."""
2204
from bzrlib.smart import request
2205
request_handlers = request.request_handlers
2206
orig_method = request_handlers.get(verb)
2207
request_handlers.remove(verb)
2208
self.addCleanup(request_handlers.register, verb, orig_method)
2211
class CapturedCall(object):
2212
"""A helper for capturing smart server calls for easy debug analysis."""
2214
def __init__(self, params, prefix_length):
2215
"""Capture the call with params and skip prefix_length stack frames."""
2218
# The last 5 frames are the __init__, the hook frame, and 3 smart
2219
# client frames. Beyond this we could get more clever, but this is good
2221
stack = traceback.extract_stack()[prefix_length:-5]
2222
self.stack = ''.join(traceback.format_list(stack))
2225
return self.call.method
2228
return self.call.method
2234
class TestCaseWithMemoryTransport(TestCase):
2235
"""Common test class for tests that do not need disk resources.
2237
Tests that need disk resources should derive from TestCaseWithTransport.
2239
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2241
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2242
a directory which does not exist. This serves to help ensure test isolation
2243
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2244
must exist. However, TestCaseWithMemoryTransport does not offer local
2245
file defaults for the transport in tests, nor does it obey the command line
2246
override, so tests that accidentally write to the common directory should
2249
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2250
a .bzr directory that stops us ascending higher into the filesystem.
1124
2253
TEST_ROOT = None
1125
2254
_TEST_NAME = 'test'
1126
OVERRIDE_PYTHON = 'python'
1128
def check_file_contents(self, filename, expect):
1129
self.log("check contents of file %s" % filename)
1130
contents = file(filename, 'r').read()
1131
if contents != expect:
1132
self.log("expected: %r" % expect)
1133
self.log("actually: %r" % contents)
1134
self.fail("contents of %s not as expected" % filename)
1136
def _make_test_root(self):
1137
if TestCaseInTempDir.TEST_ROOT is not None:
1141
root = u'test%04d.tmp' % i
1145
if e.errno == errno.EEXIST:
1150
# successfully created
1151
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
1153
# make a fake bzr directory there to prevent any tests propagating
1154
# up onto the source directory's real branch
1155
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
1158
super(TestCaseInTempDir, self).setUp()
1159
self._make_test_root()
1160
_currentdir = os.getcwdu()
1161
# shorten the name, to avoid test failures due to path length
1162
short_id = self.id().replace('bzrlib.tests.', '') \
1163
.replace('__main__.', '')[-100:]
1164
# it's possible the same test class is run several times for
1165
# parameterized tests, so make sure the names don't collide.
1169
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1171
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1172
if os.path.exists(candidate_dir):
1176
os.mkdir(candidate_dir)
1177
self.test_home_dir = candidate_dir + '/home'
1178
os.mkdir(self.test_home_dir)
1179
self.test_dir = candidate_dir + '/work'
1180
os.mkdir(self.test_dir)
1181
os.chdir(self.test_dir)
1183
os.environ['HOME'] = self.test_home_dir
1184
os.environ['APPDATA'] = self.test_home_dir
1185
def _leaveDirectory():
1186
os.chdir(_currentdir)
1187
self.addCleanup(_leaveDirectory)
1189
def build_tree(self, shape, line_endings='native', transport=None):
1190
"""Build a test tree according to a pattern.
1192
shape is a sequence of file specifications. If the final
1193
character is '/', a directory is created.
1195
This assumes that all the elements in the tree being built are new.
1197
This doesn't add anything to a branch.
1198
:param line_endings: Either 'binary' or 'native'
1199
in binary mode, exact contents are written
1200
in native mode, the line endings match the
1201
default platform endings.
1203
:param transport: A transport to write to, for building trees on
1204
VFS's. If the transport is readonly or None,
1205
"." is opened automatically.
1207
# It's OK to just create them using forward slashes on windows.
1208
if transport is None or transport.is_readonly():
1209
transport = get_transport(".")
1211
self.assert_(isinstance(name, basestring))
1213
transport.mkdir(urlutils.escape(name[:-1]))
1215
if line_endings == 'binary':
1217
elif line_endings == 'native':
1220
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
1221
content = "contents of %s%s" % (name.encode('utf-8'), end)
1222
# Technically 'put()' is the right command. However, put
1223
# uses an AtomicFile, which requires an extra rename into place
1224
# As long as the files didn't exist in the past, append() will
1225
# do the same thing as put()
1226
# On jam's machine, make_kernel_like_tree is:
1227
# put: 4.5-7.5s (averaging 6s)
1229
# put_non_atomic: 2.9-4.5s
1230
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1232
def build_tree_contents(self, shape):
1233
build_tree_contents(shape)
1235
def failUnlessExists(self, path):
1236
"""Fail unless path, which may be abs or relative, exists."""
1237
self.failUnless(osutils.lexists(path))
1239
def failIfExists(self, path):
1240
"""Fail if path, which may be abs or relative, exists."""
1241
self.failIf(osutils.lexists(path))
1243
def assertFileEqual(self, content, path):
1244
"""Fail if path does not contain 'content'."""
1245
self.failUnless(osutils.lexists(path))
1246
# TODO: jam 20060427 Shouldn't this be 'rb'?
1247
self.assertEqualDiff(content, open(path, 'r').read())
1250
class TestCaseWithTransport(TestCaseInTempDir):
1251
"""A test case that provides get_url and get_readonly_url facilities.
1253
These back onto two transport servers, one for readonly access and one for
1256
If no explicit class is provided for readonly access, a
1257
ReadonlyTransportDecorator is used instead which allows the use of non disk
1258
based read write transports.
1260
If an explicit class is provided for readonly access, that server and the
1261
readwrite one must both define get_url() as resolving to os.getcwd().
1264
def __init__(self, methodName='testMethod'):
1265
super(TestCaseWithTransport, self).__init__(methodName)
1266
self.__readonly_server = None
1267
self.__server = None
1268
self.transport_server = default_transport
2256
def __init__(self, methodName='runTest'):
2257
# allow test parameterization after test construction and before test
2258
# execution. Variables that the parameterizer sets need to be
2259
# ones that are not set by setUp, or setUp will trash them.
2260
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2261
self.vfs_transport_factory = default_transport
2262
self.transport_server = None
1269
2263
self.transport_readonly_server = None
1271
def get_readonly_url(self, relpath=None):
1272
"""Get a URL for the readonly transport.
1274
This will either be backed by '.' or a decorator to the transport
1275
used by self.get_url()
1276
relpath provides for clients to get a path relative to the base url.
1277
These should only be downwards relative, not upwards.
1279
base = self.get_readonly_server().get_url()
1280
if relpath is not None:
1281
if not base.endswith('/'):
1283
base = base + relpath
2264
self.__vfs_server = None
2266
def get_transport(self, relpath=None):
2267
"""Return a writeable transport.
2269
This transport is for the test scratch space relative to
2272
:param relpath: a path relative to the base url.
2274
t = _mod_transport.get_transport(self.get_url(relpath))
2275
self.assertFalse(t.is_readonly())
2278
def get_readonly_transport(self, relpath=None):
2279
"""Return a readonly transport for the test scratch space
2281
This can be used to test that operations which should only need
2282
readonly access in fact do not try to write.
2284
:param relpath: a path relative to the base url.
2286
t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2287
self.assertTrue(t.is_readonly())
2290
def create_transport_readonly_server(self):
2291
"""Create a transport server from class defined at init.
2293
This is mostly a hook for daughter classes.
2295
return self.transport_readonly_server()
1286
2297
def get_readonly_server(self):
1287
2298
"""Get the server instance for the readonly transport
1361
2470
# might be a relative or absolute path
1362
2471
maybe_a_url = self.get_url(relpath)
1363
2472
segments = maybe_a_url.rsplit('/', 1)
1364
t = get_transport(maybe_a_url)
2473
t = _mod_transport.get_transport(maybe_a_url)
1365
2474
if len(segments) > 1 and segments[-1] not in ('', '.'):
1368
except errors.FileExists:
1370
2476
if format is None:
1371
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
2478
if isinstance(format, basestring):
2479
format = bzrdir.format_registry.make_bzrdir(format)
1372
2480
return format.initialize_on_transport(t)
1373
2481
except errors.UninitializableFormat:
1374
2482
raise TestSkipped("Format %s is not initializable." % format)
1376
2484
def make_repository(self, relpath, shared=False, format=None):
1377
"""Create a repository on our default transport at relpath."""
2485
"""Create a repository on our default transport at relpath.
2487
Note that relpath must be a relative path, not a full url.
2489
# FIXME: If you create a remoterepository this returns the underlying
2490
# real format, which is incorrect. Actually we should make sure that
2491
# RemoteBzrDir returns a RemoteRepository.
2492
# maybe mbp 20070410
1378
2493
made_control = self.make_bzrdir(relpath, format=format)
1379
2494
return made_control.create_repository(shared=shared)
1381
def make_branch_and_memory_tree(self, relpath):
2496
def make_smart_server(self, path, backing_server=None):
2497
if backing_server is None:
2498
backing_server = self.get_server()
2499
smart_server = test_server.SmartTCPServer_for_testing()
2500
self.start_server(smart_server, backing_server)
2501
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2503
return remote_transport
2505
def make_branch_and_memory_tree(self, relpath, format=None):
1382
2506
"""Create a branch on the default transport and a MemoryTree for it."""
1383
b = self.make_branch(relpath)
2507
b = self.make_branch(relpath, format=format)
1384
2508
return memorytree.MemoryTree.create_on_branch(b)
2510
def make_branch_builder(self, relpath, format=None):
2511
branch = self.make_branch(relpath, format=format)
2512
return branchbuilder.BranchBuilder(branch=branch)
2514
def overrideEnvironmentForTesting(self):
2515
test_home_dir = self.test_home_dir
2516
if isinstance(test_home_dir, unicode):
2517
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2518
self.overrideEnv('HOME', test_home_dir)
2519
self.overrideEnv('BZR_HOME', test_home_dir)
2522
super(TestCaseWithMemoryTransport, self).setUp()
2523
# Ensure that ConnectedTransport doesn't leak sockets
2524
def get_transport_with_cleanup(*args, **kwargs):
2525
t = orig_get_transport(*args, **kwargs)
2526
if isinstance(t, _mod_transport.ConnectedTransport):
2527
self.addCleanup(t.disconnect)
2530
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2531
get_transport_with_cleanup)
2532
self._make_test_root()
2533
self.addCleanup(os.chdir, os.getcwdu())
2534
self.makeAndChdirToTestDir()
2535
self.overrideEnvironmentForTesting()
2536
self.__readonly_server = None
2537
self.__server = None
2538
self.reduceLockdirTimeout()
2540
def setup_smart_server_with_call_log(self):
2541
"""Sets up a smart server as the transport server with a call log."""
2542
self.transport_server = test_server.SmartTCPServer_for_testing
2543
self.hpss_calls = []
2545
# Skip the current stack down to the caller of
2546
# setup_smart_server_with_call_log
2547
prefix_length = len(traceback.extract_stack()) - 2
2548
def capture_hpss_call(params):
2549
self.hpss_calls.append(
2550
CapturedCall(params, prefix_length))
2551
client._SmartClient.hooks.install_named_hook(
2552
'call', capture_hpss_call, None)
2554
def reset_smart_call_log(self):
2555
self.hpss_calls = []
2558
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2559
"""Derived class that runs a test within a temporary directory.
2561
This is useful for tests that need to create a branch, etc.
2563
The directory is created in a slightly complex way: for each
2564
Python invocation, a new temporary top-level directory is created.
2565
All test cases create their own directory within that. If the
2566
tests complete successfully, the directory is removed.
2568
:ivar test_base_dir: The path of the top-level directory for this
2569
test, which contains a home directory and a work directory.
2571
:ivar test_home_dir: An initially empty directory under test_base_dir
2572
which is used as $HOME for this test.
2574
:ivar test_dir: A directory under test_base_dir used as the current
2575
directory when the test proper is run.
2578
OVERRIDE_PYTHON = 'python'
2580
def check_file_contents(self, filename, expect):
2581
self.log("check contents of file %s" % filename)
2587
if contents != expect:
2588
self.log("expected: %r" % expect)
2589
self.log("actually: %r" % contents)
2590
self.fail("contents of %s not as expected" % filename)
2592
def _getTestDirPrefix(self):
2593
# create a directory within the top level test directory
2594
if sys.platform in ('win32', 'cygwin'):
2595
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2596
# windows is likely to have path-length limits so use a short name
2597
name_prefix = name_prefix[-30:]
2599
name_prefix = re.sub('[/]', '_', self.id())
2602
def makeAndChdirToTestDir(self):
2603
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2605
For TestCaseInTempDir we create a temporary directory based on the test
2606
name and then create two subdirs - test and home under it.
2608
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2609
self._getTestDirPrefix())
2611
for i in range(100):
2612
if os.path.exists(name):
2613
name = name_prefix + '_' + str(i)
2615
# now create test and home directories within this dir
2616
self.test_base_dir = name
2617
self.addCleanup(self.deleteTestDir)
2618
os.mkdir(self.test_base_dir)
2620
self.permit_dir(self.test_base_dir)
2621
# 'sprouting' and 'init' of a branch both walk up the tree to find
2622
# stacking policy to honour; create a bzr dir with an unshared
2623
# repository (but not a branch - our code would be trying to escape
2624
# then!) to stop them, and permit it to be read.
2625
# control = bzrdir.BzrDir.create(self.test_base_dir)
2626
# control.create_repository()
2627
self.test_home_dir = self.test_base_dir + '/home'
2628
os.mkdir(self.test_home_dir)
2629
self.test_dir = self.test_base_dir + '/work'
2630
os.mkdir(self.test_dir)
2631
os.chdir(self.test_dir)
2632
# put name of test inside
2633
f = file(self.test_base_dir + '/name', 'w')
2639
def deleteTestDir(self):
2640
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2641
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2643
def build_tree(self, shape, line_endings='binary', transport=None):
2644
"""Build a test tree according to a pattern.
2646
shape is a sequence of file specifications. If the final
2647
character is '/', a directory is created.
2649
This assumes that all the elements in the tree being built are new.
2651
This doesn't add anything to a branch.
2653
:type shape: list or tuple.
2654
:param line_endings: Either 'binary' or 'native'
2655
in binary mode, exact contents are written in native mode, the
2656
line endings match the default platform endings.
2657
:param transport: A transport to write to, for building trees on VFS's.
2658
If the transport is readonly or None, "." is opened automatically.
2661
if type(shape) not in (list, tuple):
2662
raise AssertionError("Parameter 'shape' should be "
2663
"a list or a tuple. Got %r instead" % (shape,))
2664
# It's OK to just create them using forward slashes on windows.
2665
if transport is None or transport.is_readonly():
2666
transport = _mod_transport.get_transport(".")
2668
self.assertIsInstance(name, basestring)
2670
transport.mkdir(urlutils.escape(name[:-1]))
2672
if line_endings == 'binary':
2674
elif line_endings == 'native':
2677
raise errors.BzrError(
2678
'Invalid line ending request %r' % line_endings)
2679
content = "contents of %s%s" % (name.encode('utf-8'), end)
2680
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2682
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2684
def assertInWorkingTree(self, path, root_path='.', tree=None):
2685
"""Assert whether path or paths are in the WorkingTree"""
2687
tree = workingtree.WorkingTree.open(root_path)
2688
if not isinstance(path, basestring):
2690
self.assertInWorkingTree(p, tree=tree)
2692
self.assertIsNot(tree.path2id(path), None,
2693
path+' not in working tree.')
2695
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2696
"""Assert whether path or paths are not in the WorkingTree"""
2698
tree = workingtree.WorkingTree.open(root_path)
2699
if not isinstance(path, basestring):
2701
self.assertNotInWorkingTree(p,tree=tree)
2703
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2706
class TestCaseWithTransport(TestCaseInTempDir):
2707
"""A test case that provides get_url and get_readonly_url facilities.
2709
These back onto two transport servers, one for readonly access and one for
2712
If no explicit class is provided for readonly access, a
2713
ReadonlyTransportDecorator is used instead which allows the use of non disk
2714
based read write transports.
2716
If an explicit class is provided for readonly access, that server and the
2717
readwrite one must both define get_url() as resolving to os.getcwd().
2720
def get_vfs_only_server(self):
2721
"""See TestCaseWithMemoryTransport.
2723
This is useful for some tests with specific servers that need
2726
if self.__vfs_server is None:
2727
self.__vfs_server = self.vfs_transport_factory()
2728
self.start_server(self.__vfs_server)
2729
return self.__vfs_server
1386
2731
def make_branch_and_tree(self, relpath, format=None):
1387
2732
"""Create a branch on the transport and a tree locally.
1389
2734
If the transport is not a LocalTransport, the Tree can't be created on
1390
the transport. In that case the working tree is created in the local
1391
directory, and the returned tree's branch and repository will also be
2735
the transport. In that case if the vfs_transport_factory is
2736
LocalURLServer the working tree is created in the local
2737
directory backing the transport, and the returned tree's branch and
2738
repository will also be accessed locally. Otherwise a lightweight
2739
checkout is created and returned.
1394
This will fail if the original default transport for this test
1395
case wasn't backed by the working directory, as the branch won't
1396
be on disk for us to open it.
2741
We do this because we can't physically create a tree in the local
2742
path, with a branch reference to the transport_factory url, and
2743
a branch + repository in the vfs_transport, unless the vfs_transport
2744
namespace is distinct from the local disk - the two branch objects
2745
would collide. While we could construct a tree with its branch object
2746
pointing at the transport_factory transport in memory, reopening it
2747
would behaving unexpectedly, and has in the past caused testing bugs
2748
when we tried to do it that way.
1398
2750
:param format: The BzrDirFormat.
1399
2751
:returns: the WorkingTree.
1439
2821
for readonly urls.
1441
2823
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1442
be used without needed to redo it when a different
2824
be used without needed to redo it when a different
1443
2825
subclass is in use ?
1446
2828
def setUp(self):
2829
from bzrlib.tests import http_server
1447
2830
super(ChrootedTestCase, self).setUp()
1448
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1449
self.transport_readonly_server = bzrlib.transport.http.HttpServer
2831
if not self.vfs_transport_factory == memory.MemoryServer:
2832
self.transport_readonly_server = http_server.HttpServer
2835
def condition_id_re(pattern):
2836
"""Create a condition filter which performs a re check on a test's id.
2838
:param pattern: A regular expression string.
2839
:return: A callable that returns True if the re matches.
2841
filter_re = re.compile(pattern, 0)
2842
def condition(test):
2844
return filter_re.search(test_id)
2848
def condition_isinstance(klass_or_klass_list):
2849
"""Create a condition filter which returns isinstance(param, klass).
2851
:return: A callable which when called with one parameter obj return the
2852
result of isinstance(obj, klass_or_klass_list).
2855
return isinstance(obj, klass_or_klass_list)
2859
def condition_id_in_list(id_list):
2860
"""Create a condition filter which verify that test's id in a list.
2862
:param id_list: A TestIdList object.
2863
:return: A callable that returns True if the test's id appears in the list.
2865
def condition(test):
2866
return id_list.includes(test.id())
2870
def condition_id_startswith(starts):
2871
"""Create a condition filter verifying that test's id starts with a string.
2873
:param starts: A list of string.
2874
:return: A callable that returns True if the test's id starts with one of
2877
def condition(test):
2878
for start in starts:
2879
if test.id().startswith(start):
2885
def exclude_tests_by_condition(suite, condition):
2886
"""Create a test suite which excludes some tests from suite.
2888
:param suite: The suite to get tests from.
2889
:param condition: A callable whose result evaluates True when called with a
2890
test case which should be excluded from the result.
2891
:return: A suite which contains the tests found in suite that fail
2895
for test in iter_suite_tests(suite):
2896
if not condition(test):
2898
return TestUtil.TestSuite(result)
2901
def filter_suite_by_condition(suite, condition):
2902
"""Create a test suite by filtering another one.
2904
:param suite: The source suite.
2905
:param condition: A callable whose result evaluates True when called with a
2906
test case which should be included in the result.
2907
:return: A suite which contains the tests found in suite that pass
2911
for test in iter_suite_tests(suite):
2914
return TestUtil.TestSuite(result)
1452
2917
def filter_suite_by_re(suite, pattern):
1453
result = TestUtil.TestSuite()
1454
filter_re = re.compile(pattern)
2918
"""Create a test suite by filtering another one.
2920
:param suite: the source suite
2921
:param pattern: pattern that names must match
2922
:returns: the newly created suite
2924
condition = condition_id_re(pattern)
2925
result_suite = filter_suite_by_condition(suite, condition)
2929
def filter_suite_by_id_list(suite, test_id_list):
2930
"""Create a test suite by filtering another one.
2932
:param suite: The source suite.
2933
:param test_id_list: A list of the test ids to keep as strings.
2934
:returns: the newly created suite
2936
condition = condition_id_in_list(test_id_list)
2937
result_suite = filter_suite_by_condition(suite, condition)
2941
def filter_suite_by_id_startswith(suite, start):
2942
"""Create a test suite by filtering another one.
2944
:param suite: The source suite.
2945
:param start: A list of string the test id must start with one of.
2946
:returns: the newly created suite
2948
condition = condition_id_startswith(start)
2949
result_suite = filter_suite_by_condition(suite, condition)
2953
def exclude_tests_by_re(suite, pattern):
2954
"""Create a test suite which excludes some tests from suite.
2956
:param suite: The suite to get tests from.
2957
:param pattern: A regular expression string. Test ids that match this
2958
pattern will be excluded from the result.
2959
:return: A TestSuite that contains all the tests from suite without the
2960
tests that matched pattern. The order of tests is the same as it was in
2963
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2966
def preserve_input(something):
2967
"""A helper for performing test suite transformation chains.
2969
:param something: Anything you want to preserve.
2975
def randomize_suite(suite):
2976
"""Return a new TestSuite with suite's tests in random order.
2978
The tests in the input suite are flattened into a single suite in order to
2979
accomplish this. Any nested TestSuites are removed to provide global
2982
tests = list(iter_suite_tests(suite))
2983
random.shuffle(tests)
2984
return TestUtil.TestSuite(tests)
2987
def split_suite_by_condition(suite, condition):
2988
"""Split a test suite into two by a condition.
2990
:param suite: The suite to split.
2991
:param condition: The condition to match on. Tests that match this
2992
condition are returned in the first test suite, ones that do not match
2993
are in the second suite.
2994
:return: A tuple of two test suites, where the first contains tests from
2995
suite matching the condition, and the second contains the remainder
2996
from suite. The order within each output suite is the same as it was in
1455
3001
for test in iter_suite_tests(suite):
1456
if filter_re.search(test.id()):
1457
result.addTest(test)
3003
matched.append(test)
3005
did_not_match.append(test)
3006
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3009
def split_suite_by_re(suite, pattern):
3010
"""Split a test suite into two by a regular expression.
3012
:param suite: The suite to split.
3013
:param pattern: A regular expression string. Test ids that match this
3014
pattern will be in the first test suite returned, and the others in the
3015
second test suite returned.
3016
:return: A tuple of two test suites, where the first contains tests from
3017
suite matching pattern, and the second contains the remainder from
3018
suite. The order within each output suite is the same as it was in
3021
return split_suite_by_condition(suite, condition_id_re(pattern))
1461
3024
def run_suite(suite, name='test', verbose=False, pattern=".*",
1462
stop_on_failure=False, keep_output=False,
1463
transport=None, lsprof_timed=None, bench_history=None):
1464
TestCaseInTempDir._TEST_NAME = name
3025
stop_on_failure=False,
3026
transport=None, lsprof_timed=None, bench_history=None,
3027
matching_tests_first=None,
3030
exclude_pattern=None,
3033
suite_decorators=None,
3035
result_decorators=None,
3037
"""Run a test suite for bzr selftest.
3039
:param runner_class: The class of runner to use. Must support the
3040
constructor arguments passed by run_suite which are more than standard
3042
:return: A boolean indicating success.
1465
3044
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1471
pb = progress.ProgressBar()
1472
runner = TextTestRunner(stream=sys.stdout,
3049
if runner_class is None:
3050
runner_class = TextTestRunner
3053
runner = runner_class(stream=stream,
1473
3054
descriptions=0,
1474
3055
verbosity=verbosity,
1475
keep_output=keep_output,
1477
bench_history=bench_history)
3056
bench_history=bench_history,
3058
result_decorators=result_decorators,
1478
3060
runner.stop_on_failure=stop_on_failure
1480
suite = filter_suite_by_re(suite, pattern)
3061
# built in decorator factories:
3063
random_order(random_seed, runner),
3064
exclude_tests(exclude_pattern),
3066
if matching_tests_first:
3067
decorators.append(tests_first(pattern))
3069
decorators.append(filter_tests(pattern))
3070
if suite_decorators:
3071
decorators.extend(suite_decorators)
3072
# tell the result object how many tests will be running: (except if
3073
# --parallel=fork is being used. Robert said he will provide a better
3074
# progress design later -- vila 20090817)
3075
if fork_decorator not in decorators:
3076
decorators.append(CountingDecorator)
3077
for decorator in decorators:
3078
suite = decorator(suite)
3080
# Done after test suite decoration to allow randomisation etc
3081
# to take effect, though that is of marginal benefit.
3083
stream.write("Listing tests only ...\n")
3084
for t in iter_suite_tests(suite):
3085
stream.write("%s\n" % (t.id()))
1481
3087
result = runner.run(suite)
1482
return result.wasSuccessful()
3089
return result.wasStrictlySuccessful()
3091
return result.wasSuccessful()
3094
# A registry where get() returns a suite decorator.
3095
parallel_registry = registry.Registry()
3098
def fork_decorator(suite):
3099
if getattr(os, "fork", None) is None:
3100
raise errors.BzrCommandError("platform does not support fork,"
3101
" try --parallel=subprocess instead.")
3102
concurrency = osutils.local_concurrency()
3103
if concurrency == 1:
3105
from testtools import ConcurrentTestSuite
3106
return ConcurrentTestSuite(suite, fork_for_tests)
3107
parallel_registry.register('fork', fork_decorator)
3110
def subprocess_decorator(suite):
3111
concurrency = osutils.local_concurrency()
3112
if concurrency == 1:
3114
from testtools import ConcurrentTestSuite
3115
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3116
parallel_registry.register('subprocess', subprocess_decorator)
3119
def exclude_tests(exclude_pattern):
3120
"""Return a test suite decorator that excludes tests."""
3121
if exclude_pattern is None:
3122
return identity_decorator
3123
def decorator(suite):
3124
return ExcludeDecorator(suite, exclude_pattern)
3128
def filter_tests(pattern):
3130
return identity_decorator
3131
def decorator(suite):
3132
return FilterTestsDecorator(suite, pattern)
3136
def random_order(random_seed, runner):
3137
"""Return a test suite decorator factory for randomising tests order.
3139
:param random_seed: now, a string which casts to a long, or a long.
3140
:param runner: A test runner with a stream attribute to report on.
3142
if random_seed is None:
3143
return identity_decorator
3144
def decorator(suite):
3145
return RandomDecorator(suite, random_seed, runner.stream)
3149
def tests_first(pattern):
3151
return identity_decorator
3152
def decorator(suite):
3153
return TestFirstDecorator(suite, pattern)
3157
def identity_decorator(suite):
3162
class TestDecorator(TestUtil.TestSuite):
3163
"""A decorator for TestCase/TestSuite objects.
3165
Usually, subclasses should override __iter__(used when flattening test
3166
suites), which we do to filter, reorder, parallelise and so on, run() and
3170
def __init__(self, suite):
3171
TestUtil.TestSuite.__init__(self)
3174
def countTestCases(self):
3177
cases += test.countTestCases()
3184
def run(self, result):
3185
# Use iteration on self, not self._tests, to allow subclasses to hook
3188
if result.shouldStop:
3194
class CountingDecorator(TestDecorator):
3195
"""A decorator which calls result.progress(self.countTestCases)."""
3197
def run(self, result):
3198
progress_method = getattr(result, 'progress', None)
3199
if callable(progress_method):
3200
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3201
return super(CountingDecorator, self).run(result)
3204
class ExcludeDecorator(TestDecorator):
3205
"""A decorator which excludes test matching an exclude pattern."""
3207
def __init__(self, suite, exclude_pattern):
3208
TestDecorator.__init__(self, suite)
3209
self.exclude_pattern = exclude_pattern
3210
self.excluded = False
3214
return iter(self._tests)
3215
self.excluded = True
3216
suite = exclude_tests_by_re(self, self.exclude_pattern)
3218
self.addTests(suite)
3219
return iter(self._tests)
3222
class FilterTestsDecorator(TestDecorator):
3223
"""A decorator which filters tests to those matching a pattern."""
3225
def __init__(self, suite, pattern):
3226
TestDecorator.__init__(self, suite)
3227
self.pattern = pattern
3228
self.filtered = False
3232
return iter(self._tests)
3233
self.filtered = True
3234
suite = filter_suite_by_re(self, self.pattern)
3236
self.addTests(suite)
3237
return iter(self._tests)
3240
class RandomDecorator(TestDecorator):
3241
"""A decorator which randomises the order of its tests."""
3243
def __init__(self, suite, random_seed, stream):
3244
TestDecorator.__init__(self, suite)
3245
self.random_seed = random_seed
3246
self.randomised = False
3247
self.stream = stream
3251
return iter(self._tests)
3252
self.randomised = True
3253
self.stream.write("Randomizing test order using seed %s\n\n" %
3254
(self.actual_seed()))
3255
# Initialise the random number generator.
3256
random.seed(self.actual_seed())
3257
suite = randomize_suite(self)
3259
self.addTests(suite)
3260
return iter(self._tests)
3262
def actual_seed(self):
3263
if self.random_seed == "now":
3264
# We convert the seed to a long to make it reuseable across
3265
# invocations (because the user can reenter it).
3266
self.random_seed = long(time.time())
3268
# Convert the seed to a long if we can
3270
self.random_seed = long(self.random_seed)
3273
return self.random_seed
3276
class TestFirstDecorator(TestDecorator):
3277
"""A decorator which moves named tests to the front."""
3279
def __init__(self, suite, pattern):
3280
TestDecorator.__init__(self, suite)
3281
self.pattern = pattern
3282
self.filtered = False
3286
return iter(self._tests)
3287
self.filtered = True
3288
suites = split_suite_by_re(self, self.pattern)
3290
self.addTests(suites)
3291
return iter(self._tests)
3294
def partition_tests(suite, count):
3295
"""Partition suite into count lists of tests."""
3296
# This just assigns tests in a round-robin fashion. On one hand this
3297
# splits up blocks of related tests that might run faster if they shared
3298
# resources, but on the other it avoids assigning blocks of slow tests to
3299
# just one partition. So the slowest partition shouldn't be much slower
3301
partitions = [list() for i in range(count)]
3302
tests = iter_suite_tests(suite)
3303
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3304
partition.append(test)
3308
def workaround_zealous_crypto_random():
3309
"""Crypto.Random want to help us being secure, but we don't care here.
3311
This workaround some test failure related to the sftp server. Once paramiko
3312
stop using the controversial API in Crypto.Random, we may get rid of it.
3315
from Crypto.Random import atfork
3321
def fork_for_tests(suite):
3322
"""Take suite and start up one runner per CPU by forking()
3324
:return: An iterable of TestCase-like objects which can each have
3325
run(result) called on them to feed tests to result.
3327
concurrency = osutils.local_concurrency()
3329
from subunit import TestProtocolClient, ProtocolTestCase
3330
from subunit.test_results import AutoTimingTestResultDecorator
3331
class TestInOtherProcess(ProtocolTestCase):
3332
# Should be in subunit, I think. RBC.
3333
def __init__(self, stream, pid):
3334
ProtocolTestCase.__init__(self, stream)
3337
def run(self, result):
3339
ProtocolTestCase.run(self, result)
3341
os.waitpid(self.pid, 0)
3343
test_blocks = partition_tests(suite, concurrency)
3344
for process_tests in test_blocks:
3345
process_suite = TestUtil.TestSuite()
3346
process_suite.addTests(process_tests)
3347
c2pread, c2pwrite = os.pipe()
3350
workaround_zealous_crypto_random()
3353
# Leave stderr and stdout open so we can see test noise
3354
# Close stdin so that the child goes away if it decides to
3355
# read from stdin (otherwise its a roulette to see what
3356
# child actually gets keystrokes for pdb etc).
3359
stream = os.fdopen(c2pwrite, 'wb', 1)
3360
subunit_result = AutoTimingTestResultDecorator(
3361
TestProtocolClient(stream))
3362
process_suite.run(subunit_result)
3367
stream = os.fdopen(c2pread, 'rb', 1)
3368
test = TestInOtherProcess(stream, pid)
3373
def reinvoke_for_tests(suite):
3374
"""Take suite and start up one runner per CPU using subprocess().
3376
:return: An iterable of TestCase-like objects which can each have
3377
run(result) called on them to feed tests to result.
3379
concurrency = osutils.local_concurrency()
3381
from subunit import ProtocolTestCase
3382
class TestInSubprocess(ProtocolTestCase):
3383
def __init__(self, process, name):
3384
ProtocolTestCase.__init__(self, process.stdout)
3385
self.process = process
3386
self.process.stdin.close()
3389
def run(self, result):
3391
ProtocolTestCase.run(self, result)
3394
os.unlink(self.name)
3395
# print "pid %d finished" % finished_process
3396
test_blocks = partition_tests(suite, concurrency)
3397
for process_tests in test_blocks:
3398
# ugly; currently reimplement rather than reuses TestCase methods.
3399
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3400
if not os.path.isfile(bzr_path):
3401
# We are probably installed. Assume sys.argv is the right file
3402
bzr_path = sys.argv[0]
3403
bzr_path = [bzr_path]
3404
if sys.platform == "win32":
3405
# if we're on windows, we can't execute the bzr script directly
3406
bzr_path = [sys.executable] + bzr_path
3407
fd, test_list_file_name = tempfile.mkstemp()
3408
test_list_file = os.fdopen(fd, 'wb', 1)
3409
for test in process_tests:
3410
test_list_file.write(test.id() + '\n')
3411
test_list_file.close()
3413
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3415
if '--no-plugins' in sys.argv:
3416
argv.append('--no-plugins')
3417
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3418
# noise on stderr it can interrupt the subunit protocol.
3419
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3420
stdout=subprocess.PIPE,
3421
stderr=subprocess.PIPE,
3423
test = TestInSubprocess(process, test_list_file_name)
3426
os.unlink(test_list_file_name)
3431
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3432
"""Generate profiling data for all activity between start and success.
3434
The profile data is appended to the test's _benchcalls attribute and can
3435
be accessed by the forwarded-to TestResult.
3437
While it might be cleaner do accumulate this in stopTest, addSuccess is
3438
where our existing output support for lsprof is, and this class aims to
3439
fit in with that: while it could be moved it's not necessary to accomplish
3440
test profiling, nor would it be dramatically cleaner.
3443
def startTest(self, test):
3444
self.profiler = bzrlib.lsprof.BzrProfiler()
3445
# Prevent deadlocks in tests that use lsprof: those tests will
3447
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3448
self.profiler.start()
3449
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3451
def addSuccess(self, test):
3452
stats = self.profiler.stop()
3454
calls = test._benchcalls
3455
except AttributeError:
3456
test._benchcalls = []
3457
calls = test._benchcalls
3458
calls.append(((test.id(), "", ""), stats))
3459
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3461
def stopTest(self, test):
3462
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3463
self.profiler = None
3466
# Controlled by "bzr selftest -E=..." option
3467
# Currently supported:
3468
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3469
# preserves any flags supplied at the command line.
3470
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3471
# rather than failing tests. And no longer raise
3472
# LockContention when fctnl locks are not being used
3473
# with proper exclusion rules.
3474
# -Ethreads Will display thread ident at creation/join time to
3475
# help track thread leaks
3476
selftest_debug_flags = set()
1485
3479
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1487
3480
transport=None,
1488
3481
test_suite_factory=None,
1489
3482
lsprof_timed=None,
1490
bench_history=None):
3484
matching_tests_first=None,
3487
exclude_pattern=None,
3493
suite_decorators=None,
1491
3497
"""Run the whole test suite under the enhanced runner"""
1492
3498
# XXX: Very ugly way to do this...
1493
3499
# Disable warning about old formats because we don't want it to disturb
1500
3506
transport = default_transport
1501
3507
old_transport = default_transport
1502
3508
default_transport = transport
3509
global selftest_debug_flags
3510
old_debug_flags = selftest_debug_flags
3511
if debug_flags is not None:
3512
selftest_debug_flags = set(debug_flags)
3514
if load_list is None:
3517
keep_only = load_test_id_list(load_list)
3519
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3520
for start in starting_with]
1504
3521
if test_suite_factory is None:
1505
suite = test_suite()
3522
# Reduce loading time by loading modules based on the starting_with
3524
suite = test_suite(keep_only, starting_with)
1507
3526
suite = test_suite_factory()
3528
# But always filter as requested.
3529
suite = filter_suite_by_id_startswith(suite, starting_with)
3530
result_decorators = []
3532
result_decorators.append(ProfileResult)
1508
3533
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1509
stop_on_failure=stop_on_failure, keep_output=keep_output,
3534
stop_on_failure=stop_on_failure,
1510
3535
transport=transport,
1511
3536
lsprof_timed=lsprof_timed,
1512
bench_history=bench_history)
3537
bench_history=bench_history,
3538
matching_tests_first=matching_tests_first,
3539
list_only=list_only,
3540
random_seed=random_seed,
3541
exclude_pattern=exclude_pattern,
3543
runner_class=runner_class,
3544
suite_decorators=suite_decorators,
3546
result_decorators=result_decorators,
1514
3549
default_transport = old_transport
3550
selftest_debug_flags = old_debug_flags
3553
def load_test_id_list(file_name):
3554
"""Load a test id list from a text file.
3556
The format is one test id by line. No special care is taken to impose
3557
strict rules, these test ids are used to filter the test suite so a test id
3558
that do not match an existing test will do no harm. This allows user to add
3559
comments, leave blank lines, etc.
3563
ftest = open(file_name, 'rt')
3565
if e.errno != errno.ENOENT:
3568
raise errors.NoSuchFile(file_name)
3570
for test_name in ftest.readlines():
3571
test_list.append(test_name.strip())
3576
def suite_matches_id_list(test_suite, id_list):
3577
"""Warns about tests not appearing or appearing more than once.
3579
:param test_suite: A TestSuite object.
3580
:param test_id_list: The list of test ids that should be found in
3583
:return: (absents, duplicates) absents is a list containing the test found
3584
in id_list but not in test_suite, duplicates is a list containing the
3585
test found multiple times in test_suite.
3587
When using a prefined test id list, it may occurs that some tests do not
3588
exist anymore or that some tests use the same id. This function warns the
3589
tester about potential problems in his workflow (test lists are volatile)
3590
or in the test suite itself (using the same id for several tests does not
3591
help to localize defects).
3593
# Build a dict counting id occurrences
3595
for test in iter_suite_tests(test_suite):
3597
tests[id] = tests.get(id, 0) + 1
3602
occurs = tests.get(id, 0)
3604
not_found.append(id)
3606
duplicates.append(id)
3608
return not_found, duplicates
3611
class TestIdList(object):
3612
"""Test id list to filter a test suite.
3614
Relying on the assumption that test ids are built as:
3615
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3616
notation, this class offers methods to :
3617
- avoid building a test suite for modules not refered to in the test list,
3618
- keep only the tests listed from the module test suite.
3621
def __init__(self, test_id_list):
3622
# When a test suite needs to be filtered against us we compare test ids
3623
# for equality, so a simple dict offers a quick and simple solution.
3624
self.tests = dict().fromkeys(test_id_list, True)
3626
# While unittest.TestCase have ids like:
3627
# <module>.<class>.<method>[(<param+)],
3628
# doctest.DocTestCase can have ids like:
3631
# <module>.<function>
3632
# <module>.<class>.<method>
3634
# Since we can't predict a test class from its name only, we settle on
3635
# a simple constraint: a test id always begins with its module name.
3638
for test_id in test_id_list:
3639
parts = test_id.split('.')
3640
mod_name = parts.pop(0)
3641
modules[mod_name] = True
3643
mod_name += '.' + part
3644
modules[mod_name] = True
3645
self.modules = modules
3647
def refers_to(self, module_name):
3648
"""Is there tests for the module or one of its sub modules."""
3649
return self.modules.has_key(module_name)
3651
def includes(self, test_id):
3652
return self.tests.has_key(test_id)
3655
class TestPrefixAliasRegistry(registry.Registry):
3656
"""A registry for test prefix aliases.
3658
This helps implement shorcuts for the --starting-with selftest
3659
option. Overriding existing prefixes is not allowed but not fatal (a
3660
warning will be emitted).
3663
def register(self, key, obj, help=None, info=None,
3664
override_existing=False):
3665
"""See Registry.register.
3667
Trying to override an existing alias causes a warning to be emitted,
3668
not a fatal execption.
3671
super(TestPrefixAliasRegistry, self).register(
3672
key, obj, help=help, info=info, override_existing=False)
3674
actual = self.get(key)
3676
'Test prefix alias %s is already used for %s, ignoring %s'
3677
% (key, actual, obj))
3679
def resolve_alias(self, id_start):
3680
"""Replace the alias by the prefix in the given string.
3682
Using an unknown prefix is an error to help catching typos.
3684
parts = id_start.split('.')
3686
parts[0] = self.get(parts[0])
3688
raise errors.BzrCommandError(
3689
'%s is not a known test prefix alias' % parts[0])
3690
return '.'.join(parts)
3693
test_prefix_alias_registry = TestPrefixAliasRegistry()
3694
"""Registry of test prefix aliases."""
3697
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3698
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3699
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3701
# Obvious highest levels prefixes, feel free to add your own via a plugin
3702
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3703
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3704
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3705
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3706
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3709
def _test_suite_testmod_names():
3710
"""Return the standard list of test module names to test."""
3713
'bzrlib.tests.blackbox',
3714
'bzrlib.tests.commands',
3715
'bzrlib.tests.doc_generate',
3716
'bzrlib.tests.per_branch',
3717
'bzrlib.tests.per_bzrdir',
3718
'bzrlib.tests.per_controldir',
3719
'bzrlib.tests.per_controldir_colo',
3720
'bzrlib.tests.per_foreign_vcs',
3721
'bzrlib.tests.per_interrepository',
3722
'bzrlib.tests.per_intertree',
3723
'bzrlib.tests.per_inventory',
3724
'bzrlib.tests.per_interbranch',
3725
'bzrlib.tests.per_lock',
3726
'bzrlib.tests.per_merger',
3727
'bzrlib.tests.per_transport',
3728
'bzrlib.tests.per_tree',
3729
'bzrlib.tests.per_pack_repository',
3730
'bzrlib.tests.per_repository',
3731
'bzrlib.tests.per_repository_chk',
3732
'bzrlib.tests.per_repository_reference',
3733
'bzrlib.tests.per_repository_vf',
3734
'bzrlib.tests.per_uifactory',
3735
'bzrlib.tests.per_versionedfile',
3736
'bzrlib.tests.per_workingtree',
3737
'bzrlib.tests.test__annotator',
3738
'bzrlib.tests.test__bencode',
3739
'bzrlib.tests.test__btree_serializer',
3740
'bzrlib.tests.test__chk_map',
3741
'bzrlib.tests.test__dirstate_helpers',
3742
'bzrlib.tests.test__groupcompress',
3743
'bzrlib.tests.test__known_graph',
3744
'bzrlib.tests.test__rio',
3745
'bzrlib.tests.test__simple_set',
3746
'bzrlib.tests.test__static_tuple',
3747
'bzrlib.tests.test__walkdirs_win32',
3748
'bzrlib.tests.test_ancestry',
3749
'bzrlib.tests.test_annotate',
3750
'bzrlib.tests.test_api',
3751
'bzrlib.tests.test_atomicfile',
3752
'bzrlib.tests.test_bad_files',
3753
'bzrlib.tests.test_bisect_multi',
3754
'bzrlib.tests.test_branch',
3755
'bzrlib.tests.test_branchbuilder',
3756
'bzrlib.tests.test_btree_index',
3757
'bzrlib.tests.test_bugtracker',
3758
'bzrlib.tests.test_bundle',
3759
'bzrlib.tests.test_bzrdir',
3760
'bzrlib.tests.test__chunks_to_lines',
3761
'bzrlib.tests.test_cache_utf8',
3762
'bzrlib.tests.test_chk_map',
3763
'bzrlib.tests.test_chk_serializer',
3764
'bzrlib.tests.test_chunk_writer',
3765
'bzrlib.tests.test_clean_tree',
3766
'bzrlib.tests.test_cleanup',
3767
'bzrlib.tests.test_cmdline',
3768
'bzrlib.tests.test_commands',
3769
'bzrlib.tests.test_commit',
3770
'bzrlib.tests.test_commit_merge',
3771
'bzrlib.tests.test_config',
3772
'bzrlib.tests.test_conflicts',
3773
'bzrlib.tests.test_controldir',
3774
'bzrlib.tests.test_counted_lock',
3775
'bzrlib.tests.test_crash',
3776
'bzrlib.tests.test_decorators',
3777
'bzrlib.tests.test_delta',
3778
'bzrlib.tests.test_debug',
3779
'bzrlib.tests.test_diff',
3780
'bzrlib.tests.test_directory_service',
3781
'bzrlib.tests.test_dirstate',
3782
'bzrlib.tests.test_email_message',
3783
'bzrlib.tests.test_eol_filters',
3784
'bzrlib.tests.test_errors',
3785
'bzrlib.tests.test_export',
3786
'bzrlib.tests.test_extract',
3787
'bzrlib.tests.test_fetch',
3788
'bzrlib.tests.test_fixtures',
3789
'bzrlib.tests.test_fifo_cache',
3790
'bzrlib.tests.test_filters',
3791
'bzrlib.tests.test_ftp_transport',
3792
'bzrlib.tests.test_foreign',
3793
'bzrlib.tests.test_generate_docs',
3794
'bzrlib.tests.test_generate_ids',
3795
'bzrlib.tests.test_globbing',
3796
'bzrlib.tests.test_gpg',
3797
'bzrlib.tests.test_graph',
3798
'bzrlib.tests.test_groupcompress',
3799
'bzrlib.tests.test_hashcache',
3800
'bzrlib.tests.test_help',
3801
'bzrlib.tests.test_hooks',
3802
'bzrlib.tests.test_http',
3803
'bzrlib.tests.test_http_response',
3804
'bzrlib.tests.test_https_ca_bundle',
3805
'bzrlib.tests.test_identitymap',
3806
'bzrlib.tests.test_ignores',
3807
'bzrlib.tests.test_index',
3808
'bzrlib.tests.test_import_tariff',
3809
'bzrlib.tests.test_info',
3810
'bzrlib.tests.test_inv',
3811
'bzrlib.tests.test_inventory_delta',
3812
'bzrlib.tests.test_knit',
3813
'bzrlib.tests.test_lazy_import',
3814
'bzrlib.tests.test_lazy_regex',
3815
'bzrlib.tests.test_library_state',
3816
'bzrlib.tests.test_lock',
3817
'bzrlib.tests.test_lockable_files',
3818
'bzrlib.tests.test_lockdir',
3819
'bzrlib.tests.test_log',
3820
'bzrlib.tests.test_lru_cache',
3821
'bzrlib.tests.test_lsprof',
3822
'bzrlib.tests.test_mail_client',
3823
'bzrlib.tests.test_matchers',
3824
'bzrlib.tests.test_memorytree',
3825
'bzrlib.tests.test_merge',
3826
'bzrlib.tests.test_merge3',
3827
'bzrlib.tests.test_merge_core',
3828
'bzrlib.tests.test_merge_directive',
3829
'bzrlib.tests.test_mergetools',
3830
'bzrlib.tests.test_missing',
3831
'bzrlib.tests.test_msgeditor',
3832
'bzrlib.tests.test_multiparent',
3833
'bzrlib.tests.test_mutabletree',
3834
'bzrlib.tests.test_nonascii',
3835
'bzrlib.tests.test_options',
3836
'bzrlib.tests.test_osutils',
3837
'bzrlib.tests.test_osutils_encodings',
3838
'bzrlib.tests.test_pack',
3839
'bzrlib.tests.test_patch',
3840
'bzrlib.tests.test_patches',
3841
'bzrlib.tests.test_permissions',
3842
'bzrlib.tests.test_plugins',
3843
'bzrlib.tests.test_progress',
3844
'bzrlib.tests.test_pyutils',
3845
'bzrlib.tests.test_read_bundle',
3846
'bzrlib.tests.test_reconcile',
3847
'bzrlib.tests.test_reconfigure',
3848
'bzrlib.tests.test_registry',
3849
'bzrlib.tests.test_remote',
3850
'bzrlib.tests.test_rename_map',
3851
'bzrlib.tests.test_repository',
3852
'bzrlib.tests.test_revert',
3853
'bzrlib.tests.test_revision',
3854
'bzrlib.tests.test_revisionspec',
3855
'bzrlib.tests.test_revisiontree',
3856
'bzrlib.tests.test_rio',
3857
'bzrlib.tests.test_rules',
3858
'bzrlib.tests.test_sampler',
3859
'bzrlib.tests.test_scenarios',
3860
'bzrlib.tests.test_script',
3861
'bzrlib.tests.test_selftest',
3862
'bzrlib.tests.test_serializer',
3863
'bzrlib.tests.test_setup',
3864
'bzrlib.tests.test_sftp_transport',
3865
'bzrlib.tests.test_shelf',
3866
'bzrlib.tests.test_shelf_ui',
3867
'bzrlib.tests.test_smart',
3868
'bzrlib.tests.test_smart_add',
3869
'bzrlib.tests.test_smart_request',
3870
'bzrlib.tests.test_smart_transport',
3871
'bzrlib.tests.test_smtp_connection',
3872
'bzrlib.tests.test_source',
3873
'bzrlib.tests.test_ssh_transport',
3874
'bzrlib.tests.test_status',
3875
'bzrlib.tests.test_store',
3876
'bzrlib.tests.test_strace',
3877
'bzrlib.tests.test_subsume',
3878
'bzrlib.tests.test_switch',
3879
'bzrlib.tests.test_symbol_versioning',
3880
'bzrlib.tests.test_tag',
3881
'bzrlib.tests.test_test_server',
3882
'bzrlib.tests.test_testament',
3883
'bzrlib.tests.test_textfile',
3884
'bzrlib.tests.test_textmerge',
3885
'bzrlib.tests.test_cethread',
3886
'bzrlib.tests.test_timestamp',
3887
'bzrlib.tests.test_trace',
3888
'bzrlib.tests.test_transactions',
3889
'bzrlib.tests.test_transform',
3890
'bzrlib.tests.test_transport',
3891
'bzrlib.tests.test_transport_log',
3892
'bzrlib.tests.test_tree',
3893
'bzrlib.tests.test_treebuilder',
3894
'bzrlib.tests.test_treeshape',
3895
'bzrlib.tests.test_tsort',
3896
'bzrlib.tests.test_tuned_gzip',
3897
'bzrlib.tests.test_ui',
3898
'bzrlib.tests.test_uncommit',
3899
'bzrlib.tests.test_upgrade',
3900
'bzrlib.tests.test_upgrade_stacked',
3901
'bzrlib.tests.test_urlutils',
3902
'bzrlib.tests.test_version',
3903
'bzrlib.tests.test_version_info',
3904
'bzrlib.tests.test_versionedfile',
3905
'bzrlib.tests.test_weave',
3906
'bzrlib.tests.test_whitebox',
3907
'bzrlib.tests.test_win32utils',
3908
'bzrlib.tests.test_workingtree',
3909
'bzrlib.tests.test_workingtree_4',
3910
'bzrlib.tests.test_wsgi',
3911
'bzrlib.tests.test_xml',
3915
def _test_suite_modules_to_doctest():
3916
"""Return the list of modules to doctest."""
3918
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3922
'bzrlib.branchbuilder',
3923
'bzrlib.decorators',
3925
'bzrlib.iterablefile',
3930
'bzrlib.symbol_versioning',
3932
'bzrlib.tests.fixtures',
3934
'bzrlib.transport.http',
3935
'bzrlib.version_info_formats.format_custom',
3939
def test_suite(keep_only=None, starting_with=None):
1518
3940
"""Build and return TestSuite for the whole of bzrlib.
3942
:param keep_only: A list of test ids limiting the suite returned.
3944
:param starting_with: An id limiting the suite returned to the tests
1520
3947
This function can be replaced if you need to change the default test
1521
3948
suite on a global basis, but it is not encouraged.
1524
'bzrlib.tests.test_ancestry',
1525
'bzrlib.tests.test_api',
1526
'bzrlib.tests.test_atomicfile',
1527
'bzrlib.tests.test_bad_files',
1528
'bzrlib.tests.test_branch',
1529
'bzrlib.tests.test_bundle',
1530
'bzrlib.tests.test_bzrdir',
1531
'bzrlib.tests.test_cache_utf8',
1532
'bzrlib.tests.test_command',
1533
'bzrlib.tests.test_commit',
1534
'bzrlib.tests.test_commit_merge',
1535
'bzrlib.tests.test_config',
1536
'bzrlib.tests.test_conflicts',
1537
'bzrlib.tests.test_decorators',
1538
'bzrlib.tests.test_diff',
1539
'bzrlib.tests.test_doc_generate',
1540
'bzrlib.tests.test_errors',
1541
'bzrlib.tests.test_escaped_store',
1542
'bzrlib.tests.test_fetch',
1543
'bzrlib.tests.test_ftp_transport',
1544
'bzrlib.tests.test_gpg',
1545
'bzrlib.tests.test_graph',
1546
'bzrlib.tests.test_hashcache',
1547
'bzrlib.tests.test_http',
1548
'bzrlib.tests.test_http_response',
1549
'bzrlib.tests.test_identitymap',
1550
'bzrlib.tests.test_ignores',
1551
'bzrlib.tests.test_inv',
1552
'bzrlib.tests.test_knit',
1553
'bzrlib.tests.test_lazy_import',
1554
'bzrlib.tests.test_lockdir',
1555
'bzrlib.tests.test_lockable_files',
1556
'bzrlib.tests.test_log',
1557
'bzrlib.tests.test_memorytree',
1558
'bzrlib.tests.test_merge',
1559
'bzrlib.tests.test_merge3',
1560
'bzrlib.tests.test_merge_core',
1561
'bzrlib.tests.test_missing',
1562
'bzrlib.tests.test_msgeditor',
1563
'bzrlib.tests.test_nonascii',
1564
'bzrlib.tests.test_options',
1565
'bzrlib.tests.test_osutils',
1566
'bzrlib.tests.test_patch',
1567
'bzrlib.tests.test_patches',
1568
'bzrlib.tests.test_permissions',
1569
'bzrlib.tests.test_plugins',
1570
'bzrlib.tests.test_progress',
1571
'bzrlib.tests.test_reconcile',
1572
'bzrlib.tests.test_repository',
1573
'bzrlib.tests.test_revert',
1574
'bzrlib.tests.test_revision',
1575
'bzrlib.tests.test_revisionnamespaces',
1576
'bzrlib.tests.test_revisiontree',
1577
'bzrlib.tests.test_rio',
1578
'bzrlib.tests.test_sampler',
1579
'bzrlib.tests.test_selftest',
1580
'bzrlib.tests.test_setup',
1581
'bzrlib.tests.test_sftp_transport',
1582
'bzrlib.tests.test_smart_add',
1583
'bzrlib.tests.test_smart_transport',
1584
'bzrlib.tests.test_source',
1585
'bzrlib.tests.test_status',
1586
'bzrlib.tests.test_store',
1587
'bzrlib.tests.test_symbol_versioning',
1588
'bzrlib.tests.test_testament',
1589
'bzrlib.tests.test_textfile',
1590
'bzrlib.tests.test_textmerge',
1591
'bzrlib.tests.test_trace',
1592
'bzrlib.tests.test_transactions',
1593
'bzrlib.tests.test_transform',
1594
'bzrlib.tests.test_transport',
1595
'bzrlib.tests.test_tree',
1596
'bzrlib.tests.test_treebuilder',
1597
'bzrlib.tests.test_tsort',
1598
'bzrlib.tests.test_tuned_gzip',
1599
'bzrlib.tests.test_ui',
1600
'bzrlib.tests.test_upgrade',
1601
'bzrlib.tests.test_urlutils',
1602
'bzrlib.tests.test_versionedfile',
1603
'bzrlib.tests.test_version',
1604
'bzrlib.tests.test_version_info',
1605
'bzrlib.tests.test_weave',
1606
'bzrlib.tests.test_whitebox',
1607
'bzrlib.tests.test_workingtree',
1608
'bzrlib.tests.test_xml',
1610
test_transport_implementations = [
1611
'bzrlib.tests.test_transport_implementations',
1612
'bzrlib.tests.test_read_bundle',
1614
suite = TestUtil.TestSuite()
1615
3951
loader = TestUtil.TestLoader()
1616
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1617
from bzrlib.transport import TransportTestProviderAdapter
1618
adapter = TransportTestProviderAdapter()
1619
adapt_modules(test_transport_implementations, adapter, loader, suite)
1620
for package in packages_to_test():
1621
suite.addTest(package.test_suite())
1622
for m in MODULES_TO_TEST:
1623
suite.addTest(loader.loadTestsFromModule(m))
1624
for m in MODULES_TO_DOCTEST:
1625
suite.addTest(doctest.DocTestSuite(m))
1626
for name, plugin in bzrlib.plugin.all_plugins().items():
1627
if getattr(plugin, 'test_suite', None) is not None:
1628
suite.addTest(plugin.test_suite())
3953
if keep_only is not None:
3954
id_filter = TestIdList(keep_only)
3956
# We take precedence over keep_only because *at loading time* using
3957
# both options means we will load less tests for the same final result.
3958
def interesting_module(name):
3959
for start in starting_with:
3961
# Either the module name starts with the specified string
3962
name.startswith(start)
3963
# or it may contain tests starting with the specified string
3964
or start.startswith(name)
3968
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3970
elif keep_only is not None:
3971
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3972
def interesting_module(name):
3973
return id_filter.refers_to(name)
3976
loader = TestUtil.TestLoader()
3977
def interesting_module(name):
3978
# No filtering, all modules are interesting
3981
suite = loader.suiteClass()
3983
# modules building their suite with loadTestsFromModuleNames
3984
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3986
for mod in _test_suite_modules_to_doctest():
3987
if not interesting_module(mod):
3988
# No tests to keep here, move along
3991
# note that this really does mean "report only" -- doctest
3992
# still runs the rest of the examples
3993
doc_suite = IsolatedDocTestSuite(
3994
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3995
except ValueError, e:
3996
print '**failed to get doctest for: %s\n%s' % (mod, e)
3998
if len(doc_suite._tests) == 0:
3999
raise errors.BzrError("no doctests found in %s" % (mod,))
4000
suite.addTest(doc_suite)
4002
default_encoding = sys.getdefaultencoding()
4003
for name, plugin in _mod_plugin.plugins().items():
4004
if not interesting_module(plugin.module.__name__):
4006
plugin_suite = plugin.test_suite()
4007
# We used to catch ImportError here and turn it into just a warning,
4008
# but really if you don't have --no-plugins this should be a failure.
4009
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4010
if plugin_suite is None:
4011
plugin_suite = plugin.load_plugin_tests(loader)
4012
if plugin_suite is not None:
4013
suite.addTest(plugin_suite)
4014
if default_encoding != sys.getdefaultencoding():
4016
'Plugin "%s" tried to reset default encoding to: %s', name,
4017
sys.getdefaultencoding())
4019
sys.setdefaultencoding(default_encoding)
4021
if keep_only is not None:
4022
# Now that the referred modules have loaded their tests, keep only the
4024
suite = filter_suite_by_id_list(suite, id_filter)
4025
# Do some sanity checks on the id_list filtering
4026
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4028
# The tester has used both keep_only and starting_with, so he is
4029
# already aware that some tests are excluded from the list, there
4030
# is no need to tell him which.
4033
# Some tests mentioned in the list are not in the test suite. The
4034
# list may be out of date, report to the tester.
4035
for id in not_found:
4036
trace.warning('"%s" not found in the test suite', id)
4037
for id in duplicates:
4038
trace.warning('"%s" is used as an id by several tests', id)
1632
def adapt_modules(mods_list, adapter, loader, suite):
1633
"""Adapt the modules in mods_list using adapter and add to suite."""
1634
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1635
suite.addTests(adapter.adapt(test))
4043
def multiply_scenarios(*scenarios):
4044
"""Multiply two or more iterables of scenarios.
4046
It is safe to pass scenario generators or iterators.
4048
:returns: A list of compound scenarios: the cross-product of all
4049
scenarios, with the names concatenated and the parameters
4052
return reduce(_multiply_two_scenarios, map(list, scenarios))
4055
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4056
"""Multiply two sets of scenarios.
4058
:returns: the cartesian product of the two sets of scenarios, that is
4059
a scenario for every possible combination of a left scenario and a
4063
('%s,%s' % (left_name, right_name),
4064
dict(left_dict.items() + right_dict.items()))
4065
for left_name, left_dict in scenarios_left
4066
for right_name, right_dict in scenarios_right]
4069
def multiply_tests(tests, scenarios, result):
4070
"""Multiply tests_list by scenarios into result.
4072
This is the core workhorse for test parameterisation.
4074
Typically the load_tests() method for a per-implementation test suite will
4075
call multiply_tests and return the result.
4077
:param tests: The tests to parameterise.
4078
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4079
scenario_param_dict).
4080
:param result: A TestSuite to add created tests to.
4082
This returns the passed in result TestSuite with the cross product of all
4083
the tests repeated once for each scenario. Each test is adapted by adding
4084
the scenario name at the end of its id(), and updating the test object's
4085
__dict__ with the scenario_param_dict.
4087
>>> import bzrlib.tests.test_sampler
4088
>>> r = multiply_tests(
4089
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4090
... [('one', dict(param=1)),
4091
... ('two', dict(param=2))],
4092
... TestUtil.TestSuite())
4093
>>> tests = list(iter_suite_tests(r))
4097
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4103
for test in iter_suite_tests(tests):
4104
apply_scenarios(test, scenarios, result)
4108
def apply_scenarios(test, scenarios, result):
4109
"""Apply the scenarios in scenarios to test and add to result.
4111
:param test: The test to apply scenarios to.
4112
:param scenarios: An iterable of scenarios to apply to test.
4114
:seealso: apply_scenario
4116
for scenario in scenarios:
4117
result.addTest(apply_scenario(test, scenario))
4121
def apply_scenario(test, scenario):
4122
"""Copy test and apply scenario to it.
4124
:param test: A test to adapt.
4125
:param scenario: A tuple describing the scenarion.
4126
The first element of the tuple is the new test id.
4127
The second element is a dict containing attributes to set on the
4129
:return: The adapted test.
4131
new_id = "%s(%s)" % (test.id(), scenario[0])
4132
new_test = clone_test(test, new_id)
4133
for name, value in scenario[1].items():
4134
setattr(new_test, name, value)
4138
def clone_test(test, new_id):
4139
"""Clone a test giving it a new id.
4141
:param test: The test to clone.
4142
:param new_id: The id to assign to it.
4143
:return: The new test.
4145
new_test = copy.copy(test)
4146
new_test.id = lambda: new_id
4147
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4148
# causes cloned tests to share the 'details' dict. This makes it hard to
4149
# read the test output for parameterized tests, because tracebacks will be
4150
# associated with irrelevant tests.
4152
details = new_test._TestCase__details
4153
except AttributeError:
4154
# must be a different version of testtools than expected. Do nothing.
4157
# Reset the '__details' dict.
4158
new_test._TestCase__details = {}
4162
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4164
"""Helper for permutating tests against an extension module.
4166
This is meant to be used inside a modules 'load_tests()' function. It will
4167
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4168
against both implementations. Setting 'test.module' to the appropriate
4169
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4171
:param standard_tests: A test suite to permute
4172
:param loader: A TestLoader
4173
:param py_module_name: The python path to a python module that can always
4174
be loaded, and will be considered the 'python' implementation. (eg
4175
'bzrlib._chk_map_py')
4176
:param ext_module_name: The python path to an extension module. If the
4177
module cannot be loaded, a single test will be added, which notes that
4178
the module is not available. If it can be loaded, all standard_tests
4179
will be run against that module.
4180
:return: (suite, feature) suite is a test-suite that has all the permuted
4181
tests. feature is the Feature object that can be used to determine if
4182
the module is available.
4185
py_module = pyutils.get_named_object(py_module_name)
4187
('python', {'module': py_module}),
4189
suite = loader.suiteClass()
4190
feature = ModuleAvailableFeature(ext_module_name)
4191
if feature.available():
4192
scenarios.append(('C', {'module': feature.module}))
4194
# the compiled module isn't available, so we add a failing test
4195
class FailWithoutFeature(TestCase):
4196
def test_fail(self):
4197
self.requireFeature(feature)
4198
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4199
result = multiply_tests(standard_tests, scenarios, suite)
4200
return result, feature
4203
def _rmtree_temp_dir(dirname, test_id=None):
4204
# If LANG=C we probably have created some bogus paths
4205
# which rmtree(unicode) will fail to delete
4206
# so make sure we are using rmtree(str) to delete everything
4207
# except on win32, where rmtree(str) will fail
4208
# since it doesn't have the property of byte-stream paths
4209
# (they are either ascii or mbcs)
4210
if sys.platform == 'win32':
4211
# make sure we are using the unicode win32 api
4212
dirname = unicode(dirname)
4214
dirname = dirname.encode(sys.getfilesystemencoding())
4216
osutils.rmtree(dirname)
4218
# We don't want to fail here because some useful display will be lost
4219
# otherwise. Polluting the tmp dir is bad, but not giving all the
4220
# possible info to the test runner is even worse.
4222
ui.ui_factory.clear_term()
4223
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4224
# Ugly, but the last thing we want here is fail, so bear with it.
4225
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4226
).encode('ascii', 'replace')
4227
sys.stderr.write('Unable to remove testing dir %s\n%s'
4228
% (os.path.basename(dirname), printable_e))
4231
class Feature(object):
4232
"""An operating system Feature."""
4235
self._available = None
4237
def available(self):
4238
"""Is the feature available?
4240
:return: True if the feature is available.
4242
if self._available is None:
4243
self._available = self._probe()
4244
return self._available
4247
"""Implement this method in concrete features.
4249
:return: True if the feature is available.
4251
raise NotImplementedError
4254
if getattr(self, 'feature_name', None):
4255
return self.feature_name()
4256
return self.__class__.__name__
4259
class _SymlinkFeature(Feature):
4262
return osutils.has_symlinks()
4264
def feature_name(self):
4267
SymlinkFeature = _SymlinkFeature()
4270
class _HardlinkFeature(Feature):
4273
return osutils.has_hardlinks()
4275
def feature_name(self):
4278
HardlinkFeature = _HardlinkFeature()
4281
class _OsFifoFeature(Feature):
4284
return getattr(os, 'mkfifo', None)
4286
def feature_name(self):
4287
return 'filesystem fifos'
4289
OsFifoFeature = _OsFifoFeature()
4292
class _UnicodeFilenameFeature(Feature):
4293
"""Does the filesystem support Unicode filenames?"""
4297
# Check for character combinations unlikely to be covered by any
4298
# single non-unicode encoding. We use the characters
4299
# - greek small letter alpha (U+03B1) and
4300
# - braille pattern dots-123456 (U+283F).
4301
os.stat(u'\u03b1\u283f')
4302
except UnicodeEncodeError:
4304
except (IOError, OSError):
4305
# The filesystem allows the Unicode filename but the file doesn't
4309
# The filesystem allows the Unicode filename and the file exists,
4313
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4316
class _CompatabilityThunkFeature(Feature):
4317
"""This feature is just a thunk to another feature.
4319
It issues a deprecation warning if it is accessed, to let you know that you
4320
should really use a different feature.
4323
def __init__(self, dep_version, module, name,
4324
replacement_name, replacement_module=None):
4325
super(_CompatabilityThunkFeature, self).__init__()
4326
self._module = module
4327
if replacement_module is None:
4328
replacement_module = module
4329
self._replacement_module = replacement_module
4331
self._replacement_name = replacement_name
4332
self._dep_version = dep_version
4333
self._feature = None
4336
if self._feature is None:
4337
depr_msg = self._dep_version % ('%s.%s'
4338
% (self._module, self._name))
4339
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4340
self._replacement_name)
4341
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4342
# Import the new feature and use it as a replacement for the
4344
self._feature = pyutils.get_named_object(
4345
self._replacement_module, self._replacement_name)
4349
return self._feature._probe()
4352
class ModuleAvailableFeature(Feature):
4353
"""This is a feature than describes a module we want to be available.
4355
Declare the name of the module in __init__(), and then after probing, the
4356
module will be available as 'self.module'.
4358
:ivar module: The module if it is available, else None.
4361
def __init__(self, module_name):
4362
super(ModuleAvailableFeature, self).__init__()
4363
self.module_name = module_name
4367
self._module = __import__(self.module_name, {}, {}, [''])
4374
if self.available(): # Make sure the probe has been done
4378
def feature_name(self):
4379
return self.module_name
4382
def probe_unicode_in_user_encoding():
4383
"""Try to encode several unicode strings to use in unicode-aware tests.
4384
Return first successfull match.
4386
:return: (unicode value, encoded plain string value) or (None, None)
4388
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4389
for uni_val in possible_vals:
4391
str_val = uni_val.encode(osutils.get_user_encoding())
4392
except UnicodeEncodeError:
4393
# Try a different character
4396
return uni_val, str_val
4400
def probe_bad_non_ascii(encoding):
4401
"""Try to find [bad] character with code [128..255]
4402
that cannot be decoded to unicode in some encoding.
4403
Return None if all non-ascii characters is valid
4406
for i in xrange(128, 256):
4409
char.decode(encoding)
4410
except UnicodeDecodeError:
4415
class _HTTPSServerFeature(Feature):
4416
"""Some tests want an https Server, check if one is available.
4418
Right now, the only way this is available is under python2.6 which provides
4429
def feature_name(self):
4430
return 'HTTPSServer'
4433
HTTPSServerFeature = _HTTPSServerFeature()
4436
class _UnicodeFilename(Feature):
4437
"""Does the filesystem support Unicode filenames?"""
4442
except UnicodeEncodeError:
4444
except (IOError, OSError):
4445
# The filesystem allows the Unicode filename but the file doesn't
4449
# The filesystem allows the Unicode filename and the file exists,
4453
UnicodeFilename = _UnicodeFilename()
4456
class _ByteStringNamedFilesystem(Feature):
4457
"""Is the filesystem based on bytes?"""
4460
if os.name == "posix":
4464
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4467
class _UTF8Filesystem(Feature):
4468
"""Is the filesystem UTF-8?"""
4471
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4475
UTF8Filesystem = _UTF8Filesystem()
4478
class _BreakinFeature(Feature):
4479
"""Does this platform support the breakin feature?"""
4482
from bzrlib import breakin
4483
if breakin.determine_signal() is None:
4485
if sys.platform == 'win32':
4486
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4487
# We trigger SIGBREAK via a Console api so we need ctypes to
4488
# access the function
4495
def feature_name(self):
4496
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4499
BreakinFeature = _BreakinFeature()
4502
class _CaseInsCasePresFilenameFeature(Feature):
4503
"""Is the file-system case insensitive, but case-preserving?"""
4506
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4508
# first check truly case-preserving for created files, then check
4509
# case insensitive when opening existing files.
4510
name = osutils.normpath(name)
4511
base, rel = osutils.split(name)
4512
found_rel = osutils.canonical_relpath(base, name)
4513
return (found_rel == rel
4514
and os.path.isfile(name.upper())
4515
and os.path.isfile(name.lower()))
4520
def feature_name(self):
4521
return "case-insensitive case-preserving filesystem"
4523
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4526
class _CaseInsensitiveFilesystemFeature(Feature):
4527
"""Check if underlying filesystem is case-insensitive but *not* case
4530
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4531
# more likely to be case preserving, so this case is rare.
4534
if CaseInsCasePresFilenameFeature.available():
4537
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4538
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4539
TestCaseWithMemoryTransport.TEST_ROOT = root
4541
root = TestCaseWithMemoryTransport.TEST_ROOT
4542
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4544
name_a = osutils.pathjoin(tdir, 'a')
4545
name_A = osutils.pathjoin(tdir, 'A')
4547
result = osutils.isdir(name_A)
4548
_rmtree_temp_dir(tdir)
4551
def feature_name(self):
4552
return 'case-insensitive filesystem'
4554
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4557
class _CaseSensitiveFilesystemFeature(Feature):
4560
if CaseInsCasePresFilenameFeature.available():
4562
elif CaseInsensitiveFilesystemFeature.available():
4567
def feature_name(self):
4568
return 'case-sensitive filesystem'
4570
# new coding style is for feature instances to be lowercase
4571
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4574
# Only define SubUnitBzrRunner if subunit is available.
4576
from subunit import TestProtocolClient
4577
from subunit.test_results import AutoTimingTestResultDecorator
4578
class SubUnitBzrProtocolClient(TestProtocolClient):
4580
def addSuccess(self, test, details=None):
4581
# The subunit client always includes the details in the subunit
4582
# stream, but we don't want to include it in ours.
4583
if details is not None and 'log' in details:
4585
return super(SubUnitBzrProtocolClient, self).addSuccess(
4588
class SubUnitBzrRunner(TextTestRunner):
4589
def run(self, test):
4590
result = AutoTimingTestResultDecorator(
4591
SubUnitBzrProtocolClient(self.stream))
4597
class _PosixPermissionsFeature(Feature):
4601
# create temporary file and check if specified perms are maintained.
4604
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4605
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4608
os.chmod(name, write_perms)
4610
read_perms = os.stat(name).st_mode & 0777
4612
return (write_perms == read_perms)
4614
return (os.name == 'posix') and has_perms()
4616
def feature_name(self):
4617
return 'POSIX permissions support'
4619
posix_permissions_feature = _PosixPermissionsFeature()