56
90
except ImportError:
57
91
# lsprof not available
59
from bzrlib.merge import merge_inner
60
93
import bzrlib.merge3
62
import bzrlib.osutils as osutils
63
94
import bzrlib.plugin
64
import bzrlib.progress as progress
65
from bzrlib.revision import common_ancestor
95
from bzrlib.smart import client, request
66
96
import bzrlib.store
97
from bzrlib import symbol_versioning
98
from bzrlib.symbol_versioning import (
67
105
import bzrlib.trace
68
from bzrlib.transport import get_transport
69
import bzrlib.transport
70
from bzrlib.transport.local import LocalRelpathServer
71
from bzrlib.transport.readonly import ReadonlyServer
72
from bzrlib.trace import mutter
73
from bzrlib.tests import TestUtil
74
from bzrlib.tests.TestUtil import (
78
from bzrlib.tests.treeshape import build_tree_contents
79
import bzrlib.urlutils as urlutils
80
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
82
default_transport = LocalRelpathServer
85
MODULES_TO_DOCTEST = [
87
bzrlib.bundle.serializer,
100
def packages_to_test():
101
"""Return a list of packages to test.
103
The packages are not globally imported so that import failures are
104
triggered when running selftest, not when importing the command.
107
import bzrlib.tests.blackbox
108
import bzrlib.tests.branch_implementations
109
import bzrlib.tests.bzrdir_implementations
110
import bzrlib.tests.interrepository_implementations
111
import bzrlib.tests.interversionedfile_implementations
112
import bzrlib.tests.repository_implementations
113
import bzrlib.tests.revisionstore_implementations
114
import bzrlib.tests.workingtree_implementations
117
bzrlib.tests.blackbox,
118
bzrlib.tests.branch_implementations,
119
bzrlib.tests.bzrdir_implementations,
120
bzrlib.tests.interrepository_implementations,
121
bzrlib.tests.interversionedfile_implementations,
122
bzrlib.tests.repository_implementations,
123
bzrlib.tests.revisionstore_implementations,
124
bzrlib.tests.workingtree_implementations,
128
class _MyResult(unittest._TextTestResult):
129
"""Custom TestResult.
131
Shows output in a different format, including displaying runtime for tests.
106
from bzrlib.transport import (
110
from bzrlib.trace import mutter, note
111
from bzrlib.tests import (
116
from bzrlib.ui import NullProgressView
117
from bzrlib.ui.text import TextUIFactory
118
import bzrlib.version_info_formats.format_custom
120
# Mark this python module as being part of the implementation
121
# of unittest: this gives us better tracebacks where the last
122
# shown frame is the test code, not our assertXYZ.
125
default_transport = test_server.LocalURLServer
128
_unitialized_attr = object()
129
"""A sentinel needed to act as a default value in a method signature."""
132
# Subunit result codes, defined here to prevent a hard dependency on subunit.
136
# These are intentionally brought into this namespace. That way plugins, etc
137
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
138
TestSuite = TestUtil.TestSuite
139
TestLoader = TestUtil.TestLoader
141
class ExtendedTestResult(testtools.TextTestResult):
142
"""Accepts, reports and accumulates the results of running tests.
144
Compared to the unittest version this class adds support for
145
profiling, benchmarking, stopping as soon as a test fails, and
146
skipping tests. There are further-specialized subclasses for
147
different types of display.
149
When a test finishes, in whatever way, it calls one of the addSuccess,
150
addFailure or addError classes. These in turn may redirect to a more
151
specific case for the special test results supported by our extended
154
Note that just one of these objects is fed the results from many tests.
133
157
stop_early = False
135
def __init__(self, stream, descriptions, verbosity, pb=None):
136
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
139
def extractBenchmarkTime(self, testCase):
159
def __init__(self, stream, descriptions, verbosity,
163
"""Construct new TestResult.
165
:param bench_history: Optionally, a writable file object to accumulate
168
testtools.TextTestResult.__init__(self, stream)
169
if bench_history is not None:
170
from bzrlib.version import _get_bzr_source_tree
171
src_tree = _get_bzr_source_tree()
174
revision_id = src_tree.get_parent_ids()[0]
176
# XXX: if this is a brand new tree, do the same as if there
180
# XXX: If there's no branch, what should we do?
182
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
183
self._bench_history = bench_history
184
self.ui = ui.ui_factory
187
self.failure_count = 0
188
self.known_failure_count = 0
190
self.not_applicable_count = 0
191
self.unsupported = {}
193
self._overall_start_time = time.time()
194
self._strict = strict
195
self._first_thread_leaker_id = None
196
self._tests_leaking_threads_count = 0
197
self._traceback_from_test = None
199
def stopTestRun(self):
202
stopTime = time.time()
203
timeTaken = stopTime - self.startTime
204
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
205
# the parent class method is similar have to duplicate
206
self._show_list('ERROR', self.errors)
207
self._show_list('FAIL', self.failures)
208
self.stream.write(self.sep2)
209
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
210
run, run != 1 and "s" or "", timeTaken))
211
if not self.wasSuccessful():
212
self.stream.write("FAILED (")
213
failed, errored = map(len, (self.failures, self.errors))
215
self.stream.write("failures=%d" % failed)
217
if failed: self.stream.write(", ")
218
self.stream.write("errors=%d" % errored)
219
if self.known_failure_count:
220
if failed or errored: self.stream.write(", ")
221
self.stream.write("known_failure_count=%d" %
222
self.known_failure_count)
223
self.stream.write(")\n")
225
if self.known_failure_count:
226
self.stream.write("OK (known_failures=%d)\n" %
227
self.known_failure_count)
229
self.stream.write("OK\n")
230
if self.skip_count > 0:
231
skipped = self.skip_count
232
self.stream.write('%d test%s skipped\n' %
233
(skipped, skipped != 1 and "s" or ""))
235
for feature, count in sorted(self.unsupported.items()):
236
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
239
ok = self.wasStrictlySuccessful()
241
ok = self.wasSuccessful()
242
if self._first_thread_leaker_id:
244
'%s is leaking threads among %d leaking tests.\n' % (
245
self._first_thread_leaker_id,
246
self._tests_leaking_threads_count))
247
# We don't report the main thread as an active one.
249
'%d non-main threads were left active in the end.\n'
250
% (len(self._active_threads) - 1))
252
def getDescription(self, test):
255
def _extractBenchmarkTime(self, testCase, details=None):
140
256
"""Add a benchmark time for the current test case."""
141
self._benchmarkTime = getattr(testCase, "_benchtime", None)
257
if details and 'benchtime' in details:
258
return float(''.join(details['benchtime'].iter_bytes()))
259
return getattr(testCase, "_benchtime", None)
143
261
def _elapsedTestTimeString(self):
144
262
"""Return a time string for the overall time the current test has taken."""
145
return self._formatTime(time.time() - self._start_time)
263
return self._formatTime(self._delta_to_float(
264
self._now() - self._start_datetime))
147
def _testTimeString(self):
148
if self._benchmarkTime is not None:
150
self._formatTime(self._benchmarkTime),
151
self._elapsedTestTimeString())
266
def _testTimeString(self, testCase):
267
benchmark_time = self._extractBenchmarkTime(testCase)
268
if benchmark_time is not None:
269
return self._formatTime(benchmark_time) + "*"
153
return " %s" % self._elapsedTestTimeString()
271
return self._elapsedTestTimeString()
155
273
def _formatTime(self, seconds):
156
274
"""Format seconds as milliseconds with leading spaces."""
157
return "%5dms" % (1000 * seconds)
159
def _ellipsise_unimportant_words(self, a_string, final_width,
161
"""Add ellipses (sp?) for overly long strings.
163
:param keep_start: If true preserve the start of a_string rather
167
if len(a_string) > final_width:
168
result = a_string[:final_width-3] + '...'
172
if len(a_string) > final_width:
173
result = '...' + a_string[3-final_width:]
176
return result.ljust(final_width)
275
# some benchmarks can take thousands of seconds to run, so we need 8
277
return "%8dms" % (1000 * seconds)
279
def _shortened_test_description(self, test):
281
what = re.sub(r'^bzrlib\.tests\.', '', what)
284
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
285
# multiple times in a row, because the handler is added for
286
# each test but the container list is shared between cases.
287
# See lp:498869 lp:625574 and lp:637725 for background.
288
def _record_traceback_from_test(self, exc_info):
289
"""Store the traceback from passed exc_info tuple till"""
290
self._traceback_from_test = exc_info[2]
178
292
def startTest(self, test):
179
unittest.TestResult.startTest(self, test)
180
# In a short description, the important words are in
181
# the beginning, but in an id, the important words are
183
SHOW_DESCRIPTIONS = False
185
if not self.showAll and self.dots and self.pb is not None:
188
final_width = osutils.terminal_width()
189
final_width = final_width - 15 - 8
191
if SHOW_DESCRIPTIONS:
192
what = test.shortDescription()
194
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
197
if what.startswith('bzrlib.tests.'):
199
what = self._ellipsise_unimportant_words(what, final_width)
201
self.stream.write(what)
202
elif self.dots and self.pb is not None:
203
self.pb.update(what, self.testsRun - 1, None)
293
super(ExtendedTestResult, self).startTest(test)
297
self.report_test_start(test)
298
test.number = self.count
205
299
self._recordTestStartTime()
300
# Make testtools cases give us the real traceback on failure
301
addOnException = getattr(test, "addOnException", None)
302
if addOnException is not None:
303
addOnException(self._record_traceback_from_test)
304
# Only check for thread leaks if the test case supports cleanups
305
addCleanup = getattr(test, "addCleanup", None)
306
if addCleanup is not None:
307
addCleanup(self._check_leaked_threads, test)
309
def startTests(self):
310
self.report_tests_starting()
311
self._active_threads = threading.enumerate()
313
def stopTest(self, test):
314
self._traceback_from_test = None
316
def _check_leaked_threads(self, test):
317
"""See if any threads have leaked since last call
319
A sample of live threads is stored in the _active_threads attribute,
320
when this method runs it compares the current live threads and any not
321
in the previous sample are treated as having leaked.
323
now_active_threads = set(threading.enumerate())
324
threads_leaked = now_active_threads.difference(self._active_threads)
326
self._report_thread_leak(test, threads_leaked, now_active_threads)
327
self._tests_leaking_threads_count += 1
328
if self._first_thread_leaker_id is None:
329
self._first_thread_leaker_id = test.id()
330
self._active_threads = now_active_threads
207
332
def _recordTestStartTime(self):
208
333
"""Record that a test has started."""
209
self._start_time = time.time()
334
self._start_datetime = self._now()
211
336
def addError(self, test, err):
212
if isinstance(err[1], TestSkipped):
213
return self.addSkipped(test, err)
214
unittest.TestResult.addError(self, test, err)
215
self.extractBenchmarkTime(test)
217
self.stream.writeln("ERROR %s" % self._testTimeString())
218
elif self.dots and self.pb is None:
219
self.stream.write('E')
221
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
337
"""Tell result that test finished with an error.
339
Called from the TestCase run() method when the test
340
fails with an unexpected error.
342
self._post_mortem(self._traceback_from_test)
343
super(ExtendedTestResult, self).addError(test, err)
344
self.error_count += 1
345
self.report_error(test, err)
223
346
if self.stop_early:
226
349
def addFailure(self, test, err):
227
unittest.TestResult.addFailure(self, test, err)
228
self.extractBenchmarkTime(test)
230
self.stream.writeln(" FAIL %s" % self._testTimeString())
231
elif self.dots and self.pb is None:
232
self.stream.write('F')
234
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
350
"""Tell result that test failed.
352
Called from the TestCase run() method when the test
353
fails because e.g. an assert() method failed.
355
self._post_mortem(self._traceback_from_test)
356
super(ExtendedTestResult, self).addFailure(test, err)
357
self.failure_count += 1
358
self.report_failure(test, err)
236
359
if self.stop_early:
239
def addSuccess(self, test):
240
self.extractBenchmarkTime(test)
242
self.stream.writeln(' OK %s' % self._testTimeString())
243
for bench_called, stats in getattr(test, '_benchcalls', []):
244
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
245
stats.pprint(file=self.stream)
246
elif self.dots and self.pb is None:
247
self.stream.write('~')
249
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
251
unittest.TestResult.addSuccess(self, test)
253
def addSkipped(self, test, skip_excinfo):
254
self.extractBenchmarkTime(test)
256
print >>self.stream, ' SKIP %s' % self._testTimeString()
257
print >>self.stream, ' %s' % skip_excinfo[1]
258
elif self.dots and self.pb is None:
259
self.stream.write('S')
261
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
263
# seems best to treat this as success from point-of-view of unittest
264
# -- it actually does nothing so it barely matters :)
267
except KeyboardInterrupt:
270
self.addError(test, test.__exc_info())
272
unittest.TestResult.addSuccess(self, test)
274
def printErrorList(self, flavour, errors):
275
for test, err in errors:
276
self.stream.writeln(self.separator1)
277
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
278
if getattr(test, '_get_log', None) is not None:
280
print >>self.stream, \
281
('vvvv[log from %s]' % test.id()).ljust(78,'-')
282
print >>self.stream, test._get_log()
283
print >>self.stream, \
284
('^^^^[log from %s]' % test.id()).ljust(78,'-')
285
self.stream.writeln(self.separator2)
286
self.stream.writeln("%s" % err)
362
def addSuccess(self, test, details=None):
363
"""Tell result that test completed successfully.
365
Called from the TestCase run()
367
if self._bench_history is not None:
368
benchmark_time = self._extractBenchmarkTime(test, details)
369
if benchmark_time is not None:
370
self._bench_history.write("%s %s\n" % (
371
self._formatTime(benchmark_time),
373
self.report_success(test)
374
super(ExtendedTestResult, self).addSuccess(test)
375
test._log_contents = ''
377
def addExpectedFailure(self, test, err):
378
self.known_failure_count += 1
379
self.report_known_failure(test, err)
381
def addNotSupported(self, test, feature):
382
"""The test will not be run because of a missing feature.
384
# this can be called in two different ways: it may be that the
385
# test started running, and then raised (through requireFeature)
386
# UnavailableFeature. Alternatively this method can be called
387
# while probing for features before running the test code proper; in
388
# that case we will see startTest and stopTest, but the test will
389
# never actually run.
390
self.unsupported.setdefault(str(feature), 0)
391
self.unsupported[str(feature)] += 1
392
self.report_unsupported(test, feature)
394
def addSkip(self, test, reason):
395
"""A test has not run for 'reason'."""
397
self.report_skip(test, reason)
399
def addNotApplicable(self, test, reason):
400
self.not_applicable_count += 1
401
self.report_not_applicable(test, reason)
403
def _post_mortem(self, tb=None):
404
"""Start a PDB post mortem session."""
405
if os.environ.get('BZR_TEST_PDB', None):
409
def progress(self, offset, whence):
410
"""The test is adjusting the count of tests to run."""
411
if whence == SUBUNIT_SEEK_SET:
412
self.num_tests = offset
413
elif whence == SUBUNIT_SEEK_CUR:
414
self.num_tests += offset
416
raise errors.BzrError("Unknown whence %r" % whence)
418
def report_tests_starting(self):
419
"""Display information before the test run begins"""
420
if getattr(sys, 'frozen', None) is None:
421
bzr_path = osutils.realpath(sys.argv[0])
423
bzr_path = sys.executable
425
'bzr selftest: %s\n' % (bzr_path,))
428
bzrlib.__path__[0],))
430
' bzr-%s python-%s %s\n' % (
431
bzrlib.version_string,
432
bzrlib._format_version_tuple(sys.version_info),
433
platform.platform(aliased=1),
435
self.stream.write('\n')
437
def report_test_start(self, test):
438
"""Display information on the test just about to be run"""
440
def _report_thread_leak(self, test, leaked_threads, active_threads):
441
"""Display information on a test that leaked one or more threads"""
442
# GZ 2010-09-09: A leak summary reported separately from the general
443
# thread debugging would be nice. Tests under subunit
444
# need something not using stream, perhaps adding a
445
# testtools details object would be fitting.
446
if 'threads' in selftest_debug_flags:
447
self.stream.write('%s is leaking, active is now %d\n' %
448
(test.id(), len(active_threads)))
450
def startTestRun(self):
451
self.startTime = time.time()
453
def report_success(self, test):
456
def wasStrictlySuccessful(self):
457
if self.unsupported or self.known_failure_count:
459
return self.wasSuccessful()
462
class TextTestResult(ExtendedTestResult):
463
"""Displays progress and results of tests in text form"""
465
def __init__(self, stream, descriptions, verbosity,
470
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
471
bench_history, strict)
472
# We no longer pass them around, but just rely on the UIFactory stack
475
warnings.warn("Passing pb to TextTestResult is deprecated")
476
self.pb = self.ui.nested_progress_bar()
477
self.pb.show_pct = False
478
self.pb.show_spinner = False
479
self.pb.show_eta = False,
480
self.pb.show_count = False
481
self.pb.show_bar = False
482
self.pb.update_latency = 0
483
self.pb.show_transport_activity = False
485
def stopTestRun(self):
486
# called when the tests that are going to run have run
489
super(TextTestResult, self).stopTestRun()
491
def report_tests_starting(self):
492
super(TextTestResult, self).report_tests_starting()
493
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
495
def _progress_prefix_text(self):
496
# the longer this text, the less space we have to show the test
498
a = '[%d' % self.count # total that have been run
499
# tests skipped as known not to be relevant are not important enough
501
## if self.skip_count:
502
## a += ', %d skip' % self.skip_count
503
## if self.known_failure_count:
504
## a += '+%dX' % self.known_failure_count
506
a +='/%d' % self.num_tests
508
runtime = time.time() - self._overall_start_time
510
a += '%dm%ds' % (runtime / 60, runtime % 60)
513
total_fail_count = self.error_count + self.failure_count
515
a += ', %d failed' % total_fail_count
516
# if self.unsupported:
517
# a += ', %d missing' % len(self.unsupported)
521
def report_test_start(self, test):
523
self._progress_prefix_text()
525
+ self._shortened_test_description(test))
527
def _test_description(self, test):
528
return self._shortened_test_description(test)
530
def report_error(self, test, err):
531
self.stream.write('ERROR: %s\n %s\n' % (
532
self._test_description(test),
536
def report_failure(self, test, err):
537
self.stream.write('FAIL: %s\n %s\n' % (
538
self._test_description(test),
542
def report_known_failure(self, test, err):
545
def report_skip(self, test, reason):
548
def report_not_applicable(self, test, reason):
551
def report_unsupported(self, test, feature):
552
"""test cannot be run because feature is missing."""
555
class VerboseTestResult(ExtendedTestResult):
556
"""Produce long output, with one line per test run plus times"""
558
def _ellipsize_to_right(self, a_string, final_width):
559
"""Truncate and pad a string, keeping the right hand side"""
560
if len(a_string) > final_width:
561
result = '...' + a_string[3-final_width:]
564
return result.ljust(final_width)
566
def report_tests_starting(self):
567
self.stream.write('running %d tests...\n' % self.num_tests)
568
super(VerboseTestResult, self).report_tests_starting()
570
def report_test_start(self, test):
571
name = self._shortened_test_description(test)
572
width = osutils.terminal_width()
573
if width is not None:
574
# width needs space for 6 char status, plus 1 for slash, plus an
575
# 11-char time string, plus a trailing blank
576
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
578
self.stream.write(self._ellipsize_to_right(name, width-18))
580
self.stream.write(name)
583
def _error_summary(self, err):
585
return '%s%s' % (indent, err[1])
587
def report_error(self, test, err):
588
self.stream.write('ERROR %s\n%s\n'
589
% (self._testTimeString(test),
590
self._error_summary(err)))
592
def report_failure(self, test, err):
593
self.stream.write(' FAIL %s\n%s\n'
594
% (self._testTimeString(test),
595
self._error_summary(err)))
597
def report_known_failure(self, test, err):
598
self.stream.write('XFAIL %s\n%s\n'
599
% (self._testTimeString(test),
600
self._error_summary(err)))
602
def report_success(self, test):
603
self.stream.write(' OK %s\n' % self._testTimeString(test))
604
for bench_called, stats in getattr(test, '_benchcalls', []):
605
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
606
stats.pprint(file=self.stream)
607
# flush the stream so that we get smooth output. This verbose mode is
608
# used to show the output in PQM.
611
def report_skip(self, test, reason):
612
self.stream.write(' SKIP %s\n%s\n'
613
% (self._testTimeString(test), reason))
615
def report_not_applicable(self, test, reason):
616
self.stream.write(' N/A %s\n %s\n'
617
% (self._testTimeString(test), reason))
619
def report_unsupported(self, test, feature):
620
"""test cannot be run because feature is missing."""
621
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
622
%(self._testTimeString(test), feature))
289
625
class TextTestRunner(object):
534
1315
path_stat = transport.stat(path)
535
1316
actual_mode = stat.S_IMODE(path_stat.st_mode)
536
1317
self.assertEqual(mode, actual_mode,
537
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
539
def assertIsInstance(self, obj, kls):
540
"""Fail if obj is not an instance of kls"""
1318
'mode of %r incorrect (%s != %s)'
1319
% (path, oct(mode), oct(actual_mode)))
1321
def assertIsSameRealPath(self, path1, path2):
1322
"""Fail if path1 and path2 points to different files"""
1323
self.assertEqual(osutils.realpath(path1),
1324
osutils.realpath(path2),
1325
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1327
def assertIsInstance(self, obj, kls, msg=None):
1328
"""Fail if obj is not an instance of kls
1330
:param msg: Supplementary message to show if the assertion fails.
541
1332
if not isinstance(obj, kls):
542
self.fail("%r is an instance of %s rather than %s" % (
543
obj, obj.__class__, kls))
1333
m = "%r is an instance of %s rather than %s" % (
1334
obj, obj.__class__, kls)
1339
def assertFileEqual(self, content, path):
1340
"""Fail if path does not contain 'content'."""
1341
self.failUnlessExists(path)
1342
f = file(path, 'rb')
1347
self.assertEqualDiff(content, s)
1349
def assertDocstring(self, expected_docstring, obj):
1350
"""Fail if obj does not have expected_docstring"""
1352
# With -OO the docstring should be None instead
1353
self.assertIs(obj.__doc__, None)
1355
self.assertEqual(expected_docstring, obj.__doc__)
1357
def failUnlessExists(self, path):
1358
"""Fail unless path or paths, which may be abs or relative, exist."""
1359
if not isinstance(path, basestring):
1361
self.failUnlessExists(p)
1363
self.failUnless(osutils.lexists(path),path+" does not exist")
1365
def failIfExists(self, path):
1366
"""Fail if path or paths, which may be abs or relative, exist."""
1367
if not isinstance(path, basestring):
1369
self.failIfExists(p)
1371
self.failIf(osutils.lexists(path),path+" exists")
1373
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1374
"""A helper for callDeprecated and applyDeprecated.
1376
:param a_callable: A callable to call.
1377
:param args: The positional arguments for the callable
1378
:param kwargs: The keyword arguments for the callable
1379
:return: A tuple (warnings, result). result is the result of calling
1380
a_callable(``*args``, ``**kwargs``).
1383
def capture_warnings(msg, cls=None, stacklevel=None):
1384
# we've hooked into a deprecation specific callpath,
1385
# only deprecations should getting sent via it.
1386
self.assertEqual(cls, DeprecationWarning)
1387
local_warnings.append(msg)
1388
original_warning_method = symbol_versioning.warn
1389
symbol_versioning.set_warning_method(capture_warnings)
1391
result = a_callable(*args, **kwargs)
1393
symbol_versioning.set_warning_method(original_warning_method)
1394
return (local_warnings, result)
1396
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1397
"""Call a deprecated callable without warning the user.
1399
Note that this only captures warnings raised by symbol_versioning.warn,
1400
not other callers that go direct to the warning module.
1402
To test that a deprecated method raises an error, do something like
1405
self.assertRaises(errors.ReservedId,
1406
self.applyDeprecated,
1407
deprecated_in((1, 5, 0)),
1411
:param deprecation_format: The deprecation format that the callable
1412
should have been deprecated with. This is the same type as the
1413
parameter to deprecated_method/deprecated_function. If the
1414
callable is not deprecated with this format, an assertion error
1416
:param a_callable: A callable to call. This may be a bound method or
1417
a regular function. It will be called with ``*args`` and
1419
:param args: The positional arguments for the callable
1420
:param kwargs: The keyword arguments for the callable
1421
:return: The result of a_callable(``*args``, ``**kwargs``)
1423
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1425
expected_first_warning = symbol_versioning.deprecation_string(
1426
a_callable, deprecation_format)
1427
if len(call_warnings) == 0:
1428
self.fail("No deprecation warning generated by call to %s" %
1430
self.assertEqual(expected_first_warning, call_warnings[0])
1433
def callCatchWarnings(self, fn, *args, **kw):
1434
"""Call a callable that raises python warnings.
1436
The caller's responsible for examining the returned warnings.
1438
If the callable raises an exception, the exception is not
1439
caught and propagates up to the caller. In that case, the list
1440
of warnings is not available.
1442
:returns: ([warning_object, ...], fn_result)
1444
# XXX: This is not perfect, because it completely overrides the
1445
# warnings filters, and some code may depend on suppressing particular
1446
# warnings. It's the easiest way to insulate ourselves from -Werror,
1447
# though. -- Andrew, 20071062
1449
def _catcher(message, category, filename, lineno, file=None, line=None):
1450
# despite the name, 'message' is normally(?) a Warning subclass
1452
wlist.append(message)
1453
saved_showwarning = warnings.showwarning
1454
saved_filters = warnings.filters
1456
warnings.showwarning = _catcher
1457
warnings.filters = []
1458
result = fn(*args, **kw)
1460
warnings.showwarning = saved_showwarning
1461
warnings.filters = saved_filters
1462
return wlist, result
1464
def callDeprecated(self, expected, callable, *args, **kwargs):
1465
"""Assert that a callable is deprecated in a particular way.
1467
This is a very precise test for unusual requirements. The
1468
applyDeprecated helper function is probably more suited for most tests
1469
as it allows you to simply specify the deprecation format being used
1470
and will ensure that that is issued for the function being called.
1472
Note that this only captures warnings raised by symbol_versioning.warn,
1473
not other callers that go direct to the warning module. To catch
1474
general warnings, use callCatchWarnings.
1476
:param expected: a list of the deprecation warnings expected, in order
1477
:param callable: The callable to call
1478
:param args: The positional arguments for the callable
1479
:param kwargs: The keyword arguments for the callable
1481
call_warnings, result = self._capture_deprecation_warnings(callable,
1483
self.assertEqual(expected, call_warnings)
545
1486
def _startLogFile(self):
546
1487
"""Send bzr and test log messages to a temporary file.
548
1489
The file is removed as the test is torn down.
550
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
551
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
552
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
553
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
554
self._log_file_name = name
1491
self._log_file = StringIO()
1492
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
555
1493
self.addCleanup(self._finishLogFile)
557
1495
def _finishLogFile(self):
558
1496
"""Finished with the log file.
560
Read contents into memory, close, and delete.
562
if self._log_file is None:
564
bzrlib.trace.disable_test_log(self._log_nonce)
565
self._log_file.seek(0)
566
self._log_contents = self._log_file.read()
567
self._log_file.close()
568
os.remove(self._log_file_name)
569
self._log_file = self._log_file_name = None
571
def addCleanup(self, callable):
572
"""Arrange to run a callable when this case is torn down.
574
Callables are run in the reverse of the order they are registered,
575
ie last-in first-out.
577
if callable in self._cleanups:
578
raise ValueError("cleanup function %r already registered on %s"
580
self._cleanups.append(callable)
1498
Close the file and delete it, unless setKeepLogfile was called.
1500
if bzrlib.trace._trace_file:
1501
# flush the log file, to get all content
1502
bzrlib.trace._trace_file.flush()
1503
bzrlib.trace.pop_log_file(self._log_memento)
1504
# Cache the log result and delete the file on disk
1505
self._get_log(False)
1507
def thisFailsStrictLockCheck(self):
1508
"""It is known that this test would fail with -Dstrict_locks.
1510
By default, all tests are run with strict lock checking unless
1511
-Edisable_lock_checks is supplied. However there are some tests which
1512
we know fail strict locks at this point that have not been fixed.
1513
They should call this function to disable the strict checking.
1515
This should be used sparingly, it is much better to fix the locking
1516
issues rather than papering over the problem by calling this function.
1518
debug.debug_flags.discard('strict_locks')
1520
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1521
"""Overrides an object attribute restoring it after the test.
1523
:param obj: The object that will be mutated.
1525
:param attr_name: The attribute name we want to preserve/override in
1528
:param new: The optional value we want to set the attribute to.
1530
:returns: The actual attr value.
1532
value = getattr(obj, attr_name)
1533
# The actual value is captured by the call below
1534
self.addCleanup(setattr, obj, attr_name, value)
1535
if new is not _unitialized_attr:
1536
setattr(obj, attr_name, new)
582
1539
def _cleanEnvironment(self):
1541
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
584
1542
'HOME': os.getcwd(),
585
'APPDATA': os.getcwd(),
1543
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1544
# tests do check our impls match APPDATA
1545
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1549
'BZREMAIL': None, # may still be present in the environment
1550
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1551
'BZR_PROGRESS_BAR': None,
1553
'BZR_PLUGIN_PATH': None,
1554
'BZR_DISABLE_PLUGINS': None,
1555
'BZR_PLUGINS_AT': None,
1556
'BZR_CONCURRENCY': None,
1557
# Make sure that any text ui tests are consistent regardless of
1558
# the environment the test case is run in; you may want tests that
1559
# test other combinations. 'dumb' is a reasonable guess for tests
1560
# going to a pipe or a StringIO.
1564
'BZR_COLUMNS': '80',
1566
'SSH_AUTH_SOCK': None,
1570
'https_proxy': None,
1571
'HTTPS_PROXY': None,
1576
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1577
# least. If you do (care), please update this comment
1581
'BZR_REMOTE_PATH': None,
1582
# Generally speaking, we don't want apport reporting on crashes in
1583
# the test envirnoment unless we're specifically testing apport,
1584
# so that it doesn't leak into the real system environment. We
1585
# use an env var so it propagates to subprocesses.
1586
'APPORT_DISABLE': '1',
590
1589
self.addCleanup(self._restoreEnvironment)
591
1590
for name, value in new_env.iteritems():
592
1591
self._captureVar(name, value)
595
1593
def _captureVar(self, name, newvalue):
596
"""Set an environment variable, preparing it to be reset when finished."""
597
self.__old_env[name] = os.environ.get(name, None)
599
if name in os.environ:
602
os.environ[name] = newvalue
605
def _restoreVar(name, value):
607
if name in os.environ:
610
os.environ[name] = value
1594
"""Set an environment variable, and reset it when finished."""
1595
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
612
1597
def _restoreEnvironment(self):
613
for name, value in self.__old_env.iteritems():
614
self._restoreVar(name, value)
618
unittest.TestCase.tearDown(self)
1598
for name, value in self._old_env.iteritems():
1599
osutils.set_or_unset_env(name, value)
1601
def _restoreHooks(self):
1602
for klass, (name, hooks) in self._preserved_hooks.items():
1603
setattr(klass, name, hooks)
1605
def knownFailure(self, reason):
1606
"""This test has failed for some known reason."""
1607
raise KnownFailure(reason)
1609
def _suppress_log(self):
1610
"""Remove the log info from details."""
1611
self.discardDetail('log')
1613
def _do_skip(self, result, reason):
1614
self._suppress_log()
1615
addSkip = getattr(result, 'addSkip', None)
1616
if not callable(addSkip):
1617
result.addSuccess(result)
1619
addSkip(self, reason)
1622
def _do_known_failure(self, result, e):
1623
self._suppress_log()
1624
err = sys.exc_info()
1625
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1626
if addExpectedFailure is not None:
1627
addExpectedFailure(self, err)
1629
result.addSuccess(self)
1632
def _do_not_applicable(self, result, e):
1634
reason = 'No reason given'
1637
self._suppress_log ()
1638
addNotApplicable = getattr(result, 'addNotApplicable', None)
1639
if addNotApplicable is not None:
1640
result.addNotApplicable(self, reason)
1642
self._do_skip(result, reason)
1645
def _report_skip(self, result, err):
1646
"""Override the default _report_skip.
1648
We want to strip the 'log' detail. If we waint until _do_skip, it has
1649
already been formatted into the 'reason' string, and we can't pull it
1652
self._suppress_log()
1653
super(TestCase, self)._report_skip(self, result, err)
1656
def _report_expected_failure(self, result, err):
1659
See _report_skip for motivation.
1661
self._suppress_log()
1662
super(TestCase, self)._report_expected_failure(self, result, err)
1665
def _do_unsupported_or_skip(self, result, e):
1667
self._suppress_log()
1668
addNotSupported = getattr(result, 'addNotSupported', None)
1669
if addNotSupported is not None:
1670
result.addNotSupported(self, reason)
1672
self._do_skip(result, reason)
620
1674
def time(self, callable, *args, **kwargs):
621
1675
"""Run callable and accrue the time it takes to the benchmark time.
623
1677
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
624
1678
this will cause lsprofile statistics to be gathered and stored in
625
1679
self._benchcalls.
627
1681
if self._benchtime is None:
1682
self.addDetail('benchtime', content.Content(content.ContentType(
1683
"text", "plain"), lambda:[str(self._benchtime)]))
628
1684
self._benchtime = 0
629
1685
start = time.time()
853
2103
sys.stderr = real_stderr
854
2104
sys.stdin = real_stdin
856
def merge(self, branch_from, wt_to):
857
"""A helper for tests to do a ui-less merge.
859
This should move to the main library when someone has time to integrate
862
# minimal ui-less merge.
863
wt_to.branch.fetch(branch_from)
864
base_rev = common_ancestor(branch_from.last_revision(),
865
wt_to.branch.last_revision(),
866
wt_to.branch.repository)
867
merge_inner(wt_to.branch, branch_from.basis_tree(),
868
wt_to.branch.repository.revision_tree(base_rev),
870
wt_to.add_pending_merge(branch_from.last_revision())
873
BzrTestBase = TestCase
876
class TestCaseInTempDir(TestCase):
2106
def reduceLockdirTimeout(self):
2107
"""Reduce the default lock timeout for the duration of the test, so that
2108
if LockContention occurs during a test, it does so quickly.
2110
Tests that expect to provoke LockContention errors should call this.
2112
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2114
def make_utf8_encoded_stringio(self, encoding_type=None):
2115
"""Return a StringIOWrapper instance, that will encode Unicode
2118
if encoding_type is None:
2119
encoding_type = 'strict'
2121
output_encoding = 'utf-8'
2122
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2123
sio.encoding = output_encoding
2126
def disable_verb(self, verb):
2127
"""Disable a smart server verb for one test."""
2128
from bzrlib.smart import request
2129
request_handlers = request.request_handlers
2130
orig_method = request_handlers.get(verb)
2131
request_handlers.remove(verb)
2132
self.addCleanup(request_handlers.register, verb, orig_method)
2135
class CapturedCall(object):
2136
"""A helper for capturing smart server calls for easy debug analysis."""
2138
def __init__(self, params, prefix_length):
2139
"""Capture the call with params and skip prefix_length stack frames."""
2142
# The last 5 frames are the __init__, the hook frame, and 3 smart
2143
# client frames. Beyond this we could get more clever, but this is good
2145
stack = traceback.extract_stack()[prefix_length:-5]
2146
self.stack = ''.join(traceback.format_list(stack))
2149
return self.call.method
2152
return self.call.method
2158
class TestCaseWithMemoryTransport(TestCase):
2159
"""Common test class for tests that do not need disk resources.
2161
Tests that need disk resources should derive from TestCaseWithTransport.
2163
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2165
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2166
a directory which does not exist. This serves to help ensure test isolation
2167
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2168
must exist. However, TestCaseWithMemoryTransport does not offer local
2169
file defaults for the transport in tests, nor does it obey the command line
2170
override, so tests that accidentally write to the common directory should
2173
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2174
a .bzr directory that stops us ascending higher into the filesystem.
2180
def __init__(self, methodName='runTest'):
2181
# allow test parameterization after test construction and before test
2182
# execution. Variables that the parameterizer sets need to be
2183
# ones that are not set by setUp, or setUp will trash them.
2184
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2185
self.vfs_transport_factory = default_transport
2186
self.transport_server = None
2187
self.transport_readonly_server = None
2188
self.__vfs_server = None
2190
def get_transport(self, relpath=None):
2191
"""Return a writeable transport.
2193
This transport is for the test scratch space relative to
2196
:param relpath: a path relative to the base url.
2198
t = _mod_transport.get_transport(self.get_url(relpath))
2199
self.assertFalse(t.is_readonly())
2202
def get_readonly_transport(self, relpath=None):
2203
"""Return a readonly transport for the test scratch space
2205
This can be used to test that operations which should only need
2206
readonly access in fact do not try to write.
2208
:param relpath: a path relative to the base url.
2210
t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2211
self.assertTrue(t.is_readonly())
2214
def create_transport_readonly_server(self):
2215
"""Create a transport server from class defined at init.
2217
This is mostly a hook for daughter classes.
2219
return self.transport_readonly_server()
2221
def get_readonly_server(self):
2222
"""Get the server instance for the readonly transport
2224
This is useful for some tests with specific servers to do diagnostics.
2226
if self.__readonly_server is None:
2227
if self.transport_readonly_server is None:
2228
# readonly decorator requested
2229
self.__readonly_server = test_server.ReadonlyServer()
2231
# explicit readonly transport.
2232
self.__readonly_server = self.create_transport_readonly_server()
2233
self.start_server(self.__readonly_server,
2234
self.get_vfs_only_server())
2235
return self.__readonly_server
2237
def get_readonly_url(self, relpath=None):
2238
"""Get a URL for the readonly transport.
2240
This will either be backed by '.' or a decorator to the transport
2241
used by self.get_url()
2242
relpath provides for clients to get a path relative to the base url.
2243
These should only be downwards relative, not upwards.
2245
base = self.get_readonly_server().get_url()
2246
return self._adjust_url(base, relpath)
2248
def get_vfs_only_server(self):
2249
"""Get the vfs only read/write server instance.
2251
This is useful for some tests with specific servers that need
2254
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2255
is no means to override it.
2257
if self.__vfs_server is None:
2258
self.__vfs_server = memory.MemoryServer()
2259
self.start_server(self.__vfs_server)
2260
return self.__vfs_server
2262
def get_server(self):
2263
"""Get the read/write server instance.
2265
This is useful for some tests with specific servers that need
2268
This is built from the self.transport_server factory. If that is None,
2269
then the self.get_vfs_server is returned.
2271
if self.__server is None:
2272
if (self.transport_server is None or self.transport_server is
2273
self.vfs_transport_factory):
2274
self.__server = self.get_vfs_only_server()
2276
# bring up a decorated means of access to the vfs only server.
2277
self.__server = self.transport_server()
2278
self.start_server(self.__server, self.get_vfs_only_server())
2279
return self.__server
2281
def _adjust_url(self, base, relpath):
2282
"""Get a URL (or maybe a path) for the readwrite transport.
2284
This will either be backed by '.' or to an equivalent non-file based
2286
relpath provides for clients to get a path relative to the base url.
2287
These should only be downwards relative, not upwards.
2289
if relpath is not None and relpath != '.':
2290
if not base.endswith('/'):
2292
# XXX: Really base should be a url; we did after all call
2293
# get_url()! But sometimes it's just a path (from
2294
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2295
# to a non-escaped local path.
2296
if base.startswith('./') or base.startswith('/'):
2299
base += urlutils.escape(relpath)
2302
def get_url(self, relpath=None):
2303
"""Get a URL (or maybe a path) for the readwrite transport.
2305
This will either be backed by '.' or to an equivalent non-file based
2307
relpath provides for clients to get a path relative to the base url.
2308
These should only be downwards relative, not upwards.
2310
base = self.get_server().get_url()
2311
return self._adjust_url(base, relpath)
2313
def get_vfs_only_url(self, relpath=None):
2314
"""Get a URL (or maybe a path for the plain old vfs transport.
2316
This will never be a smart protocol. It always has all the
2317
capabilities of the local filesystem, but it might actually be a
2318
MemoryTransport or some other similar virtual filesystem.
2320
This is the backing transport (if any) of the server returned by
2321
get_url and get_readonly_url.
2323
:param relpath: provides for clients to get a path relative to the base
2324
url. These should only be downwards relative, not upwards.
2327
base = self.get_vfs_only_server().get_url()
2328
return self._adjust_url(base, relpath)
2330
def _create_safety_net(self):
2331
"""Make a fake bzr directory.
2333
This prevents any tests propagating up onto the TEST_ROOT directory's
2336
root = TestCaseWithMemoryTransport.TEST_ROOT
2337
bzrdir.BzrDir.create_standalone_workingtree(root)
2339
def _check_safety_net(self):
2340
"""Check that the safety .bzr directory have not been touched.
2342
_make_test_root have created a .bzr directory to prevent tests from
2343
propagating. This method ensures than a test did not leaked.
2345
root = TestCaseWithMemoryTransport.TEST_ROOT
2346
self.permit_url(_mod_transport.get_transport(root).base)
2347
wt = workingtree.WorkingTree.open(root)
2348
last_rev = wt.last_revision()
2349
if last_rev != 'null:':
2350
# The current test have modified the /bzr directory, we need to
2351
# recreate a new one or all the followng tests will fail.
2352
# If you need to inspect its content uncomment the following line
2353
# import pdb; pdb.set_trace()
2354
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2355
self._create_safety_net()
2356
raise AssertionError('%s/.bzr should not be modified' % root)
2358
def _make_test_root(self):
2359
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2360
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2361
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2363
TestCaseWithMemoryTransport.TEST_ROOT = root
2365
self._create_safety_net()
2367
# The same directory is used by all tests, and we're not
2368
# specifically told when all tests are finished. This will do.
2369
atexit.register(_rmtree_temp_dir, root)
2371
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2372
self.addCleanup(self._check_safety_net)
2374
def makeAndChdirToTestDir(self):
2375
"""Create a temporary directories for this one test.
2377
This must set self.test_home_dir and self.test_dir and chdir to
2380
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2382
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2383
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2384
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2385
self.permit_dir(self.test_dir)
2387
def make_branch(self, relpath, format=None):
2388
"""Create a branch on the transport at relpath."""
2389
repo = self.make_repository(relpath, format=format)
2390
return repo.bzrdir.create_branch()
2392
def make_bzrdir(self, relpath, format=None):
2394
# might be a relative or absolute path
2395
maybe_a_url = self.get_url(relpath)
2396
segments = maybe_a_url.rsplit('/', 1)
2397
t = _mod_transport.get_transport(maybe_a_url)
2398
if len(segments) > 1 and segments[-1] not in ('', '.'):
2402
if isinstance(format, basestring):
2403
format = bzrdir.format_registry.make_bzrdir(format)
2404
return format.initialize_on_transport(t)
2405
except errors.UninitializableFormat:
2406
raise TestSkipped("Format %s is not initializable." % format)
2408
def make_repository(self, relpath, shared=False, format=None):
2409
"""Create a repository on our default transport at relpath.
2411
Note that relpath must be a relative path, not a full url.
2413
# FIXME: If you create a remoterepository this returns the underlying
2414
# real format, which is incorrect. Actually we should make sure that
2415
# RemoteBzrDir returns a RemoteRepository.
2416
# maybe mbp 20070410
2417
made_control = self.make_bzrdir(relpath, format=format)
2418
return made_control.create_repository(shared=shared)
2420
def make_smart_server(self, path, backing_server=None):
2421
if backing_server is None:
2422
backing_server = self.get_server()
2423
smart_server = test_server.SmartTCPServer_for_testing()
2424
self.start_server(smart_server, backing_server)
2425
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2427
return remote_transport
2429
def make_branch_and_memory_tree(self, relpath, format=None):
2430
"""Create a branch on the default transport and a MemoryTree for it."""
2431
b = self.make_branch(relpath, format=format)
2432
return memorytree.MemoryTree.create_on_branch(b)
2434
def make_branch_builder(self, relpath, format=None):
2435
branch = self.make_branch(relpath, format=format)
2436
return branchbuilder.BranchBuilder(branch=branch)
2438
def overrideEnvironmentForTesting(self):
2439
test_home_dir = self.test_home_dir
2440
if isinstance(test_home_dir, unicode):
2441
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2442
os.environ['HOME'] = test_home_dir
2443
os.environ['BZR_HOME'] = test_home_dir
2446
super(TestCaseWithMemoryTransport, self).setUp()
2447
# Ensure that ConnectedTransport doesn't leak sockets
2448
def get_transport_with_cleanup(*args, **kwargs):
2449
t = orig_get_transport(*args, **kwargs)
2450
if isinstance(t, _mod_transport.ConnectedTransport):
2451
self.addCleanup(t.disconnect)
2454
orig_get_transport = self.overrideAttr(_mod_transport, '_get_transport',
2455
get_transport_with_cleanup)
2456
self._make_test_root()
2457
self.addCleanup(os.chdir, os.getcwdu())
2458
self.makeAndChdirToTestDir()
2459
self.overrideEnvironmentForTesting()
2460
self.__readonly_server = None
2461
self.__server = None
2462
self.reduceLockdirTimeout()
2464
def setup_smart_server_with_call_log(self):
2465
"""Sets up a smart server as the transport server with a call log."""
2466
self.transport_server = test_server.SmartTCPServer_for_testing
2467
self.hpss_calls = []
2469
# Skip the current stack down to the caller of
2470
# setup_smart_server_with_call_log
2471
prefix_length = len(traceback.extract_stack()) - 2
2472
def capture_hpss_call(params):
2473
self.hpss_calls.append(
2474
CapturedCall(params, prefix_length))
2475
client._SmartClient.hooks.install_named_hook(
2476
'call', capture_hpss_call, None)
2478
def reset_smart_call_log(self):
2479
self.hpss_calls = []
2482
class TestCaseInTempDir(TestCaseWithMemoryTransport):
877
2483
"""Derived class that runs a test within a temporary directory.
879
2485
This is useful for tests that need to create a branch, etc.
1168
2745
for readonly urls.
1170
2747
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1171
be used without needed to redo it when a different
2748
be used without needed to redo it when a different
1172
2749
subclass is in use ?
1175
2752
def setUp(self):
2753
from bzrlib.tests import http_server
1176
2754
super(ChrootedTestCase, self).setUp()
1177
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1178
self.transport_readonly_server = bzrlib.transport.http.HttpServer
2755
if not self.vfs_transport_factory == memory.MemoryServer:
2756
self.transport_readonly_server = http_server.HttpServer
2759
def condition_id_re(pattern):
2760
"""Create a condition filter which performs a re check on a test's id.
2762
:param pattern: A regular expression string.
2763
:return: A callable that returns True if the re matches.
2765
filter_re = re.compile(pattern, 0)
2766
def condition(test):
2768
return filter_re.search(test_id)
2772
def condition_isinstance(klass_or_klass_list):
2773
"""Create a condition filter which returns isinstance(param, klass).
2775
:return: A callable which when called with one parameter obj return the
2776
result of isinstance(obj, klass_or_klass_list).
2779
return isinstance(obj, klass_or_klass_list)
2783
def condition_id_in_list(id_list):
2784
"""Create a condition filter which verify that test's id in a list.
2786
:param id_list: A TestIdList object.
2787
:return: A callable that returns True if the test's id appears in the list.
2789
def condition(test):
2790
return id_list.includes(test.id())
2794
def condition_id_startswith(starts):
2795
"""Create a condition filter verifying that test's id starts with a string.
2797
:param starts: A list of string.
2798
:return: A callable that returns True if the test's id starts with one of
2801
def condition(test):
2802
for start in starts:
2803
if test.id().startswith(start):
2809
def exclude_tests_by_condition(suite, condition):
2810
"""Create a test suite which excludes some tests from suite.
2812
:param suite: The suite to get tests from.
2813
:param condition: A callable whose result evaluates True when called with a
2814
test case which should be excluded from the result.
2815
:return: A suite which contains the tests found in suite that fail
2819
for test in iter_suite_tests(suite):
2820
if not condition(test):
2822
return TestUtil.TestSuite(result)
2825
def filter_suite_by_condition(suite, condition):
2826
"""Create a test suite by filtering another one.
2828
:param suite: The source suite.
2829
:param condition: A callable whose result evaluates True when called with a
2830
test case which should be included in the result.
2831
:return: A suite which contains the tests found in suite that pass
2835
for test in iter_suite_tests(suite):
2838
return TestUtil.TestSuite(result)
1181
2841
def filter_suite_by_re(suite, pattern):
1182
result = TestUtil.TestSuite()
1183
filter_re = re.compile(pattern)
2842
"""Create a test suite by filtering another one.
2844
:param suite: the source suite
2845
:param pattern: pattern that names must match
2846
:returns: the newly created suite
2848
condition = condition_id_re(pattern)
2849
result_suite = filter_suite_by_condition(suite, condition)
2853
def filter_suite_by_id_list(suite, test_id_list):
2854
"""Create a test suite by filtering another one.
2856
:param suite: The source suite.
2857
:param test_id_list: A list of the test ids to keep as strings.
2858
:returns: the newly created suite
2860
condition = condition_id_in_list(test_id_list)
2861
result_suite = filter_suite_by_condition(suite, condition)
2865
def filter_suite_by_id_startswith(suite, start):
2866
"""Create a test suite by filtering another one.
2868
:param suite: The source suite.
2869
:param start: A list of string the test id must start with one of.
2870
:returns: the newly created suite
2872
condition = condition_id_startswith(start)
2873
result_suite = filter_suite_by_condition(suite, condition)
2877
def exclude_tests_by_re(suite, pattern):
2878
"""Create a test suite which excludes some tests from suite.
2880
:param suite: The suite to get tests from.
2881
:param pattern: A regular expression string. Test ids that match this
2882
pattern will be excluded from the result.
2883
:return: A TestSuite that contains all the tests from suite without the
2884
tests that matched pattern. The order of tests is the same as it was in
2887
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2890
def preserve_input(something):
2891
"""A helper for performing test suite transformation chains.
2893
:param something: Anything you want to preserve.
2899
def randomize_suite(suite):
2900
"""Return a new TestSuite with suite's tests in random order.
2902
The tests in the input suite are flattened into a single suite in order to
2903
accomplish this. Any nested TestSuites are removed to provide global
2906
tests = list(iter_suite_tests(suite))
2907
random.shuffle(tests)
2908
return TestUtil.TestSuite(tests)
2911
def split_suite_by_condition(suite, condition):
2912
"""Split a test suite into two by a condition.
2914
:param suite: The suite to split.
2915
:param condition: The condition to match on. Tests that match this
2916
condition are returned in the first test suite, ones that do not match
2917
are in the second suite.
2918
:return: A tuple of two test suites, where the first contains tests from
2919
suite matching the condition, and the second contains the remainder
2920
from suite. The order within each output suite is the same as it was in
1184
2925
for test in iter_suite_tests(suite):
1185
if filter_re.search(test.id()):
1186
result.addTest(test)
2927
matched.append(test)
2929
did_not_match.append(test)
2930
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2933
def split_suite_by_re(suite, pattern):
2934
"""Split a test suite into two by a regular expression.
2936
:param suite: The suite to split.
2937
:param pattern: A regular expression string. Test ids that match this
2938
pattern will be in the first test suite returned, and the others in the
2939
second test suite returned.
2940
:return: A tuple of two test suites, where the first contains tests from
2941
suite matching pattern, and the second contains the remainder from
2942
suite. The order within each output suite is the same as it was in
2945
return split_suite_by_condition(suite, condition_id_re(pattern))
1190
2948
def run_suite(suite, name='test', verbose=False, pattern=".*",
1191
stop_on_failure=False, keep_output=False,
1192
transport=None, lsprof_timed=None):
1193
TestCaseInTempDir._TEST_NAME = name
2949
stop_on_failure=False,
2950
transport=None, lsprof_timed=None, bench_history=None,
2951
matching_tests_first=None,
2954
exclude_pattern=None,
2957
suite_decorators=None,
2959
result_decorators=None,
2961
"""Run a test suite for bzr selftest.
2963
:param runner_class: The class of runner to use. Must support the
2964
constructor arguments passed by run_suite which are more than standard
2966
:return: A boolean indicating success.
1194
2968
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1200
pb = progress.ProgressBar()
1201
runner = TextTestRunner(stream=sys.stdout,
2973
if runner_class is None:
2974
runner_class = TextTestRunner
2977
runner = runner_class(stream=stream,
1202
2978
descriptions=0,
1203
2979
verbosity=verbosity,
1204
keep_output=keep_output,
2980
bench_history=bench_history,
2982
result_decorators=result_decorators,
1206
2984
runner.stop_on_failure=stop_on_failure
1208
suite = filter_suite_by_re(suite, pattern)
2985
# built in decorator factories:
2987
random_order(random_seed, runner),
2988
exclude_tests(exclude_pattern),
2990
if matching_tests_first:
2991
decorators.append(tests_first(pattern))
2993
decorators.append(filter_tests(pattern))
2994
if suite_decorators:
2995
decorators.extend(suite_decorators)
2996
# tell the result object how many tests will be running: (except if
2997
# --parallel=fork is being used. Robert said he will provide a better
2998
# progress design later -- vila 20090817)
2999
if fork_decorator not in decorators:
3000
decorators.append(CountingDecorator)
3001
for decorator in decorators:
3002
suite = decorator(suite)
3004
# Done after test suite decoration to allow randomisation etc
3005
# to take effect, though that is of marginal benefit.
3007
stream.write("Listing tests only ...\n")
3008
for t in iter_suite_tests(suite):
3009
stream.write("%s\n" % (t.id()))
1209
3011
result = runner.run(suite)
1210
return result.wasSuccessful()
3013
return result.wasStrictlySuccessful()
3015
return result.wasSuccessful()
3018
# A registry where get() returns a suite decorator.
3019
parallel_registry = registry.Registry()
3022
def fork_decorator(suite):
3023
if getattr(os, "fork", None) is None:
3024
raise errors.BzrCommandError("platform does not support fork,"
3025
" try --parallel=subprocess instead.")
3026
concurrency = osutils.local_concurrency()
3027
if concurrency == 1:
3029
from testtools import ConcurrentTestSuite
3030
return ConcurrentTestSuite(suite, fork_for_tests)
3031
parallel_registry.register('fork', fork_decorator)
3034
def subprocess_decorator(suite):
3035
concurrency = osutils.local_concurrency()
3036
if concurrency == 1:
3038
from testtools import ConcurrentTestSuite
3039
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3040
parallel_registry.register('subprocess', subprocess_decorator)
3043
def exclude_tests(exclude_pattern):
3044
"""Return a test suite decorator that excludes tests."""
3045
if exclude_pattern is None:
3046
return identity_decorator
3047
def decorator(suite):
3048
return ExcludeDecorator(suite, exclude_pattern)
3052
def filter_tests(pattern):
3054
return identity_decorator
3055
def decorator(suite):
3056
return FilterTestsDecorator(suite, pattern)
3060
def random_order(random_seed, runner):
3061
"""Return a test suite decorator factory for randomising tests order.
3063
:param random_seed: now, a string which casts to a long, or a long.
3064
:param runner: A test runner with a stream attribute to report on.
3066
if random_seed is None:
3067
return identity_decorator
3068
def decorator(suite):
3069
return RandomDecorator(suite, random_seed, runner.stream)
3073
def tests_first(pattern):
3075
return identity_decorator
3076
def decorator(suite):
3077
return TestFirstDecorator(suite, pattern)
3081
def identity_decorator(suite):
3086
class TestDecorator(TestUtil.TestSuite):
3087
"""A decorator for TestCase/TestSuite objects.
3089
Usually, subclasses should override __iter__(used when flattening test
3090
suites), which we do to filter, reorder, parallelise and so on, run() and
3094
def __init__(self, suite):
3095
TestUtil.TestSuite.__init__(self)
3098
def countTestCases(self):
3101
cases += test.countTestCases()
3108
def run(self, result):
3109
# Use iteration on self, not self._tests, to allow subclasses to hook
3112
if result.shouldStop:
3118
class CountingDecorator(TestDecorator):
3119
"""A decorator which calls result.progress(self.countTestCases)."""
3121
def run(self, result):
3122
progress_method = getattr(result, 'progress', None)
3123
if callable(progress_method):
3124
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3125
return super(CountingDecorator, self).run(result)
3128
class ExcludeDecorator(TestDecorator):
3129
"""A decorator which excludes test matching an exclude pattern."""
3131
def __init__(self, suite, exclude_pattern):
3132
TestDecorator.__init__(self, suite)
3133
self.exclude_pattern = exclude_pattern
3134
self.excluded = False
3138
return iter(self._tests)
3139
self.excluded = True
3140
suite = exclude_tests_by_re(self, self.exclude_pattern)
3142
self.addTests(suite)
3143
return iter(self._tests)
3146
class FilterTestsDecorator(TestDecorator):
3147
"""A decorator which filters tests to those matching a pattern."""
3149
def __init__(self, suite, pattern):
3150
TestDecorator.__init__(self, suite)
3151
self.pattern = pattern
3152
self.filtered = False
3156
return iter(self._tests)
3157
self.filtered = True
3158
suite = filter_suite_by_re(self, self.pattern)
3160
self.addTests(suite)
3161
return iter(self._tests)
3164
class RandomDecorator(TestDecorator):
3165
"""A decorator which randomises the order of its tests."""
3167
def __init__(self, suite, random_seed, stream):
3168
TestDecorator.__init__(self, suite)
3169
self.random_seed = random_seed
3170
self.randomised = False
3171
self.stream = stream
3175
return iter(self._tests)
3176
self.randomised = True
3177
self.stream.write("Randomizing test order using seed %s\n\n" %
3178
(self.actual_seed()))
3179
# Initialise the random number generator.
3180
random.seed(self.actual_seed())
3181
suite = randomize_suite(self)
3183
self.addTests(suite)
3184
return iter(self._tests)
3186
def actual_seed(self):
3187
if self.random_seed == "now":
3188
# We convert the seed to a long to make it reuseable across
3189
# invocations (because the user can reenter it).
3190
self.random_seed = long(time.time())
3192
# Convert the seed to a long if we can
3194
self.random_seed = long(self.random_seed)
3197
return self.random_seed
3200
class TestFirstDecorator(TestDecorator):
3201
"""A decorator which moves named tests to the front."""
3203
def __init__(self, suite, pattern):
3204
TestDecorator.__init__(self, suite)
3205
self.pattern = pattern
3206
self.filtered = False
3210
return iter(self._tests)
3211
self.filtered = True
3212
suites = split_suite_by_re(self, self.pattern)
3214
self.addTests(suites)
3215
return iter(self._tests)
3218
def partition_tests(suite, count):
3219
"""Partition suite into count lists of tests."""
3220
# This just assigns tests in a round-robin fashion. On one hand this
3221
# splits up blocks of related tests that might run faster if they shared
3222
# resources, but on the other it avoids assigning blocks of slow tests to
3223
# just one partition. So the slowest partition shouldn't be much slower
3225
partitions = [list() for i in range(count)]
3226
tests = iter_suite_tests(suite)
3227
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3228
partition.append(test)
3232
def workaround_zealous_crypto_random():
3233
"""Crypto.Random want to help us being secure, but we don't care here.
3235
This workaround some test failure related to the sftp server. Once paramiko
3236
stop using the controversial API in Crypto.Random, we may get rid of it.
3239
from Crypto.Random import atfork
3245
def fork_for_tests(suite):
3246
"""Take suite and start up one runner per CPU by forking()
3248
:return: An iterable of TestCase-like objects which can each have
3249
run(result) called on them to feed tests to result.
3251
concurrency = osutils.local_concurrency()
3253
from subunit import TestProtocolClient, ProtocolTestCase
3254
from subunit.test_results import AutoTimingTestResultDecorator
3255
class TestInOtherProcess(ProtocolTestCase):
3256
# Should be in subunit, I think. RBC.
3257
def __init__(self, stream, pid):
3258
ProtocolTestCase.__init__(self, stream)
3261
def run(self, result):
3263
ProtocolTestCase.run(self, result)
3265
os.waitpid(self.pid, 0)
3267
test_blocks = partition_tests(suite, concurrency)
3268
for process_tests in test_blocks:
3269
process_suite = TestUtil.TestSuite()
3270
process_suite.addTests(process_tests)
3271
c2pread, c2pwrite = os.pipe()
3274
workaround_zealous_crypto_random()
3277
# Leave stderr and stdout open so we can see test noise
3278
# Close stdin so that the child goes away if it decides to
3279
# read from stdin (otherwise its a roulette to see what
3280
# child actually gets keystrokes for pdb etc).
3283
stream = os.fdopen(c2pwrite, 'wb', 1)
3284
subunit_result = AutoTimingTestResultDecorator(
3285
TestProtocolClient(stream))
3286
process_suite.run(subunit_result)
3291
stream = os.fdopen(c2pread, 'rb', 1)
3292
test = TestInOtherProcess(stream, pid)
3297
def reinvoke_for_tests(suite):
3298
"""Take suite and start up one runner per CPU using subprocess().
3300
:return: An iterable of TestCase-like objects which can each have
3301
run(result) called on them to feed tests to result.
3303
concurrency = osutils.local_concurrency()
3305
from subunit import ProtocolTestCase
3306
class TestInSubprocess(ProtocolTestCase):
3307
def __init__(self, process, name):
3308
ProtocolTestCase.__init__(self, process.stdout)
3309
self.process = process
3310
self.process.stdin.close()
3313
def run(self, result):
3315
ProtocolTestCase.run(self, result)
3318
os.unlink(self.name)
3319
# print "pid %d finished" % finished_process
3320
test_blocks = partition_tests(suite, concurrency)
3321
for process_tests in test_blocks:
3322
# ugly; currently reimplement rather than reuses TestCase methods.
3323
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3324
if not os.path.isfile(bzr_path):
3325
# We are probably installed. Assume sys.argv is the right file
3326
bzr_path = sys.argv[0]
3327
bzr_path = [bzr_path]
3328
if sys.platform == "win32":
3329
# if we're on windows, we can't execute the bzr script directly
3330
bzr_path = [sys.executable] + bzr_path
3331
fd, test_list_file_name = tempfile.mkstemp()
3332
test_list_file = os.fdopen(fd, 'wb', 1)
3333
for test in process_tests:
3334
test_list_file.write(test.id() + '\n')
3335
test_list_file.close()
3337
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3339
if '--no-plugins' in sys.argv:
3340
argv.append('--no-plugins')
3341
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3342
# noise on stderr it can interrupt the subunit protocol.
3343
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3344
stdout=subprocess.PIPE,
3345
stderr=subprocess.PIPE,
3347
test = TestInSubprocess(process, test_list_file_name)
3350
os.unlink(test_list_file_name)
3355
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3356
"""Generate profiling data for all activity between start and success.
3358
The profile data is appended to the test's _benchcalls attribute and can
3359
be accessed by the forwarded-to TestResult.
3361
While it might be cleaner do accumulate this in stopTest, addSuccess is
3362
where our existing output support for lsprof is, and this class aims to
3363
fit in with that: while it could be moved it's not necessary to accomplish
3364
test profiling, nor would it be dramatically cleaner.
3367
def startTest(self, test):
3368
self.profiler = bzrlib.lsprof.BzrProfiler()
3369
# Prevent deadlocks in tests that use lsprof: those tests will
3371
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3372
self.profiler.start()
3373
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3375
def addSuccess(self, test):
3376
stats = self.profiler.stop()
3378
calls = test._benchcalls
3379
except AttributeError:
3380
test._benchcalls = []
3381
calls = test._benchcalls
3382
calls.append(((test.id(), "", ""), stats))
3383
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3385
def stopTest(self, test):
3386
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3387
self.profiler = None
3390
# Controlled by "bzr selftest -E=..." option
3391
# Currently supported:
3392
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3393
# preserves any flags supplied at the command line.
3394
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3395
# rather than failing tests. And no longer raise
3396
# LockContention when fctnl locks are not being used
3397
# with proper exclusion rules.
3398
# -Ethreads Will display thread ident at creation/join time to
3399
# help track thread leaks
3400
selftest_debug_flags = set()
1213
3403
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1215
3404
transport=None,
1216
3405
test_suite_factory=None,
3408
matching_tests_first=None,
3411
exclude_pattern=None,
3417
suite_decorators=None,
1218
3421
"""Run the whole test suite under the enhanced runner"""
3422
# XXX: Very ugly way to do this...
3423
# Disable warning about old formats because we don't want it to disturb
3424
# any blackbox tests.
3425
from bzrlib import repository
3426
repository._deprecation_warning_done = True
1219
3428
global default_transport
1220
3429
if transport is None:
1221
3430
transport = default_transport
1222
3431
old_transport = default_transport
1223
3432
default_transport = transport
3433
global selftest_debug_flags
3434
old_debug_flags = selftest_debug_flags
3435
if debug_flags is not None:
3436
selftest_debug_flags = set(debug_flags)
3438
if load_list is None:
3441
keep_only = load_test_id_list(load_list)
3443
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3444
for start in starting_with]
1225
3445
if test_suite_factory is None:
1226
suite = test_suite()
3446
# Reduce loading time by loading modules based on the starting_with
3448
suite = test_suite(keep_only, starting_with)
1228
3450
suite = test_suite_factory()
3452
# But always filter as requested.
3453
suite = filter_suite_by_id_startswith(suite, starting_with)
3454
result_decorators = []
3456
result_decorators.append(ProfileResult)
1229
3457
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1230
stop_on_failure=stop_on_failure, keep_output=keep_output,
3458
stop_on_failure=stop_on_failure,
1231
3459
transport=transport,
1232
lsprof_timed=lsprof_timed)
3460
lsprof_timed=lsprof_timed,
3461
bench_history=bench_history,
3462
matching_tests_first=matching_tests_first,
3463
list_only=list_only,
3464
random_seed=random_seed,
3465
exclude_pattern=exclude_pattern,
3467
runner_class=runner_class,
3468
suite_decorators=suite_decorators,
3470
result_decorators=result_decorators,
1234
3473
default_transport = old_transport
3474
selftest_debug_flags = old_debug_flags
3477
def load_test_id_list(file_name):
3478
"""Load a test id list from a text file.
3480
The format is one test id by line. No special care is taken to impose
3481
strict rules, these test ids are used to filter the test suite so a test id
3482
that do not match an existing test will do no harm. This allows user to add
3483
comments, leave blank lines, etc.
3487
ftest = open(file_name, 'rt')
3489
if e.errno != errno.ENOENT:
3492
raise errors.NoSuchFile(file_name)
3494
for test_name in ftest.readlines():
3495
test_list.append(test_name.strip())
3500
def suite_matches_id_list(test_suite, id_list):
3501
"""Warns about tests not appearing or appearing more than once.
3503
:param test_suite: A TestSuite object.
3504
:param test_id_list: The list of test ids that should be found in
3507
:return: (absents, duplicates) absents is a list containing the test found
3508
in id_list but not in test_suite, duplicates is a list containing the
3509
test found multiple times in test_suite.
3511
When using a prefined test id list, it may occurs that some tests do not
3512
exist anymore or that some tests use the same id. This function warns the
3513
tester about potential problems in his workflow (test lists are volatile)
3514
or in the test suite itself (using the same id for several tests does not
3515
help to localize defects).
3517
# Build a dict counting id occurrences
3519
for test in iter_suite_tests(test_suite):
3521
tests[id] = tests.get(id, 0) + 1
3526
occurs = tests.get(id, 0)
3528
not_found.append(id)
3530
duplicates.append(id)
3532
return not_found, duplicates
3535
class TestIdList(object):
3536
"""Test id list to filter a test suite.
3538
Relying on the assumption that test ids are built as:
3539
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3540
notation, this class offers methods to :
3541
- avoid building a test suite for modules not refered to in the test list,
3542
- keep only the tests listed from the module test suite.
3545
def __init__(self, test_id_list):
3546
# When a test suite needs to be filtered against us we compare test ids
3547
# for equality, so a simple dict offers a quick and simple solution.
3548
self.tests = dict().fromkeys(test_id_list, True)
3550
# While unittest.TestCase have ids like:
3551
# <module>.<class>.<method>[(<param+)],
3552
# doctest.DocTestCase can have ids like:
3555
# <module>.<function>
3556
# <module>.<class>.<method>
3558
# Since we can't predict a test class from its name only, we settle on
3559
# a simple constraint: a test id always begins with its module name.
3562
for test_id in test_id_list:
3563
parts = test_id.split('.')
3564
mod_name = parts.pop(0)
3565
modules[mod_name] = True
3567
mod_name += '.' + part
3568
modules[mod_name] = True
3569
self.modules = modules
3571
def refers_to(self, module_name):
3572
"""Is there tests for the module or one of its sub modules."""
3573
return self.modules.has_key(module_name)
3575
def includes(self, test_id):
3576
return self.tests.has_key(test_id)
3579
class TestPrefixAliasRegistry(registry.Registry):
3580
"""A registry for test prefix aliases.
3582
This helps implement shorcuts for the --starting-with selftest
3583
option. Overriding existing prefixes is not allowed but not fatal (a
3584
warning will be emitted).
3587
def register(self, key, obj, help=None, info=None,
3588
override_existing=False):
3589
"""See Registry.register.
3591
Trying to override an existing alias causes a warning to be emitted,
3592
not a fatal execption.
3595
super(TestPrefixAliasRegistry, self).register(
3596
key, obj, help=help, info=info, override_existing=False)
3598
actual = self.get(key)
3599
note('Test prefix alias %s is already used for %s, ignoring %s'
3600
% (key, actual, obj))
3602
def resolve_alias(self, id_start):
3603
"""Replace the alias by the prefix in the given string.
3605
Using an unknown prefix is an error to help catching typos.
3607
parts = id_start.split('.')
3609
parts[0] = self.get(parts[0])
3611
raise errors.BzrCommandError(
3612
'%s is not a known test prefix alias' % parts[0])
3613
return '.'.join(parts)
3616
test_prefix_alias_registry = TestPrefixAliasRegistry()
3617
"""Registry of test prefix aliases."""
3620
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3621
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3622
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3624
# Obvious highest levels prefixes, feel free to add your own via a plugin
3625
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3626
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3627
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3628
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3629
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3632
def _test_suite_testmod_names():
3633
"""Return the standard list of test module names to test."""
3636
'bzrlib.tests.blackbox',
3637
'bzrlib.tests.commands',
3638
'bzrlib.tests.doc_generate',
3639
'bzrlib.tests.per_branch',
3640
'bzrlib.tests.per_bzrdir',
3641
'bzrlib.tests.per_controldir',
3642
'bzrlib.tests.per_controldir_colo',
3643
'bzrlib.tests.per_foreign_vcs',
3644
'bzrlib.tests.per_interrepository',
3645
'bzrlib.tests.per_intertree',
3646
'bzrlib.tests.per_inventory',
3647
'bzrlib.tests.per_interbranch',
3648
'bzrlib.tests.per_lock',
3649
'bzrlib.tests.per_merger',
3650
'bzrlib.tests.per_transport',
3651
'bzrlib.tests.per_tree',
3652
'bzrlib.tests.per_pack_repository',
3653
'bzrlib.tests.per_repository',
3654
'bzrlib.tests.per_repository_chk',
3655
'bzrlib.tests.per_repository_reference',
3656
'bzrlib.tests.per_uifactory',
3657
'bzrlib.tests.per_versionedfile',
3658
'bzrlib.tests.per_workingtree',
3659
'bzrlib.tests.test__annotator',
3660
'bzrlib.tests.test__bencode',
3661
'bzrlib.tests.test__btree_serializer',
3662
'bzrlib.tests.test__chk_map',
3663
'bzrlib.tests.test__dirstate_helpers',
3664
'bzrlib.tests.test__groupcompress',
3665
'bzrlib.tests.test__known_graph',
3666
'bzrlib.tests.test__rio',
3667
'bzrlib.tests.test__simple_set',
3668
'bzrlib.tests.test__static_tuple',
3669
'bzrlib.tests.test__walkdirs_win32',
3670
'bzrlib.tests.test_ancestry',
3671
'bzrlib.tests.test_annotate',
3672
'bzrlib.tests.test_api',
3673
'bzrlib.tests.test_atomicfile',
3674
'bzrlib.tests.test_bad_files',
3675
'bzrlib.tests.test_bisect_multi',
3676
'bzrlib.tests.test_branch',
3677
'bzrlib.tests.test_branchbuilder',
3678
'bzrlib.tests.test_btree_index',
3679
'bzrlib.tests.test_bugtracker',
3680
'bzrlib.tests.test_bundle',
3681
'bzrlib.tests.test_bzrdir',
3682
'bzrlib.tests.test__chunks_to_lines',
3683
'bzrlib.tests.test_cache_utf8',
3684
'bzrlib.tests.test_chk_map',
3685
'bzrlib.tests.test_chk_serializer',
3686
'bzrlib.tests.test_chunk_writer',
3687
'bzrlib.tests.test_clean_tree',
3688
'bzrlib.tests.test_cleanup',
3689
'bzrlib.tests.test_cmdline',
3690
'bzrlib.tests.test_commands',
3691
'bzrlib.tests.test_commit',
3692
'bzrlib.tests.test_commit_merge',
3693
'bzrlib.tests.test_config',
3694
'bzrlib.tests.test_conflicts',
3695
'bzrlib.tests.test_counted_lock',
3696
'bzrlib.tests.test_crash',
3697
'bzrlib.tests.test_decorators',
3698
'bzrlib.tests.test_delta',
3699
'bzrlib.tests.test_debug',
3700
'bzrlib.tests.test_deprecated_graph',
3701
'bzrlib.tests.test_diff',
3702
'bzrlib.tests.test_directory_service',
3703
'bzrlib.tests.test_dirstate',
3704
'bzrlib.tests.test_email_message',
3705
'bzrlib.tests.test_eol_filters',
3706
'bzrlib.tests.test_errors',
3707
'bzrlib.tests.test_export',
3708
'bzrlib.tests.test_extract',
3709
'bzrlib.tests.test_fetch',
3710
'bzrlib.tests.test_fixtures',
3711
'bzrlib.tests.test_fifo_cache',
3712
'bzrlib.tests.test_filters',
3713
'bzrlib.tests.test_ftp_transport',
3714
'bzrlib.tests.test_foreign',
3715
'bzrlib.tests.test_generate_docs',
3716
'bzrlib.tests.test_generate_ids',
3717
'bzrlib.tests.test_globbing',
3718
'bzrlib.tests.test_gpg',
3719
'bzrlib.tests.test_graph',
3720
'bzrlib.tests.test_groupcompress',
3721
'bzrlib.tests.test_hashcache',
3722
'bzrlib.tests.test_help',
3723
'bzrlib.tests.test_hooks',
3724
'bzrlib.tests.test_http',
3725
'bzrlib.tests.test_http_response',
3726
'bzrlib.tests.test_https_ca_bundle',
3727
'bzrlib.tests.test_identitymap',
3728
'bzrlib.tests.test_ignores',
3729
'bzrlib.tests.test_index',
3730
'bzrlib.tests.test_import_tariff',
3731
'bzrlib.tests.test_info',
3732
'bzrlib.tests.test_inv',
3733
'bzrlib.tests.test_inventory_delta',
3734
'bzrlib.tests.test_knit',
3735
'bzrlib.tests.test_lazy_import',
3736
'bzrlib.tests.test_lazy_regex',
3737
'bzrlib.tests.test_library_state',
3738
'bzrlib.tests.test_lock',
3739
'bzrlib.tests.test_lockable_files',
3740
'bzrlib.tests.test_lockdir',
3741
'bzrlib.tests.test_log',
3742
'bzrlib.tests.test_lru_cache',
3743
'bzrlib.tests.test_lsprof',
3744
'bzrlib.tests.test_mail_client',
3745
'bzrlib.tests.test_matchers',
3746
'bzrlib.tests.test_memorytree',
3747
'bzrlib.tests.test_merge',
3748
'bzrlib.tests.test_merge3',
3749
'bzrlib.tests.test_merge_core',
3750
'bzrlib.tests.test_merge_directive',
3751
'bzrlib.tests.test_missing',
3752
'bzrlib.tests.test_msgeditor',
3753
'bzrlib.tests.test_multiparent',
3754
'bzrlib.tests.test_mutabletree',
3755
'bzrlib.tests.test_nonascii',
3756
'bzrlib.tests.test_options',
3757
'bzrlib.tests.test_osutils',
3758
'bzrlib.tests.test_osutils_encodings',
3759
'bzrlib.tests.test_pack',
3760
'bzrlib.tests.test_patch',
3761
'bzrlib.tests.test_patches',
3762
'bzrlib.tests.test_permissions',
3763
'bzrlib.tests.test_plugins',
3764
'bzrlib.tests.test_progress',
3765
'bzrlib.tests.test_pyutils',
3766
'bzrlib.tests.test_read_bundle',
3767
'bzrlib.tests.test_reconcile',
3768
'bzrlib.tests.test_reconfigure',
3769
'bzrlib.tests.test_registry',
3770
'bzrlib.tests.test_remote',
3771
'bzrlib.tests.test_rename_map',
3772
'bzrlib.tests.test_repository',
3773
'bzrlib.tests.test_revert',
3774
'bzrlib.tests.test_revision',
3775
'bzrlib.tests.test_revisionspec',
3776
'bzrlib.tests.test_revisiontree',
3777
'bzrlib.tests.test_rio',
3778
'bzrlib.tests.test_rules',
3779
'bzrlib.tests.test_sampler',
3780
'bzrlib.tests.test_scenarios',
3781
'bzrlib.tests.test_script',
3782
'bzrlib.tests.test_selftest',
3783
'bzrlib.tests.test_serializer',
3784
'bzrlib.tests.test_setup',
3785
'bzrlib.tests.test_sftp_transport',
3786
'bzrlib.tests.test_shelf',
3787
'bzrlib.tests.test_shelf_ui',
3788
'bzrlib.tests.test_smart',
3789
'bzrlib.tests.test_smart_add',
3790
'bzrlib.tests.test_smart_request',
3791
'bzrlib.tests.test_smart_transport',
3792
'bzrlib.tests.test_smtp_connection',
3793
'bzrlib.tests.test_source',
3794
'bzrlib.tests.test_ssh_transport',
3795
'bzrlib.tests.test_status',
3796
'bzrlib.tests.test_store',
3797
'bzrlib.tests.test_strace',
3798
'bzrlib.tests.test_subsume',
3799
'bzrlib.tests.test_switch',
3800
'bzrlib.tests.test_symbol_versioning',
3801
'bzrlib.tests.test_tag',
3802
'bzrlib.tests.test_test_server',
3803
'bzrlib.tests.test_testament',
3804
'bzrlib.tests.test_textfile',
3805
'bzrlib.tests.test_textmerge',
3806
'bzrlib.tests.test_timestamp',
3807
'bzrlib.tests.test_trace',
3808
'bzrlib.tests.test_transactions',
3809
'bzrlib.tests.test_transform',
3810
'bzrlib.tests.test_transport',
3811
'bzrlib.tests.test_transport_log',
3812
'bzrlib.tests.test_tree',
3813
'bzrlib.tests.test_treebuilder',
3814
'bzrlib.tests.test_treeshape',
3815
'bzrlib.tests.test_tsort',
3816
'bzrlib.tests.test_tuned_gzip',
3817
'bzrlib.tests.test_ui',
3818
'bzrlib.tests.test_uncommit',
3819
'bzrlib.tests.test_upgrade',
3820
'bzrlib.tests.test_upgrade_stacked',
3821
'bzrlib.tests.test_urlutils',
3822
'bzrlib.tests.test_version',
3823
'bzrlib.tests.test_version_info',
3824
'bzrlib.tests.test_versionedfile',
3825
'bzrlib.tests.test_weave',
3826
'bzrlib.tests.test_whitebox',
3827
'bzrlib.tests.test_win32utils',
3828
'bzrlib.tests.test_workingtree',
3829
'bzrlib.tests.test_workingtree_4',
3830
'bzrlib.tests.test_wsgi',
3831
'bzrlib.tests.test_xml',
3835
def _test_suite_modules_to_doctest():
3836
"""Return the list of modules to doctest."""
3838
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3842
'bzrlib.branchbuilder',
3843
'bzrlib.decorators',
3846
'bzrlib.iterablefile',
3851
'bzrlib.symbol_versioning',
3853
'bzrlib.tests.fixtures',
3855
'bzrlib.transport.http',
3856
'bzrlib.version_info_formats.format_custom',
3860
def test_suite(keep_only=None, starting_with=None):
1238
3861
"""Build and return TestSuite for the whole of bzrlib.
3863
:param keep_only: A list of test ids limiting the suite returned.
3865
:param starting_with: An id limiting the suite returned to the tests
1240
3868
This function can be replaced if you need to change the default test
1241
3869
suite on a global basis, but it is not encouraged.
1244
'bzrlib.tests.test_ancestry',
1245
'bzrlib.tests.test_api',
1246
'bzrlib.tests.test_bad_files',
1247
'bzrlib.tests.test_branch',
1248
'bzrlib.tests.test_bundle',
1249
'bzrlib.tests.test_bzrdir',
1250
'bzrlib.tests.test_command',
1251
'bzrlib.tests.test_commit',
1252
'bzrlib.tests.test_commit_merge',
1253
'bzrlib.tests.test_config',
1254
'bzrlib.tests.test_conflicts',
1255
'bzrlib.tests.test_decorators',
1256
'bzrlib.tests.test_diff',
1257
'bzrlib.tests.test_doc_generate',
1258
'bzrlib.tests.test_emptytree',
1259
'bzrlib.tests.test_errors',
1260
'bzrlib.tests.test_escaped_store',
1261
'bzrlib.tests.test_fetch',
1262
'bzrlib.tests.test_gpg',
1263
'bzrlib.tests.test_graph',
1264
'bzrlib.tests.test_hashcache',
1265
'bzrlib.tests.test_http',
1266
'bzrlib.tests.test_identitymap',
1267
'bzrlib.tests.test_inv',
1268
'bzrlib.tests.test_knit',
1269
'bzrlib.tests.test_lockdir',
1270
'bzrlib.tests.test_lockable_files',
1271
'bzrlib.tests.test_log',
1272
'bzrlib.tests.test_merge',
1273
'bzrlib.tests.test_merge3',
1274
'bzrlib.tests.test_merge_core',
1275
'bzrlib.tests.test_missing',
1276
'bzrlib.tests.test_msgeditor',
1277
'bzrlib.tests.test_nonascii',
1278
'bzrlib.tests.test_options',
1279
'bzrlib.tests.test_osutils',
1280
'bzrlib.tests.test_patch',
1281
'bzrlib.tests.test_patches',
1282
'bzrlib.tests.test_permissions',
1283
'bzrlib.tests.test_plugins',
1284
'bzrlib.tests.test_progress',
1285
'bzrlib.tests.test_reconcile',
1286
'bzrlib.tests.test_repository',
1287
'bzrlib.tests.test_revision',
1288
'bzrlib.tests.test_revisionnamespaces',
1289
'bzrlib.tests.test_revprops',
1290
'bzrlib.tests.test_revisiontree',
1291
'bzrlib.tests.test_rio',
1292
'bzrlib.tests.test_sampler',
1293
'bzrlib.tests.test_selftest',
1294
'bzrlib.tests.test_setup',
1295
'bzrlib.tests.test_sftp_transport',
1296
'bzrlib.tests.test_smart_add',
1297
'bzrlib.tests.test_source',
1298
'bzrlib.tests.test_status',
1299
'bzrlib.tests.test_store',
1300
'bzrlib.tests.test_symbol_versioning',
1301
'bzrlib.tests.test_testament',
1302
'bzrlib.tests.test_textfile',
1303
'bzrlib.tests.test_textmerge',
1304
'bzrlib.tests.test_trace',
1305
'bzrlib.tests.test_transactions',
1306
'bzrlib.tests.test_transform',
1307
'bzrlib.tests.test_transport',
1308
'bzrlib.tests.test_tsort',
1309
'bzrlib.tests.test_tuned_gzip',
1310
'bzrlib.tests.test_ui',
1311
'bzrlib.tests.test_upgrade',
1312
'bzrlib.tests.test_urlutils',
1313
'bzrlib.tests.test_versionedfile',
1314
'bzrlib.tests.test_weave',
1315
'bzrlib.tests.test_whitebox',
1316
'bzrlib.tests.test_workingtree',
1317
'bzrlib.tests.test_xml',
1319
test_transport_implementations = [
1320
'bzrlib.tests.test_transport_implementations',
1321
'bzrlib.tests.test_read_bundle',
1323
suite = TestUtil.TestSuite()
1324
3872
loader = TestUtil.TestLoader()
1325
from bzrlib.transport import TransportTestProviderAdapter
1326
adapter = TransportTestProviderAdapter()
1327
adapt_modules(test_transport_implementations, adapter, loader, suite)
1328
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1329
for package in packages_to_test():
1330
suite.addTest(package.test_suite())
1331
for m in MODULES_TO_TEST:
1332
suite.addTest(loader.loadTestsFromModule(m))
1333
for m in MODULES_TO_DOCTEST:
1334
suite.addTest(doctest.DocTestSuite(m))
1335
for name, plugin in bzrlib.plugin.all_plugins().items():
1336
if getattr(plugin, 'test_suite', None) is not None:
1337
suite.addTest(plugin.test_suite())
3874
if keep_only is not None:
3875
id_filter = TestIdList(keep_only)
3877
# We take precedence over keep_only because *at loading time* using
3878
# both options means we will load less tests for the same final result.
3879
def interesting_module(name):
3880
for start in starting_with:
3882
# Either the module name starts with the specified string
3883
name.startswith(start)
3884
# or it may contain tests starting with the specified string
3885
or start.startswith(name)
3889
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3891
elif keep_only is not None:
3892
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3893
def interesting_module(name):
3894
return id_filter.refers_to(name)
3897
loader = TestUtil.TestLoader()
3898
def interesting_module(name):
3899
# No filtering, all modules are interesting
3902
suite = loader.suiteClass()
3904
# modules building their suite with loadTestsFromModuleNames
3905
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3907
for mod in _test_suite_modules_to_doctest():
3908
if not interesting_module(mod):
3909
# No tests to keep here, move along
3912
# note that this really does mean "report only" -- doctest
3913
# still runs the rest of the examples
3914
doc_suite = doctest.DocTestSuite(mod,
3915
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3916
except ValueError, e:
3917
print '**failed to get doctest for: %s\n%s' % (mod, e)
3919
if len(doc_suite._tests) == 0:
3920
raise errors.BzrError("no doctests found in %s" % (mod,))
3921
suite.addTest(doc_suite)
3923
default_encoding = sys.getdefaultencoding()
3924
for name, plugin in bzrlib.plugin.plugins().items():
3925
if not interesting_module(plugin.module.__name__):
3927
plugin_suite = plugin.test_suite()
3928
# We used to catch ImportError here and turn it into just a warning,
3929
# but really if you don't have --no-plugins this should be a failure.
3930
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3931
if plugin_suite is None:
3932
plugin_suite = plugin.load_plugin_tests(loader)
3933
if plugin_suite is not None:
3934
suite.addTest(plugin_suite)
3935
if default_encoding != sys.getdefaultencoding():
3936
bzrlib.trace.warning(
3937
'Plugin "%s" tried to reset default encoding to: %s', name,
3938
sys.getdefaultencoding())
3940
sys.setdefaultencoding(default_encoding)
3942
if keep_only is not None:
3943
# Now that the referred modules have loaded their tests, keep only the
3945
suite = filter_suite_by_id_list(suite, id_filter)
3946
# Do some sanity checks on the id_list filtering
3947
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3949
# The tester has used both keep_only and starting_with, so he is
3950
# already aware that some tests are excluded from the list, there
3951
# is no need to tell him which.
3954
# Some tests mentioned in the list are not in the test suite. The
3955
# list may be out of date, report to the tester.
3956
for id in not_found:
3957
bzrlib.trace.warning('"%s" not found in the test suite', id)
3958
for id in duplicates:
3959
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
1341
def adapt_modules(mods_list, adapter, loader, suite):
1342
"""Adapt the modules in mods_list using adapter and add to suite."""
1343
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1344
suite.addTests(adapter.adapt(test))
3964
def multiply_scenarios(*scenarios):
3965
"""Multiply two or more iterables of scenarios.
3967
It is safe to pass scenario generators or iterators.
3969
:returns: A list of compound scenarios: the cross-product of all
3970
scenarios, with the names concatenated and the parameters
3973
return reduce(_multiply_two_scenarios, map(list, scenarios))
3976
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3977
"""Multiply two sets of scenarios.
3979
:returns: the cartesian product of the two sets of scenarios, that is
3980
a scenario for every possible combination of a left scenario and a
3984
('%s,%s' % (left_name, right_name),
3985
dict(left_dict.items() + right_dict.items()))
3986
for left_name, left_dict in scenarios_left
3987
for right_name, right_dict in scenarios_right]
3990
def multiply_tests(tests, scenarios, result):
3991
"""Multiply tests_list by scenarios into result.
3993
This is the core workhorse for test parameterisation.
3995
Typically the load_tests() method for a per-implementation test suite will
3996
call multiply_tests and return the result.
3998
:param tests: The tests to parameterise.
3999
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4000
scenario_param_dict).
4001
:param result: A TestSuite to add created tests to.
4003
This returns the passed in result TestSuite with the cross product of all
4004
the tests repeated once for each scenario. Each test is adapted by adding
4005
the scenario name at the end of its id(), and updating the test object's
4006
__dict__ with the scenario_param_dict.
4008
>>> import bzrlib.tests.test_sampler
4009
>>> r = multiply_tests(
4010
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4011
... [('one', dict(param=1)),
4012
... ('two', dict(param=2))],
4013
... TestUtil.TestSuite())
4014
>>> tests = list(iter_suite_tests(r))
4018
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4024
for test in iter_suite_tests(tests):
4025
apply_scenarios(test, scenarios, result)
4029
def apply_scenarios(test, scenarios, result):
4030
"""Apply the scenarios in scenarios to test and add to result.
4032
:param test: The test to apply scenarios to.
4033
:param scenarios: An iterable of scenarios to apply to test.
4035
:seealso: apply_scenario
4037
for scenario in scenarios:
4038
result.addTest(apply_scenario(test, scenario))
4042
def apply_scenario(test, scenario):
4043
"""Copy test and apply scenario to it.
4045
:param test: A test to adapt.
4046
:param scenario: A tuple describing the scenarion.
4047
The first element of the tuple is the new test id.
4048
The second element is a dict containing attributes to set on the
4050
:return: The adapted test.
4052
new_id = "%s(%s)" % (test.id(), scenario[0])
4053
new_test = clone_test(test, new_id)
4054
for name, value in scenario[1].items():
4055
setattr(new_test, name, value)
4059
def clone_test(test, new_id):
4060
"""Clone a test giving it a new id.
4062
:param test: The test to clone.
4063
:param new_id: The id to assign to it.
4064
:return: The new test.
4066
new_test = copy.copy(test)
4067
new_test.id = lambda: new_id
4068
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4069
# causes cloned tests to share the 'details' dict. This makes it hard to
4070
# read the test output for parameterized tests, because tracebacks will be
4071
# associated with irrelevant tests.
4073
details = new_test._TestCase__details
4074
except AttributeError:
4075
# must be a different version of testtools than expected. Do nothing.
4078
# Reset the '__details' dict.
4079
new_test._TestCase__details = {}
4083
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4085
"""Helper for permutating tests against an extension module.
4087
This is meant to be used inside a modules 'load_tests()' function. It will
4088
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4089
against both implementations. Setting 'test.module' to the appropriate
4090
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4092
:param standard_tests: A test suite to permute
4093
:param loader: A TestLoader
4094
:param py_module_name: The python path to a python module that can always
4095
be loaded, and will be considered the 'python' implementation. (eg
4096
'bzrlib._chk_map_py')
4097
:param ext_module_name: The python path to an extension module. If the
4098
module cannot be loaded, a single test will be added, which notes that
4099
the module is not available. If it can be loaded, all standard_tests
4100
will be run against that module.
4101
:return: (suite, feature) suite is a test-suite that has all the permuted
4102
tests. feature is the Feature object that can be used to determine if
4103
the module is available.
4106
py_module = pyutils.get_named_object(py_module_name)
4108
('python', {'module': py_module}),
4110
suite = loader.suiteClass()
4111
feature = ModuleAvailableFeature(ext_module_name)
4112
if feature.available():
4113
scenarios.append(('C', {'module': feature.module}))
4115
# the compiled module isn't available, so we add a failing test
4116
class FailWithoutFeature(TestCase):
4117
def test_fail(self):
4118
self.requireFeature(feature)
4119
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4120
result = multiply_tests(standard_tests, scenarios, suite)
4121
return result, feature
4124
def _rmtree_temp_dir(dirname, test_id=None):
4125
# If LANG=C we probably have created some bogus paths
4126
# which rmtree(unicode) will fail to delete
4127
# so make sure we are using rmtree(str) to delete everything
4128
# except on win32, where rmtree(str) will fail
4129
# since it doesn't have the property of byte-stream paths
4130
# (they are either ascii or mbcs)
4131
if sys.platform == 'win32':
4132
# make sure we are using the unicode win32 api
4133
dirname = unicode(dirname)
4135
dirname = dirname.encode(sys.getfilesystemencoding())
4137
osutils.rmtree(dirname)
4139
# We don't want to fail here because some useful display will be lost
4140
# otherwise. Polluting the tmp dir is bad, but not giving all the
4141
# possible info to the test runner is even worse.
4143
ui.ui_factory.clear_term()
4144
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4145
# Ugly, but the last thing we want here is fail, so bear with it.
4146
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4147
).encode('ascii', 'replace')
4148
sys.stderr.write('Unable to remove testing dir %s\n%s'
4149
% (os.path.basename(dirname), printable_e))
4152
class Feature(object):
4153
"""An operating system Feature."""
4156
self._available = None
4158
def available(self):
4159
"""Is the feature available?
4161
:return: True if the feature is available.
4163
if self._available is None:
4164
self._available = self._probe()
4165
return self._available
4168
"""Implement this method in concrete features.
4170
:return: True if the feature is available.
4172
raise NotImplementedError
4175
if getattr(self, 'feature_name', None):
4176
return self.feature_name()
4177
return self.__class__.__name__
4180
class _SymlinkFeature(Feature):
4183
return osutils.has_symlinks()
4185
def feature_name(self):
4188
SymlinkFeature = _SymlinkFeature()
4191
class _HardlinkFeature(Feature):
4194
return osutils.has_hardlinks()
4196
def feature_name(self):
4199
HardlinkFeature = _HardlinkFeature()
4202
class _OsFifoFeature(Feature):
4205
return getattr(os, 'mkfifo', None)
4207
def feature_name(self):
4208
return 'filesystem fifos'
4210
OsFifoFeature = _OsFifoFeature()
4213
class _UnicodeFilenameFeature(Feature):
4214
"""Does the filesystem support Unicode filenames?"""
4218
# Check for character combinations unlikely to be covered by any
4219
# single non-unicode encoding. We use the characters
4220
# - greek small letter alpha (U+03B1) and
4221
# - braille pattern dots-123456 (U+283F).
4222
os.stat(u'\u03b1\u283f')
4223
except UnicodeEncodeError:
4225
except (IOError, OSError):
4226
# The filesystem allows the Unicode filename but the file doesn't
4230
# The filesystem allows the Unicode filename and the file exists,
4234
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4237
class _CompatabilityThunkFeature(Feature):
4238
"""This feature is just a thunk to another feature.
4240
It issues a deprecation warning if it is accessed, to let you know that you
4241
should really use a different feature.
4244
def __init__(self, dep_version, module, name,
4245
replacement_name, replacement_module=None):
4246
super(_CompatabilityThunkFeature, self).__init__()
4247
self._module = module
4248
if replacement_module is None:
4249
replacement_module = module
4250
self._replacement_module = replacement_module
4252
self._replacement_name = replacement_name
4253
self._dep_version = dep_version
4254
self._feature = None
4257
if self._feature is None:
4258
depr_msg = self._dep_version % ('%s.%s'
4259
% (self._module, self._name))
4260
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4261
self._replacement_name)
4262
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4263
# Import the new feature and use it as a replacement for the
4265
self._feature = pyutils.get_named_object(
4266
self._replacement_module, self._replacement_name)
4270
return self._feature._probe()
4273
class ModuleAvailableFeature(Feature):
4274
"""This is a feature than describes a module we want to be available.
4276
Declare the name of the module in __init__(), and then after probing, the
4277
module will be available as 'self.module'.
4279
:ivar module: The module if it is available, else None.
4282
def __init__(self, module_name):
4283
super(ModuleAvailableFeature, self).__init__()
4284
self.module_name = module_name
4288
self._module = __import__(self.module_name, {}, {}, [''])
4295
if self.available(): # Make sure the probe has been done
4299
def feature_name(self):
4300
return self.module_name
4303
# This is kept here for compatibility, it is recommended to use
4304
# 'bzrlib.tests.feature.paramiko' instead
4305
ParamikoFeature = _CompatabilityThunkFeature(
4306
deprecated_in((2,1,0)),
4307
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4310
def probe_unicode_in_user_encoding():
4311
"""Try to encode several unicode strings to use in unicode-aware tests.
4312
Return first successfull match.
4314
:return: (unicode value, encoded plain string value) or (None, None)
4316
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4317
for uni_val in possible_vals:
4319
str_val = uni_val.encode(osutils.get_user_encoding())
4320
except UnicodeEncodeError:
4321
# Try a different character
4324
return uni_val, str_val
4328
def probe_bad_non_ascii(encoding):
4329
"""Try to find [bad] character with code [128..255]
4330
that cannot be decoded to unicode in some encoding.
4331
Return None if all non-ascii characters is valid
4334
for i in xrange(128, 256):
4337
char.decode(encoding)
4338
except UnicodeDecodeError:
4343
class _HTTPSServerFeature(Feature):
4344
"""Some tests want an https Server, check if one is available.
4346
Right now, the only way this is available is under python2.6 which provides
4357
def feature_name(self):
4358
return 'HTTPSServer'
4361
HTTPSServerFeature = _HTTPSServerFeature()
4364
class _UnicodeFilename(Feature):
4365
"""Does the filesystem support Unicode filenames?"""
4370
except UnicodeEncodeError:
4372
except (IOError, OSError):
4373
# The filesystem allows the Unicode filename but the file doesn't
4377
# The filesystem allows the Unicode filename and the file exists,
4381
UnicodeFilename = _UnicodeFilename()
4384
class _ByteStringNamedFilesystem(Feature):
4385
"""Is the filesystem based on bytes?"""
4388
if os.name == "posix":
4392
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4395
class _UTF8Filesystem(Feature):
4396
"""Is the filesystem UTF-8?"""
4399
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4403
UTF8Filesystem = _UTF8Filesystem()
4406
class _BreakinFeature(Feature):
4407
"""Does this platform support the breakin feature?"""
4410
from bzrlib import breakin
4411
if breakin.determine_signal() is None:
4413
if sys.platform == 'win32':
4414
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4415
# We trigger SIGBREAK via a Console api so we need ctypes to
4416
# access the function
4423
def feature_name(self):
4424
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4427
BreakinFeature = _BreakinFeature()
4430
class _CaseInsCasePresFilenameFeature(Feature):
4431
"""Is the file-system case insensitive, but case-preserving?"""
4434
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4436
# first check truly case-preserving for created files, then check
4437
# case insensitive when opening existing files.
4438
name = osutils.normpath(name)
4439
base, rel = osutils.split(name)
4440
found_rel = osutils.canonical_relpath(base, name)
4441
return (found_rel == rel
4442
and os.path.isfile(name.upper())
4443
and os.path.isfile(name.lower()))
4448
def feature_name(self):
4449
return "case-insensitive case-preserving filesystem"
4451
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4454
class _CaseInsensitiveFilesystemFeature(Feature):
4455
"""Check if underlying filesystem is case-insensitive but *not* case
4458
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4459
# more likely to be case preserving, so this case is rare.
4462
if CaseInsCasePresFilenameFeature.available():
4465
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4466
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4467
TestCaseWithMemoryTransport.TEST_ROOT = root
4469
root = TestCaseWithMemoryTransport.TEST_ROOT
4470
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4472
name_a = osutils.pathjoin(tdir, 'a')
4473
name_A = osutils.pathjoin(tdir, 'A')
4475
result = osutils.isdir(name_A)
4476
_rmtree_temp_dir(tdir)
4479
def feature_name(self):
4480
return 'case-insensitive filesystem'
4482
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4485
class _CaseSensitiveFilesystemFeature(Feature):
4488
if CaseInsCasePresFilenameFeature.available():
4490
elif CaseInsensitiveFilesystemFeature.available():
4495
def feature_name(self):
4496
return 'case-sensitive filesystem'
4498
# new coding style is for feature instances to be lowercase
4499
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4502
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4503
SubUnitFeature = _CompatabilityThunkFeature(
4504
deprecated_in((2,1,0)),
4505
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4506
# Only define SubUnitBzrRunner if subunit is available.
4508
from subunit import TestProtocolClient
4509
from subunit.test_results import AutoTimingTestResultDecorator
4510
class SubUnitBzrProtocolClient(TestProtocolClient):
4512
def addSuccess(self, test, details=None):
4513
# The subunit client always includes the details in the subunit
4514
# stream, but we don't want to include it in ours.
4515
if details is not None and 'log' in details:
4517
return super(SubUnitBzrProtocolClient, self).addSuccess(
4520
class SubUnitBzrRunner(TextTestRunner):
4521
def run(self, test):
4522
result = AutoTimingTestResultDecorator(
4523
SubUnitBzrProtocolClient(self.stream))
4529
class _PosixPermissionsFeature(Feature):
4533
# create temporary file and check if specified perms are maintained.
4536
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4537
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4540
os.chmod(name, write_perms)
4542
read_perms = os.stat(name).st_mode & 0777
4544
return (write_perms == read_perms)
4546
return (os.name == 'posix') and has_perms()
4548
def feature_name(self):
4549
return 'POSIX permissions support'
4551
posix_permissions_feature = _PosixPermissionsFeature()