76
55
from bzrlib.merge import merge_inner
77
56
import bzrlib.merge3
58
import bzrlib.osutils as osutils
78
59
import bzrlib.plugin
60
import bzrlib.progress as progress
61
from bzrlib.revision import common_ancestor
79
62
import bzrlib.store
80
from bzrlib import symbol_versioning
81
from bzrlib.symbol_versioning import (
87
63
import bzrlib.trace
88
from bzrlib.transport import get_transport
64
from bzrlib.transport import urlescape, get_transport
89
65
import bzrlib.transport
90
from bzrlib.transport.local import LocalURLServer
91
from bzrlib.transport.memory import MemoryServer
66
from bzrlib.transport.local import LocalRelpathServer
92
67
from bzrlib.transport.readonly import ReadonlyServer
93
from bzrlib.trace import mutter, note
94
from bzrlib.tests import TestUtil
95
from bzrlib.tests.http_server import HttpServer
96
from bzrlib.tests.TestUtil import (
68
from bzrlib.trace import mutter
69
from bzrlib.tests.TestUtil import TestLoader, TestSuite
100
70
from bzrlib.tests.treeshape import build_tree_contents
101
import bzrlib.version_info_formats.format_custom
102
71
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
104
# Mark this python module as being part of the implementation
105
# of unittest: this gives us better tracebacks where the last
106
# shown frame is the test code, not our assertXYZ.
109
default_transport = LocalURLServer
112
class ExtendedTestResult(unittest._TextTestResult):
113
"""Accepts, reports and accumulates the results of running tests.
115
Compared to the unittest version this class adds support for
116
profiling, benchmarking, stopping as soon as a test fails, and
117
skipping tests. There are further-specialized subclasses for
118
different types of display.
120
When a test finishes, in whatever way, it calls one of the addSuccess,
121
addFailure or addError classes. These in turn may redirect to a more
122
specific case for the special test results supported by our extended
125
Note that just one of these objects is fed the results from many tests.
73
default_transport = LocalRelpathServer
76
MODULES_TO_DOCTEST = [
88
def packages_to_test():
89
"""Return a list of packages to test.
91
The packages are not globally imported so that import failures are
92
triggered when running selftest, not when importing the command.
95
import bzrlib.tests.blackbox
96
import bzrlib.tests.branch_implementations
97
import bzrlib.tests.bzrdir_implementations
98
import bzrlib.tests.interrepository_implementations
99
import bzrlib.tests.interversionedfile_implementations
100
import bzrlib.tests.repository_implementations
101
import bzrlib.tests.revisionstore_implementations
102
import bzrlib.tests.workingtree_implementations
105
bzrlib.tests.blackbox,
106
bzrlib.tests.branch_implementations,
107
bzrlib.tests.bzrdir_implementations,
108
bzrlib.tests.interrepository_implementations,
109
bzrlib.tests.interversionedfile_implementations,
110
bzrlib.tests.repository_implementations,
111
bzrlib.tests.revisionstore_implementations,
112
bzrlib.tests.workingtree_implementations,
116
class _MyResult(unittest._TextTestResult):
117
"""Custom TestResult.
119
Shows output in a different format, including displaying runtime for tests.
128
121
stop_early = False
130
def __init__(self, stream, descriptions, verbosity,
134
"""Construct new TestResult.
136
:param bench_history: Optionally, a writable file object to accumulate
123
def __init__(self, stream, descriptions, verbosity, pb=None):
139
124
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
140
if bench_history is not None:
141
from bzrlib.version import _get_bzr_source_tree
142
src_tree = _get_bzr_source_tree()
145
revision_id = src_tree.get_parent_ids()[0]
147
# XXX: if this is a brand new tree, do the same as if there
151
# XXX: If there's no branch, what should we do?
153
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
154
self._bench_history = bench_history
155
self.ui = ui.ui_factory
156
self.num_tests = num_tests
158
self.failure_count = 0
159
self.known_failure_count = 0
161
self.not_applicable_count = 0
162
self.unsupported = {}
164
self._overall_start_time = time.time()
166
def _extractBenchmarkTime(self, testCase):
127
def extractBenchmarkTime(self, testCase):
167
128
"""Add a benchmark time for the current test case."""
168
return getattr(testCase, "_benchtime", None)
129
self._benchmarkTime = getattr(testCase, "_benchtime", None)
170
131
def _elapsedTestTimeString(self):
171
132
"""Return a time string for the overall time the current test has taken."""
172
133
return self._formatTime(time.time() - self._start_time)
174
def _testTimeString(self, testCase):
175
benchmark_time = self._extractBenchmarkTime(testCase)
176
if benchmark_time is not None:
135
def _testTimeString(self):
136
if self._benchmarkTime is not None:
177
137
return "%s/%s" % (
178
self._formatTime(benchmark_time),
138
self._formatTime(self._benchmarkTime),
179
139
self._elapsedTestTimeString())
181
return " %s" % self._elapsedTestTimeString()
141
return " %s" % self._elapsedTestTimeString()
183
143
def _formatTime(self, seconds):
184
144
"""Format seconds as milliseconds with leading spaces."""
185
# some benchmarks can take thousands of seconds to run, so we need 8
187
return "%8dms" % (1000 * seconds)
145
return "%5dms" % (1000 * seconds)
189
def _shortened_test_description(self, test):
191
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
147
def _ellipsise_unimportant_words(self, a_string, final_width,
149
"""Add ellipses (sp?) for overly long strings.
151
:param keep_start: If true preserve the start of a_string rather
155
if len(a_string) > final_width:
156
result = a_string[:final_width-3] + '...'
160
if len(a_string) > final_width:
161
result = '...' + a_string[3-final_width:]
164
return result.ljust(final_width)
194
166
def startTest(self, test):
195
167
unittest.TestResult.startTest(self, test)
196
self.report_test_start(test)
197
test.number = self.count
168
# In a short description, the important words are in
169
# the beginning, but in an id, the important words are
171
SHOW_DESCRIPTIONS = False
173
if not self.showAll and self.dots and self.pb is not None:
176
final_width = osutils.terminal_width()
177
final_width = final_width - 15 - 8
179
if SHOW_DESCRIPTIONS:
180
what = test.shortDescription()
182
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
185
if what.startswith('bzrlib.tests.'):
187
what = self._ellipsise_unimportant_words(what, final_width)
189
self.stream.write(what)
190
elif self.dots and self.pb is not None:
191
self.pb.update(what, self.testsRun - 1, None)
198
193
self._recordTestStartTime()
200
195
def _recordTestStartTime(self):
201
196
"""Record that a test has started."""
202
197
self._start_time = time.time()
204
def _cleanupLogFile(self, test):
205
# We can only do this if we have one of our TestCases, not if
207
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
208
if setKeepLogfile is not None:
211
199
def addError(self, test, err):
212
"""Tell result that test finished with an error.
214
Called from the TestCase run() method when the test
215
fails with an unexpected error.
217
self._testConcluded(test)
218
200
if isinstance(err[1], TestSkipped):
219
return self._addSkipped(test, err)
220
elif isinstance(err[1], UnavailableFeature):
221
return self.addNotSupported(test, err[1].args[0])
223
unittest.TestResult.addError(self, test, err)
224
self.error_count += 1
225
self.report_error(test, err)
228
self._cleanupLogFile(test)
201
return self.addSkipped(test, err)
202
unittest.TestResult.addError(self, test, err)
203
self.extractBenchmarkTime(test)
205
self.stream.writeln("ERROR %s" % self._testTimeString())
206
elif self.dots and self.pb is None:
207
self.stream.write('E')
209
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
230
214
def addFailure(self, test, err):
231
"""Tell result that test failed.
233
Called from the TestCase run() method when the test
234
fails because e.g. an assert() method failed.
236
self._testConcluded(test)
237
if isinstance(err[1], KnownFailure):
238
return self._addKnownFailure(test, err)
240
unittest.TestResult.addFailure(self, test, err)
241
self.failure_count += 1
242
self.report_failure(test, err)
245
self._cleanupLogFile(test)
215
unittest.TestResult.addFailure(self, test, err)
216
self.extractBenchmarkTime(test)
218
self.stream.writeln(" FAIL %s" % self._testTimeString())
219
elif self.dots and self.pb is None:
220
self.stream.write('F')
222
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
247
227
def addSuccess(self, test):
248
"""Tell result that test completed successfully.
250
Called from the TestCase run()
252
self._testConcluded(test)
253
if self._bench_history is not None:
254
benchmark_time = self._extractBenchmarkTime(test)
255
if benchmark_time is not None:
256
self._bench_history.write("%s %s\n" % (
257
self._formatTime(benchmark_time),
259
self.report_success(test)
260
self._cleanupLogFile(test)
261
unittest.TestResult.addSuccess(self, test)
262
test._log_contents = ''
264
def _testConcluded(self, test):
265
"""Common code when a test has finished.
267
Called regardless of whether it succeded, failed, etc.
271
def _addKnownFailure(self, test, err):
272
self.known_failure_count += 1
273
self.report_known_failure(test, err)
275
def addNotSupported(self, test, feature):
276
"""The test will not be run because of a missing feature.
278
# this can be called in two different ways: it may be that the
279
# test started running, and then raised (through addError)
280
# UnavailableFeature. Alternatively this method can be called
281
# while probing for features before running the tests; in that
282
# case we will see startTest and stopTest, but the test will never
284
self.unsupported.setdefault(str(feature), 0)
285
self.unsupported[str(feature)] += 1
286
self.report_unsupported(test, feature)
288
def _addSkipped(self, test, skip_excinfo):
289
if isinstance(skip_excinfo[1], TestNotApplicable):
290
self.not_applicable_count += 1
291
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
302
# seems best to treat this as success from point-of-view of unittest
303
# -- it actually does nothing so it barely matters :)
304
unittest.TestResult.addSuccess(self, test)
305
test._log_contents = ''
228
self.extractBenchmarkTime(test)
230
self.stream.writeln(' OK %s' % self._testTimeString())
231
for bench_called, stats in getattr(test, '_benchcalls', []):
232
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
233
stats.pprint(file=self.stream)
234
elif self.dots and self.pb is None:
235
self.stream.write('~')
237
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
239
unittest.TestResult.addSuccess(self, test)
241
def addSkipped(self, test, skip_excinfo):
242
self.extractBenchmarkTime(test)
244
print >>self.stream, ' SKIP %s' % self._testTimeString()
245
print >>self.stream, ' %s' % skip_excinfo[1]
246
elif self.dots and self.pb is None:
247
self.stream.write('S')
249
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
251
# seems best to treat this as success from point-of-view of unittest
252
# -- it actually does nothing so it barely matters :)
253
unittest.TestResult.addSuccess(self, test)
307
255
def printErrorList(self, flavour, errors):
308
256
for test, err in errors:
309
257
self.stream.writeln(self.separator1)
310
self.stream.write("%s: " % flavour)
311
self.stream.writeln(self.getDescription(test))
258
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
312
259
if getattr(test, '_get_log', None) is not None:
313
self.stream.write('\n')
315
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
316
self.stream.write('\n')
317
self.stream.write(test._get_log())
318
self.stream.write('\n')
320
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
321
self.stream.write('\n')
261
print >>self.stream, \
262
('vvvv[log from %s]' % test.id()).ljust(78,'-')
263
print >>self.stream, test._get_log()
264
print >>self.stream, \
265
('^^^^[log from %s]' % test.id()).ljust(78,'-')
322
266
self.stream.writeln(self.separator2)
323
267
self.stream.writeln("%s" % err)
328
def report_cleaning_up(self):
331
def report_success(self, test):
334
def wasStrictlySuccessful(self):
335
if self.unsupported or self.known_failure_count:
337
return self.wasSuccessful()
340
class TextTestResult(ExtendedTestResult):
341
"""Displays progress and results of tests in text form"""
343
def __init__(self, stream, descriptions, verbosity,
348
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
349
bench_history, num_tests)
351
self.pb = self.ui.nested_progress_bar()
352
self._supplied_pb = False
355
self._supplied_pb = True
356
self.pb.show_pct = False
357
self.pb.show_spinner = False
358
self.pb.show_eta = False,
359
self.pb.show_count = False
360
self.pb.show_bar = False
362
def report_starting(self):
363
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
365
def _progress_prefix_text(self):
366
# the longer this text, the less space we have to show the test
368
a = '[%d' % self.count # total that have been run
369
# tests skipped as known not to be relevant are not important enough
371
## if self.skip_count:
372
## a += ', %d skip' % self.skip_count
373
## if self.known_failure_count:
374
## a += '+%dX' % self.known_failure_count
375
if self.num_tests is not None:
376
a +='/%d' % self.num_tests
378
runtime = time.time() - self._overall_start_time
380
a += '%dm%ds' % (runtime / 60, runtime % 60)
384
a += ', %d err' % self.error_count
385
if self.failure_count:
386
a += ', %d fail' % self.failure_count
388
a += ', %d missing' % len(self.unsupported)
392
def report_test_start(self, test):
395
self._progress_prefix_text()
397
+ self._shortened_test_description(test))
399
def _test_description(self, test):
400
return self._shortened_test_description(test)
402
def report_error(self, test, err):
403
self.pb.note('ERROR: %s\n %s\n',
404
self._test_description(test),
408
def report_failure(self, test, err):
409
self.pb.note('FAIL: %s\n %s\n',
410
self._test_description(test),
414
def report_known_failure(self, test, err):
415
self.pb.note('XFAIL: %s\n%s\n',
416
self._test_description(test), err[1])
418
def report_skip(self, test, skip_excinfo):
421
def report_not_applicable(self, test, skip_excinfo):
424
def report_unsupported(self, test, feature):
425
"""test cannot be run because feature is missing."""
427
def report_cleaning_up(self):
428
self.pb.update('cleaning up...')
431
if not self._supplied_pb:
435
class VerboseTestResult(ExtendedTestResult):
436
"""Produce long output, with one line per test run plus times"""
438
def _ellipsize_to_right(self, a_string, final_width):
439
"""Truncate and pad a string, keeping the right hand side"""
440
if len(a_string) > final_width:
441
result = '...' + a_string[3-final_width:]
444
return result.ljust(final_width)
446
def report_starting(self):
447
self.stream.write('running %d tests...\n' % self.num_tests)
449
def report_test_start(self, test):
451
name = self._shortened_test_description(test)
452
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
453
# numbers, plus a trailing blank
454
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
455
self.stream.write(self._ellipsize_to_right(name,
456
osutils.terminal_width()-30))
459
def _error_summary(self, err):
461
return '%s%s' % (indent, err[1])
463
def report_error(self, test, err):
464
self.stream.writeln('ERROR %s\n%s'
465
% (self._testTimeString(test),
466
self._error_summary(err)))
468
def report_failure(self, test, err):
469
self.stream.writeln(' FAIL %s\n%s'
470
% (self._testTimeString(test),
471
self._error_summary(err)))
473
def report_known_failure(self, test, err):
474
self.stream.writeln('XFAIL %s\n%s'
475
% (self._testTimeString(test),
476
self._error_summary(err)))
478
def report_success(self, test):
479
self.stream.writeln(' OK %s' % self._testTimeString(test))
480
for bench_called, stats in getattr(test, '_benchcalls', []):
481
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
482
stats.pprint(file=self.stream)
483
# flush the stream so that we get smooth output. This verbose mode is
484
# used to show the output in PQM.
487
def report_skip(self, test, skip_excinfo):
488
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(test),
490
self._error_summary(skip_excinfo)))
492
def report_not_applicable(self, test, skip_excinfo):
493
self.stream.writeln(' N/A %s\n%s'
494
% (self._testTimeString(test),
495
self._error_summary(skip_excinfo)))
497
def report_unsupported(self, test, feature):
498
"""test cannot be run because feature is missing."""
499
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
500
%(self._testTimeString(test), feature))
503
270
class TextTestRunner(object):
504
271
stop_on_failure = False
1000
472
self.assertEqual(mode, actual_mode,
1001
473
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
1003
def assertIsSameRealPath(self, path1, path2):
1004
"""Fail if path1 and path2 points to different files"""
1005
self.assertEqual(osutils.realpath(path1),
1006
osutils.realpath(path2),
1007
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1009
475
def assertIsInstance(self, obj, kls):
1010
476
"""Fail if obj is not an instance of kls"""
1011
477
if not isinstance(obj, kls):
1012
478
self.fail("%r is an instance of %s rather than %s" % (
1013
479
obj, obj.__class__, kls))
1015
def expectFailure(self, reason, assertion, *args, **kwargs):
1016
"""Invoke a test, expecting it to fail for the given reason.
1018
This is for assertions that ought to succeed, but currently fail.
1019
(The failure is *expected* but not *wanted*.) Please be very precise
1020
about the failure you're expecting. If a new bug is introduced,
1021
AssertionError should be raised, not KnownFailure.
1023
Frequently, expectFailure should be followed by an opposite assertion.
1026
Intended to be used with a callable that raises AssertionError as the
1027
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1029
Raises KnownFailure if the test fails. Raises AssertionError if the
1034
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1036
self.assertEqual(42, dynamic_val)
1038
This means that a dynamic_val of 54 will cause the test to raise
1039
a KnownFailure. Once math is fixed and the expectFailure is removed,
1040
only a dynamic_val of 42 will allow the test to pass. Anything other
1041
than 54 or 42 will cause an AssertionError.
1044
assertion(*args, **kwargs)
1045
except AssertionError:
1046
raise KnownFailure(reason)
1048
self.fail('Unexpected success. Should have failed: %s' % reason)
1050
def assertFileEqual(self, content, path):
1051
"""Fail if path does not contain 'content'."""
1052
self.failUnlessExists(path)
1053
f = file(path, 'rb')
1058
self.assertEqualDiff(content, s)
1060
def failUnlessExists(self, path):
1061
"""Fail unless path or paths, which may be abs or relative, exist."""
1062
if not isinstance(path, basestring):
1064
self.failUnlessExists(p)
1066
self.failUnless(osutils.lexists(path),path+" does not exist")
1068
def failIfExists(self, path):
1069
"""Fail if path or paths, which may be abs or relative, exist."""
1070
if not isinstance(path, basestring):
1072
self.failIfExists(p)
1074
self.failIf(osutils.lexists(path),path+" exists")
1076
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1077
"""A helper for callDeprecated and applyDeprecated.
1079
:param a_callable: A callable to call.
1080
:param args: The positional arguments for the callable
1081
:param kwargs: The keyword arguments for the callable
1082
:return: A tuple (warnings, result). result is the result of calling
1083
a_callable(``*args``, ``**kwargs``).
1086
def capture_warnings(msg, cls=None, stacklevel=None):
1087
# we've hooked into a deprecation specific callpath,
1088
# only deprecations should getting sent via it.
1089
self.assertEqual(cls, DeprecationWarning)
1090
local_warnings.append(msg)
1091
original_warning_method = symbol_versioning.warn
1092
symbol_versioning.set_warning_method(capture_warnings)
1094
result = a_callable(*args, **kwargs)
1096
symbol_versioning.set_warning_method(original_warning_method)
1097
return (local_warnings, result)
1099
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1100
"""Call a deprecated callable without warning the user.
1102
Note that this only captures warnings raised by symbol_versioning.warn,
1103
not other callers that go direct to the warning module.
1105
To test that a deprecated method raises an error, do something like
1108
self.assertRaises(errors.ReservedId,
1109
self.applyDeprecated,
1110
deprecated_in((1, 5, 0)),
1114
:param deprecation_format: The deprecation format that the callable
1115
should have been deprecated with. This is the same type as the
1116
parameter to deprecated_method/deprecated_function. If the
1117
callable is not deprecated with this format, an assertion error
1119
:param a_callable: A callable to call. This may be a bound method or
1120
a regular function. It will be called with ``*args`` and
1122
:param args: The positional arguments for the callable
1123
:param kwargs: The keyword arguments for the callable
1124
:return: The result of a_callable(``*args``, ``**kwargs``)
1126
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1128
expected_first_warning = symbol_versioning.deprecation_string(
1129
a_callable, deprecation_format)
1130
if len(call_warnings) == 0:
1131
self.fail("No deprecation warning generated by call to %s" %
1133
self.assertEqual(expected_first_warning, call_warnings[0])
1136
def callCatchWarnings(self, fn, *args, **kw):
1137
"""Call a callable that raises python warnings.
1139
The caller's responsible for examining the returned warnings.
1141
If the callable raises an exception, the exception is not
1142
caught and propagates up to the caller. In that case, the list
1143
of warnings is not available.
1145
:returns: ([warning_object, ...], fn_result)
1147
# XXX: This is not perfect, because it completely overrides the
1148
# warnings filters, and some code may depend on suppressing particular
1149
# warnings. It's the easiest way to insulate ourselves from -Werror,
1150
# though. -- Andrew, 20071062
1152
def _catcher(message, category, filename, lineno, file=None, line=None):
1153
# despite the name, 'message' is normally(?) a Warning subclass
1155
wlist.append(message)
1156
saved_showwarning = warnings.showwarning
1157
saved_filters = warnings.filters
1159
warnings.showwarning = _catcher
1160
warnings.filters = []
1161
result = fn(*args, **kw)
1163
warnings.showwarning = saved_showwarning
1164
warnings.filters = saved_filters
1165
return wlist, result
1167
def callDeprecated(self, expected, callable, *args, **kwargs):
1168
"""Assert that a callable is deprecated in a particular way.
1170
This is a very precise test for unusual requirements. The
1171
applyDeprecated helper function is probably more suited for most tests
1172
as it allows you to simply specify the deprecation format being used
1173
and will ensure that that is issued for the function being called.
1175
Note that this only captures warnings raised by symbol_versioning.warn,
1176
not other callers that go direct to the warning module. To catch
1177
general warnings, use callCatchWarnings.
1179
:param expected: a list of the deprecation warnings expected, in order
1180
:param callable: The callable to call
1181
:param args: The positional arguments for the callable
1182
:param kwargs: The keyword arguments for the callable
1184
call_warnings, result = self._capture_deprecation_warnings(callable,
1186
self.assertEqual(expected, call_warnings)
1189
481
def _startLogFile(self):
1190
482
"""Send bzr and test log messages to a temporary file.
1192
484
The file is removed as the test is torn down.
1194
486
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1195
self._log_file = os.fdopen(fileno, 'w+')
1196
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
487
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
488
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
489
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1197
490
self._log_file_name = name
1198
491
self.addCleanup(self._finishLogFile)
1200
493
def _finishLogFile(self):
1201
494
"""Finished with the log file.
1203
Close the file and delete it, unless setKeepLogfile was called.
496
Read contents into memory, close, and delete.
1205
if self._log_file is None:
1207
bzrlib.trace.pop_log_file(self._log_memento)
498
bzrlib.trace.disable_test_log(self._log_nonce)
499
self._log_file.seek(0)
500
self._log_contents = self._log_file.read()
1208
501
self._log_file.close()
1209
self._log_file = None
1210
if not self._keep_log_file:
1211
os.remove(self._log_file_name)
1212
self._log_file_name = None
1214
def setKeepLogfile(self):
1215
"""Make the logfile not be deleted when _finishLogFile is called."""
1216
self._keep_log_file = True
1218
def addCleanup(self, callable, *args, **kwargs):
502
os.remove(self._log_file_name)
503
self._log_file = self._log_file_name = None
505
def addCleanup(self, callable):
1219
506
"""Arrange to run a callable when this case is torn down.
1221
508
Callables are run in the reverse of the order they are registered,
1222
509
ie last-in first-out.
1224
self._cleanups.append((callable, args, kwargs))
511
if callable in self._cleanups:
512
raise ValueError("cleanup function %r already registered on %s"
514
self._cleanups.append(callable)
1226
516
def _cleanEnvironment(self):
1228
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1229
518
'HOME': os.getcwd(),
1230
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1231
# tests do check our impls match APPDATA
1232
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1234
'BZREMAIL': None, # may still be present in the environment
519
'APPDATA': os.getcwd(),
1236
'BZR_PROGRESS_BAR': None,
1239
'SSH_AUTH_SOCK': None,
1243
'https_proxy': None,
1244
'HTTPS_PROXY': None,
1249
# Nobody cares about these ones AFAIK. So far at
1250
# least. If you do (care), please update this comment
1254
'BZR_REMOTE_PATH': None,
1256
523
self.__old_env = {}
1257
524
self.addCleanup(self._restoreEnvironment)
1258
525
for name, value in new_env.iteritems():
1259
526
self._captureVar(name, value)
1261
529
def _captureVar(self, name, newvalue):
1262
"""Set an environment variable, and reset it when finished."""
1263
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
530
"""Set an environment variable, preparing it to be reset when finished."""
531
self.__old_env[name] = os.environ.get(name, None)
533
if name in os.environ:
536
os.environ[name] = newvalue
1265
def _restore_debug_flags(self):
1266
debug.debug_flags.clear()
1267
debug.debug_flags.update(self._preserved_debug_flags)
539
def _restoreVar(name, value):
541
if name in os.environ:
544
os.environ[name] = value
1269
546
def _restoreEnvironment(self):
1270
547
for name, value in self.__old_env.iteritems():
1271
osutils.set_or_unset_env(name, value)
1273
def _restoreHooks(self):
1274
for klass, hooks in self._preserved_hooks.items():
1275
setattr(klass, 'hooks', hooks)
1277
def knownFailure(self, reason):
1278
"""This test has failed for some known reason."""
1279
raise KnownFailure(reason)
1281
def run(self, result=None):
1282
if result is None: result = self.defaultTestResult()
1283
for feature in getattr(self, '_test_needs_features', []):
1284
if not feature.available():
1285
result.startTest(self)
1286
if getattr(result, 'addNotSupported', None):
1287
result.addNotSupported(self, feature)
1289
result.addSuccess(self)
1290
result.stopTest(self)
1293
return unittest.TestCase.run(self, result)
1296
absent_attr = object()
1297
for attr_name in self.attrs_to_keep:
1298
attr = getattr(self, attr_name, absent_attr)
1299
if attr is not absent_attr:
1300
saved_attrs[attr_name] = attr
1301
self.__dict__ = saved_attrs
548
self._restoreVar(name, value)
1303
550
def tearDown(self):
1304
551
self._runCleanups()
1469
662
This sends the stdout/stderr results into the test's log,
1470
663
where it may be useful for debugging. See also run_captured.
1472
:keyword stdin: A string to be used as stdin for the command.
1473
:keyword retcode: The status code the command should return;
1475
:keyword working_dir: The directory to run the command in
1476
:keyword error_regexes: A list of expected error messages. If
1477
specified they must be seen in the error output of the command.
1479
out, err = self._run_bzr_autosplit(
1484
working_dir=working_dir,
1486
for regex in error_regexes:
1487
self.assertContainsRe(err, regex)
1490
def run_bzr_error(self, error_regexes, *args, **kwargs):
1491
"""Run bzr, and check that stderr contains the supplied regexes
1493
:param error_regexes: Sequence of regular expressions which
1494
must each be found in the error output. The relative ordering
1496
:param args: command-line arguments for bzr
1497
:param kwargs: Keyword arguments which are interpreted by run_bzr
1498
This function changes the default value of retcode to be 3,
1499
since in most cases this is run when you expect bzr to fail.
1501
:return: (out, err) The actual output of running the command (in case
1502
you want to do more inspection)
1506
# Make sure that commit is failing because there is nothing to do
1507
self.run_bzr_error(['no changes to commit'],
1508
['commit', '-m', 'my commit comment'])
1509
# Make sure --strict is handling an unknown file, rather than
1510
# giving us the 'nothing to do' error
1511
self.build_tree(['unknown'])
1512
self.run_bzr_error(['Commit refused because there are unknown files'],
1513
['commit', --strict', '-m', 'my commit comment'])
1515
kwargs.setdefault('retcode', 3)
1516
kwargs['error_regexes'] = error_regexes
1517
out, err = self.run_bzr(*args, **kwargs)
1520
def run_bzr_subprocess(self, *args, **kwargs):
1521
"""Run bzr in a subprocess for testing.
1523
This starts a new Python interpreter and runs bzr in there.
1524
This should only be used for tests that have a justifiable need for
1525
this isolation: e.g. they are testing startup time, or signal
1526
handling, or early startup code, etc. Subprocess code can't be
1527
profiled or debugged so easily.
1529
:keyword retcode: The status code that is expected. Defaults to 0. If
1530
None is supplied, the status code is not checked.
1531
:keyword env_changes: A dictionary which lists changes to environment
1532
variables. A value of None will unset the env variable.
1533
The values must be strings. The change will only occur in the
1534
child, so you don't need to fix the environment after running.
1535
:keyword universal_newlines: Convert CRLF => LF
1536
:keyword allow_plugins: By default the subprocess is run with
1537
--no-plugins to ensure test reproducibility. Also, it is possible
1538
for system-wide plugins to create unexpected output on stderr,
1539
which can cause unnecessary test failures.
1541
env_changes = kwargs.get('env_changes', {})
1542
working_dir = kwargs.get('working_dir', None)
1543
allow_plugins = kwargs.get('allow_plugins', False)
1545
if isinstance(args[0], list):
1547
elif isinstance(args[0], basestring):
1548
args = list(shlex.split(args[0]))
1550
raise ValueError("passing varargs to run_bzr_subprocess")
1551
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1552
working_dir=working_dir,
1553
allow_plugins=allow_plugins)
1554
# We distinguish between retcode=None and retcode not passed.
1555
supplied_retcode = kwargs.get('retcode', 0)
1556
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1557
universal_newlines=kwargs.get('universal_newlines', False),
1560
def start_bzr_subprocess(self, process_args, env_changes=None,
1561
skip_if_plan_to_signal=False,
1563
allow_plugins=False):
1564
"""Start bzr in a subprocess for testing.
1566
This starts a new Python interpreter and runs bzr in there.
1567
This should only be used for tests that have a justifiable need for
1568
this isolation: e.g. they are testing startup time, or signal
1569
handling, or early startup code, etc. Subprocess code can't be
1570
profiled or debugged so easily.
1572
:param process_args: a list of arguments to pass to the bzr executable,
1573
for example ``['--version']``.
1574
:param env_changes: A dictionary which lists changes to environment
1575
variables. A value of None will unset the env variable.
1576
The values must be strings. The change will only occur in the
1577
child, so you don't need to fix the environment after running.
1578
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1580
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1582
:returns: Popen object for the started process.
1584
if skip_if_plan_to_signal:
1585
if not getattr(os, 'kill', None):
1586
raise TestSkipped("os.kill not available.")
1588
if env_changes is None:
1592
def cleanup_environment():
1593
for env_var, value in env_changes.iteritems():
1594
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1596
def restore_environment():
1597
for env_var, value in old_env.iteritems():
1598
osutils.set_or_unset_env(env_var, value)
1600
bzr_path = self.get_bzr_path()
1603
if working_dir is not None:
1604
cwd = osutils.getcwd()
1605
os.chdir(working_dir)
1608
# win32 subprocess doesn't support preexec_fn
1609
# so we will avoid using it on all platforms, just to
1610
# make sure the code path is used, and we don't break on win32
1611
cleanup_environment()
1612
command = [sys.executable]
1613
# frozen executables don't need the path to bzr
1614
if getattr(sys, "frozen", None) is None:
1615
command.append(bzr_path)
1616
if not allow_plugins:
1617
command.append('--no-plugins')
1618
command.extend(process_args)
1619
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1621
restore_environment()
1627
def _popen(self, *args, **kwargs):
1628
"""Place a call to Popen.
1630
Allows tests to override this method to intercept the calls made to
1631
Popen for introspection.
1633
return Popen(*args, **kwargs)
1635
def get_bzr_path(self):
1636
"""Return the path of the 'bzr' executable for this test suite."""
1637
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1638
if not os.path.isfile(bzr_path):
1639
# We are probably installed. Assume sys.argv is the right file
1640
bzr_path = sys.argv[0]
1643
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1644
universal_newlines=False, process_args=None):
1645
"""Finish the execution of process.
1647
:param process: the Popen object returned from start_bzr_subprocess.
1648
:param retcode: The status code that is expected. Defaults to 0. If
1649
None is supplied, the status code is not checked.
1650
:param send_signal: an optional signal to send to the process.
1651
:param universal_newlines: Convert CRLF => LF
1652
:returns: (stdout, stderr)
1654
if send_signal is not None:
1655
os.kill(process.pid, send_signal)
1656
out, err = process.communicate()
1658
if universal_newlines:
1659
out = out.replace('\r\n', '\n')
1660
err = err.replace('\r\n', '\n')
1662
if retcode is not None and retcode != process.returncode:
1663
if process_args is None:
1664
process_args = "(unknown args)"
1665
mutter('Output of bzr %s:\n%s', process_args, out)
1666
mutter('Error for bzr %s:\n%s', process_args, err)
1667
self.fail('Command bzr %s failed with retcode %s != %s'
1668
% (process_args, retcode, process.returncode))
665
:param stdin: A string to be used as stdin for the command.
667
retcode = kwargs.pop('retcode', 0)
668
stdin = kwargs.pop('stdin', None)
669
return self.run_bzr_captured(args, retcode, stdin)
1671
671
def check_inventory_shape(self, inv, shape):
1672
672
"""Compare an inventory to a list of expected names.
1720
720
sys.stderr = real_stderr
1721
721
sys.stdin = real_stdin
1723
def reduceLockdirTimeout(self):
1724
"""Reduce the default lock timeout for the duration of the test, so that
1725
if LockContention occurs during a test, it does so quickly.
1727
Tests that expect to provoke LockContention errors should call this.
1729
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1731
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1732
self.addCleanup(resetTimeout)
1733
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1735
def make_utf8_encoded_stringio(self, encoding_type=None):
1736
"""Return a StringIOWrapper instance, that will encode Unicode
1739
if encoding_type is None:
1740
encoding_type = 'strict'
1742
output_encoding = 'utf-8'
1743
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1744
sio.encoding = output_encoding
1748
class TestCaseWithMemoryTransport(TestCase):
1749
"""Common test class for tests that do not need disk resources.
1751
Tests that need disk resources should derive from TestCaseWithTransport.
1753
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1755
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1756
a directory which does not exist. This serves to help ensure test isolation
1757
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1758
must exist. However, TestCaseWithMemoryTransport does not offer local
1759
file defaults for the transport in tests, nor does it obey the command line
1760
override, so tests that accidentally write to the common directory should
1763
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1764
a .bzr directory that stops us ascending higher into the filesystem.
1770
def __init__(self, methodName='runTest'):
1771
# allow test parameterization after test construction and before test
1772
# execution. Variables that the parameterizer sets need to be
1773
# ones that are not set by setUp, or setUp will trash them.
1774
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1775
self.vfs_transport_factory = default_transport
1776
self.transport_server = None
1777
self.transport_readonly_server = None
1778
self.__vfs_server = None
1780
def get_transport(self, relpath=None):
1781
"""Return a writeable transport.
1783
This transport is for the test scratch space relative to
1786
:param relpath: a path relative to the base url.
1788
t = get_transport(self.get_url(relpath))
1789
self.assertFalse(t.is_readonly())
1792
def get_readonly_transport(self, relpath=None):
1793
"""Return a readonly transport for the test scratch space
1795
This can be used to test that operations which should only need
1796
readonly access in fact do not try to write.
1798
:param relpath: a path relative to the base url.
1800
t = get_transport(self.get_readonly_url(relpath))
1801
self.assertTrue(t.is_readonly())
1804
def create_transport_readonly_server(self):
1805
"""Create a transport server from class defined at init.
1807
This is mostly a hook for daughter classes.
1809
return self.transport_readonly_server()
1811
def get_readonly_server(self):
1812
"""Get the server instance for the readonly transport
1814
This is useful for some tests with specific servers to do diagnostics.
1816
if self.__readonly_server is None:
1817
if self.transport_readonly_server is None:
1818
# readonly decorator requested
1819
# bring up the server
1820
self.__readonly_server = ReadonlyServer()
1821
self.__readonly_server.setUp(self.get_vfs_only_server())
1823
self.__readonly_server = self.create_transport_readonly_server()
1824
self.__readonly_server.setUp(self.get_vfs_only_server())
1825
self.addCleanup(self.__readonly_server.tearDown)
1826
return self.__readonly_server
1828
def get_readonly_url(self, relpath=None):
1829
"""Get a URL for the readonly transport.
1831
This will either be backed by '.' or a decorator to the transport
1832
used by self.get_url()
1833
relpath provides for clients to get a path relative to the base url.
1834
These should only be downwards relative, not upwards.
1836
base = self.get_readonly_server().get_url()
1837
return self._adjust_url(base, relpath)
1839
def get_vfs_only_server(self):
1840
"""Get the vfs only read/write server instance.
1842
This is useful for some tests with specific servers that need
1845
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1846
is no means to override it.
1848
if self.__vfs_server is None:
1849
self.__vfs_server = MemoryServer()
1850
self.__vfs_server.setUp()
1851
self.addCleanup(self.__vfs_server.tearDown)
1852
return self.__vfs_server
1854
def get_server(self):
1855
"""Get the read/write server instance.
1857
This is useful for some tests with specific servers that need
1860
This is built from the self.transport_server factory. If that is None,
1861
then the self.get_vfs_server is returned.
1863
if self.__server is None:
1864
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1865
return self.get_vfs_only_server()
1867
# bring up a decorated means of access to the vfs only server.
1868
self.__server = self.transport_server()
1870
self.__server.setUp(self.get_vfs_only_server())
1871
except TypeError, e:
1872
# This should never happen; the try:Except here is to assist
1873
# developers having to update code rather than seeing an
1874
# uninformative TypeError.
1875
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1876
self.addCleanup(self.__server.tearDown)
1877
return self.__server
1879
def _adjust_url(self, base, relpath):
1880
"""Get a URL (or maybe a path) for the readwrite transport.
1882
This will either be backed by '.' or to an equivalent non-file based
1884
relpath provides for clients to get a path relative to the base url.
1885
These should only be downwards relative, not upwards.
1887
if relpath is not None and relpath != '.':
1888
if not base.endswith('/'):
1890
# XXX: Really base should be a url; we did after all call
1891
# get_url()! But sometimes it's just a path (from
1892
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1893
# to a non-escaped local path.
1894
if base.startswith('./') or base.startswith('/'):
1897
base += urlutils.escape(relpath)
1900
def get_url(self, relpath=None):
1901
"""Get a URL (or maybe a path) for the readwrite transport.
1903
This will either be backed by '.' or to an equivalent non-file based
1905
relpath provides for clients to get a path relative to the base url.
1906
These should only be downwards relative, not upwards.
1908
base = self.get_server().get_url()
1909
return self._adjust_url(base, relpath)
1911
def get_vfs_only_url(self, relpath=None):
1912
"""Get a URL (or maybe a path for the plain old vfs transport.
1914
This will never be a smart protocol. It always has all the
1915
capabilities of the local filesystem, but it might actually be a
1916
MemoryTransport or some other similar virtual filesystem.
1918
This is the backing transport (if any) of the server returned by
1919
get_url and get_readonly_url.
1921
:param relpath: provides for clients to get a path relative to the base
1922
url. These should only be downwards relative, not upwards.
1925
base = self.get_vfs_only_server().get_url()
1926
return self._adjust_url(base, relpath)
1928
def _create_safety_net(self):
1929
"""Make a fake bzr directory.
1931
This prevents any tests propagating up onto the TEST_ROOT directory's
1934
root = TestCaseWithMemoryTransport.TEST_ROOT
1935
bzrdir.BzrDir.create_standalone_workingtree(root)
1937
def _check_safety_net(self):
1938
"""Check that the safety .bzr directory have not been touched.
1940
_make_test_root have created a .bzr directory to prevent tests from
1941
propagating. This method ensures than a test did not leaked.
1943
root = TestCaseWithMemoryTransport.TEST_ROOT
1944
wt = workingtree.WorkingTree.open(root)
1945
last_rev = wt.last_revision()
1946
if last_rev != 'null:':
1947
# The current test have modified the /bzr directory, we need to
1948
# recreate a new one or all the followng tests will fail.
1949
# If you need to inspect its content uncomment the following line
1950
# import pdb; pdb.set_trace()
1951
_rmtree_temp_dir(root + '/.bzr')
1952
self._create_safety_net()
1953
raise AssertionError('%s/.bzr should not be modified' % root)
1955
def _make_test_root(self):
1956
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1957
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1958
TestCaseWithMemoryTransport.TEST_ROOT = root
1960
self._create_safety_net()
1962
# The same directory is used by all tests, and we're not
1963
# specifically told when all tests are finished. This will do.
1964
atexit.register(_rmtree_temp_dir, root)
1966
self.addCleanup(self._check_safety_net)
1968
def makeAndChdirToTestDir(self):
1969
"""Create a temporary directories for this one test.
1971
This must set self.test_home_dir and self.test_dir and chdir to
1974
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1976
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1977
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1978
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1980
def make_branch(self, relpath, format=None):
1981
"""Create a branch on the transport at relpath."""
1982
repo = self.make_repository(relpath, format=format)
1983
return repo.bzrdir.create_branch()
1985
def make_bzrdir(self, relpath, format=None):
1987
# might be a relative or absolute path
1988
maybe_a_url = self.get_url(relpath)
1989
segments = maybe_a_url.rsplit('/', 1)
1990
t = get_transport(maybe_a_url)
1991
if len(segments) > 1 and segments[-1] not in ('', '.'):
1995
if isinstance(format, basestring):
1996
format = bzrdir.format_registry.make_bzrdir(format)
1997
return format.initialize_on_transport(t)
1998
except errors.UninitializableFormat:
1999
raise TestSkipped("Format %s is not initializable." % format)
2001
def make_repository(self, relpath, shared=False, format=None):
2002
"""Create a repository on our default transport at relpath.
2004
Note that relpath must be a relative path, not a full url.
2006
# FIXME: If you create a remoterepository this returns the underlying
2007
# real format, which is incorrect. Actually we should make sure that
2008
# RemoteBzrDir returns a RemoteRepository.
2009
# maybe mbp 20070410
2010
made_control = self.make_bzrdir(relpath, format=format)
2011
return made_control.create_repository(shared=shared)
2013
def make_branch_and_memory_tree(self, relpath, format=None):
2014
"""Create a branch on the default transport and a MemoryTree for it."""
2015
b = self.make_branch(relpath, format=format)
2016
return memorytree.MemoryTree.create_on_branch(b)
2018
def make_branch_builder(self, relpath, format=None):
2019
url = self.get_url(relpath)
2020
tran = get_transport(url)
2021
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2023
def overrideEnvironmentForTesting(self):
2024
os.environ['HOME'] = self.test_home_dir
2025
os.environ['BZR_HOME'] = self.test_home_dir
2028
super(TestCaseWithMemoryTransport, self).setUp()
2029
self._make_test_root()
2030
_currentdir = os.getcwdu()
2031
def _leaveDirectory():
2032
os.chdir(_currentdir)
2033
self.addCleanup(_leaveDirectory)
2034
self.makeAndChdirToTestDir()
2035
self.overrideEnvironmentForTesting()
2036
self.__readonly_server = None
2037
self.__server = None
2038
self.reduceLockdirTimeout()
723
def merge(self, branch_from, wt_to):
724
"""A helper for tests to do a ui-less merge.
726
This should move to the main library when someone has time to integrate
729
# minimal ui-less merge.
730
wt_to.branch.fetch(branch_from)
731
base_rev = common_ancestor(branch_from.last_revision(),
732
wt_to.branch.last_revision(),
733
wt_to.branch.repository)
734
merge_inner(wt_to.branch, branch_from.basis_tree(),
735
wt_to.branch.repository.revision_tree(base_rev),
737
wt_to.add_pending_merge(branch_from.last_revision())
740
BzrTestBase = TestCase
2041
class TestCaseInTempDir(TestCaseWithMemoryTransport):
743
class TestCaseInTempDir(TestCase):
2042
744
"""Derived class that runs a test within a temporary directory.
2044
746
This is useful for tests that need to create a branch, etc.
2284
1040
def setUp(self):
2285
1041
super(ChrootedTestCase, self).setUp()
2286
if not self.vfs_transport_factory == MemoryServer:
2287
self.transport_readonly_server = HttpServer
2290
def condition_id_re(pattern):
2291
"""Create a condition filter which performs a re check on a test's id.
2293
:param pattern: A regular expression string.
2294
:return: A callable that returns True if the re matches.
1042
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1043
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1046
def filter_suite_by_re(suite, pattern):
1047
result = TestSuite()
2296
1048
filter_re = re.compile(pattern)
2297
def condition(test):
2299
return filter_re.search(test_id)
2303
def condition_isinstance(klass_or_klass_list):
2304
"""Create a condition filter which returns isinstance(param, klass).
2306
:return: A callable which when called with one parameter obj return the
2307
result of isinstance(obj, klass_or_klass_list).
2310
return isinstance(obj, klass_or_klass_list)
2314
def condition_id_in_list(id_list):
2315
"""Create a condition filter which verify that test's id in a list.
2317
:param id_list: A TestIdList object.
2318
:return: A callable that returns True if the test's id appears in the list.
2320
def condition(test):
2321
return id_list.includes(test.id())
2325
def condition_id_startswith(starts):
2326
"""Create a condition filter verifying that test's id starts with a string.
2328
:param starts: A list of string.
2329
:return: A callable that returns True if the test's id starts with one of
2332
def condition(test):
2333
for start in starts:
2334
if test.id().startswith(start):
2340
def exclude_tests_by_condition(suite, condition):
2341
"""Create a test suite which excludes some tests from suite.
2343
:param suite: The suite to get tests from.
2344
:param condition: A callable whose result evaluates True when called with a
2345
test case which should be excluded from the result.
2346
:return: A suite which contains the tests found in suite that fail
2350
for test in iter_suite_tests(suite):
2351
if not condition(test):
2353
return TestUtil.TestSuite(result)
2356
def filter_suite_by_condition(suite, condition):
2357
"""Create a test suite by filtering another one.
2359
:param suite: The source suite.
2360
:param condition: A callable whose result evaluates True when called with a
2361
test case which should be included in the result.
2362
:return: A suite which contains the tests found in suite that pass
2366
for test in iter_suite_tests(suite):
2369
return TestUtil.TestSuite(result)
2372
def filter_suite_by_re(suite, pattern):
2373
"""Create a test suite by filtering another one.
2375
:param suite: the source suite
2376
:param pattern: pattern that names must match
2377
:returns: the newly created suite
2379
condition = condition_id_re(pattern)
2380
result_suite = filter_suite_by_condition(suite, condition)
2384
def filter_suite_by_id_list(suite, test_id_list):
2385
"""Create a test suite by filtering another one.
2387
:param suite: The source suite.
2388
:param test_id_list: A list of the test ids to keep as strings.
2389
:returns: the newly created suite
2391
condition = condition_id_in_list(test_id_list)
2392
result_suite = filter_suite_by_condition(suite, condition)
2396
def filter_suite_by_id_startswith(suite, start):
2397
"""Create a test suite by filtering another one.
2399
:param suite: The source suite.
2400
:param start: A list of string the test id must start with one of.
2401
:returns: the newly created suite
2403
condition = condition_id_startswith(start)
2404
result_suite = filter_suite_by_condition(suite, condition)
2408
def exclude_tests_by_re(suite, pattern):
2409
"""Create a test suite which excludes some tests from suite.
2411
:param suite: The suite to get tests from.
2412
:param pattern: A regular expression string. Test ids that match this
2413
pattern will be excluded from the result.
2414
:return: A TestSuite that contains all the tests from suite without the
2415
tests that matched pattern. The order of tests is the same as it was in
2418
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2421
def preserve_input(something):
2422
"""A helper for performing test suite transformation chains.
2424
:param something: Anything you want to preserve.
2430
def randomize_suite(suite):
2431
"""Return a new TestSuite with suite's tests in random order.
2433
The tests in the input suite are flattened into a single suite in order to
2434
accomplish this. Any nested TestSuites are removed to provide global
2437
tests = list(iter_suite_tests(suite))
2438
random.shuffle(tests)
2439
return TestUtil.TestSuite(tests)
2442
def split_suite_by_condition(suite, condition):
2443
"""Split a test suite into two by a condition.
2445
:param suite: The suite to split.
2446
:param condition: The condition to match on. Tests that match this
2447
condition are returned in the first test suite, ones that do not match
2448
are in the second suite.
2449
:return: A tuple of two test suites, where the first contains tests from
2450
suite matching the condition, and the second contains the remainder
2451
from suite. The order within each output suite is the same as it was in
2456
for test in iter_suite_tests(suite):
2458
matched.append(test)
2460
did_not_match.append(test)
2461
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2464
def split_suite_by_re(suite, pattern):
2465
"""Split a test suite into two by a regular expression.
2467
:param suite: The suite to split.
2468
:param pattern: A regular expression string. Test ids that match this
2469
pattern will be in the first test suite returned, and the others in the
2470
second test suite returned.
2471
:return: A tuple of two test suites, where the first contains tests from
2472
suite matching pattern, and the second contains the remainder from
2473
suite. The order within each output suite is the same as it was in
2476
return split_suite_by_condition(suite, condition_id_re(pattern))
1049
for test in iter_suite_tests(suite):
1050
if filter_re.search(test.id()):
1051
result.addTest(test)
2479
1055
def run_suite(suite, name='test', verbose=False, pattern=".*",
2480
stop_on_failure=False,
2481
transport=None, lsprof_timed=None, bench_history=None,
2482
matching_tests_first=None,
2485
exclude_pattern=None,
1056
stop_on_failure=False, keep_output=False,
1057
transport=None, lsprof_timed=None):
1058
TestCaseInTempDir._TEST_NAME = name
2487
1059
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1065
pb = progress.ProgressBar()
2492
1066
runner = TextTestRunner(stream=sys.stdout,
2493
1067
descriptions=0,
2494
1068
verbosity=verbosity,
2495
bench_history=bench_history,
2496
list_only=list_only,
1069
keep_output=keep_output,
2498
1071
runner.stop_on_failure=stop_on_failure
2499
# Initialise the random number generator and display the seed used.
2500
# We convert the seed to a long to make it reuseable across invocations.
2501
random_order = False
2502
if random_seed is not None:
2504
if random_seed == "now":
2505
random_seed = long(time.time())
2507
# Convert the seed to a long if we can
2509
random_seed = long(random_seed)
2512
runner.stream.writeln("Randomizing test order using seed %s\n" %
2514
random.seed(random_seed)
2515
# Customise the list of tests if requested
2516
if exclude_pattern is not None:
2517
suite = exclude_tests_by_re(suite, exclude_pattern)
2519
order_changer = randomize_suite
2521
order_changer = preserve_input
2522
if pattern != '.*' or random_order:
2523
if matching_tests_first:
2524
suites = map(order_changer, split_suite_by_re(suite, pattern))
2525
suite = TestUtil.TestSuite(suites)
2527
suite = order_changer(filter_suite_by_re(suite, pattern))
1073
suite = filter_suite_by_re(suite, pattern)
2529
1074
result = runner.run(suite)
2532
return result.wasStrictlySuccessful()
2534
1075
return result.wasSuccessful()
2537
# Controlled by "bzr selftest -E=..." option
2538
selftest_debug_flags = set()
2541
1078
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2542
1080
transport=None,
2543
1081
test_suite_factory=None,
2546
matching_tests_first=None,
2549
exclude_pattern=None,
2555
1083
"""Run the whole test suite under the enhanced runner"""
2556
# XXX: Very ugly way to do this...
2557
# Disable warning about old formats because we don't want it to disturb
2558
# any blackbox tests.
2559
from bzrlib import repository
2560
repository._deprecation_warning_done = True
2562
1084
global default_transport
2563
1085
if transport is None:
2564
1086
transport = default_transport
2565
1087
old_transport = default_transport
2566
1088
default_transport = transport
2567
global selftest_debug_flags
2568
old_debug_flags = selftest_debug_flags
2569
if debug_flags is not None:
2570
selftest_debug_flags = set(debug_flags)
2572
if load_list is None:
2575
keep_only = load_test_id_list(load_list)
2576
1090
if test_suite_factory is None:
2577
suite = test_suite(keep_only, starting_with)
1091
suite = test_suite()
2579
1093
suite = test_suite_factory()
2580
1094
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2581
stop_on_failure=stop_on_failure,
1095
stop_on_failure=stop_on_failure, keep_output=keep_output,
2582
1096
transport=transport,
2583
lsprof_timed=lsprof_timed,
2584
bench_history=bench_history,
2585
matching_tests_first=matching_tests_first,
2586
list_only=list_only,
2587
random_seed=random_seed,
2588
exclude_pattern=exclude_pattern,
1097
lsprof_timed=lsprof_timed)
2591
1099
default_transport = old_transport
2592
selftest_debug_flags = old_debug_flags
2595
def load_test_id_list(file_name):
2596
"""Load a test id list from a text file.
2598
The format is one test id by line. No special care is taken to impose
2599
strict rules, these test ids are used to filter the test suite so a test id
2600
that do not match an existing test will do no harm. This allows user to add
2601
comments, leave blank lines, etc.
2605
ftest = open(file_name, 'rt')
2607
if e.errno != errno.ENOENT:
2610
raise errors.NoSuchFile(file_name)
2612
for test_name in ftest.readlines():
2613
test_list.append(test_name.strip())
2618
def suite_matches_id_list(test_suite, id_list):
2619
"""Warns about tests not appearing or appearing more than once.
2621
:param test_suite: A TestSuite object.
2622
:param test_id_list: The list of test ids that should be found in
2625
:return: (absents, duplicates) absents is a list containing the test found
2626
in id_list but not in test_suite, duplicates is a list containing the
2627
test found multiple times in test_suite.
2629
When using a prefined test id list, it may occurs that some tests do not
2630
exist anymore or that some tests use the same id. This function warns the
2631
tester about potential problems in his workflow (test lists are volatile)
2632
or in the test suite itself (using the same id for several tests does not
2633
help to localize defects).
2635
# Build a dict counting id occurrences
2637
for test in iter_suite_tests(test_suite):
2639
tests[id] = tests.get(id, 0) + 1
2644
occurs = tests.get(id, 0)
2646
not_found.append(id)
2648
duplicates.append(id)
2650
return not_found, duplicates
2653
class TestIdList(object):
2654
"""Test id list to filter a test suite.
2656
Relying on the assumption that test ids are built as:
2657
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2658
notation, this class offers methods to :
2659
- avoid building a test suite for modules not refered to in the test list,
2660
- keep only the tests listed from the module test suite.
2663
def __init__(self, test_id_list):
2664
# When a test suite needs to be filtered against us we compare test ids
2665
# for equality, so a simple dict offers a quick and simple solution.
2666
self.tests = dict().fromkeys(test_id_list, True)
2668
# While unittest.TestCase have ids like:
2669
# <module>.<class>.<method>[(<param+)],
2670
# doctest.DocTestCase can have ids like:
2673
# <module>.<function>
2674
# <module>.<class>.<method>
2676
# Since we can't predict a test class from its name only, we settle on
2677
# a simple constraint: a test id always begins with its module name.
2680
for test_id in test_id_list:
2681
parts = test_id.split('.')
2682
mod_name = parts.pop(0)
2683
modules[mod_name] = True
2685
mod_name += '.' + part
2686
modules[mod_name] = True
2687
self.modules = modules
2689
def refers_to(self, module_name):
2690
"""Is there tests for the module or one of its sub modules."""
2691
return self.modules.has_key(module_name)
2693
def includes(self, test_id):
2694
return self.tests.has_key(test_id)
2697
class TestPrefixAliasRegistry(registry.Registry):
2698
"""A registry for test prefix aliases.
2700
This helps implement shorcuts for the --starting-with selftest
2701
option. Overriding existing prefixes is not allowed but not fatal (a
2702
warning will be emitted).
2705
def register(self, key, obj, help=None, info=None,
2706
override_existing=False):
2707
"""See Registry.register.
2709
Trying to override an existing alias causes a warning to be emitted,
2710
not a fatal execption.
2713
super(TestPrefixAliasRegistry, self).register(
2714
key, obj, help=help, info=info, override_existing=False)
2716
actual = self.get(key)
2717
note('Test prefix alias %s is already used for %s, ignoring %s'
2718
% (key, actual, obj))
2720
def resolve_alias(self, id_start):
2721
"""Replace the alias by the prefix in the given string.
2723
Using an unknown prefix is an error to help catching typos.
2725
parts = id_start.split('.')
2727
parts[0] = self.get(parts[0])
2729
raise errors.BzrCommandError(
2730
'%s is not a known test prefix alias' % parts[0])
2731
return '.'.join(parts)
2734
test_prefix_alias_registry = TestPrefixAliasRegistry()
2735
"""Registry of test prefix aliases."""
2738
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2739
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2740
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2742
# Obvious higest levels prefixes, feel free to add your own via a plugin
2743
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2744
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2745
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2746
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2747
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2750
def test_suite(keep_only=None, starting_with=None):
2751
1103
"""Build and return TestSuite for the whole of bzrlib.
2753
:param keep_only: A list of test ids limiting the suite returned.
2755
:param starting_with: An id limiting the suite returned to the tests
2758
1105
This function can be replaced if you need to change the default test
2759
1106
suite on a global basis, but it is not encouraged.
2763
'bzrlib.tests.blackbox',
2764
'bzrlib.tests.branch_implementations',
2765
'bzrlib.tests.bzrdir_implementations',
2766
'bzrlib.tests.commands',
2767
'bzrlib.tests.interrepository_implementations',
2768
'bzrlib.tests.intertree_implementations',
2769
'bzrlib.tests.inventory_implementations',
2770
'bzrlib.tests.per_lock',
2771
'bzrlib.tests.per_repository',
2772
'bzrlib.tests.per_repository_reference',
2773
'bzrlib.tests.test__dirstate_helpers',
2774
'bzrlib.tests.test__walkdirs_win32',
1108
from doctest import DocTestSuite
1110
global MODULES_TO_DOCTEST
2775
1113
'bzrlib.tests.test_ancestry',
2776
'bzrlib.tests.test_annotate',
2777
1114
'bzrlib.tests.test_api',
2778
'bzrlib.tests.test_atomicfile',
2779
1115
'bzrlib.tests.test_bad_files',
2780
'bzrlib.tests.test_bisect_multi',
2781
1116
'bzrlib.tests.test_branch',
2782
'bzrlib.tests.test_branchbuilder',
2783
'bzrlib.tests.test_btree_index',
2784
'bzrlib.tests.test_bugtracker',
2785
'bzrlib.tests.test_bundle',
2786
1117
'bzrlib.tests.test_bzrdir',
2787
'bzrlib.tests.test_cache_utf8',
2788
'bzrlib.tests.test_chunk_writer',
2789
'bzrlib.tests.test_commands',
1118
'bzrlib.tests.test_command',
2790
1119
'bzrlib.tests.test_commit',
2791
1120
'bzrlib.tests.test_commit_merge',
2792
1121
'bzrlib.tests.test_config',
2793
1122
'bzrlib.tests.test_conflicts',
2794
'bzrlib.tests.test_counted_lock',
2795
1123
'bzrlib.tests.test_decorators',
2796
'bzrlib.tests.test_delta',
2797
'bzrlib.tests.test_deprecated_graph',
2798
1124
'bzrlib.tests.test_diff',
2799
'bzrlib.tests.test_directory_service',
2800
'bzrlib.tests.test_dirstate',
2801
'bzrlib.tests.test_email_message',
1125
'bzrlib.tests.test_doc_generate',
2802
1126
'bzrlib.tests.test_errors',
2803
'bzrlib.tests.test_extract',
1127
'bzrlib.tests.test_escaped_store',
2804
1128
'bzrlib.tests.test_fetch',
2805
'bzrlib.tests.test_ftp_transport',
2806
'bzrlib.tests.test_generate_docs',
2807
'bzrlib.tests.test_generate_ids',
2808
'bzrlib.tests.test_globbing',
2809
1129
'bzrlib.tests.test_gpg',
2810
1130
'bzrlib.tests.test_graph',
2811
1131
'bzrlib.tests.test_hashcache',
2812
'bzrlib.tests.test_help',
2813
'bzrlib.tests.test_hooks',
2814
1132
'bzrlib.tests.test_http',
2815
'bzrlib.tests.test_http_implementations',
2816
'bzrlib.tests.test_http_response',
2817
'bzrlib.tests.test_https_ca_bundle',
2818
1133
'bzrlib.tests.test_identitymap',
2819
'bzrlib.tests.test_ignores',
2820
'bzrlib.tests.test_index',
2821
'bzrlib.tests.test_info',
2822
1134
'bzrlib.tests.test_inv',
2823
1135
'bzrlib.tests.test_knit',
2824
'bzrlib.tests.test_lazy_import',
2825
'bzrlib.tests.test_lazy_regex',
1136
'bzrlib.tests.test_lockdir',
2826
1137
'bzrlib.tests.test_lockable_files',
2827
'bzrlib.tests.test_lockdir',
2828
1138
'bzrlib.tests.test_log',
2829
'bzrlib.tests.test_lru_cache',
2830
'bzrlib.tests.test_lsprof',
2831
'bzrlib.tests.test_mail_client',
2832
'bzrlib.tests.test_memorytree',
2833
1139
'bzrlib.tests.test_merge',
2834
1140
'bzrlib.tests.test_merge3',
2835
1141
'bzrlib.tests.test_merge_core',
2836
'bzrlib.tests.test_merge_directive',
2837
1142
'bzrlib.tests.test_missing',
2838
1143
'bzrlib.tests.test_msgeditor',
2839
'bzrlib.tests.test_multiparent',
2840
'bzrlib.tests.test_mutabletree',
2841
1144
'bzrlib.tests.test_nonascii',
2842
1145
'bzrlib.tests.test_options',
2843
1146
'bzrlib.tests.test_osutils',
2844
'bzrlib.tests.test_osutils_encodings',
2845
'bzrlib.tests.test_pack',
2846
'bzrlib.tests.test_pack_repository',
2847
1147
'bzrlib.tests.test_patch',
2848
'bzrlib.tests.test_patches',
2849
1148
'bzrlib.tests.test_permissions',
2850
1149
'bzrlib.tests.test_plugins',
2851
1150
'bzrlib.tests.test_progress',
2852
'bzrlib.tests.test_read_bundle',
2853
1151
'bzrlib.tests.test_reconcile',
2854
'bzrlib.tests.test_reconfigure',
2855
'bzrlib.tests.test_registry',
2856
'bzrlib.tests.test_remote',
2857
1152
'bzrlib.tests.test_repository',
2858
'bzrlib.tests.test_revert',
2859
1153
'bzrlib.tests.test_revision',
2860
'bzrlib.tests.test_revisionspec',
2861
'bzrlib.tests.test_revisiontree',
1154
'bzrlib.tests.test_revisionnamespaces',
1155
'bzrlib.tests.test_revprops',
2862
1156
'bzrlib.tests.test_rio',
2863
'bzrlib.tests.test_rules',
2864
1157
'bzrlib.tests.test_sampler',
2865
1158
'bzrlib.tests.test_selftest',
2866
1159
'bzrlib.tests.test_setup',
2867
1160
'bzrlib.tests.test_sftp_transport',
2868
'bzrlib.tests.test_smart',
2869
1161
'bzrlib.tests.test_smart_add',
2870
'bzrlib.tests.test_smart_transport',
2871
'bzrlib.tests.test_smtp_connection',
2872
1162
'bzrlib.tests.test_source',
2873
'bzrlib.tests.test_ssh_transport',
2874
1163
'bzrlib.tests.test_status',
2875
1164
'bzrlib.tests.test_store',
2876
'bzrlib.tests.test_strace',
2877
'bzrlib.tests.test_subsume',
2878
'bzrlib.tests.test_switch',
2879
1165
'bzrlib.tests.test_symbol_versioning',
2880
'bzrlib.tests.test_tag',
2881
1166
'bzrlib.tests.test_testament',
2882
1167
'bzrlib.tests.test_textfile',
2883
1168
'bzrlib.tests.test_textmerge',
2884
'bzrlib.tests.test_timestamp',
2885
1169
'bzrlib.tests.test_trace',
2886
1170
'bzrlib.tests.test_transactions',
2887
1171
'bzrlib.tests.test_transform',
2888
1172
'bzrlib.tests.test_transport',
2889
'bzrlib.tests.test_transport_implementations',
2890
'bzrlib.tests.test_transport_log',
2891
'bzrlib.tests.test_tree',
2892
'bzrlib.tests.test_treebuilder',
2893
1173
'bzrlib.tests.test_tsort',
2894
1174
'bzrlib.tests.test_tuned_gzip',
2895
1175
'bzrlib.tests.test_ui',
2896
'bzrlib.tests.test_uncommit',
2897
1176
'bzrlib.tests.test_upgrade',
2898
'bzrlib.tests.test_upgrade_stacked',
2899
'bzrlib.tests.test_urlutils',
2900
'bzrlib.tests.test_version',
2901
'bzrlib.tests.test_version_info',
2902
1177
'bzrlib.tests.test_versionedfile',
2903
1178
'bzrlib.tests.test_weave',
2904
1179
'bzrlib.tests.test_whitebox',
2905
'bzrlib.tests.test_win32utils',
2906
1180
'bzrlib.tests.test_workingtree',
2907
'bzrlib.tests.test_workingtree_4',
2908
'bzrlib.tests.test_wsgi',
2909
1181
'bzrlib.tests.test_xml',
2910
'bzrlib.tests.tree_implementations',
2911
'bzrlib.tests.workingtree_implementations',
2912
'bzrlib.util.tests.test_bencode',
1183
test_transport_implementations = [
1184
'bzrlib.tests.test_transport_implementations']
2915
1187
loader = TestUtil.TestLoader()
2918
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2919
for start in starting_with]
2920
# We take precedence over keep_only because *at loading time* using
2921
# both options means we will load less tests for the same final result.
2922
def interesting_module(name):
2923
for start in starting_with:
2925
# Either the module name starts with the specified string
2926
name.startswith(start)
2927
# or it may contain tests starting with the specified string
2928
or start.startswith(name)
2932
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2934
elif keep_only is not None:
2935
id_filter = TestIdList(keep_only)
2936
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2937
def interesting_module(name):
2938
return id_filter.refers_to(name)
2941
loader = TestUtil.TestLoader()
2942
def interesting_module(name):
2943
# No filtering, all modules are interesting
2946
suite = loader.suiteClass()
2948
# modules building their suite with loadTestsFromModuleNames
1188
from bzrlib.transport import TransportTestProviderAdapter
1189
adapter = TransportTestProviderAdapter()
1190
adapt_modules(test_transport_implementations, adapter, loader, suite)
2949
1191
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2951
modules_to_doctest = [
2956
'bzrlib.iterablefile',
2961
'bzrlib.symbol_versioning',
2964
'bzrlib.version_info_formats.format_custom',
2967
for mod in modules_to_doctest:
2968
if not interesting_module(mod):
2969
# No tests to keep here, move along
2972
doc_suite = doctest.DocTestSuite(mod)
2973
except ValueError, e:
2974
print '**failed to get doctest for: %s\n%s' % (mod, e)
2976
suite.addTest(doc_suite)
2978
default_encoding = sys.getdefaultencoding()
2979
for name, plugin in bzrlib.plugin.plugins().items():
2980
if not interesting_module(plugin.module.__name__):
2982
plugin_suite = plugin.test_suite()
2983
# We used to catch ImportError here and turn it into just a warning,
2984
# but really if you don't have --no-plugins this should be a failure.
2985
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2986
if plugin_suite is None:
2987
plugin_suite = plugin.load_plugin_tests(loader)
2988
if plugin_suite is not None:
2989
suite.addTest(plugin_suite)
2990
if default_encoding != sys.getdefaultencoding():
2991
bzrlib.trace.warning(
2992
'Plugin "%s" tried to reset default encoding to: %s', name,
2993
sys.getdefaultencoding())
2995
sys.setdefaultencoding(default_encoding)
2998
suite = filter_suite_by_id_startswith(suite, starting_with)
3000
if keep_only is not None:
3001
# Now that the referred modules have loaded their tests, keep only the
3003
suite = filter_suite_by_id_list(suite, id_filter)
3004
# Do some sanity checks on the id_list filtering
3005
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3007
# The tester has used both keep_only and starting_with, so he is
3008
# already aware that some tests are excluded from the list, there
3009
# is no need to tell him which.
3012
# Some tests mentioned in the list are not in the test suite. The
3013
# list may be out of date, report to the tester.
3014
for id in not_found:
3015
bzrlib.trace.warning('"%s" not found in the test suite', id)
3016
for id in duplicates:
3017
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3022
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3023
"""Adapt all tests in some given modules to given scenarios.
3025
This is the recommended public interface for test parameterization.
3026
Typically the test_suite() method for a per-implementation test
3027
suite will call multiply_tests_from_modules and return the
3030
:param module_name_list: List of fully-qualified names of test
3032
:param scenario_iter: Iterable of pairs of (scenario_name,
3033
scenario_param_dict).
3034
:param loader: If provided, will be used instead of a new
3035
bzrlib.tests.TestLoader() instance.
3037
This returns a new TestSuite containing the cross product of
3038
all the tests in all the modules, each repeated for each scenario.
3039
Each test is adapted by adding the scenario name at the end
3040
of its name, and updating the test object's __dict__ with the
3041
scenario_param_dict.
3043
>>> r = multiply_tests_from_modules(
3044
... ['bzrlib.tests.test_sampler'],
3045
... [('one', dict(param=1)),
3046
... ('two', dict(param=2))])
3047
>>> tests = list(iter_suite_tests(r))
3051
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3057
# XXX: Isn't load_tests() a better way to provide the same functionality
3058
# without forcing a predefined TestScenarioApplier ? --vila 080215
3060
loader = TestUtil.TestLoader()
3062
suite = loader.suiteClass()
3064
adapter = TestScenarioApplier()
3065
adapter.scenarios = list(scenario_iter)
3066
adapt_modules(module_name_list, adapter, loader, suite)
3070
def multiply_scenarios(scenarios_left, scenarios_right):
3071
"""Multiply two sets of scenarios.
3073
:returns: the cartesian product of the two sets of scenarios, that is
3074
a scenario for every possible combination of a left scenario and a
3078
('%s,%s' % (left_name, right_name),
3079
dict(left_dict.items() + right_dict.items()))
3080
for left_name, left_dict in scenarios_left
3081
for right_name, right_dict in scenarios_right]
1192
for package in packages_to_test():
1193
suite.addTest(package.test_suite())
1194
for m in MODULES_TO_TEST:
1195
suite.addTest(loader.loadTestsFromModule(m))
1196
for m in (MODULES_TO_DOCTEST):
1197
suite.addTest(DocTestSuite(m))
1198
for name, plugin in bzrlib.plugin.all_plugins().items():
1199
if getattr(plugin, 'test_suite', None) is not None:
1200
suite.addTest(plugin.test_suite())
3085
1204
def adapt_modules(mods_list, adapter, loader, suite):
3086
1205
"""Adapt the modules in mods_list using adapter and add to suite."""
3087
tests = loader.loadTestsFromModuleNames(mods_list)
3088
adapt_tests(tests, adapter, suite)
3091
def adapt_tests(tests_list, adapter, suite):
3092
"""Adapt the tests in tests_list using adapter and add to suite."""
3093
for test in iter_suite_tests(tests_list):
1206
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
3094
1207
suite.addTests(adapter.adapt(test))
3097
def _rmtree_temp_dir(dirname):
3098
# If LANG=C we probably have created some bogus paths
3099
# which rmtree(unicode) will fail to delete
3100
# so make sure we are using rmtree(str) to delete everything
3101
# except on win32, where rmtree(str) will fail
3102
# since it doesn't have the property of byte-stream paths
3103
# (they are either ascii or mbcs)
3104
if sys.platform == 'win32':
3105
# make sure we are using the unicode win32 api
3106
dirname = unicode(dirname)
3108
dirname = dirname.encode(sys.getfilesystemencoding())
3110
osutils.rmtree(dirname)
3112
if sys.platform == 'win32' and e.errno == errno.EACCES:
3113
sys.stderr.write(('Permission denied: '
3114
'unable to remove testing dir '
3115
'%s\n' % os.path.basename(dirname)))
3120
class Feature(object):
3121
"""An operating system Feature."""
3124
self._available = None
3126
def available(self):
3127
"""Is the feature available?
3129
:return: True if the feature is available.
3131
if self._available is None:
3132
self._available = self._probe()
3133
return self._available
3136
"""Implement this method in concrete features.
3138
:return: True if the feature is available.
3140
raise NotImplementedError
3143
if getattr(self, 'feature_name', None):
3144
return self.feature_name()
3145
return self.__class__.__name__
3148
class _SymlinkFeature(Feature):
3151
return osutils.has_symlinks()
3153
def feature_name(self):
3156
SymlinkFeature = _SymlinkFeature()
3159
class _HardlinkFeature(Feature):
3162
return osutils.has_hardlinks()
3164
def feature_name(self):
3167
HardlinkFeature = _HardlinkFeature()
3170
class _OsFifoFeature(Feature):
3173
return getattr(os, 'mkfifo', None)
3175
def feature_name(self):
3176
return 'filesystem fifos'
3178
OsFifoFeature = _OsFifoFeature()
3181
class _UnicodeFilenameFeature(Feature):
3182
"""Does the filesystem support Unicode filenames?"""
3186
# Check for character combinations unlikely to be covered by any
3187
# single non-unicode encoding. We use the characters
3188
# - greek small letter alpha (U+03B1) and
3189
# - braille pattern dots-123456 (U+283F).
3190
os.stat(u'\u03b1\u283f')
3191
except UnicodeEncodeError:
3193
except (IOError, OSError):
3194
# The filesystem allows the Unicode filename but the file doesn't
3198
# The filesystem allows the Unicode filename and the file exists,
3202
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3205
class TestScenarioApplier(object):
3206
"""A tool to apply scenarios to tests."""
3208
def adapt(self, test):
3209
"""Return a TestSuite containing a copy of test for each scenario."""
3210
result = unittest.TestSuite()
3211
for scenario in self.scenarios:
3212
result.addTest(self.adapt_test_to_scenario(test, scenario))
3215
def adapt_test_to_scenario(self, test, scenario):
3216
"""Copy test and apply scenario to it.
3218
:param test: A test to adapt.
3219
:param scenario: A tuple describing the scenarion.
3220
The first element of the tuple is the new test id.
3221
The second element is a dict containing attributes to set on the
3223
:return: The adapted test.
3225
from copy import deepcopy
3226
new_test = deepcopy(test)
3227
for name, value in scenario[1].items():
3228
setattr(new_test, name, value)
3229
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3230
new_test.id = lambda: new_id
3234
def probe_unicode_in_user_encoding():
3235
"""Try to encode several unicode strings to use in unicode-aware tests.
3236
Return first successfull match.
3238
:return: (unicode value, encoded plain string value) or (None, None)
3240
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3241
for uni_val in possible_vals:
3243
str_val = uni_val.encode(osutils.get_user_encoding())
3244
except UnicodeEncodeError:
3245
# Try a different character
3248
return uni_val, str_val
3252
def probe_bad_non_ascii(encoding):
3253
"""Try to find [bad] character with code [128..255]
3254
that cannot be decoded to unicode in some encoding.
3255
Return None if all non-ascii characters is valid
3258
for i in xrange(128, 256):
3261
char.decode(encoding)
3262
except UnicodeDecodeError:
3267
class _FTPServerFeature(Feature):
3268
"""Some tests want an FTP Server, check if one is available.
3270
Right now, the only way this is available is if 'medusa' is installed.
3271
http://www.amk.ca/python/code/medusa.html
3276
import bzrlib.tests.ftp_server
3281
def feature_name(self):
3284
FTPServerFeature = _FTPServerFeature()
3287
class _UnicodeFilename(Feature):
3288
"""Does the filesystem support Unicode filenames?"""
3293
except UnicodeEncodeError:
3295
except (IOError, OSError):
3296
# The filesystem allows the Unicode filename but the file doesn't
3300
# The filesystem allows the Unicode filename and the file exists,
3304
UnicodeFilename = _UnicodeFilename()
3307
class _UTF8Filesystem(Feature):
3308
"""Is the filesystem UTF-8?"""
3311
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3315
UTF8Filesystem = _UTF8Filesystem()
3318
class _CaseInsensitiveFilesystemFeature(Feature):
3319
"""Check if underlying filesystem is case-insensitive
3320
(e.g. on Windows, Cygwin, MacOS)
3324
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3325
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3326
TestCaseWithMemoryTransport.TEST_ROOT = root
3328
root = TestCaseWithMemoryTransport.TEST_ROOT
3329
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3331
name_a = osutils.pathjoin(tdir, 'a')
3332
name_A = osutils.pathjoin(tdir, 'A')
3334
result = osutils.isdir(name_A)
3335
_rmtree_temp_dir(tdir)
3338
def feature_name(self):
3339
return 'case-insensitive filesystem'
3341
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()