21
21
# little as possible, so this should be used rarely if it's added at all.
22
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
25
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
64
39
import bzrlib.branch
40
import bzrlib.bzrdir as bzrdir
65
41
import bzrlib.commands
66
import bzrlib.timestamp
42
import bzrlib.errors as errors
68
43
import bzrlib.inventory
69
44
import bzrlib.iterablefile
70
45
import bzrlib.lockdir
74
# lsprof not available
76
from bzrlib.merge import merge_inner
77
46
import bzrlib.merge3
48
import bzrlib.osutils as osutils
78
49
import bzrlib.plugin
79
50
import bzrlib.store
80
from bzrlib import symbol_versioning
81
from bzrlib.symbol_versioning import (
87
51
import bzrlib.trace
88
from bzrlib.transport import get_transport
52
from bzrlib.transport import urlescape, get_transport
89
53
import bzrlib.transport
90
from bzrlib.transport.local import LocalURLServer
91
from bzrlib.transport.memory import MemoryServer
54
from bzrlib.transport.local import LocalRelpathServer
92
55
from bzrlib.transport.readonly import ReadonlyServer
93
from bzrlib.trace import mutter, note
94
from bzrlib.tests import TestUtil
95
from bzrlib.tests.http_server import HttpServer
96
from bzrlib.tests.TestUtil import (
56
from bzrlib.trace import mutter
57
from bzrlib.tests.TestUtil import TestLoader, TestSuite
100
58
from bzrlib.tests.treeshape import build_tree_contents
101
import bzrlib.version_info_formats.format_custom
102
59
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
104
# Mark this python module as being part of the implementation
105
# of unittest: this gives us better tracebacks where the last
106
# shown frame is the test code, not our assertXYZ.
109
default_transport = LocalURLServer
112
class ExtendedTestResult(unittest._TextTestResult):
113
"""Accepts, reports and accumulates the results of running tests.
115
Compared to the unittest version this class adds support for
116
profiling, benchmarking, stopping as soon as a test fails, and
117
skipping tests. There are further-specialized subclasses for
118
different types of display.
120
When a test finishes, in whatever way, it calls one of the addSuccess,
121
addFailure or addError classes. These in turn may redirect to a more
122
specific case for the special test results supported by our extended
125
Note that just one of these objects is fed the results from many tests.
61
default_transport = LocalRelpathServer
64
MODULES_TO_DOCTEST = [
76
def packages_to_test():
77
"""Return a list of packages to test.
79
The packages are not globally imported so that import failures are
80
triggered when running selftest, not when importing the command.
83
import bzrlib.tests.blackbox
84
import bzrlib.tests.branch_implementations
85
import bzrlib.tests.bzrdir_implementations
86
import bzrlib.tests.interrepository_implementations
87
import bzrlib.tests.repository_implementations
88
import bzrlib.tests.workingtree_implementations
91
bzrlib.tests.blackbox,
92
bzrlib.tests.branch_implementations,
93
bzrlib.tests.bzrdir_implementations,
94
bzrlib.tests.interrepository_implementations,
95
bzrlib.tests.repository_implementations,
96
bzrlib.tests.workingtree_implementations,
100
class _MyResult(unittest._TextTestResult):
101
"""Custom TestResult.
103
Shows output in a different format, including displaying runtime for tests.
128
105
stop_early = False
130
def __init__(self, stream, descriptions, verbosity,
134
"""Construct new TestResult.
136
:param bench_history: Optionally, a writable file object to accumulate
139
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
140
if bench_history is not None:
141
from bzrlib.version import _get_bzr_source_tree
142
src_tree = _get_bzr_source_tree()
145
revision_id = src_tree.get_parent_ids()[0]
147
# XXX: if this is a brand new tree, do the same as if there
151
# XXX: If there's no branch, what should we do?
153
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
154
self._bench_history = bench_history
155
self.ui = ui.ui_factory
156
self.num_tests = num_tests
158
self.failure_count = 0
159
self.known_failure_count = 0
161
self.not_applicable_count = 0
162
self.unsupported = {}
164
self._overall_start_time = time.time()
166
def _extractBenchmarkTime(self, testCase):
167
"""Add a benchmark time for the current test case."""
168
return getattr(testCase, "_benchtime", None)
170
def _elapsedTestTimeString(self):
171
"""Return a time string for the overall time the current test has taken."""
172
return self._formatTime(time.time() - self._start_time)
174
def _testTimeString(self, testCase):
175
benchmark_time = self._extractBenchmarkTime(testCase)
176
if benchmark_time is not None:
178
self._formatTime(benchmark_time),
179
self._elapsedTestTimeString())
181
return " %s" % self._elapsedTestTimeString()
183
def _formatTime(self, seconds):
184
"""Format seconds as milliseconds with leading spaces."""
185
# some benchmarks can take thousands of seconds to run, so we need 8
187
return "%8dms" % (1000 * seconds)
189
def _shortened_test_description(self, test):
191
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
107
def _elapsedTime(self):
108
return "%5dms" % (1000 * (time.time() - self._start_time))
194
110
def startTest(self, test):
195
111
unittest.TestResult.startTest(self, test)
196
self.report_test_start(test)
197
test.number = self.count
198
self._recordTestStartTime()
200
def _recordTestStartTime(self):
201
"""Record that a test has started."""
112
# In a short description, the important words are in
113
# the beginning, but in an id, the important words are
115
SHOW_DESCRIPTIONS = False
117
width = osutils.terminal_width()
118
name_width = width - 15
120
if SHOW_DESCRIPTIONS:
121
what = test.shortDescription()
123
if len(what) > name_width:
124
what = what[:name_width-3] + '...'
127
if what.startswith('bzrlib.tests.'):
129
if len(what) > name_width:
130
what = '...' + what[3-name_width:]
131
what = what.ljust(name_width)
132
self.stream.write(what)
202
134
self._start_time = time.time()
204
def _cleanupLogFile(self, test):
205
# We can only do this if we have one of our TestCases, not if
207
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
208
if setKeepLogfile is not None:
211
136
def addError(self, test, err):
212
"""Tell result that test finished with an error.
214
Called from the TestCase run() method when the test
215
fails with an unexpected error.
217
self._testConcluded(test)
218
137
if isinstance(err[1], TestSkipped):
219
return self._addSkipped(test, err)
220
elif isinstance(err[1], UnavailableFeature):
221
return self.addNotSupported(test, err[1].args[0])
223
unittest.TestResult.addError(self, test, err)
224
self.error_count += 1
225
self.report_error(test, err)
228
self._cleanupLogFile(test)
138
return self.addSkipped(test, err)
139
unittest.TestResult.addError(self, test, err)
141
self.stream.writeln("ERROR %s" % self._elapsedTime())
143
self.stream.write('E')
230
148
def addFailure(self, test, err):
231
"""Tell result that test failed.
233
Called from the TestCase run() method when the test
234
fails because e.g. an assert() method failed.
236
self._testConcluded(test)
237
if isinstance(err[1], KnownFailure):
238
return self._addKnownFailure(test, err)
240
unittest.TestResult.addFailure(self, test, err)
241
self.failure_count += 1
242
self.report_failure(test, err)
245
self._cleanupLogFile(test)
149
unittest.TestResult.addFailure(self, test, err)
151
self.stream.writeln(" FAIL %s" % self._elapsedTime())
153
self.stream.write('F')
247
158
def addSuccess(self, test):
248
"""Tell result that test completed successfully.
250
Called from the TestCase run()
252
self._testConcluded(test)
253
if self._bench_history is not None:
254
benchmark_time = self._extractBenchmarkTime(test)
255
if benchmark_time is not None:
256
self._bench_history.write("%s %s\n" % (
257
self._formatTime(benchmark_time),
259
self.report_success(test)
260
self._cleanupLogFile(test)
261
unittest.TestResult.addSuccess(self, test)
262
test._log_contents = ''
264
def _testConcluded(self, test):
265
"""Common code when a test has finished.
267
Called regardless of whether it succeded, failed, etc.
271
def _addKnownFailure(self, test, err):
272
self.known_failure_count += 1
273
self.report_known_failure(test, err)
275
def addNotSupported(self, test, feature):
276
"""The test will not be run because of a missing feature.
278
# this can be called in two different ways: it may be that the
279
# test started running, and then raised (through addError)
280
# UnavailableFeature. Alternatively this method can be called
281
# while probing for features before running the tests; in that
282
# case we will see startTest and stopTest, but the test will never
284
self.unsupported.setdefault(str(feature), 0)
285
self.unsupported[str(feature)] += 1
286
self.report_unsupported(test, feature)
288
def _addSkipped(self, test, skip_excinfo):
289
if isinstance(skip_excinfo[1], TestNotApplicable):
290
self.not_applicable_count += 1
291
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
302
# seems best to treat this as success from point-of-view of unittest
303
# -- it actually does nothing so it barely matters :)
304
unittest.TestResult.addSuccess(self, test)
305
test._log_contents = ''
160
self.stream.writeln(' OK %s' % self._elapsedTime())
162
self.stream.write('~')
164
unittest.TestResult.addSuccess(self, test)
166
def addSkipped(self, test, skip_excinfo):
168
print >>self.stream, ' SKIP %s' % self._elapsedTime()
169
print >>self.stream, ' %s' % skip_excinfo[1]
171
self.stream.write('S')
173
# seems best to treat this as success from point-of-view of unittest
174
# -- it actually does nothing so it barely matters :)
175
unittest.TestResult.addSuccess(self, test)
307
177
def printErrorList(self, flavour, errors):
308
178
for test, err in errors:
309
179
self.stream.writeln(self.separator1)
310
self.stream.write("%s: " % flavour)
311
self.stream.writeln(self.getDescription(test))
180
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
312
181
if getattr(test, '_get_log', None) is not None:
313
self.stream.write('\n')
315
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
316
self.stream.write('\n')
317
self.stream.write(test._get_log())
318
self.stream.write('\n')
320
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
321
self.stream.write('\n')
183
print >>self.stream, \
184
('vvvv[log from %s]' % test.id()).ljust(78,'-')
185
print >>self.stream, test._get_log()
186
print >>self.stream, \
187
('^^^^[log from %s]' % test.id()).ljust(78,'-')
322
188
self.stream.writeln(self.separator2)
323
189
self.stream.writeln("%s" % err)
328
def report_cleaning_up(self):
331
def report_success(self, test):
334
def wasStrictlySuccessful(self):
335
if self.unsupported or self.known_failure_count:
337
return self.wasSuccessful()
340
class TextTestResult(ExtendedTestResult):
341
"""Displays progress and results of tests in text form"""
343
def __init__(self, stream, descriptions, verbosity,
348
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
349
bench_history, num_tests)
351
self.pb = self.ui.nested_progress_bar()
352
self._supplied_pb = False
355
self._supplied_pb = True
356
self.pb.show_pct = False
357
self.pb.show_spinner = False
358
self.pb.show_eta = False,
359
self.pb.show_count = False
360
self.pb.show_bar = False
362
def report_starting(self):
363
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
365
def _progress_prefix_text(self):
366
# the longer this text, the less space we have to show the test
368
a = '[%d' % self.count # total that have been run
369
# tests skipped as known not to be relevant are not important enough
371
## if self.skip_count:
372
## a += ', %d skip' % self.skip_count
373
## if self.known_failure_count:
374
## a += '+%dX' % self.known_failure_count
375
if self.num_tests is not None:
376
a +='/%d' % self.num_tests
378
runtime = time.time() - self._overall_start_time
380
a += '%dm%ds' % (runtime / 60, runtime % 60)
384
a += ', %d err' % self.error_count
385
if self.failure_count:
386
a += ', %d fail' % self.failure_count
388
a += ', %d missing' % len(self.unsupported)
392
def report_test_start(self, test):
395
self._progress_prefix_text()
397
+ self._shortened_test_description(test))
399
def _test_description(self, test):
400
return self._shortened_test_description(test)
402
def report_error(self, test, err):
403
self.pb.note('ERROR: %s\n %s\n',
404
self._test_description(test),
408
def report_failure(self, test, err):
409
self.pb.note('FAIL: %s\n %s\n',
410
self._test_description(test),
414
def report_known_failure(self, test, err):
415
self.pb.note('XFAIL: %s\n%s\n',
416
self._test_description(test), err[1])
418
def report_skip(self, test, skip_excinfo):
421
def report_not_applicable(self, test, skip_excinfo):
424
def report_unsupported(self, test, feature):
425
"""test cannot be run because feature is missing."""
427
def report_cleaning_up(self):
428
self.pb.update('cleaning up...')
431
if not self._supplied_pb:
435
class VerboseTestResult(ExtendedTestResult):
436
"""Produce long output, with one line per test run plus times"""
438
def _ellipsize_to_right(self, a_string, final_width):
439
"""Truncate and pad a string, keeping the right hand side"""
440
if len(a_string) > final_width:
441
result = '...' + a_string[3-final_width:]
444
return result.ljust(final_width)
446
def report_starting(self):
447
self.stream.write('running %d tests...\n' % self.num_tests)
449
def report_test_start(self, test):
451
name = self._shortened_test_description(test)
452
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
453
# numbers, plus a trailing blank
454
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
455
self.stream.write(self._ellipsize_to_right(name,
456
osutils.terminal_width()-30))
459
def _error_summary(self, err):
461
return '%s%s' % (indent, err[1])
463
def report_error(self, test, err):
464
self.stream.writeln('ERROR %s\n%s'
465
% (self._testTimeString(test),
466
self._error_summary(err)))
468
def report_failure(self, test, err):
469
self.stream.writeln(' FAIL %s\n%s'
470
% (self._testTimeString(test),
471
self._error_summary(err)))
473
def report_known_failure(self, test, err):
474
self.stream.writeln('XFAIL %s\n%s'
475
% (self._testTimeString(test),
476
self._error_summary(err)))
478
def report_success(self, test):
479
self.stream.writeln(' OK %s' % self._testTimeString(test))
480
for bench_called, stats in getattr(test, '_benchcalls', []):
481
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
482
stats.pprint(file=self.stream)
483
# flush the stream so that we get smooth output. This verbose mode is
484
# used to show the output in PQM.
487
def report_skip(self, test, skip_excinfo):
488
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(test),
490
self._error_summary(skip_excinfo)))
492
def report_not_applicable(self, test, skip_excinfo):
493
self.stream.writeln(' N/A %s\n%s'
494
% (self._testTimeString(test),
495
self._error_summary(skip_excinfo)))
497
def report_unsupported(self, test, feature):
498
"""test cannot be run because feature is missing."""
499
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
500
%(self._testTimeString(test), feature))
503
class TextTestRunner(object):
192
class TextTestRunner(unittest.TextTestRunner):
504
193
stop_on_failure = False
513
self.stream = unittest._WritelnDecorator(stream)
514
self.descriptions = descriptions
515
self.verbosity = verbosity
516
self._bench_history = bench_history
517
self.list_only = list_only
520
"Run the given test case or test suite."
521
startTime = time.time()
522
if self.verbosity == 1:
523
result_class = TextTestResult
524
elif self.verbosity >= 2:
525
result_class = VerboseTestResult
526
result = result_class(self.stream,
529
bench_history=self._bench_history,
530
num_tests=test.countTestCases(),
195
def _makeResult(self):
196
result = _MyResult(self.stream, self.descriptions, self.verbosity)
532
197
result.stop_early = self.stop_on_failure
533
result.report_starting()
535
if self.verbosity >= 2:
536
self.stream.writeln("Listing tests only ...\n")
538
for t in iter_suite_tests(test):
539
self.stream.writeln("%s" % (t.id()))
541
actionTaken = "Listed"
544
run = result.testsRun
546
stopTime = time.time()
547
timeTaken = stopTime - startTime
549
self.stream.writeln(result.separator2)
550
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
551
run, run != 1 and "s" or "", timeTaken))
552
self.stream.writeln()
553
if not result.wasSuccessful():
554
self.stream.write("FAILED (")
555
failed, errored = map(len, (result.failures, result.errors))
557
self.stream.write("failures=%d" % failed)
559
if failed: self.stream.write(", ")
560
self.stream.write("errors=%d" % errored)
561
if result.known_failure_count:
562
if failed or errored: self.stream.write(", ")
563
self.stream.write("known_failure_count=%d" %
564
result.known_failure_count)
565
self.stream.writeln(")")
567
if result.known_failure_count:
568
self.stream.writeln("OK (known_failures=%d)" %
569
result.known_failure_count)
571
self.stream.writeln("OK")
572
if result.skip_count > 0:
573
skipped = result.skip_count
574
self.stream.writeln('%d test%s skipped' %
575
(skipped, skipped != 1 and "s" or ""))
576
if result.unsupported:
577
for feature, count in sorted(result.unsupported.items()):
578
self.stream.writeln("Missing feature '%s' skipped %d tests." %
867
281
if message is None:
868
282
message = "texts not equal:\n"
870
message = 'first string is missing a final newline.\n'
872
message = 'second string is missing a final newline.\n'
873
raise AssertionError(message +
874
self._ndiff_strings(a, b))
283
raise AssertionError(message +
284
self._ndiff_strings(a, b))
876
286
def assertEqualMode(self, mode, mode_test):
877
287
self.assertEqual(mode, mode_test,
878
288
'mode mismatch %o != %o' % (mode, mode_test))
880
def assertEqualStat(self, expected, actual):
881
"""assert that expected and actual are the same stat result.
883
:param expected: A stat result.
884
:param actual: A stat result.
885
:raises AssertionError: If the expected and actual stat values differ
888
self.assertEqual(expected.st_size, actual.st_size)
889
self.assertEqual(expected.st_mtime, actual.st_mtime)
890
self.assertEqual(expected.st_ctime, actual.st_ctime)
891
self.assertEqual(expected.st_dev, actual.st_dev)
892
self.assertEqual(expected.st_ino, actual.st_ino)
893
self.assertEqual(expected.st_mode, actual.st_mode)
895
def assertPositive(self, val):
896
"""Assert that val is greater than 0."""
897
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
899
def assertNegative(self, val):
900
"""Assert that val is less than 0."""
901
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
903
290
def assertStartsWith(self, s, prefix):
904
291
if not s.startswith(prefix):
905
292
raise AssertionError('string %r does not start with %r' % (s, prefix))
907
294
def assertEndsWith(self, s, suffix):
908
"""Asserts that s ends with suffix."""
909
if not s.endswith(suffix):
295
if not s.endswith(prefix):
910
296
raise AssertionError('string %r does not end with %r' % (s, suffix))
912
298
def assertContainsRe(self, haystack, needle_re):
913
299
"""Assert that a contains something matching a regular expression."""
914
300
if not re.search(needle_re, haystack):
915
if '\n' in haystack or len(haystack) > 60:
916
# a long string, format it in a more readable way
917
raise AssertionError(
918
'pattern "%s" not found in\n"""\\\n%s"""\n'
919
% (needle_re, haystack))
921
raise AssertionError('pattern "%s" not found in "%s"'
922
% (needle_re, haystack))
924
def assertNotContainsRe(self, haystack, needle_re):
925
"""Assert that a does not match a regular expression"""
926
if re.search(needle_re, haystack):
927
raise AssertionError('pattern "%s" found in "%s"'
301
raise AssertionError('pattern "%s" not found in "%s"'
928
302
% (needle_re, haystack))
930
304
def assertSubset(self, sublist, superlist):
931
305
"""Assert that every entry in sublist is present in superlist."""
932
missing = set(sublist) - set(superlist)
307
for entry in sublist:
308
if entry not in superlist:
309
missing.append(entry)
933
310
if len(missing) > 0:
934
raise AssertionError("value(s) %r not present in container %r" %
311
raise AssertionError("value(s) %r not present in container %r" %
935
312
(missing, superlist))
937
def assertListRaises(self, excClass, func, *args, **kwargs):
938
"""Fail unless excClass is raised when the iterator from func is used.
940
Many functions can return generators this makes sure
941
to wrap them in a list() call to make sure the whole generator
942
is run, and that the proper exception is raised.
945
list(func(*args, **kwargs))
949
if getattr(excClass,'__name__', None) is not None:
950
excName = excClass.__name__
952
excName = str(excClass)
953
raise self.failureException, "%s not raised" % excName
955
def assertRaises(self, excClass, callableObj, *args, **kwargs):
956
"""Assert that a callable raises a particular exception.
958
:param excClass: As for the except statement, this may be either an
959
exception class, or a tuple of classes.
960
:param callableObj: A callable, will be passed ``*args`` and
963
Returns the exception so that you can examine it.
966
callableObj(*args, **kwargs)
970
if getattr(excClass,'__name__', None) is not None:
971
excName = excClass.__name__
974
excName = str(excClass)
975
raise self.failureException, "%s not raised" % excName
977
def assertIs(self, left, right, message=None):
314
def assertIs(self, left, right):
978
315
if not (left is right):
979
if message is not None:
980
raise AssertionError(message)
982
raise AssertionError("%r is not %r." % (left, right))
984
def assertIsNot(self, left, right, message=None):
986
if message is not None:
987
raise AssertionError(message)
989
raise AssertionError("%r is %r." % (left, right))
316
raise AssertionError("%r is not %r." % (left, right))
991
318
def assertTransportMode(self, transport, path, mode):
992
319
"""Fail if a path does not have mode mode.
994
If modes are not supported on this transport, the assertion is ignored.
321
If modes are not supported on this platform, the test is skipped.
996
if not transport._can_roundtrip_unix_modebits():
323
if sys.platform == 'win32':
998
325
path_stat = transport.stat(path)
999
326
actual_mode = stat.S_IMODE(path_stat.st_mode)
1000
327
self.assertEqual(mode, actual_mode,
1001
328
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
1003
def assertIsSameRealPath(self, path1, path2):
1004
"""Fail if path1 and path2 points to different files"""
1005
self.assertEqual(osutils.realpath(path1),
1006
osutils.realpath(path2),
1007
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1009
def assertIsInstance(self, obj, kls):
1010
"""Fail if obj is not an instance of kls"""
1011
if not isinstance(obj, kls):
1012
self.fail("%r is an instance of %s rather than %s" % (
1013
obj, obj.__class__, kls))
1015
def expectFailure(self, reason, assertion, *args, **kwargs):
1016
"""Invoke a test, expecting it to fail for the given reason.
1018
This is for assertions that ought to succeed, but currently fail.
1019
(The failure is *expected* but not *wanted*.) Please be very precise
1020
about the failure you're expecting. If a new bug is introduced,
1021
AssertionError should be raised, not KnownFailure.
1023
Frequently, expectFailure should be followed by an opposite assertion.
1026
Intended to be used with a callable that raises AssertionError as the
1027
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1029
Raises KnownFailure if the test fails. Raises AssertionError if the
1034
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1036
self.assertEqual(42, dynamic_val)
1038
This means that a dynamic_val of 54 will cause the test to raise
1039
a KnownFailure. Once math is fixed and the expectFailure is removed,
1040
only a dynamic_val of 42 will allow the test to pass. Anything other
1041
than 54 or 42 will cause an AssertionError.
1044
assertion(*args, **kwargs)
1045
except AssertionError:
1046
raise KnownFailure(reason)
1048
self.fail('Unexpected success. Should have failed: %s' % reason)
1050
def assertFileEqual(self, content, path):
1051
"""Fail if path does not contain 'content'."""
1052
self.failUnlessExists(path)
1053
f = file(path, 'rb')
1058
self.assertEqualDiff(content, s)
1060
def failUnlessExists(self, path):
1061
"""Fail unless path or paths, which may be abs or relative, exist."""
1062
if not isinstance(path, basestring):
1064
self.failUnlessExists(p)
1066
self.failUnless(osutils.lexists(path),path+" does not exist")
1068
def failIfExists(self, path):
1069
"""Fail if path or paths, which may be abs or relative, exist."""
1070
if not isinstance(path, basestring):
1072
self.failIfExists(p)
1074
self.failIf(osutils.lexists(path),path+" exists")
1076
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1077
"""A helper for callDeprecated and applyDeprecated.
1079
:param a_callable: A callable to call.
1080
:param args: The positional arguments for the callable
1081
:param kwargs: The keyword arguments for the callable
1082
:return: A tuple (warnings, result). result is the result of calling
1083
a_callable(``*args``, ``**kwargs``).
1086
def capture_warnings(msg, cls=None, stacklevel=None):
1087
# we've hooked into a deprecation specific callpath,
1088
# only deprecations should getting sent via it.
1089
self.assertEqual(cls, DeprecationWarning)
1090
local_warnings.append(msg)
1091
original_warning_method = symbol_versioning.warn
1092
symbol_versioning.set_warning_method(capture_warnings)
1094
result = a_callable(*args, **kwargs)
1096
symbol_versioning.set_warning_method(original_warning_method)
1097
return (local_warnings, result)
1099
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1100
"""Call a deprecated callable without warning the user.
1102
Note that this only captures warnings raised by symbol_versioning.warn,
1103
not other callers that go direct to the warning module.
1105
To test that a deprecated method raises an error, do something like
1108
self.assertRaises(errors.ReservedId,
1109
self.applyDeprecated,
1110
deprecated_in((1, 5, 0)),
1114
:param deprecation_format: The deprecation format that the callable
1115
should have been deprecated with. This is the same type as the
1116
parameter to deprecated_method/deprecated_function. If the
1117
callable is not deprecated with this format, an assertion error
1119
:param a_callable: A callable to call. This may be a bound method or
1120
a regular function. It will be called with ``*args`` and
1122
:param args: The positional arguments for the callable
1123
:param kwargs: The keyword arguments for the callable
1124
:return: The result of a_callable(``*args``, ``**kwargs``)
1126
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1128
expected_first_warning = symbol_versioning.deprecation_string(
1129
a_callable, deprecation_format)
1130
if len(call_warnings) == 0:
1131
self.fail("No deprecation warning generated by call to %s" %
1133
self.assertEqual(expected_first_warning, call_warnings[0])
1136
def callCatchWarnings(self, fn, *args, **kw):
1137
"""Call a callable that raises python warnings.
1139
The caller's responsible for examining the returned warnings.
1141
If the callable raises an exception, the exception is not
1142
caught and propagates up to the caller. In that case, the list
1143
of warnings is not available.
1145
:returns: ([warning_object, ...], fn_result)
1147
# XXX: This is not perfect, because it completely overrides the
1148
# warnings filters, and some code may depend on suppressing particular
1149
# warnings. It's the easiest way to insulate ourselves from -Werror,
1150
# though. -- Andrew, 20071062
1152
def _catcher(message, category, filename, lineno, file=None, line=None):
1153
# despite the name, 'message' is normally(?) a Warning subclass
1155
wlist.append(message)
1156
saved_showwarning = warnings.showwarning
1157
saved_filters = warnings.filters
1159
warnings.showwarning = _catcher
1160
warnings.filters = []
1161
result = fn(*args, **kw)
1163
warnings.showwarning = saved_showwarning
1164
warnings.filters = saved_filters
1165
return wlist, result
1167
def callDeprecated(self, expected, callable, *args, **kwargs):
1168
"""Assert that a callable is deprecated in a particular way.
1170
This is a very precise test for unusual requirements. The
1171
applyDeprecated helper function is probably more suited for most tests
1172
as it allows you to simply specify the deprecation format being used
1173
and will ensure that that is issued for the function being called.
1175
Note that this only captures warnings raised by symbol_versioning.warn,
1176
not other callers that go direct to the warning module. To catch
1177
general warnings, use callCatchWarnings.
1179
:param expected: a list of the deprecation warnings expected, in order
1180
:param callable: The callable to call
1181
:param args: The positional arguments for the callable
1182
:param kwargs: The keyword arguments for the callable
1184
call_warnings, result = self._capture_deprecation_warnings(callable,
1186
self.assertEqual(expected, call_warnings)
1189
330
def _startLogFile(self):
1190
331
"""Send bzr and test log messages to a temporary file.
1192
333
The file is removed as the test is torn down.
1194
335
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1195
self._log_file = os.fdopen(fileno, 'w+')
1196
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
336
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
337
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
338
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1197
339
self._log_file_name = name
1198
340
self.addCleanup(self._finishLogFile)
1200
342
def _finishLogFile(self):
1201
343
"""Finished with the log file.
1203
Close the file and delete it, unless setKeepLogfile was called.
345
Read contents into memory, close, and delete.
1205
if self._log_file is None:
1207
bzrlib.trace.pop_log_file(self._log_memento)
347
bzrlib.trace.disable_test_log(self._log_nonce)
348
self._log_file.seek(0)
349
self._log_contents = self._log_file.read()
1208
350
self._log_file.close()
1209
self._log_file = None
1210
if not self._keep_log_file:
1211
os.remove(self._log_file_name)
1212
self._log_file_name = None
1214
def setKeepLogfile(self):
1215
"""Make the logfile not be deleted when _finishLogFile is called."""
1216
self._keep_log_file = True
1218
def addCleanup(self, callable, *args, **kwargs):
351
os.remove(self._log_file_name)
352
self._log_file = self._log_file_name = None
354
def addCleanup(self, callable):
1219
355
"""Arrange to run a callable when this case is torn down.
1221
357
Callables are run in the reverse of the order they are registered,
1222
358
ie last-in first-out.
1224
self._cleanups.append((callable, args, kwargs))
360
if callable in self._cleanups:
361
raise ValueError("cleanup function %r already registered on %s"
363
self._cleanups.append(callable)
1226
365
def _cleanEnvironment(self):
1228
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1229
367
'HOME': os.getcwd(),
1230
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1231
# tests do check our impls match APPDATA
1232
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1234
'BZREMAIL': None, # may still be present in the environment
368
'APPDATA': os.getcwd(),
1236
'BZR_PROGRESS_BAR': None,
1238
'BZR_PLUGIN_PATH': None,
1240
'SSH_AUTH_SOCK': None,
1244
'https_proxy': None,
1245
'HTTPS_PROXY': None,
1250
# Nobody cares about these ones AFAIK. So far at
1251
# least. If you do (care), please update this comment
1255
'BZR_REMOTE_PATH': None,
1257
372
self.__old_env = {}
1258
373
self.addCleanup(self._restoreEnvironment)
1259
374
for name, value in new_env.iteritems():
1260
375
self._captureVar(name, value)
1262
378
def _captureVar(self, name, newvalue):
1263
"""Set an environment variable, and reset it when finished."""
1264
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
379
"""Set an environment variable, preparing it to be reset when finished."""
380
self.__old_env[name] = os.environ.get(name, None)
382
if name in os.environ:
385
os.environ[name] = newvalue
1266
def _restore_debug_flags(self):
1267
debug.debug_flags.clear()
1268
debug.debug_flags.update(self._preserved_debug_flags)
388
def _restoreVar(name, value):
390
if name in os.environ:
393
os.environ[name] = value
1270
395
def _restoreEnvironment(self):
1271
396
for name, value in self.__old_env.iteritems():
1272
osutils.set_or_unset_env(name, value)
1274
def _restoreHooks(self):
1275
for klass, hooks in self._preserved_hooks.items():
1276
setattr(klass, 'hooks', hooks)
1278
def knownFailure(self, reason):
1279
"""This test has failed for some known reason."""
1280
raise KnownFailure(reason)
1282
def run(self, result=None):
1283
if result is None: result = self.defaultTestResult()
1284
for feature in getattr(self, '_test_needs_features', []):
1285
if not feature.available():
1286
result.startTest(self)
1287
if getattr(result, 'addNotSupported', None):
1288
result.addNotSupported(self, feature)
1290
result.addSuccess(self)
1291
result.stopTest(self)
1294
return unittest.TestCase.run(self, result)
1297
absent_attr = object()
1298
for attr_name in self.attrs_to_keep:
1299
attr = getattr(self, attr_name, absent_attr)
1300
if attr is not absent_attr:
1301
saved_attrs[attr_name] = attr
1302
self.__dict__ = saved_attrs
397
self._restoreVar(name, value)
1304
399
def tearDown(self):
1305
400
self._runCleanups()
1306
401
unittest.TestCase.tearDown(self)
1308
def time(self, callable, *args, **kwargs):
1309
"""Run callable and accrue the time it takes to the benchmark time.
1311
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1312
this will cause lsprofile statistics to be gathered and stored in
1315
if self._benchtime is None:
1319
if not self._gather_lsprof_in_benchmarks:
1320
return callable(*args, **kwargs)
1322
# record this benchmark
1323
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1325
self._benchcalls.append(((callable, args, kwargs), stats))
1328
self._benchtime += time.time() - start
1330
403
def _runCleanups(self):
1331
404
"""Run registered cleanup functions.
1335
408
# TODO: Perhaps this should keep running cleanups even if
1336
409
# one of them fails?
1338
# Actually pop the cleanups from the list so tearDown running
1339
# twice is safe (this happens for skipped tests).
1340
while self._cleanups:
1341
cleanup, args, kwargs = self._cleanups.pop()
1342
cleanup(*args, **kwargs)
410
for cleanup_fn in reversed(self._cleanups):
1344
413
def log(self, *args):
1347
def _get_log(self, keep_log_file=False):
1348
"""Get the log from bzrlib.trace calls from this test.
1350
:param keep_log_file: When True, if the log is still a file on disk
1351
leave it as a file on disk. When False, if the log is still a file
1352
on disk, the log file is deleted and the log preserved as
1354
:return: A string containing the log.
1356
# flush the log file, to get all content
1358
if bzrlib.trace._trace_file:
1359
bzrlib.trace._trace_file.flush()
1360
if self._log_contents:
1361
# XXX: this can hardly contain the content flushed above --vila
417
"""Return as a string the log for this test"""
418
if self._log_file_name:
419
return open(self._log_file_name).read()
1363
421
return self._log_contents
1364
if self._log_file_name is not None:
1365
logfile = open(self._log_file_name)
1367
log_contents = logfile.read()
1370
if not keep_log_file:
1371
self._log_contents = log_contents
1373
os.remove(self._log_file_name)
1375
if sys.platform == 'win32' and e.errno == errno.EACCES:
1376
sys.stderr.write(('Unable to delete log file '
1377
' %r\n' % self._log_file_name))
1382
return "DELETED log file to reduce memory footprint"
1384
def requireFeature(self, feature):
1385
"""This test requires a specific feature is available.
1387
:raises UnavailableFeature: When feature is not available.
422
# TODO: Delete the log after it's been read in
424
def capture(self, cmd, retcode=0):
425
"""Shortcut that splits cmd into words, runs, and returns stdout"""
426
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
428
def run_bzr_captured(self, argv, retcode=0):
429
"""Invoke bzr and return (stdout, stderr).
431
Useful for code that wants to check the contents of the
432
output, the way error messages are presented, etc.
434
This should be the main method for tests that want to exercise the
435
overall behavior of the bzr application (rather than a unit test
436
or a functional test of the library.)
438
Much of the old code runs bzr by forking a new copy of Python, but
439
that is slower, harder to debug, and generally not necessary.
441
This runs bzr through the interface that catches and reports
442
errors, and with logging set to something approximating the
443
default, so that error reporting can be checked.
445
argv -- arguments to invoke bzr
446
retcode -- expected return code, or None for don't-care.
1389
if not feature.available():
1390
raise UnavailableFeature(feature)
1392
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1394
"""Run bazaar command line, splitting up a string command line."""
1395
if isinstance(args, basestring):
1396
# shlex don't understand unicode strings,
1397
# so args should be plain string (bialix 20070906)
1398
args = list(shlex.split(str(args)))
1399
return self._run_bzr_core(args, retcode=retcode,
1400
encoding=encoding, stdin=stdin, working_dir=working_dir,
1403
def _run_bzr_core(self, args, retcode, encoding, stdin,
1405
if encoding is None:
1406
encoding = osutils.get_user_encoding()
1407
stdout = StringIOWrapper()
1408
stderr = StringIOWrapper()
1409
stdout.encoding = encoding
1410
stderr.encoding = encoding
1412
self.log('run bzr: %r', args)
450
self.log('run bzr: %s', ' '.join(argv))
1413
451
# FIXME: don't call into logging here
1414
452
handler = logging.StreamHandler(stderr)
453
handler.setFormatter(bzrlib.trace.QuietFormatter())
1415
454
handler.setLevel(logging.INFO)
1416
455
logger = logging.getLogger('')
1417
456
logger.addHandler(handler)
1418
old_ui_factory = ui.ui_factory
1419
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1422
if working_dir is not None:
1423
cwd = osutils.getcwd()
1424
os.chdir(working_dir)
1427
result = self.apply_redirected(ui.ui_factory.stdin,
1429
bzrlib.commands.run_bzr_catch_user_errors,
458
result = self.apply_redirected(None, stdout, stderr,
459
bzrlib.commands.run_bzr_catch_errors,
1432
462
logger.removeHandler(handler)
1433
ui.ui_factory = old_ui_factory
1437
463
out = stdout.getvalue()
1438
464
err = stderr.getvalue()
1440
self.log('output:\n%r', out)
466
self.log('output:\n%s', out)
1442
self.log('errors:\n%r', err)
468
self.log('errors:\n%s', err)
1443
469
if retcode is not None:
1444
self.assertEquals(retcode, result,
1445
message='Unexpected return code')
470
self.assertEquals(result, retcode)
1448
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1449
working_dir=None, error_regexes=[], output_encoding=None):
473
def run_bzr(self, *args, **kwargs):
1450
474
"""Invoke bzr, as if it were run from the command line.
1452
The argument list should not include the bzr program name - the
1453
first argument is normally the bzr command. Arguments may be
1454
passed in three ways:
1456
1- A list of strings, eg ["commit", "a"]. This is recommended
1457
when the command contains whitespace or metacharacters, or
1458
is built up at run time.
1460
2- A single string, eg "add a". This is the most convenient
1461
for hardcoded commands.
1463
This runs bzr through the interface that catches and reports
1464
errors, and with logging set to something approximating the
1465
default, so that error reporting can be checked.
1467
476
This should be the main method for tests that want to exercise the
1468
477
overall behavior of the bzr application (rather than a unit test
1469
478
or a functional test of the library.)
1471
480
This sends the stdout/stderr results into the test's log,
1472
481
where it may be useful for debugging. See also run_captured.
1474
:keyword stdin: A string to be used as stdin for the command.
1475
:keyword retcode: The status code the command should return;
1477
:keyword working_dir: The directory to run the command in
1478
:keyword error_regexes: A list of expected error messages. If
1479
specified they must be seen in the error output of the command.
1481
out, err = self._run_bzr_autosplit(
1486
working_dir=working_dir,
1488
for regex in error_regexes:
1489
self.assertContainsRe(err, regex)
1492
def run_bzr_error(self, error_regexes, *args, **kwargs):
1493
"""Run bzr, and check that stderr contains the supplied regexes
1495
:param error_regexes: Sequence of regular expressions which
1496
must each be found in the error output. The relative ordering
1498
:param args: command-line arguments for bzr
1499
:param kwargs: Keyword arguments which are interpreted by run_bzr
1500
This function changes the default value of retcode to be 3,
1501
since in most cases this is run when you expect bzr to fail.
1503
:return: (out, err) The actual output of running the command (in case
1504
you want to do more inspection)
1508
# Make sure that commit is failing because there is nothing to do
1509
self.run_bzr_error(['no changes to commit'],
1510
['commit', '-m', 'my commit comment'])
1511
# Make sure --strict is handling an unknown file, rather than
1512
# giving us the 'nothing to do' error
1513
self.build_tree(['unknown'])
1514
self.run_bzr_error(['Commit refused because there are unknown files'],
1515
['commit', --strict', '-m', 'my commit comment'])
1517
kwargs.setdefault('retcode', 3)
1518
kwargs['error_regexes'] = error_regexes
1519
out, err = self.run_bzr(*args, **kwargs)
1522
def run_bzr_subprocess(self, *args, **kwargs):
1523
"""Run bzr in a subprocess for testing.
1525
This starts a new Python interpreter and runs bzr in there.
1526
This should only be used for tests that have a justifiable need for
1527
this isolation: e.g. they are testing startup time, or signal
1528
handling, or early startup code, etc. Subprocess code can't be
1529
profiled or debugged so easily.
1531
:keyword retcode: The status code that is expected. Defaults to 0. If
1532
None is supplied, the status code is not checked.
1533
:keyword env_changes: A dictionary which lists changes to environment
1534
variables. A value of None will unset the env variable.
1535
The values must be strings. The change will only occur in the
1536
child, so you don't need to fix the environment after running.
1537
:keyword universal_newlines: Convert CRLF => LF
1538
:keyword allow_plugins: By default the subprocess is run with
1539
--no-plugins to ensure test reproducibility. Also, it is possible
1540
for system-wide plugins to create unexpected output on stderr,
1541
which can cause unnecessary test failures.
1543
env_changes = kwargs.get('env_changes', {})
1544
working_dir = kwargs.get('working_dir', None)
1545
allow_plugins = kwargs.get('allow_plugins', False)
1547
if isinstance(args[0], list):
1549
elif isinstance(args[0], basestring):
1550
args = list(shlex.split(args[0]))
1552
raise ValueError("passing varargs to run_bzr_subprocess")
1553
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1554
working_dir=working_dir,
1555
allow_plugins=allow_plugins)
1556
# We distinguish between retcode=None and retcode not passed.
1557
supplied_retcode = kwargs.get('retcode', 0)
1558
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1559
universal_newlines=kwargs.get('universal_newlines', False),
1562
def start_bzr_subprocess(self, process_args, env_changes=None,
1563
skip_if_plan_to_signal=False,
1565
allow_plugins=False):
1566
"""Start bzr in a subprocess for testing.
1568
This starts a new Python interpreter and runs bzr in there.
1569
This should only be used for tests that have a justifiable need for
1570
this isolation: e.g. they are testing startup time, or signal
1571
handling, or early startup code, etc. Subprocess code can't be
1572
profiled or debugged so easily.
1574
:param process_args: a list of arguments to pass to the bzr executable,
1575
for example ``['--version']``.
1576
:param env_changes: A dictionary which lists changes to environment
1577
variables. A value of None will unset the env variable.
1578
The values must be strings. The change will only occur in the
1579
child, so you don't need to fix the environment after running.
1580
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1582
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1584
:returns: Popen object for the started process.
1586
if skip_if_plan_to_signal:
1587
if not getattr(os, 'kill', None):
1588
raise TestSkipped("os.kill not available.")
1590
if env_changes is None:
1594
def cleanup_environment():
1595
for env_var, value in env_changes.iteritems():
1596
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1598
def restore_environment():
1599
for env_var, value in old_env.iteritems():
1600
osutils.set_or_unset_env(env_var, value)
1602
bzr_path = self.get_bzr_path()
1605
if working_dir is not None:
1606
cwd = osutils.getcwd()
1607
os.chdir(working_dir)
1610
# win32 subprocess doesn't support preexec_fn
1611
# so we will avoid using it on all platforms, just to
1612
# make sure the code path is used, and we don't break on win32
1613
cleanup_environment()
1614
command = [sys.executable]
1615
# frozen executables don't need the path to bzr
1616
if getattr(sys, "frozen", None) is None:
1617
command.append(bzr_path)
1618
if not allow_plugins:
1619
command.append('--no-plugins')
1620
command.extend(process_args)
1621
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1623
restore_environment()
1629
def _popen(self, *args, **kwargs):
1630
"""Place a call to Popen.
1632
Allows tests to override this method to intercept the calls made to
1633
Popen for introspection.
1635
return Popen(*args, **kwargs)
1637
def get_bzr_path(self):
1638
"""Return the path of the 'bzr' executable for this test suite."""
1639
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1640
if not os.path.isfile(bzr_path):
1641
# We are probably installed. Assume sys.argv is the right file
1642
bzr_path = sys.argv[0]
1645
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1646
universal_newlines=False, process_args=None):
1647
"""Finish the execution of process.
1649
:param process: the Popen object returned from start_bzr_subprocess.
1650
:param retcode: The status code that is expected. Defaults to 0. If
1651
None is supplied, the status code is not checked.
1652
:param send_signal: an optional signal to send to the process.
1653
:param universal_newlines: Convert CRLF => LF
1654
:returns: (stdout, stderr)
1656
if send_signal is not None:
1657
os.kill(process.pid, send_signal)
1658
out, err = process.communicate()
1660
if universal_newlines:
1661
out = out.replace('\r\n', '\n')
1662
err = err.replace('\r\n', '\n')
1664
if retcode is not None and retcode != process.returncode:
1665
if process_args is None:
1666
process_args = "(unknown args)"
1667
mutter('Output of bzr %s:\n%s', process_args, out)
1668
mutter('Error for bzr %s:\n%s', process_args, err)
1669
self.fail('Command bzr %s failed with retcode %s != %s'
1670
% (process_args, retcode, process.returncode))
483
retcode = kwargs.pop('retcode', 0)
484
return self.run_bzr_captured(args, retcode)
1673
486
def check_inventory_shape(self, inv, shape):
1674
487
"""Compare an inventory to a list of expected names.
1722
535
sys.stderr = real_stderr
1723
536
sys.stdin = real_stdin
1725
def reduceLockdirTimeout(self):
1726
"""Reduce the default lock timeout for the duration of the test, so that
1727
if LockContention occurs during a test, it does so quickly.
1729
Tests that expect to provoke LockContention errors should call this.
1731
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1733
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1734
self.addCleanup(resetTimeout)
1735
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1737
def make_utf8_encoded_stringio(self, encoding_type=None):
1738
"""Return a StringIOWrapper instance, that will encode Unicode
1741
if encoding_type is None:
1742
encoding_type = 'strict'
1744
output_encoding = 'utf-8'
1745
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1746
sio.encoding = output_encoding
1750
class TestCaseWithMemoryTransport(TestCase):
1751
"""Common test class for tests that do not need disk resources.
1753
Tests that need disk resources should derive from TestCaseWithTransport.
1755
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1757
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1758
a directory which does not exist. This serves to help ensure test isolation
1759
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1760
must exist. However, TestCaseWithMemoryTransport does not offer local
1761
file defaults for the transport in tests, nor does it obey the command line
1762
override, so tests that accidentally write to the common directory should
1765
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1766
a .bzr directory that stops us ascending higher into the filesystem.
1772
def __init__(self, methodName='runTest'):
1773
# allow test parameterization after test construction and before test
1774
# execution. Variables that the parameterizer sets need to be
1775
# ones that are not set by setUp, or setUp will trash them.
1776
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1777
self.vfs_transport_factory = default_transport
1778
self.transport_server = None
1779
self.transport_readonly_server = None
1780
self.__vfs_server = None
1782
def get_transport(self, relpath=None):
1783
"""Return a writeable transport.
1785
This transport is for the test scratch space relative to
1788
:param relpath: a path relative to the base url.
1790
t = get_transport(self.get_url(relpath))
1791
self.assertFalse(t.is_readonly())
1794
def get_readonly_transport(self, relpath=None):
1795
"""Return a readonly transport for the test scratch space
1797
This can be used to test that operations which should only need
1798
readonly access in fact do not try to write.
1800
:param relpath: a path relative to the base url.
1802
t = get_transport(self.get_readonly_url(relpath))
1803
self.assertTrue(t.is_readonly())
1806
def create_transport_readonly_server(self):
1807
"""Create a transport server from class defined at init.
1809
This is mostly a hook for daughter classes.
1811
return self.transport_readonly_server()
1813
def get_readonly_server(self):
1814
"""Get the server instance for the readonly transport
1816
This is useful for some tests with specific servers to do diagnostics.
1818
if self.__readonly_server is None:
1819
if self.transport_readonly_server is None:
1820
# readonly decorator requested
1821
# bring up the server
1822
self.__readonly_server = ReadonlyServer()
1823
self.__readonly_server.setUp(self.get_vfs_only_server())
1825
self.__readonly_server = self.create_transport_readonly_server()
1826
self.__readonly_server.setUp(self.get_vfs_only_server())
1827
self.addCleanup(self.__readonly_server.tearDown)
1828
return self.__readonly_server
1830
def get_readonly_url(self, relpath=None):
1831
"""Get a URL for the readonly transport.
1833
This will either be backed by '.' or a decorator to the transport
1834
used by self.get_url()
1835
relpath provides for clients to get a path relative to the base url.
1836
These should only be downwards relative, not upwards.
1838
base = self.get_readonly_server().get_url()
1839
return self._adjust_url(base, relpath)
1841
def get_vfs_only_server(self):
1842
"""Get the vfs only read/write server instance.
1844
This is useful for some tests with specific servers that need
1847
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1848
is no means to override it.
1850
if self.__vfs_server is None:
1851
self.__vfs_server = MemoryServer()
1852
self.__vfs_server.setUp()
1853
self.addCleanup(self.__vfs_server.tearDown)
1854
return self.__vfs_server
1856
def get_server(self):
1857
"""Get the read/write server instance.
1859
This is useful for some tests with specific servers that need
1862
This is built from the self.transport_server factory. If that is None,
1863
then the self.get_vfs_server is returned.
1865
if self.__server is None:
1866
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1867
return self.get_vfs_only_server()
1869
# bring up a decorated means of access to the vfs only server.
1870
self.__server = self.transport_server()
1872
self.__server.setUp(self.get_vfs_only_server())
1873
except TypeError, e:
1874
# This should never happen; the try:Except here is to assist
1875
# developers having to update code rather than seeing an
1876
# uninformative TypeError.
1877
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1878
self.addCleanup(self.__server.tearDown)
1879
return self.__server
1881
def _adjust_url(self, base, relpath):
1882
"""Get a URL (or maybe a path) for the readwrite transport.
1884
This will either be backed by '.' or to an equivalent non-file based
1886
relpath provides for clients to get a path relative to the base url.
1887
These should only be downwards relative, not upwards.
1889
if relpath is not None and relpath != '.':
1890
if not base.endswith('/'):
1892
# XXX: Really base should be a url; we did after all call
1893
# get_url()! But sometimes it's just a path (from
1894
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1895
# to a non-escaped local path.
1896
if base.startswith('./') or base.startswith('/'):
1899
base += urlutils.escape(relpath)
1902
def get_url(self, relpath=None):
1903
"""Get a URL (or maybe a path) for the readwrite transport.
1905
This will either be backed by '.' or to an equivalent non-file based
1907
relpath provides for clients to get a path relative to the base url.
1908
These should only be downwards relative, not upwards.
1910
base = self.get_server().get_url()
1911
return self._adjust_url(base, relpath)
1913
def get_vfs_only_url(self, relpath=None):
1914
"""Get a URL (or maybe a path for the plain old vfs transport.
1916
This will never be a smart protocol. It always has all the
1917
capabilities of the local filesystem, but it might actually be a
1918
MemoryTransport or some other similar virtual filesystem.
1920
This is the backing transport (if any) of the server returned by
1921
get_url and get_readonly_url.
1923
:param relpath: provides for clients to get a path relative to the base
1924
url. These should only be downwards relative, not upwards.
1927
base = self.get_vfs_only_server().get_url()
1928
return self._adjust_url(base, relpath)
1930
def _create_safety_net(self):
1931
"""Make a fake bzr directory.
1933
This prevents any tests propagating up onto the TEST_ROOT directory's
1936
root = TestCaseWithMemoryTransport.TEST_ROOT
1937
bzrdir.BzrDir.create_standalone_workingtree(root)
1939
def _check_safety_net(self):
1940
"""Check that the safety .bzr directory have not been touched.
1942
_make_test_root have created a .bzr directory to prevent tests from
1943
propagating. This method ensures than a test did not leaked.
1945
root = TestCaseWithMemoryTransport.TEST_ROOT
1946
wt = workingtree.WorkingTree.open(root)
1947
last_rev = wt.last_revision()
1948
if last_rev != 'null:':
1949
# The current test have modified the /bzr directory, we need to
1950
# recreate a new one or all the followng tests will fail.
1951
# If you need to inspect its content uncomment the following line
1952
# import pdb; pdb.set_trace()
1953
_rmtree_temp_dir(root + '/.bzr')
1954
self._create_safety_net()
1955
raise AssertionError('%s/.bzr should not be modified' % root)
1957
def _make_test_root(self):
1958
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1959
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1960
TestCaseWithMemoryTransport.TEST_ROOT = root
1962
self._create_safety_net()
1964
# The same directory is used by all tests, and we're not
1965
# specifically told when all tests are finished. This will do.
1966
atexit.register(_rmtree_temp_dir, root)
1968
self.addCleanup(self._check_safety_net)
1970
def makeAndChdirToTestDir(self):
1971
"""Create a temporary directories for this one test.
1973
This must set self.test_home_dir and self.test_dir and chdir to
1976
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1978
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1979
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1980
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1982
def make_branch(self, relpath, format=None):
1983
"""Create a branch on the transport at relpath."""
1984
repo = self.make_repository(relpath, format=format)
1985
return repo.bzrdir.create_branch()
1987
def make_bzrdir(self, relpath, format=None):
1989
# might be a relative or absolute path
1990
maybe_a_url = self.get_url(relpath)
1991
segments = maybe_a_url.rsplit('/', 1)
1992
t = get_transport(maybe_a_url)
1993
if len(segments) > 1 and segments[-1] not in ('', '.'):
1997
if isinstance(format, basestring):
1998
format = bzrdir.format_registry.make_bzrdir(format)
1999
return format.initialize_on_transport(t)
2000
except errors.UninitializableFormat:
2001
raise TestSkipped("Format %s is not initializable." % format)
2003
def make_repository(self, relpath, shared=False, format=None):
2004
"""Create a repository on our default transport at relpath.
2006
Note that relpath must be a relative path, not a full url.
2008
# FIXME: If you create a remoterepository this returns the underlying
2009
# real format, which is incorrect. Actually we should make sure that
2010
# RemoteBzrDir returns a RemoteRepository.
2011
# maybe mbp 20070410
2012
made_control = self.make_bzrdir(relpath, format=format)
2013
return made_control.create_repository(shared=shared)
2015
def make_branch_and_memory_tree(self, relpath, format=None):
2016
"""Create a branch on the default transport and a MemoryTree for it."""
2017
b = self.make_branch(relpath, format=format)
2018
return memorytree.MemoryTree.create_on_branch(b)
2020
def make_branch_builder(self, relpath, format=None):
2021
url = self.get_url(relpath)
2022
tran = get_transport(url)
2023
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2025
def overrideEnvironmentForTesting(self):
2026
os.environ['HOME'] = self.test_home_dir
2027
os.environ['BZR_HOME'] = self.test_home_dir
2030
super(TestCaseWithMemoryTransport, self).setUp()
2031
self._make_test_root()
2032
_currentdir = os.getcwdu()
2033
def _leaveDirectory():
2034
os.chdir(_currentdir)
2035
self.addCleanup(_leaveDirectory)
2036
self.makeAndChdirToTestDir()
2037
self.overrideEnvironmentForTesting()
2038
self.__readonly_server = None
2039
self.__server = None
2040
self.reduceLockdirTimeout()
539
BzrTestBase = TestCase
2043
class TestCaseInTempDir(TestCaseWithMemoryTransport):
542
class TestCaseInTempDir(TestCase):
2044
543
"""Derived class that runs a test within a temporary directory.
2046
545
This is useful for tests that need to create a branch, etc.
2286
807
def setUp(self):
2287
808
super(ChrootedTestCase, self).setUp()
2288
if not self.vfs_transport_factory == MemoryServer:
2289
self.transport_readonly_server = HttpServer
2292
def condition_id_re(pattern):
2293
"""Create a condition filter which performs a re check on a test's id.
2295
:param pattern: A regular expression string.
2296
:return: A callable that returns True if the re matches.
809
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
810
self.transport_readonly_server = bzrlib.transport.http.HttpServer
813
def filter_suite_by_re(suite, pattern):
2298
815
filter_re = re.compile(pattern)
2299
def condition(test):
2301
return filter_re.search(test_id)
2305
def condition_isinstance(klass_or_klass_list):
2306
"""Create a condition filter which returns isinstance(param, klass).
2308
:return: A callable which when called with one parameter obj return the
2309
result of isinstance(obj, klass_or_klass_list).
2312
return isinstance(obj, klass_or_klass_list)
2316
def condition_id_in_list(id_list):
2317
"""Create a condition filter which verify that test's id in a list.
2319
:param id_list: A TestIdList object.
2320
:return: A callable that returns True if the test's id appears in the list.
2322
def condition(test):
2323
return id_list.includes(test.id())
2327
def condition_id_startswith(starts):
2328
"""Create a condition filter verifying that test's id starts with a string.
2330
:param starts: A list of string.
2331
:return: A callable that returns True if the test's id starts with one of
2334
def condition(test):
2335
for start in starts:
2336
if test.id().startswith(start):
2342
def exclude_tests_by_condition(suite, condition):
2343
"""Create a test suite which excludes some tests from suite.
2345
:param suite: The suite to get tests from.
2346
:param condition: A callable whose result evaluates True when called with a
2347
test case which should be excluded from the result.
2348
:return: A suite which contains the tests found in suite that fail
2352
for test in iter_suite_tests(suite):
2353
if not condition(test):
2355
return TestUtil.TestSuite(result)
2358
def filter_suite_by_condition(suite, condition):
2359
"""Create a test suite by filtering another one.
2361
:param suite: The source suite.
2362
:param condition: A callable whose result evaluates True when called with a
2363
test case which should be included in the result.
2364
:return: A suite which contains the tests found in suite that pass
2368
for test in iter_suite_tests(suite):
2371
return TestUtil.TestSuite(result)
2374
def filter_suite_by_re(suite, pattern):
2375
"""Create a test suite by filtering another one.
2377
:param suite: the source suite
2378
:param pattern: pattern that names must match
2379
:returns: the newly created suite
2381
condition = condition_id_re(pattern)
2382
result_suite = filter_suite_by_condition(suite, condition)
2386
def filter_suite_by_id_list(suite, test_id_list):
2387
"""Create a test suite by filtering another one.
2389
:param suite: The source suite.
2390
:param test_id_list: A list of the test ids to keep as strings.
2391
:returns: the newly created suite
2393
condition = condition_id_in_list(test_id_list)
2394
result_suite = filter_suite_by_condition(suite, condition)
2398
def filter_suite_by_id_startswith(suite, start):
2399
"""Create a test suite by filtering another one.
2401
:param suite: The source suite.
2402
:param start: A list of string the test id must start with one of.
2403
:returns: the newly created suite
2405
condition = condition_id_startswith(start)
2406
result_suite = filter_suite_by_condition(suite, condition)
2410
def exclude_tests_by_re(suite, pattern):
2411
"""Create a test suite which excludes some tests from suite.
2413
:param suite: The suite to get tests from.
2414
:param pattern: A regular expression string. Test ids that match this
2415
pattern will be excluded from the result.
2416
:return: A TestSuite that contains all the tests from suite without the
2417
tests that matched pattern. The order of tests is the same as it was in
2420
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2423
def preserve_input(something):
2424
"""A helper for performing test suite transformation chains.
2426
:param something: Anything you want to preserve.
2432
def randomize_suite(suite):
2433
"""Return a new TestSuite with suite's tests in random order.
2435
The tests in the input suite are flattened into a single suite in order to
2436
accomplish this. Any nested TestSuites are removed to provide global
2439
tests = list(iter_suite_tests(suite))
2440
random.shuffle(tests)
2441
return TestUtil.TestSuite(tests)
2444
def split_suite_by_condition(suite, condition):
2445
"""Split a test suite into two by a condition.
2447
:param suite: The suite to split.
2448
:param condition: The condition to match on. Tests that match this
2449
condition are returned in the first test suite, ones that do not match
2450
are in the second suite.
2451
:return: A tuple of two test suites, where the first contains tests from
2452
suite matching the condition, and the second contains the remainder
2453
from suite. The order within each output suite is the same as it was in
2458
for test in iter_suite_tests(suite):
2460
matched.append(test)
2462
did_not_match.append(test)
2463
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2466
def split_suite_by_re(suite, pattern):
2467
"""Split a test suite into two by a regular expression.
2469
:param suite: The suite to split.
2470
:param pattern: A regular expression string. Test ids that match this
2471
pattern will be in the first test suite returned, and the others in the
2472
second test suite returned.
2473
:return: A tuple of two test suites, where the first contains tests from
2474
suite matching pattern, and the second contains the remainder from
2475
suite. The order within each output suite is the same as it was in
2478
return split_suite_by_condition(suite, condition_id_re(pattern))
816
for test in iter_suite_tests(suite):
817
if filter_re.search(test.id()):
2481
822
def run_suite(suite, name='test', verbose=False, pattern=".*",
2482
stop_on_failure=False,
2483
transport=None, lsprof_timed=None, bench_history=None,
2484
matching_tests_first=None,
2487
exclude_pattern=None,
2489
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
823
stop_on_failure=False, keep_output=False,
825
TestCaseInTempDir._TEST_NAME = name
2494
830
runner = TextTestRunner(stream=sys.stdout,
2496
verbosity=verbosity,
2497
bench_history=bench_history,
2498
list_only=list_only,
2500
833
runner.stop_on_failure=stop_on_failure
2501
# Initialise the random number generator and display the seed used.
2502
# We convert the seed to a long to make it reuseable across invocations.
2503
random_order = False
2504
if random_seed is not None:
2506
if random_seed == "now":
2507
random_seed = long(time.time())
2509
# Convert the seed to a long if we can
2511
random_seed = long(random_seed)
2514
runner.stream.writeln("Randomizing test order using seed %s\n" %
2516
random.seed(random_seed)
2517
# Customise the list of tests if requested
2518
if exclude_pattern is not None:
2519
suite = exclude_tests_by_re(suite, exclude_pattern)
2521
order_changer = randomize_suite
2523
order_changer = preserve_input
2524
if pattern != '.*' or random_order:
2525
if matching_tests_first:
2526
suites = map(order_changer, split_suite_by_re(suite, pattern))
2527
suite = TestUtil.TestSuite(suites)
2529
suite = order_changer(filter_suite_by_re(suite, pattern))
835
suite = filter_suite_by_re(suite, pattern)
2531
836
result = runner.run(suite)
2534
return result.wasStrictlySuccessful()
837
# This is still a little bogus,
838
# but only a little. Folk not using our testrunner will
839
# have to delete their temp directories themselves.
840
if result.wasSuccessful() or not keep_output:
841
if TestCaseInTempDir.TEST_ROOT is not None:
842
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
844
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
2536
845
return result.wasSuccessful()
2539
# Controlled by "bzr selftest -E=..." option
2540
selftest_debug_flags = set()
2543
848
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2545
test_suite_factory=None,
2548
matching_tests_first=None,
2551
exclude_pattern=None,
2557
851
"""Run the whole test suite under the enhanced runner"""
2558
# XXX: Very ugly way to do this...
2559
# Disable warning about old formats because we don't want it to disturb
2560
# any blackbox tests.
2561
from bzrlib import repository
2562
repository._deprecation_warning_done = True
2564
852
global default_transport
2565
853
if transport is None:
2566
854
transport = default_transport
2567
855
old_transport = default_transport
2568
856
default_transport = transport
2569
global selftest_debug_flags
2570
old_debug_flags = selftest_debug_flags
2571
if debug_flags is not None:
2572
selftest_debug_flags = set(debug_flags)
2574
if load_list is None:
2577
keep_only = load_test_id_list(load_list)
2578
if test_suite_factory is None:
2579
suite = test_suite(keep_only, starting_with)
2581
suite = test_suite_factory()
2582
859
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2583
stop_on_failure=stop_on_failure,
2584
transport=transport,
2585
lsprof_timed=lsprof_timed,
2586
bench_history=bench_history,
2587
matching_tests_first=matching_tests_first,
2588
list_only=list_only,
2589
random_seed=random_seed,
2590
exclude_pattern=exclude_pattern,
860
stop_on_failure=stop_on_failure, keep_output=keep_output,
2593
863
default_transport = old_transport
2594
selftest_debug_flags = old_debug_flags
2597
def load_test_id_list(file_name):
2598
"""Load a test id list from a text file.
2600
The format is one test id by line. No special care is taken to impose
2601
strict rules, these test ids are used to filter the test suite so a test id
2602
that do not match an existing test will do no harm. This allows user to add
2603
comments, leave blank lines, etc.
2607
ftest = open(file_name, 'rt')
2609
if e.errno != errno.ENOENT:
2612
raise errors.NoSuchFile(file_name)
2614
for test_name in ftest.readlines():
2615
test_list.append(test_name.strip())
2620
def suite_matches_id_list(test_suite, id_list):
2621
"""Warns about tests not appearing or appearing more than once.
2623
:param test_suite: A TestSuite object.
2624
:param test_id_list: The list of test ids that should be found in
2627
:return: (absents, duplicates) absents is a list containing the test found
2628
in id_list but not in test_suite, duplicates is a list containing the
2629
test found multiple times in test_suite.
2631
When using a prefined test id list, it may occurs that some tests do not
2632
exist anymore or that some tests use the same id. This function warns the
2633
tester about potential problems in his workflow (test lists are volatile)
2634
or in the test suite itself (using the same id for several tests does not
2635
help to localize defects).
2637
# Build a dict counting id occurrences
2639
for test in iter_suite_tests(test_suite):
2641
tests[id] = tests.get(id, 0) + 1
2646
occurs = tests.get(id, 0)
2648
not_found.append(id)
2650
duplicates.append(id)
2652
return not_found, duplicates
2655
class TestIdList(object):
2656
"""Test id list to filter a test suite.
2658
Relying on the assumption that test ids are built as:
2659
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2660
notation, this class offers methods to :
2661
- avoid building a test suite for modules not refered to in the test list,
2662
- keep only the tests listed from the module test suite.
2665
def __init__(self, test_id_list):
2666
# When a test suite needs to be filtered against us we compare test ids
2667
# for equality, so a simple dict offers a quick and simple solution.
2668
self.tests = dict().fromkeys(test_id_list, True)
2670
# While unittest.TestCase have ids like:
2671
# <module>.<class>.<method>[(<param+)],
2672
# doctest.DocTestCase can have ids like:
2675
# <module>.<function>
2676
# <module>.<class>.<method>
2678
# Since we can't predict a test class from its name only, we settle on
2679
# a simple constraint: a test id always begins with its module name.
2682
for test_id in test_id_list:
2683
parts = test_id.split('.')
2684
mod_name = parts.pop(0)
2685
modules[mod_name] = True
2687
mod_name += '.' + part
2688
modules[mod_name] = True
2689
self.modules = modules
2691
def refers_to(self, module_name):
2692
"""Is there tests for the module or one of its sub modules."""
2693
return self.modules.has_key(module_name)
2695
def includes(self, test_id):
2696
return self.tests.has_key(test_id)
2699
class TestPrefixAliasRegistry(registry.Registry):
2700
"""A registry for test prefix aliases.
2702
This helps implement shorcuts for the --starting-with selftest
2703
option. Overriding existing prefixes is not allowed but not fatal (a
2704
warning will be emitted).
2707
def register(self, key, obj, help=None, info=None,
2708
override_existing=False):
2709
"""See Registry.register.
2711
Trying to override an existing alias causes a warning to be emitted,
2712
not a fatal execption.
2715
super(TestPrefixAliasRegistry, self).register(
2716
key, obj, help=help, info=info, override_existing=False)
2718
actual = self.get(key)
2719
note('Test prefix alias %s is already used for %s, ignoring %s'
2720
% (key, actual, obj))
2722
def resolve_alias(self, id_start):
2723
"""Replace the alias by the prefix in the given string.
2725
Using an unknown prefix is an error to help catching typos.
2727
parts = id_start.split('.')
2729
parts[0] = self.get(parts[0])
2731
raise errors.BzrCommandError(
2732
'%s is not a known test prefix alias' % parts[0])
2733
return '.'.join(parts)
2736
test_prefix_alias_registry = TestPrefixAliasRegistry()
2737
"""Registry of test prefix aliases."""
2740
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2741
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2742
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2744
# Obvious higest levels prefixes, feel free to add your own via a plugin
2745
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2746
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2747
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2748
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2749
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2752
def test_suite(keep_only=None, starting_with=None):
2753
"""Build and return TestSuite for the whole of bzrlib.
2755
:param keep_only: A list of test ids limiting the suite returned.
2757
:param starting_with: An id limiting the suite returned to the tests
2760
This function can be replaced if you need to change the default test
2761
suite on a global basis, but it is not encouraged.
2765
'bzrlib.tests.blackbox',
2766
'bzrlib.tests.branch_implementations',
2767
'bzrlib.tests.bzrdir_implementations',
2768
'bzrlib.tests.commands',
2769
'bzrlib.tests.interrepository_implementations',
2770
'bzrlib.tests.intertree_implementations',
2771
'bzrlib.tests.inventory_implementations',
2772
'bzrlib.tests.per_lock',
2773
'bzrlib.tests.per_repository',
2774
'bzrlib.tests.per_repository_reference',
2775
'bzrlib.tests.test__dirstate_helpers',
2776
'bzrlib.tests.test__walkdirs_win32',
868
"""Build and return TestSuite for the whole program."""
869
from doctest import DocTestSuite
871
global MODULES_TO_DOCTEST
2777
874
'bzrlib.tests.test_ancestry',
2778
875
'bzrlib.tests.test_annotate',
2779
876
'bzrlib.tests.test_api',
2780
'bzrlib.tests.test_atomicfile',
2781
877
'bzrlib.tests.test_bad_files',
2782
'bzrlib.tests.test_bisect_multi',
878
'bzrlib.tests.test_basis_inventory',
2783
879
'bzrlib.tests.test_branch',
2784
'bzrlib.tests.test_branchbuilder',
2785
'bzrlib.tests.test_btree_index',
2786
'bzrlib.tests.test_bugtracker',
2787
'bzrlib.tests.test_bundle',
2788
880
'bzrlib.tests.test_bzrdir',
2789
'bzrlib.tests.test_cache_utf8',
2790
'bzrlib.tests.test_chunk_writer',
2791
'bzrlib.tests.test_commands',
881
'bzrlib.tests.test_command',
2792
882
'bzrlib.tests.test_commit',
2793
883
'bzrlib.tests.test_commit_merge',
2794
884
'bzrlib.tests.test_config',
2795
885
'bzrlib.tests.test_conflicts',
2796
'bzrlib.tests.test_counted_lock',
2797
886
'bzrlib.tests.test_decorators',
2798
'bzrlib.tests.test_delta',
2799
'bzrlib.tests.test_deprecated_graph',
2800
887
'bzrlib.tests.test_diff',
2801
'bzrlib.tests.test_directory_service',
2802
'bzrlib.tests.test_dirstate',
2803
'bzrlib.tests.test_email_message',
888
'bzrlib.tests.test_doc_generate',
2804
889
'bzrlib.tests.test_errors',
2805
'bzrlib.tests.test_extract',
2806
890
'bzrlib.tests.test_fetch',
2807
'bzrlib.tests.test_ftp_transport',
2808
'bzrlib.tests.test_foreign',
2809
'bzrlib.tests.test_generate_docs',
2810
'bzrlib.tests.test_generate_ids',
2811
'bzrlib.tests.test_globbing',
2812
891
'bzrlib.tests.test_gpg',
2813
892
'bzrlib.tests.test_graph',
2814
893
'bzrlib.tests.test_hashcache',
2815
'bzrlib.tests.test_help',
2816
'bzrlib.tests.test_hooks',
2817
894
'bzrlib.tests.test_http',
2818
'bzrlib.tests.test_http_implementations',
2819
'bzrlib.tests.test_http_response',
2820
'bzrlib.tests.test_https_ca_bundle',
2821
895
'bzrlib.tests.test_identitymap',
2822
'bzrlib.tests.test_ignores',
2823
'bzrlib.tests.test_index',
2824
'bzrlib.tests.test_info',
2825
896
'bzrlib.tests.test_inv',
2826
'bzrlib.tests.test_knit',
2827
'bzrlib.tests.test_lazy_import',
2828
'bzrlib.tests.test_lazy_regex',
897
'bzrlib.tests.test_lockdir',
2829
898
'bzrlib.tests.test_lockable_files',
2830
'bzrlib.tests.test_lockdir',
2831
899
'bzrlib.tests.test_log',
2832
'bzrlib.tests.test_lru_cache',
2833
'bzrlib.tests.test_lsprof',
2834
'bzrlib.tests.test_mail_client',
2835
'bzrlib.tests.test_memorytree',
2836
900
'bzrlib.tests.test_merge',
2837
901
'bzrlib.tests.test_merge3',
2838
902
'bzrlib.tests.test_merge_core',
2839
'bzrlib.tests.test_merge_directive',
2840
903
'bzrlib.tests.test_missing',
2841
904
'bzrlib.tests.test_msgeditor',
2842
'bzrlib.tests.test_multiparent',
2843
'bzrlib.tests.test_mutabletree',
2844
905
'bzrlib.tests.test_nonascii',
2845
906
'bzrlib.tests.test_options',
2846
907
'bzrlib.tests.test_osutils',
2847
'bzrlib.tests.test_osutils_encodings',
2848
'bzrlib.tests.test_pack',
2849
'bzrlib.tests.test_pack_repository',
2850
'bzrlib.tests.test_patch',
2851
'bzrlib.tests.test_patches',
2852
908
'bzrlib.tests.test_permissions',
2853
909
'bzrlib.tests.test_plugins',
2854
'bzrlib.tests.test_progress',
2855
'bzrlib.tests.test_read_bundle',
2856
910
'bzrlib.tests.test_reconcile',
2857
'bzrlib.tests.test_reconfigure',
2858
'bzrlib.tests.test_registry',
2859
'bzrlib.tests.test_remote',
2860
911
'bzrlib.tests.test_repository',
2861
'bzrlib.tests.test_revert',
2862
912
'bzrlib.tests.test_revision',
2863
'bzrlib.tests.test_revisionspec',
2864
'bzrlib.tests.test_revisiontree',
913
'bzrlib.tests.test_revisionnamespaces',
914
'bzrlib.tests.test_revprops',
915
'bzrlib.tests.test_reweave',
2865
916
'bzrlib.tests.test_rio',
2866
'bzrlib.tests.test_rules',
2867
917
'bzrlib.tests.test_sampler',
2868
918
'bzrlib.tests.test_selftest',
2869
919
'bzrlib.tests.test_setup',
2870
920
'bzrlib.tests.test_sftp_transport',
2871
'bzrlib.tests.test_shelf',
2872
'bzrlib.tests.test_shelf_ui',
2873
'bzrlib.tests.test_smart',
2874
921
'bzrlib.tests.test_smart_add',
2875
'bzrlib.tests.test_smart_transport',
2876
'bzrlib.tests.test_smtp_connection',
2877
922
'bzrlib.tests.test_source',
2878
'bzrlib.tests.test_ssh_transport',
2879
'bzrlib.tests.test_status',
2880
923
'bzrlib.tests.test_store',
2881
'bzrlib.tests.test_strace',
2882
'bzrlib.tests.test_subsume',
2883
'bzrlib.tests.test_switch',
2884
924
'bzrlib.tests.test_symbol_versioning',
2885
'bzrlib.tests.test_tag',
2886
925
'bzrlib.tests.test_testament',
2887
'bzrlib.tests.test_textfile',
2888
'bzrlib.tests.test_textmerge',
2889
'bzrlib.tests.test_timestamp',
2890
926
'bzrlib.tests.test_trace',
2891
927
'bzrlib.tests.test_transactions',
2892
928
'bzrlib.tests.test_transform',
2893
929
'bzrlib.tests.test_transport',
2894
'bzrlib.tests.test_transport_implementations',
2895
'bzrlib.tests.test_transport_log',
2896
'bzrlib.tests.test_tree',
2897
'bzrlib.tests.test_treebuilder',
2898
930
'bzrlib.tests.test_tsort',
2899
'bzrlib.tests.test_tuned_gzip',
2900
931
'bzrlib.tests.test_ui',
2901
932
'bzrlib.tests.test_uncommit',
2902
933
'bzrlib.tests.test_upgrade',
2903
'bzrlib.tests.test_upgrade_stacked',
2904
'bzrlib.tests.test_urlutils',
2905
'bzrlib.tests.test_version',
2906
'bzrlib.tests.test_version_info',
2907
'bzrlib.tests.test_versionedfile',
2908
934
'bzrlib.tests.test_weave',
2909
935
'bzrlib.tests.test_whitebox',
2910
'bzrlib.tests.test_win32utils',
2911
936
'bzrlib.tests.test_workingtree',
2912
'bzrlib.tests.test_workingtree_4',
2913
'bzrlib.tests.test_wsgi',
2914
937
'bzrlib.tests.test_xml',
2915
'bzrlib.tests.tree_implementations',
2916
'bzrlib.tests.workingtree_implementations',
2917
'bzrlib.util.tests.test_bencode',
2920
loader = TestUtil.TestLoader()
2923
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2924
for start in starting_with]
2925
# We take precedence over keep_only because *at loading time* using
2926
# both options means we will load less tests for the same final result.
2927
def interesting_module(name):
2928
for start in starting_with:
2930
# Either the module name starts with the specified string
2931
name.startswith(start)
2932
# or it may contain tests starting with the specified string
2933
or start.startswith(name)
2937
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2939
elif keep_only is not None:
2940
id_filter = TestIdList(keep_only)
2941
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2942
def interesting_module(name):
2943
return id_filter.refers_to(name)
2946
loader = TestUtil.TestLoader()
2947
def interesting_module(name):
2948
# No filtering, all modules are interesting
2951
suite = loader.suiteClass()
2953
# modules building their suite with loadTestsFromModuleNames
2954
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2956
modules_to_doctest = [
2958
'bzrlib.branchbuilder',
2961
'bzrlib.iterablefile',
2965
'bzrlib.symbol_versioning',
2968
'bzrlib.version_info_formats.format_custom',
2971
for mod in modules_to_doctest:
2972
if not interesting_module(mod):
2973
# No tests to keep here, move along
2976
# note that this really does mean "report only" -- doctest
2977
# still runs the rest of the examples
2978
doc_suite = doctest.DocTestSuite(mod,
2979
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
2980
except ValueError, e:
2981
print '**failed to get doctest for: %s\n%s' % (mod, e)
2983
if len(doc_suite._tests) == 0:
2984
raise errors.BzrError("no doctests found in %s" % (mod,))
2985
suite.addTest(doc_suite)
2987
default_encoding = sys.getdefaultencoding()
2988
for name, plugin in bzrlib.plugin.plugins().items():
2989
if not interesting_module(plugin.module.__name__):
2991
plugin_suite = plugin.test_suite()
2992
# We used to catch ImportError here and turn it into just a warning,
2993
# but really if you don't have --no-plugins this should be a failure.
2994
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2995
if plugin_suite is None:
2996
plugin_suite = plugin.load_plugin_tests(loader)
2997
if plugin_suite is not None:
2998
suite.addTest(plugin_suite)
2999
if default_encoding != sys.getdefaultencoding():
3000
bzrlib.trace.warning(
3001
'Plugin "%s" tried to reset default encoding to: %s', name,
3002
sys.getdefaultencoding())
3004
sys.setdefaultencoding(default_encoding)
3007
suite = filter_suite_by_id_startswith(suite, starting_with)
3009
if keep_only is not None:
3010
# Now that the referred modules have loaded their tests, keep only the
3012
suite = filter_suite_by_id_list(suite, id_filter)
3013
# Do some sanity checks on the id_list filtering
3014
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3016
# The tester has used both keep_only and starting_with, so he is
3017
# already aware that some tests are excluded from the list, there
3018
# is no need to tell him which.
3021
# Some tests mentioned in the list are not in the test suite. The
3022
# list may be out of date, report to the tester.
3023
for id in not_found:
3024
bzrlib.trace.warning('"%s" not found in the test suite', id)
3025
for id in duplicates:
3026
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3031
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3032
"""Adapt all tests in some given modules to given scenarios.
3034
This is the recommended public interface for test parameterization.
3035
Typically the test_suite() method for a per-implementation test
3036
suite will call multiply_tests_from_modules and return the
3039
:param module_name_list: List of fully-qualified names of test
3041
:param scenario_iter: Iterable of pairs of (scenario_name,
3042
scenario_param_dict).
3043
:param loader: If provided, will be used instead of a new
3044
bzrlib.tests.TestLoader() instance.
3046
This returns a new TestSuite containing the cross product of
3047
all the tests in all the modules, each repeated for each scenario.
3048
Each test is adapted by adding the scenario name at the end
3049
of its name, and updating the test object's __dict__ with the
3050
scenario_param_dict.
3052
>>> r = multiply_tests_from_modules(
3053
... ['bzrlib.tests.test_sampler'],
3054
... [('one', dict(param=1)),
3055
... ('two', dict(param=2))])
3056
>>> tests = list(iter_suite_tests(r))
3060
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3066
# XXX: Isn't load_tests() a better way to provide the same functionality
3067
# without forcing a predefined TestScenarioApplier ? --vila 080215
3069
loader = TestUtil.TestLoader()
3071
suite = loader.suiteClass()
3073
adapter = TestScenarioApplier()
3074
adapter.scenarios = list(scenario_iter)
3075
adapt_modules(module_name_list, adapter, loader, suite)
3079
def multiply_scenarios(scenarios_left, scenarios_right):
3080
"""Multiply two sets of scenarios.
3082
:returns: the cartesian product of the two sets of scenarios, that is
3083
a scenario for every possible combination of a left scenario and a
3087
('%s,%s' % (left_name, right_name),
3088
dict(left_dict.items() + right_dict.items()))
3089
for left_name, left_dict in scenarios_left
3090
for right_name, right_dict in scenarios_right]
939
test_transport_implementations = [
940
'bzrlib.tests.test_transport_implementations']
942
TestCase.BZRPATH = osutils.pathjoin(
943
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
944
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
945
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
948
# python2.4's TestLoader.loadTestsFromNames gives very poor
949
# errors if it fails to load a named module - no indication of what's
950
# actually wrong, just "no such module". We should probably override that
951
# class, but for the moment just load them ourselves. (mbp 20051202)
952
loader = TestLoader()
953
from bzrlib.transport import TransportTestProviderAdapter
954
adapter = TransportTestProviderAdapter()
955
adapt_modules(test_transport_implementations, adapter, loader, suite)
956
for mod_name in testmod_names:
957
mod = _load_module_by_name(mod_name)
958
suite.addTest(loader.loadTestsFromModule(mod))
959
for package in packages_to_test():
960
suite.addTest(package.test_suite())
961
for m in MODULES_TO_TEST:
962
suite.addTest(loader.loadTestsFromModule(m))
963
for m in (MODULES_TO_DOCTEST):
964
suite.addTest(DocTestSuite(m))
965
for name, plugin in bzrlib.plugin.all_plugins().items():
966
if getattr(plugin, 'test_suite', None) is not None:
967
suite.addTest(plugin.test_suite())
3094
971
def adapt_modules(mods_list, adapter, loader, suite):
3095
972
"""Adapt the modules in mods_list using adapter and add to suite."""
3096
tests = loader.loadTestsFromModuleNames(mods_list)
3097
adapt_tests(tests, adapter, suite)
3100
def adapt_tests(tests_list, adapter, suite):
3101
"""Adapt the tests in tests_list using adapter and add to suite."""
3102
for test in iter_suite_tests(tests_list):
3103
suite.addTests(adapter.adapt(test))
3106
def _rmtree_temp_dir(dirname):
3107
# If LANG=C we probably have created some bogus paths
3108
# which rmtree(unicode) will fail to delete
3109
# so make sure we are using rmtree(str) to delete everything
3110
# except on win32, where rmtree(str) will fail
3111
# since it doesn't have the property of byte-stream paths
3112
# (they are either ascii or mbcs)
3113
if sys.platform == 'win32':
3114
# make sure we are using the unicode win32 api
3115
dirname = unicode(dirname)
3117
dirname = dirname.encode(sys.getfilesystemencoding())
3119
osutils.rmtree(dirname)
3121
if sys.platform == 'win32' and e.errno == errno.EACCES:
3122
sys.stderr.write(('Permission denied: '
3123
'unable to remove testing dir '
3124
'%s\n' % os.path.basename(dirname)))
3129
class Feature(object):
3130
"""An operating system Feature."""
3133
self._available = None
3135
def available(self):
3136
"""Is the feature available?
3138
:return: True if the feature is available.
3140
if self._available is None:
3141
self._available = self._probe()
3142
return self._available
3145
"""Implement this method in concrete features.
3147
:return: True if the feature is available.
3149
raise NotImplementedError
3152
if getattr(self, 'feature_name', None):
3153
return self.feature_name()
3154
return self.__class__.__name__
3157
class _SymlinkFeature(Feature):
3160
return osutils.has_symlinks()
3162
def feature_name(self):
3165
SymlinkFeature = _SymlinkFeature()
3168
class _HardlinkFeature(Feature):
3171
return osutils.has_hardlinks()
3173
def feature_name(self):
3176
HardlinkFeature = _HardlinkFeature()
3179
class _OsFifoFeature(Feature):
3182
return getattr(os, 'mkfifo', None)
3184
def feature_name(self):
3185
return 'filesystem fifos'
3187
OsFifoFeature = _OsFifoFeature()
3190
class _UnicodeFilenameFeature(Feature):
3191
"""Does the filesystem support Unicode filenames?"""
3195
# Check for character combinations unlikely to be covered by any
3196
# single non-unicode encoding. We use the characters
3197
# - greek small letter alpha (U+03B1) and
3198
# - braille pattern dots-123456 (U+283F).
3199
os.stat(u'\u03b1\u283f')
3200
except UnicodeEncodeError:
3202
except (IOError, OSError):
3203
# The filesystem allows the Unicode filename but the file doesn't
3207
# The filesystem allows the Unicode filename and the file exists,
3211
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3214
class TestScenarioApplier(object):
3215
"""A tool to apply scenarios to tests."""
3217
def adapt(self, test):
3218
"""Return a TestSuite containing a copy of test for each scenario."""
3219
result = unittest.TestSuite()
3220
for scenario in self.scenarios:
3221
result.addTest(self.adapt_test_to_scenario(test, scenario))
3224
def adapt_test_to_scenario(self, test, scenario):
3225
"""Copy test and apply scenario to it.
3227
:param test: A test to adapt.
3228
:param scenario: A tuple describing the scenarion.
3229
The first element of the tuple is the new test id.
3230
The second element is a dict containing attributes to set on the
3232
:return: The adapted test.
3234
from copy import deepcopy
3235
new_test = deepcopy(test)
3236
for name, value in scenario[1].items():
3237
setattr(new_test, name, value)
3238
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3239
new_test.id = lambda: new_id
3243
def probe_unicode_in_user_encoding():
3244
"""Try to encode several unicode strings to use in unicode-aware tests.
3245
Return first successfull match.
3247
:return: (unicode value, encoded plain string value) or (None, None)
3249
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3250
for uni_val in possible_vals:
3252
str_val = uni_val.encode(osutils.get_user_encoding())
3253
except UnicodeEncodeError:
3254
# Try a different character
3257
return uni_val, str_val
3261
def probe_bad_non_ascii(encoding):
3262
"""Try to find [bad] character with code [128..255]
3263
that cannot be decoded to unicode in some encoding.
3264
Return None if all non-ascii characters is valid
3267
for i in xrange(128, 256):
3270
char.decode(encoding)
3271
except UnicodeDecodeError:
3276
class _FTPServerFeature(Feature):
3277
"""Some tests want an FTP Server, check if one is available.
3279
Right now, the only way this is available is if 'medusa' is installed.
3280
http://www.amk.ca/python/code/medusa.html
3285
import bzrlib.tests.ftp_server
3290
def feature_name(self):
3293
FTPServerFeature = _FTPServerFeature()
3296
class _UnicodeFilename(Feature):
3297
"""Does the filesystem support Unicode filenames?"""
3302
except UnicodeEncodeError:
3304
except (IOError, OSError):
3305
# The filesystem allows the Unicode filename but the file doesn't
3309
# The filesystem allows the Unicode filename and the file exists,
3313
UnicodeFilename = _UnicodeFilename()
3316
class _UTF8Filesystem(Feature):
3317
"""Is the filesystem UTF-8?"""
3320
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3324
UTF8Filesystem = _UTF8Filesystem()
3327
class _CaseInsensitiveFilesystemFeature(Feature):
3328
"""Check if underlying filesystem is case-insensitive
3329
(e.g. on Windows, Cygwin, MacOS)
3333
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3334
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3335
TestCaseWithMemoryTransport.TEST_ROOT = root
3337
root = TestCaseWithMemoryTransport.TEST_ROOT
3338
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3340
name_a = osutils.pathjoin(tdir, 'a')
3341
name_A = osutils.pathjoin(tdir, 'A')
3343
result = osutils.isdir(name_A)
3344
_rmtree_temp_dir(tdir)
3347
def feature_name(self):
3348
return 'case-insensitive filesystem'
3350
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
973
for mod_name in mods_list:
974
mod = _load_module_by_name(mod_name)
975
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
976
suite.addTests(adapter.adapt(test))
979
def _load_module_by_name(mod_name):
980
parts = mod_name.split('.')
981
module = __import__(mod_name)
983
# for historical reasons python returns the top-level module even though
984
# it loads the submodule; we need to walk down to get the one we want.
986
module = getattr(module, parts.pop(0))