21
21
# little as possible, so this should be used rarely if it's added at all.
22
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
24
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
64
38
import bzrlib.branch
65
39
import bzrlib.commands
66
import bzrlib.timestamp
40
from bzrlib.errors import (BzrError,
42
UninitializableFormat,
68
44
import bzrlib.inventory
69
45
import bzrlib.iterablefile
70
46
import bzrlib.lockdir
74
# lsprof not available
76
from bzrlib.merge import merge_inner
77
47
import bzrlib.merge3
49
import bzrlib.osutils as osutils
78
50
import bzrlib.plugin
79
51
import bzrlib.store
80
from bzrlib import symbol_versioning
81
from bzrlib.symbol_versioning import (
87
52
import bzrlib.trace
88
from bzrlib.transport import get_transport
53
from bzrlib.transport import urlescape, get_transport
89
54
import bzrlib.transport
90
from bzrlib.transport.local import LocalURLServer
91
from bzrlib.transport.memory import MemoryServer
55
from bzrlib.transport.local import LocalRelpathServer
92
56
from bzrlib.transport.readonly import ReadonlyServer
93
from bzrlib.trace import mutter, note
94
from bzrlib.tests import TestUtil
95
from bzrlib.tests.http_server import HttpServer
96
from bzrlib.tests.TestUtil import (
57
from bzrlib.trace import mutter
58
from bzrlib.tests.TestUtil import TestLoader, TestSuite
100
59
from bzrlib.tests.treeshape import build_tree_contents
101
import bzrlib.version_info_formats.format_custom
102
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
104
# Mark this python module as being part of the implementation
105
# of unittest: this gives us better tracebacks where the last
106
# shown frame is the test code, not our assertXYZ.
109
default_transport = LocalURLServer
112
class ExtendedTestResult(unittest._TextTestResult):
113
"""Accepts, reports and accumulates the results of running tests.
115
Compared to the unittest version this class adds support for
116
profiling, benchmarking, stopping as soon as a test fails, and
117
skipping tests. There are further-specialized subclasses for
118
different types of display.
120
When a test finishes, in whatever way, it calls one of the addSuccess,
121
addFailure or addError classes. These in turn may redirect to a more
122
specific case for the special test results supported by our extended
125
Note that just one of these objects is fed the results from many tests.
60
from bzrlib.workingtree import WorkingTree
62
default_transport = LocalRelpathServer
65
MODULES_TO_DOCTEST = [
77
def packages_to_test():
78
"""Return a list of packages to test.
80
The packages are not globally imported so that import failures are
81
triggered when running selftest, not when importing the command.
84
import bzrlib.tests.blackbox
85
import bzrlib.tests.branch_implementations
88
bzrlib.tests.branch_implementations,
92
class _MyResult(unittest._TextTestResult):
95
Shows output in a different format, including displaying runtime for tests.
128
97
stop_early = False
130
def __init__(self, stream, descriptions, verbosity,
134
"""Construct new TestResult.
136
:param bench_history: Optionally, a writable file object to accumulate
139
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
140
if bench_history is not None:
141
from bzrlib.version import _get_bzr_source_tree
142
src_tree = _get_bzr_source_tree()
145
revision_id = src_tree.get_parent_ids()[0]
147
# XXX: if this is a brand new tree, do the same as if there
151
# XXX: If there's no branch, what should we do?
153
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
154
self._bench_history = bench_history
155
self.ui = ui.ui_factory
156
self.num_tests = num_tests
158
self.failure_count = 0
159
self.known_failure_count = 0
161
self.not_applicable_count = 0
162
self.unsupported = {}
164
self._overall_start_time = time.time()
166
def _extractBenchmarkTime(self, testCase):
167
"""Add a benchmark time for the current test case."""
168
return getattr(testCase, "_benchtime", None)
170
def _elapsedTestTimeString(self):
171
"""Return a time string for the overall time the current test has taken."""
172
return self._formatTime(time.time() - self._start_time)
174
def _testTimeString(self, testCase):
175
benchmark_time = self._extractBenchmarkTime(testCase)
176
if benchmark_time is not None:
178
self._formatTime(benchmark_time),
179
self._elapsedTestTimeString())
181
return " %s" % self._elapsedTestTimeString()
183
def _formatTime(self, seconds):
184
"""Format seconds as milliseconds with leading spaces."""
185
# some benchmarks can take thousands of seconds to run, so we need 8
187
return "%8dms" % (1000 * seconds)
189
def _shortened_test_description(self, test):
191
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
99
def _elapsedTime(self):
100
return "%5dms" % (1000 * (time.time() - self._start_time))
194
102
def startTest(self, test):
195
103
unittest.TestResult.startTest(self, test)
196
self.report_test_start(test)
197
test.number = self.count
198
self._recordTestStartTime()
200
def _recordTestStartTime(self):
201
"""Record that a test has started."""
104
# In a short description, the important words are in
105
# the beginning, but in an id, the important words are
107
SHOW_DESCRIPTIONS = False
109
width = osutils.terminal_width()
110
name_width = width - 15
112
if SHOW_DESCRIPTIONS:
113
what = test.shortDescription()
115
if len(what) > name_width:
116
what = what[:name_width-3] + '...'
119
if what.startswith('bzrlib.tests.'):
121
if len(what) > name_width:
122
what = '...' + what[3-name_width:]
123
what = what.ljust(name_width)
124
self.stream.write(what)
202
126
self._start_time = time.time()
204
def _cleanupLogFile(self, test):
205
# We can only do this if we have one of our TestCases, not if
207
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
208
if setKeepLogfile is not None:
211
128
def addError(self, test, err):
212
"""Tell result that test finished with an error.
214
Called from the TestCase run() method when the test
215
fails with an unexpected error.
217
self._testConcluded(test)
218
129
if isinstance(err[1], TestSkipped):
219
return self._addSkipped(test, err)
220
elif isinstance(err[1], UnavailableFeature):
221
return self.addNotSupported(test, err[1].args[0])
223
unittest.TestResult.addError(self, test, err)
224
self.error_count += 1
225
self.report_error(test, err)
228
self._cleanupLogFile(test)
130
return self.addSkipped(test, err)
131
unittest.TestResult.addError(self, test, err)
133
self.stream.writeln("ERROR %s" % self._elapsedTime())
135
self.stream.write('E')
230
140
def addFailure(self, test, err):
231
"""Tell result that test failed.
233
Called from the TestCase run() method when the test
234
fails because e.g. an assert() method failed.
236
self._testConcluded(test)
237
if isinstance(err[1], KnownFailure):
238
return self._addKnownFailure(test, err)
240
unittest.TestResult.addFailure(self, test, err)
241
self.failure_count += 1
242
self.report_failure(test, err)
245
self._cleanupLogFile(test)
141
unittest.TestResult.addFailure(self, test, err)
143
self.stream.writeln(" FAIL %s" % self._elapsedTime())
145
self.stream.write('F')
247
150
def addSuccess(self, test):
248
"""Tell result that test completed successfully.
250
Called from the TestCase run()
252
self._testConcluded(test)
253
if self._bench_history is not None:
254
benchmark_time = self._extractBenchmarkTime(test)
255
if benchmark_time is not None:
256
self._bench_history.write("%s %s\n" % (
257
self._formatTime(benchmark_time),
259
self.report_success(test)
260
self._cleanupLogFile(test)
261
unittest.TestResult.addSuccess(self, test)
262
test._log_contents = ''
264
def _testConcluded(self, test):
265
"""Common code when a test has finished.
267
Called regardless of whether it succeded, failed, etc.
271
def _addKnownFailure(self, test, err):
272
self.known_failure_count += 1
273
self.report_known_failure(test, err)
275
def addNotSupported(self, test, feature):
276
"""The test will not be run because of a missing feature.
278
# this can be called in two different ways: it may be that the
279
# test started running, and then raised (through addError)
280
# UnavailableFeature. Alternatively this method can be called
281
# while probing for features before running the tests; in that
282
# case we will see startTest and stopTest, but the test will never
284
self.unsupported.setdefault(str(feature), 0)
285
self.unsupported[str(feature)] += 1
286
self.report_unsupported(test, feature)
288
def _addSkipped(self, test, skip_excinfo):
289
if isinstance(skip_excinfo[1], TestNotApplicable):
290
self.not_applicable_count += 1
291
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
302
# seems best to treat this as success from point-of-view of unittest
303
# -- it actually does nothing so it barely matters :)
304
unittest.TestResult.addSuccess(self, test)
305
test._log_contents = ''
152
self.stream.writeln(' OK %s' % self._elapsedTime())
154
self.stream.write('~')
156
unittest.TestResult.addSuccess(self, test)
158
def addSkipped(self, test, skip_excinfo):
160
print >>self.stream, ' SKIP %s' % self._elapsedTime()
161
print >>self.stream, ' %s' % skip_excinfo[1]
163
self.stream.write('S')
165
# seems best to treat this as success from point-of-view of unittest
166
# -- it actually does nothing so it barely matters :)
167
unittest.TestResult.addSuccess(self, test)
307
169
def printErrorList(self, flavour, errors):
308
170
for test, err in errors:
309
171
self.stream.writeln(self.separator1)
310
self.stream.write("%s: " % flavour)
311
self.stream.writeln(self.getDescription(test))
172
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
312
173
if getattr(test, '_get_log', None) is not None:
313
self.stream.write('\n')
315
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
316
self.stream.write('\n')
317
self.stream.write(test._get_log())
318
self.stream.write('\n')
320
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
321
self.stream.write('\n')
175
print >>self.stream, \
176
('vvvv[log from %s]' % test.id()).ljust(78,'-')
177
print >>self.stream, test._get_log()
178
print >>self.stream, \
179
('^^^^[log from %s]' % test.id()).ljust(78,'-')
322
180
self.stream.writeln(self.separator2)
323
181
self.stream.writeln("%s" % err)
328
def report_cleaning_up(self):
331
def report_success(self, test):
334
def wasStrictlySuccessful(self):
335
if self.unsupported or self.known_failure_count:
337
return self.wasSuccessful()
340
class TextTestResult(ExtendedTestResult):
341
"""Displays progress and results of tests in text form"""
343
def __init__(self, stream, descriptions, verbosity,
348
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
349
bench_history, num_tests)
351
self.pb = self.ui.nested_progress_bar()
352
self._supplied_pb = False
355
self._supplied_pb = True
356
self.pb.show_pct = False
357
self.pb.show_spinner = False
358
self.pb.show_eta = False,
359
self.pb.show_count = False
360
self.pb.show_bar = False
362
def report_starting(self):
363
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
365
def _progress_prefix_text(self):
366
# the longer this text, the less space we have to show the test
368
a = '[%d' % self.count # total that have been run
369
# tests skipped as known not to be relevant are not important enough
371
## if self.skip_count:
372
## a += ', %d skip' % self.skip_count
373
## if self.known_failure_count:
374
## a += '+%dX' % self.known_failure_count
375
if self.num_tests is not None:
376
a +='/%d' % self.num_tests
378
runtime = time.time() - self._overall_start_time
380
a += '%dm%ds' % (runtime / 60, runtime % 60)
384
a += ', %d err' % self.error_count
385
if self.failure_count:
386
a += ', %d fail' % self.failure_count
388
a += ', %d missing' % len(self.unsupported)
392
def report_test_start(self, test):
395
self._progress_prefix_text()
397
+ self._shortened_test_description(test))
399
def _test_description(self, test):
400
return self._shortened_test_description(test)
402
def report_error(self, test, err):
403
self.pb.note('ERROR: %s\n %s\n',
404
self._test_description(test),
408
def report_failure(self, test, err):
409
self.pb.note('FAIL: %s\n %s\n',
410
self._test_description(test),
414
def report_known_failure(self, test, err):
415
self.pb.note('XFAIL: %s\n%s\n',
416
self._test_description(test), err[1])
418
def report_skip(self, test, skip_excinfo):
421
def report_not_applicable(self, test, skip_excinfo):
424
def report_unsupported(self, test, feature):
425
"""test cannot be run because feature is missing."""
427
def report_cleaning_up(self):
428
self.pb.update('cleaning up...')
431
if not self._supplied_pb:
435
class VerboseTestResult(ExtendedTestResult):
436
"""Produce long output, with one line per test run plus times"""
438
def _ellipsize_to_right(self, a_string, final_width):
439
"""Truncate and pad a string, keeping the right hand side"""
440
if len(a_string) > final_width:
441
result = '...' + a_string[3-final_width:]
444
return result.ljust(final_width)
446
def report_starting(self):
447
self.stream.write('running %d tests...\n' % self.num_tests)
449
def report_test_start(self, test):
451
name = self._shortened_test_description(test)
452
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
453
# numbers, plus a trailing blank
454
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
455
self.stream.write(self._ellipsize_to_right(name,
456
osutils.terminal_width()-30))
459
def _error_summary(self, err):
461
return '%s%s' % (indent, err[1])
463
def report_error(self, test, err):
464
self.stream.writeln('ERROR %s\n%s'
465
% (self._testTimeString(test),
466
self._error_summary(err)))
468
def report_failure(self, test, err):
469
self.stream.writeln(' FAIL %s\n%s'
470
% (self._testTimeString(test),
471
self._error_summary(err)))
473
def report_known_failure(self, test, err):
474
self.stream.writeln('XFAIL %s\n%s'
475
% (self._testTimeString(test),
476
self._error_summary(err)))
478
def report_success(self, test):
479
self.stream.writeln(' OK %s' % self._testTimeString(test))
480
for bench_called, stats in getattr(test, '_benchcalls', []):
481
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
482
stats.pprint(file=self.stream)
483
# flush the stream so that we get smooth output. This verbose mode is
484
# used to show the output in PQM.
487
def report_skip(self, test, skip_excinfo):
488
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(test),
490
self._error_summary(skip_excinfo)))
492
def report_not_applicable(self, test, skip_excinfo):
493
self.stream.writeln(' N/A %s\n%s'
494
% (self._testTimeString(test),
495
self._error_summary(skip_excinfo)))
497
def report_unsupported(self, test, feature):
498
"""test cannot be run because feature is missing."""
499
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
500
%(self._testTimeString(test), feature))
503
class TextTestRunner(object):
184
class TextTestRunner(unittest.TextTestRunner):
504
185
stop_on_failure = False
513
self.stream = unittest._WritelnDecorator(stream)
514
self.descriptions = descriptions
515
self.verbosity = verbosity
516
self._bench_history = bench_history
517
self.list_only = list_only
520
"Run the given test case or test suite."
521
startTime = time.time()
522
if self.verbosity == 1:
523
result_class = TextTestResult
524
elif self.verbosity >= 2:
525
result_class = VerboseTestResult
526
result = result_class(self.stream,
529
bench_history=self._bench_history,
530
num_tests=test.countTestCases(),
187
def _makeResult(self):
188
result = _MyResult(self.stream, self.descriptions, self.verbosity)
532
189
result.stop_early = self.stop_on_failure
533
result.report_starting()
535
if self.verbosity >= 2:
536
self.stream.writeln("Listing tests only ...\n")
538
for t in iter_suite_tests(test):
539
self.stream.writeln("%s" % (t.id()))
541
actionTaken = "Listed"
544
run = result.testsRun
546
stopTime = time.time()
547
timeTaken = stopTime - startTime
549
self.stream.writeln(result.separator2)
550
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
551
run, run != 1 and "s" or "", timeTaken))
552
self.stream.writeln()
553
if not result.wasSuccessful():
554
self.stream.write("FAILED (")
555
failed, errored = map(len, (result.failures, result.errors))
557
self.stream.write("failures=%d" % failed)
559
if failed: self.stream.write(", ")
560
self.stream.write("errors=%d" % errored)
561
if result.known_failure_count:
562
if failed or errored: self.stream.write(", ")
563
self.stream.write("known_failure_count=%d" %
564
result.known_failure_count)
565
self.stream.writeln(")")
567
if result.known_failure_count:
568
self.stream.writeln("OK (known_failures=%d)" %
569
result.known_failure_count)
571
self.stream.writeln("OK")
572
if result.skip_count > 0:
573
skipped = result.skip_count
574
self.stream.writeln('%d test%s skipped' %
575
(skipped, skipped != 1 and "s" or ""))
576
if result.unsupported:
577
for feature, count in sorted(result.unsupported.items()):
578
self.stream.writeln("Missing feature '%s' skipped %d tests." %
864
270
# TODO: perhaps override assertEquals to call this for strings?
868
message = "texts not equal:\n"
870
message = 'first string is missing a final newline.\n'
872
message = 'second string is missing a final newline.\n'
873
raise AssertionError(message +
874
self._ndiff_strings(a, b))
273
raise AssertionError("texts not equal:\n" +
274
self._ndiff_strings(a, b))
876
def assertEqualMode(self, mode, mode_test):
877
self.assertEqual(mode, mode_test,
878
'mode mismatch %o != %o' % (mode, mode_test))
880
def assertEqualStat(self, expected, actual):
881
"""assert that expected and actual are the same stat result.
883
:param expected: A stat result.
884
:param actual: A stat result.
885
:raises AssertionError: If the expected and actual stat values differ
888
self.assertEqual(expected.st_size, actual.st_size)
889
self.assertEqual(expected.st_mtime, actual.st_mtime)
890
self.assertEqual(expected.st_ctime, actual.st_ctime)
891
self.assertEqual(expected.st_dev, actual.st_dev)
892
self.assertEqual(expected.st_ino, actual.st_ino)
893
self.assertEqual(expected.st_mode, actual.st_mode)
895
def assertPositive(self, val):
896
"""Assert that val is greater than 0."""
897
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
899
def assertNegative(self, val):
900
"""Assert that val is less than 0."""
901
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
903
276
def assertStartsWith(self, s, prefix):
904
277
if not s.startswith(prefix):
905
278
raise AssertionError('string %r does not start with %r' % (s, prefix))
907
280
def assertEndsWith(self, s, suffix):
908
"""Asserts that s ends with suffix."""
909
if not s.endswith(suffix):
281
if not s.endswith(prefix):
910
282
raise AssertionError('string %r does not end with %r' % (s, suffix))
912
284
def assertContainsRe(self, haystack, needle_re):
913
285
"""Assert that a contains something matching a regular expression."""
914
286
if not re.search(needle_re, haystack):
915
if '\n' in haystack or len(haystack) > 60:
916
# a long string, format it in a more readable way
917
raise AssertionError(
918
'pattern "%s" not found in\n"""\\\n%s"""\n'
919
% (needle_re, haystack))
921
raise AssertionError('pattern "%s" not found in "%s"'
922
% (needle_re, haystack))
924
def assertNotContainsRe(self, haystack, needle_re):
925
"""Assert that a does not match a regular expression"""
926
if re.search(needle_re, haystack):
927
raise AssertionError('pattern "%s" found in "%s"'
287
raise AssertionError('pattern "%s" not found in "%s"'
928
288
% (needle_re, haystack))
930
290
def assertSubset(self, sublist, superlist):
931
291
"""Assert that every entry in sublist is present in superlist."""
932
missing = set(sublist) - set(superlist)
293
for entry in sublist:
294
if entry not in superlist:
295
missing.append(entry)
933
296
if len(missing) > 0:
934
raise AssertionError("value(s) %r not present in container %r" %
297
raise AssertionError("value(s) %r not present in container %r" %
935
298
(missing, superlist))
937
def assertListRaises(self, excClass, func, *args, **kwargs):
938
"""Fail unless excClass is raised when the iterator from func is used.
940
Many functions can return generators this makes sure
941
to wrap them in a list() call to make sure the whole generator
942
is run, and that the proper exception is raised.
945
list(func(*args, **kwargs))
949
if getattr(excClass,'__name__', None) is not None:
950
excName = excClass.__name__
952
excName = str(excClass)
953
raise self.failureException, "%s not raised" % excName
955
def assertRaises(self, excClass, callableObj, *args, **kwargs):
956
"""Assert that a callable raises a particular exception.
958
:param excClass: As for the except statement, this may be either an
959
exception class, or a tuple of classes.
960
:param callableObj: A callable, will be passed ``*args`` and
963
Returns the exception so that you can examine it.
966
callableObj(*args, **kwargs)
970
if getattr(excClass,'__name__', None) is not None:
971
excName = excClass.__name__
974
excName = str(excClass)
975
raise self.failureException, "%s not raised" % excName
977
def assertIs(self, left, right, message=None):
300
def assertIs(self, left, right):
978
301
if not (left is right):
979
if message is not None:
980
raise AssertionError(message)
982
raise AssertionError("%r is not %r." % (left, right))
984
def assertIsNot(self, left, right, message=None):
986
if message is not None:
987
raise AssertionError(message)
989
raise AssertionError("%r is %r." % (left, right))
302
raise AssertionError("%r is not %r." % (left, right))
991
304
def assertTransportMode(self, transport, path, mode):
992
305
"""Fail if a path does not have mode mode.
994
If modes are not supported on this transport, the assertion is ignored.
307
If modes are not supported on this platform, the test is skipped.
996
if not transport._can_roundtrip_unix_modebits():
309
if sys.platform == 'win32':
998
311
path_stat = transport.stat(path)
999
312
actual_mode = stat.S_IMODE(path_stat.st_mode)
1000
313
self.assertEqual(mode, actual_mode,
1001
314
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
1003
def assertIsSameRealPath(self, path1, path2):
1004
"""Fail if path1 and path2 points to different files"""
1005
self.assertEqual(osutils.realpath(path1),
1006
osutils.realpath(path2),
1007
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1009
def assertIsInstance(self, obj, kls):
1010
"""Fail if obj is not an instance of kls"""
1011
if not isinstance(obj, kls):
1012
self.fail("%r is an instance of %s rather than %s" % (
1013
obj, obj.__class__, kls))
1015
def expectFailure(self, reason, assertion, *args, **kwargs):
1016
"""Invoke a test, expecting it to fail for the given reason.
1018
This is for assertions that ought to succeed, but currently fail.
1019
(The failure is *expected* but not *wanted*.) Please be very precise
1020
about the failure you're expecting. If a new bug is introduced,
1021
AssertionError should be raised, not KnownFailure.
1023
Frequently, expectFailure should be followed by an opposite assertion.
1026
Intended to be used with a callable that raises AssertionError as the
1027
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1029
Raises KnownFailure if the test fails. Raises AssertionError if the
1034
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1036
self.assertEqual(42, dynamic_val)
1038
This means that a dynamic_val of 54 will cause the test to raise
1039
a KnownFailure. Once math is fixed and the expectFailure is removed,
1040
only a dynamic_val of 42 will allow the test to pass. Anything other
1041
than 54 or 42 will cause an AssertionError.
1044
assertion(*args, **kwargs)
1045
except AssertionError:
1046
raise KnownFailure(reason)
1048
self.fail('Unexpected success. Should have failed: %s' % reason)
1050
def assertFileEqual(self, content, path):
1051
"""Fail if path does not contain 'content'."""
1052
self.failUnlessExists(path)
1053
f = file(path, 'rb')
1058
self.assertEqualDiff(content, s)
1060
def failUnlessExists(self, path):
1061
"""Fail unless path or paths, which may be abs or relative, exist."""
1062
if not isinstance(path, basestring):
1064
self.failUnlessExists(p)
1066
self.failUnless(osutils.lexists(path),path+" does not exist")
1068
def failIfExists(self, path):
1069
"""Fail if path or paths, which may be abs or relative, exist."""
1070
if not isinstance(path, basestring):
1072
self.failIfExists(p)
1074
self.failIf(osutils.lexists(path),path+" exists")
1076
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1077
"""A helper for callDeprecated and applyDeprecated.
1079
:param a_callable: A callable to call.
1080
:param args: The positional arguments for the callable
1081
:param kwargs: The keyword arguments for the callable
1082
:return: A tuple (warnings, result). result is the result of calling
1083
a_callable(``*args``, ``**kwargs``).
1086
def capture_warnings(msg, cls=None, stacklevel=None):
1087
# we've hooked into a deprecation specific callpath,
1088
# only deprecations should getting sent via it.
1089
self.assertEqual(cls, DeprecationWarning)
1090
local_warnings.append(msg)
1091
original_warning_method = symbol_versioning.warn
1092
symbol_versioning.set_warning_method(capture_warnings)
1094
result = a_callable(*args, **kwargs)
1096
symbol_versioning.set_warning_method(original_warning_method)
1097
return (local_warnings, result)
1099
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1100
"""Call a deprecated callable without warning the user.
1102
Note that this only captures warnings raised by symbol_versioning.warn,
1103
not other callers that go direct to the warning module.
1105
To test that a deprecated method raises an error, do something like
1108
self.assertRaises(errors.ReservedId,
1109
self.applyDeprecated,
1110
deprecated_in((1, 5, 0)),
1114
:param deprecation_format: The deprecation format that the callable
1115
should have been deprecated with. This is the same type as the
1116
parameter to deprecated_method/deprecated_function. If the
1117
callable is not deprecated with this format, an assertion error
1119
:param a_callable: A callable to call. This may be a bound method or
1120
a regular function. It will be called with ``*args`` and
1122
:param args: The positional arguments for the callable
1123
:param kwargs: The keyword arguments for the callable
1124
:return: The result of a_callable(``*args``, ``**kwargs``)
1126
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1128
expected_first_warning = symbol_versioning.deprecation_string(
1129
a_callable, deprecation_format)
1130
if len(call_warnings) == 0:
1131
self.fail("No deprecation warning generated by call to %s" %
1133
self.assertEqual(expected_first_warning, call_warnings[0])
1136
def callCatchWarnings(self, fn, *args, **kw):
1137
"""Call a callable that raises python warnings.
1139
The caller's responsible for examining the returned warnings.
1141
If the callable raises an exception, the exception is not
1142
caught and propagates up to the caller. In that case, the list
1143
of warnings is not available.
1145
:returns: ([warning_object, ...], fn_result)
1147
# XXX: This is not perfect, because it completely overrides the
1148
# warnings filters, and some code may depend on suppressing particular
1149
# warnings. It's the easiest way to insulate ourselves from -Werror,
1150
# though. -- Andrew, 20071062
1152
def _catcher(message, category, filename, lineno, file=None, line=None):
1153
# despite the name, 'message' is normally(?) a Warning subclass
1155
wlist.append(message)
1156
saved_showwarning = warnings.showwarning
1157
saved_filters = warnings.filters
1159
warnings.showwarning = _catcher
1160
warnings.filters = []
1161
result = fn(*args, **kw)
1163
warnings.showwarning = saved_showwarning
1164
warnings.filters = saved_filters
1165
return wlist, result
1167
def callDeprecated(self, expected, callable, *args, **kwargs):
1168
"""Assert that a callable is deprecated in a particular way.
1170
This is a very precise test for unusual requirements. The
1171
applyDeprecated helper function is probably more suited for most tests
1172
as it allows you to simply specify the deprecation format being used
1173
and will ensure that that is issued for the function being called.
1175
Note that this only captures warnings raised by symbol_versioning.warn,
1176
not other callers that go direct to the warning module. To catch
1177
general warnings, use callCatchWarnings.
1179
:param expected: a list of the deprecation warnings expected, in order
1180
:param callable: The callable to call
1181
:param args: The positional arguments for the callable
1182
:param kwargs: The keyword arguments for the callable
1184
call_warnings, result = self._capture_deprecation_warnings(callable,
1186
self.assertEqual(expected, call_warnings)
1189
316
def _startLogFile(self):
1190
317
"""Send bzr and test log messages to a temporary file.
1192
319
The file is removed as the test is torn down.
1194
321
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1195
self._log_file = os.fdopen(fileno, 'w+')
1196
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
322
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
323
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
324
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1197
325
self._log_file_name = name
1198
326
self.addCleanup(self._finishLogFile)
1200
328
def _finishLogFile(self):
1201
329
"""Finished with the log file.
1203
Close the file and delete it, unless setKeepLogfile was called.
331
Read contents into memory, close, and delete.
1205
if self._log_file is None:
1207
bzrlib.trace.pop_log_file(self._log_memento)
333
bzrlib.trace.disable_test_log(self._log_nonce)
334
self._log_file.seek(0)
335
self._log_contents = self._log_file.read()
1208
336
self._log_file.close()
1209
self._log_file = None
1210
if not self._keep_log_file:
1211
os.remove(self._log_file_name)
1212
self._log_file_name = None
1214
def setKeepLogfile(self):
1215
"""Make the logfile not be deleted when _finishLogFile is called."""
1216
self._keep_log_file = True
1218
def addCleanup(self, callable, *args, **kwargs):
337
os.remove(self._log_file_name)
338
self._log_file = self._log_file_name = None
340
def addCleanup(self, callable):
1219
341
"""Arrange to run a callable when this case is torn down.
1221
343
Callables are run in the reverse of the order they are registered,
1222
344
ie last-in first-out.
1224
self._cleanups.append((callable, args, kwargs))
346
if callable in self._cleanups:
347
raise ValueError("cleanup function %r already registered on %s"
349
self._cleanups.append(callable)
1226
351
def _cleanEnvironment(self):
1228
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1229
353
'HOME': os.getcwd(),
1230
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1231
# tests do check our impls match APPDATA
1232
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1234
'BZREMAIL': None, # may still be present in the environment
354
'APPDATA': os.getcwd(),
1236
'BZR_PROGRESS_BAR': None,
1239
'SSH_AUTH_SOCK': None,
1243
'https_proxy': None,
1244
'HTTPS_PROXY': None,
1249
# Nobody cares about these ones AFAIK. So far at
1250
# least. If you do (care), please update this comment
1254
'BZR_REMOTE_PATH': None,
1256
358
self.__old_env = {}
1257
359
self.addCleanup(self._restoreEnvironment)
1258
360
for name, value in new_env.iteritems():
1259
361
self._captureVar(name, value)
1261
364
def _captureVar(self, name, newvalue):
1262
"""Set an environment variable, and reset it when finished."""
1263
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
365
"""Set an environment variable, preparing it to be reset when finished."""
366
self.__old_env[name] = os.environ.get(name, None)
368
if name in os.environ:
371
os.environ[name] = newvalue
1265
def _restore_debug_flags(self):
1266
debug.debug_flags.clear()
1267
debug.debug_flags.update(self._preserved_debug_flags)
374
def _restoreVar(name, value):
376
if name in os.environ:
379
os.environ[name] = value
1269
381
def _restoreEnvironment(self):
1270
382
for name, value in self.__old_env.iteritems():
1271
osutils.set_or_unset_env(name, value)
1273
def _restoreHooks(self):
1274
for klass, hooks in self._preserved_hooks.items():
1275
setattr(klass, 'hooks', hooks)
1277
def knownFailure(self, reason):
1278
"""This test has failed for some known reason."""
1279
raise KnownFailure(reason)
1281
def run(self, result=None):
1282
if result is None: result = self.defaultTestResult()
1283
for feature in getattr(self, '_test_needs_features', []):
1284
if not feature.available():
1285
result.startTest(self)
1286
if getattr(result, 'addNotSupported', None):
1287
result.addNotSupported(self, feature)
1289
result.addSuccess(self)
1290
result.stopTest(self)
1293
return unittest.TestCase.run(self, result)
1296
absent_attr = object()
1297
for attr_name in self.attrs_to_keep:
1298
attr = getattr(self, attr_name, absent_attr)
1299
if attr is not absent_attr:
1300
saved_attrs[attr_name] = attr
1301
self.__dict__ = saved_attrs
383
self._restoreVar(name, value)
1303
385
def tearDown(self):
1304
386
self._runCleanups()
1305
387
unittest.TestCase.tearDown(self)
1307
def time(self, callable, *args, **kwargs):
1308
"""Run callable and accrue the time it takes to the benchmark time.
1310
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1311
this will cause lsprofile statistics to be gathered and stored in
1314
if self._benchtime is None:
1318
if not self._gather_lsprof_in_benchmarks:
1319
return callable(*args, **kwargs)
1321
# record this benchmark
1322
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1324
self._benchcalls.append(((callable, args, kwargs), stats))
1327
self._benchtime += time.time() - start
1329
389
def _runCleanups(self):
1330
390
"""Run registered cleanup functions.
1334
394
# TODO: Perhaps this should keep running cleanups even if
1335
395
# one of them fails?
1337
# Actually pop the cleanups from the list so tearDown running
1338
# twice is safe (this happens for skipped tests).
1339
while self._cleanups:
1340
cleanup, args, kwargs = self._cleanups.pop()
1341
cleanup(*args, **kwargs)
396
for cleanup_fn in reversed(self._cleanups):
1343
399
def log(self, *args):
1346
def _get_log(self, keep_log_file=False):
1347
"""Get the log from bzrlib.trace calls from this test.
1349
:param keep_log_file: When True, if the log is still a file on disk
1350
leave it as a file on disk. When False, if the log is still a file
1351
on disk, the log file is deleted and the log preserved as
1353
:return: A string containing the log.
1355
# flush the log file, to get all content
1357
bzrlib.trace._trace_file.flush()
1358
if self._log_contents:
1359
# XXX: this can hardly contain the content flushed above --vila
403
"""Return as a string the log for this test"""
404
if self._log_file_name:
405
return open(self._log_file_name).read()
1361
407
return self._log_contents
1362
if self._log_file_name is not None:
1363
logfile = open(self._log_file_name)
1365
log_contents = logfile.read()
1368
if not keep_log_file:
1369
self._log_contents = log_contents
1371
os.remove(self._log_file_name)
1373
if sys.platform == 'win32' and e.errno == errno.EACCES:
1374
sys.stderr.write(('Unable to delete log file '
1375
' %r\n' % self._log_file_name))
1380
return "DELETED log file to reduce memory footprint"
1382
def requireFeature(self, feature):
1383
"""This test requires a specific feature is available.
1385
:raises UnavailableFeature: When feature is not available.
408
# TODO: Delete the log after it's been read in
410
def capture(self, cmd, retcode=0):
411
"""Shortcut that splits cmd into words, runs, and returns stdout"""
412
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
414
def run_bzr_captured(self, argv, retcode=0):
415
"""Invoke bzr and return (stdout, stderr).
417
Useful for code that wants to check the contents of the
418
output, the way error messages are presented, etc.
420
This should be the main method for tests that want to exercise the
421
overall behavior of the bzr application (rather than a unit test
422
or a functional test of the library.)
424
Much of the old code runs bzr by forking a new copy of Python, but
425
that is slower, harder to debug, and generally not necessary.
427
This runs bzr through the interface that catches and reports
428
errors, and with logging set to something approximating the
429
default, so that error reporting can be checked.
431
argv -- arguments to invoke bzr
432
retcode -- expected return code, or None for don't-care.
1387
if not feature.available():
1388
raise UnavailableFeature(feature)
1390
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1392
"""Run bazaar command line, splitting up a string command line."""
1393
if isinstance(args, basestring):
1394
# shlex don't understand unicode strings,
1395
# so args should be plain string (bialix 20070906)
1396
args = list(shlex.split(str(args)))
1397
return self._run_bzr_core(args, retcode=retcode,
1398
encoding=encoding, stdin=stdin, working_dir=working_dir,
1401
def _run_bzr_core(self, args, retcode, encoding, stdin,
1403
if encoding is None:
1404
encoding = osutils.get_user_encoding()
1405
stdout = StringIOWrapper()
1406
stderr = StringIOWrapper()
1407
stdout.encoding = encoding
1408
stderr.encoding = encoding
1410
self.log('run bzr: %r', args)
436
self.log('run bzr: %s', ' '.join(argv))
1411
437
# FIXME: don't call into logging here
1412
438
handler = logging.StreamHandler(stderr)
439
handler.setFormatter(bzrlib.trace.QuietFormatter())
1413
440
handler.setLevel(logging.INFO)
1414
441
logger = logging.getLogger('')
1415
442
logger.addHandler(handler)
1416
old_ui_factory = ui.ui_factory
1417
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1420
if working_dir is not None:
1421
cwd = osutils.getcwd()
1422
os.chdir(working_dir)
1425
result = self.apply_redirected(ui.ui_factory.stdin,
1427
bzrlib.commands.run_bzr_catch_user_errors,
444
result = self.apply_redirected(None, stdout, stderr,
445
bzrlib.commands.run_bzr_catch_errors,
1430
448
logger.removeHandler(handler)
1431
ui.ui_factory = old_ui_factory
1435
449
out = stdout.getvalue()
1436
450
err = stderr.getvalue()
1438
self.log('output:\n%r', out)
452
self.log('output:\n%s', out)
1440
self.log('errors:\n%r', err)
454
self.log('errors:\n%s', err)
1441
455
if retcode is not None:
1442
self.assertEquals(retcode, result,
1443
message='Unexpected return code')
456
self.assertEquals(result, retcode)
1446
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1447
working_dir=None, error_regexes=[], output_encoding=None):
459
def run_bzr(self, *args, **kwargs):
1448
460
"""Invoke bzr, as if it were run from the command line.
1450
The argument list should not include the bzr program name - the
1451
first argument is normally the bzr command. Arguments may be
1452
passed in three ways:
1454
1- A list of strings, eg ["commit", "a"]. This is recommended
1455
when the command contains whitespace or metacharacters, or
1456
is built up at run time.
1458
2- A single string, eg "add a". This is the most convenient
1459
for hardcoded commands.
1461
This runs bzr through the interface that catches and reports
1462
errors, and with logging set to something approximating the
1463
default, so that error reporting can be checked.
1465
462
This should be the main method for tests that want to exercise the
1466
463
overall behavior of the bzr application (rather than a unit test
1467
464
or a functional test of the library.)
1469
466
This sends the stdout/stderr results into the test's log,
1470
467
where it may be useful for debugging. See also run_captured.
1472
:keyword stdin: A string to be used as stdin for the command.
1473
:keyword retcode: The status code the command should return;
1475
:keyword working_dir: The directory to run the command in
1476
:keyword error_regexes: A list of expected error messages. If
1477
specified they must be seen in the error output of the command.
1479
out, err = self._run_bzr_autosplit(
1484
working_dir=working_dir,
1486
for regex in error_regexes:
1487
self.assertContainsRe(err, regex)
1490
def run_bzr_error(self, error_regexes, *args, **kwargs):
1491
"""Run bzr, and check that stderr contains the supplied regexes
1493
:param error_regexes: Sequence of regular expressions which
1494
must each be found in the error output. The relative ordering
1496
:param args: command-line arguments for bzr
1497
:param kwargs: Keyword arguments which are interpreted by run_bzr
1498
This function changes the default value of retcode to be 3,
1499
since in most cases this is run when you expect bzr to fail.
1501
:return: (out, err) The actual output of running the command (in case
1502
you want to do more inspection)
1506
# Make sure that commit is failing because there is nothing to do
1507
self.run_bzr_error(['no changes to commit'],
1508
['commit', '-m', 'my commit comment'])
1509
# Make sure --strict is handling an unknown file, rather than
1510
# giving us the 'nothing to do' error
1511
self.build_tree(['unknown'])
1512
self.run_bzr_error(['Commit refused because there are unknown files'],
1513
['commit', --strict', '-m', 'my commit comment'])
1515
kwargs.setdefault('retcode', 3)
1516
kwargs['error_regexes'] = error_regexes
1517
out, err = self.run_bzr(*args, **kwargs)
1520
def run_bzr_subprocess(self, *args, **kwargs):
1521
"""Run bzr in a subprocess for testing.
1523
This starts a new Python interpreter and runs bzr in there.
1524
This should only be used for tests that have a justifiable need for
1525
this isolation: e.g. they are testing startup time, or signal
1526
handling, or early startup code, etc. Subprocess code can't be
1527
profiled or debugged so easily.
1529
:keyword retcode: The status code that is expected. Defaults to 0. If
1530
None is supplied, the status code is not checked.
1531
:keyword env_changes: A dictionary which lists changes to environment
1532
variables. A value of None will unset the env variable.
1533
The values must be strings. The change will only occur in the
1534
child, so you don't need to fix the environment after running.
1535
:keyword universal_newlines: Convert CRLF => LF
1536
:keyword allow_plugins: By default the subprocess is run with
1537
--no-plugins to ensure test reproducibility. Also, it is possible
1538
for system-wide plugins to create unexpected output on stderr,
1539
which can cause unnecessary test failures.
1541
env_changes = kwargs.get('env_changes', {})
1542
working_dir = kwargs.get('working_dir', None)
1543
allow_plugins = kwargs.get('allow_plugins', False)
1545
if isinstance(args[0], list):
1547
elif isinstance(args[0], basestring):
1548
args = list(shlex.split(args[0]))
1550
raise ValueError("passing varargs to run_bzr_subprocess")
1551
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1552
working_dir=working_dir,
1553
allow_plugins=allow_plugins)
1554
# We distinguish between retcode=None and retcode not passed.
1555
supplied_retcode = kwargs.get('retcode', 0)
1556
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1557
universal_newlines=kwargs.get('universal_newlines', False),
1560
def start_bzr_subprocess(self, process_args, env_changes=None,
1561
skip_if_plan_to_signal=False,
1563
allow_plugins=False):
1564
"""Start bzr in a subprocess for testing.
1566
This starts a new Python interpreter and runs bzr in there.
1567
This should only be used for tests that have a justifiable need for
1568
this isolation: e.g. they are testing startup time, or signal
1569
handling, or early startup code, etc. Subprocess code can't be
1570
profiled or debugged so easily.
1572
:param process_args: a list of arguments to pass to the bzr executable,
1573
for example ``['--version']``.
1574
:param env_changes: A dictionary which lists changes to environment
1575
variables. A value of None will unset the env variable.
1576
The values must be strings. The change will only occur in the
1577
child, so you don't need to fix the environment after running.
1578
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1580
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1582
:returns: Popen object for the started process.
1584
if skip_if_plan_to_signal:
1585
if not getattr(os, 'kill', None):
1586
raise TestSkipped("os.kill not available.")
1588
if env_changes is None:
1592
def cleanup_environment():
1593
for env_var, value in env_changes.iteritems():
1594
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1596
def restore_environment():
1597
for env_var, value in old_env.iteritems():
1598
osutils.set_or_unset_env(env_var, value)
1600
bzr_path = self.get_bzr_path()
1603
if working_dir is not None:
1604
cwd = osutils.getcwd()
1605
os.chdir(working_dir)
1608
# win32 subprocess doesn't support preexec_fn
1609
# so we will avoid using it on all platforms, just to
1610
# make sure the code path is used, and we don't break on win32
1611
cleanup_environment()
1612
command = [sys.executable]
1613
# frozen executables don't need the path to bzr
1614
if getattr(sys, "frozen", None) is None:
1615
command.append(bzr_path)
1616
if not allow_plugins:
1617
command.append('--no-plugins')
1618
command.extend(process_args)
1619
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1621
restore_environment()
1627
def _popen(self, *args, **kwargs):
1628
"""Place a call to Popen.
1630
Allows tests to override this method to intercept the calls made to
1631
Popen for introspection.
1633
return Popen(*args, **kwargs)
1635
def get_bzr_path(self):
1636
"""Return the path of the 'bzr' executable for this test suite."""
1637
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1638
if not os.path.isfile(bzr_path):
1639
# We are probably installed. Assume sys.argv is the right file
1640
bzr_path = sys.argv[0]
1643
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1644
universal_newlines=False, process_args=None):
1645
"""Finish the execution of process.
1647
:param process: the Popen object returned from start_bzr_subprocess.
1648
:param retcode: The status code that is expected. Defaults to 0. If
1649
None is supplied, the status code is not checked.
1650
:param send_signal: an optional signal to send to the process.
1651
:param universal_newlines: Convert CRLF => LF
1652
:returns: (stdout, stderr)
1654
if send_signal is not None:
1655
os.kill(process.pid, send_signal)
1656
out, err = process.communicate()
1658
if universal_newlines:
1659
out = out.replace('\r\n', '\n')
1660
err = err.replace('\r\n', '\n')
1662
if retcode is not None and retcode != process.returncode:
1663
if process_args is None:
1664
process_args = "(unknown args)"
1665
mutter('Output of bzr %s:\n%s', process_args, out)
1666
mutter('Error for bzr %s:\n%s', process_args, err)
1667
self.fail('Command bzr %s failed with retcode %s != %s'
1668
% (process_args, retcode, process.returncode))
469
retcode = kwargs.pop('retcode', 0)
470
return self.run_bzr_captured(args, retcode)
1671
472
def check_inventory_shape(self, inv, shape):
1672
473
"""Compare an inventory to a list of expected names.
1720
521
sys.stderr = real_stderr
1721
522
sys.stdin = real_stdin
1723
def reduceLockdirTimeout(self):
1724
"""Reduce the default lock timeout for the duration of the test, so that
1725
if LockContention occurs during a test, it does so quickly.
1727
Tests that expect to provoke LockContention errors should call this.
1729
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1731
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1732
self.addCleanup(resetTimeout)
1733
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1735
def make_utf8_encoded_stringio(self, encoding_type=None):
1736
"""Return a StringIOWrapper instance, that will encode Unicode
1739
if encoding_type is None:
1740
encoding_type = 'strict'
1742
output_encoding = 'utf-8'
1743
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1744
sio.encoding = output_encoding
1748
class TestCaseWithMemoryTransport(TestCase):
1749
"""Common test class for tests that do not need disk resources.
1751
Tests that need disk resources should derive from TestCaseWithTransport.
1753
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1755
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1756
a directory which does not exist. This serves to help ensure test isolation
1757
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1758
must exist. However, TestCaseWithMemoryTransport does not offer local
1759
file defaults for the transport in tests, nor does it obey the command line
1760
override, so tests that accidentally write to the common directory should
1763
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1764
a .bzr directory that stops us ascending higher into the filesystem.
1770
def __init__(self, methodName='runTest'):
1771
# allow test parameterization after test construction and before test
1772
# execution. Variables that the parameterizer sets need to be
1773
# ones that are not set by setUp, or setUp will trash them.
1774
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1775
self.vfs_transport_factory = default_transport
1776
self.transport_server = None
1777
self.transport_readonly_server = None
1778
self.__vfs_server = None
1780
def get_transport(self, relpath=None):
1781
"""Return a writeable transport.
1783
This transport is for the test scratch space relative to
1786
:param relpath: a path relative to the base url.
1788
t = get_transport(self.get_url(relpath))
1789
self.assertFalse(t.is_readonly())
1792
def get_readonly_transport(self, relpath=None):
1793
"""Return a readonly transport for the test scratch space
1795
This can be used to test that operations which should only need
1796
readonly access in fact do not try to write.
1798
:param relpath: a path relative to the base url.
1800
t = get_transport(self.get_readonly_url(relpath))
1801
self.assertTrue(t.is_readonly())
1804
def create_transport_readonly_server(self):
1805
"""Create a transport server from class defined at init.
1807
This is mostly a hook for daughter classes.
1809
return self.transport_readonly_server()
1811
def get_readonly_server(self):
1812
"""Get the server instance for the readonly transport
1814
This is useful for some tests with specific servers to do diagnostics.
1816
if self.__readonly_server is None:
1817
if self.transport_readonly_server is None:
1818
# readonly decorator requested
1819
# bring up the server
1820
self.__readonly_server = ReadonlyServer()
1821
self.__readonly_server.setUp(self.get_vfs_only_server())
1823
self.__readonly_server = self.create_transport_readonly_server()
1824
self.__readonly_server.setUp(self.get_vfs_only_server())
1825
self.addCleanup(self.__readonly_server.tearDown)
1826
return self.__readonly_server
1828
def get_readonly_url(self, relpath=None):
1829
"""Get a URL for the readonly transport.
1831
This will either be backed by '.' or a decorator to the transport
1832
used by self.get_url()
1833
relpath provides for clients to get a path relative to the base url.
1834
These should only be downwards relative, not upwards.
1836
base = self.get_readonly_server().get_url()
1837
return self._adjust_url(base, relpath)
1839
def get_vfs_only_server(self):
1840
"""Get the vfs only read/write server instance.
1842
This is useful for some tests with specific servers that need
1845
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1846
is no means to override it.
1848
if self.__vfs_server is None:
1849
self.__vfs_server = MemoryServer()
1850
self.__vfs_server.setUp()
1851
self.addCleanup(self.__vfs_server.tearDown)
1852
return self.__vfs_server
1854
def get_server(self):
1855
"""Get the read/write server instance.
1857
This is useful for some tests with specific servers that need
1860
This is built from the self.transport_server factory. If that is None,
1861
then the self.get_vfs_server is returned.
1863
if self.__server is None:
1864
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1865
return self.get_vfs_only_server()
1867
# bring up a decorated means of access to the vfs only server.
1868
self.__server = self.transport_server()
1870
self.__server.setUp(self.get_vfs_only_server())
1871
except TypeError, e:
1872
# This should never happen; the try:Except here is to assist
1873
# developers having to update code rather than seeing an
1874
# uninformative TypeError.
1875
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1876
self.addCleanup(self.__server.tearDown)
1877
return self.__server
1879
def _adjust_url(self, base, relpath):
1880
"""Get a URL (or maybe a path) for the readwrite transport.
1882
This will either be backed by '.' or to an equivalent non-file based
1884
relpath provides for clients to get a path relative to the base url.
1885
These should only be downwards relative, not upwards.
1887
if relpath is not None and relpath != '.':
1888
if not base.endswith('/'):
1890
# XXX: Really base should be a url; we did after all call
1891
# get_url()! But sometimes it's just a path (from
1892
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1893
# to a non-escaped local path.
1894
if base.startswith('./') or base.startswith('/'):
1897
base += urlutils.escape(relpath)
1900
def get_url(self, relpath=None):
1901
"""Get a URL (or maybe a path) for the readwrite transport.
1903
This will either be backed by '.' or to an equivalent non-file based
1905
relpath provides for clients to get a path relative to the base url.
1906
These should only be downwards relative, not upwards.
1908
base = self.get_server().get_url()
1909
return self._adjust_url(base, relpath)
1911
def get_vfs_only_url(self, relpath=None):
1912
"""Get a URL (or maybe a path for the plain old vfs transport.
1914
This will never be a smart protocol. It always has all the
1915
capabilities of the local filesystem, but it might actually be a
1916
MemoryTransport or some other similar virtual filesystem.
1918
This is the backing transport (if any) of the server returned by
1919
get_url and get_readonly_url.
1921
:param relpath: provides for clients to get a path relative to the base
1922
url. These should only be downwards relative, not upwards.
1925
base = self.get_vfs_only_server().get_url()
1926
return self._adjust_url(base, relpath)
1928
def _create_safety_net(self):
1929
"""Make a fake bzr directory.
1931
This prevents any tests propagating up onto the TEST_ROOT directory's
1934
root = TestCaseWithMemoryTransport.TEST_ROOT
1935
bzrdir.BzrDir.create_standalone_workingtree(root)
1937
def _check_safety_net(self):
1938
"""Check that the safety .bzr directory have not been touched.
1940
_make_test_root have created a .bzr directory to prevent tests from
1941
propagating. This method ensures than a test did not leaked.
1943
root = TestCaseWithMemoryTransport.TEST_ROOT
1944
wt = workingtree.WorkingTree.open(root)
1945
last_rev = wt.last_revision()
1946
if last_rev != 'null:':
1947
# The current test have modified the /bzr directory, we need to
1948
# recreate a new one or all the followng tests will fail.
1949
# If you need to inspect its content uncomment the following line
1950
# import pdb; pdb.set_trace()
1951
_rmtree_temp_dir(root + '/.bzr')
1952
self._create_safety_net()
1953
raise AssertionError('%s/.bzr should not be modified' % root)
1955
def _make_test_root(self):
1956
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1957
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1958
TestCaseWithMemoryTransport.TEST_ROOT = root
1960
self._create_safety_net()
1962
# The same directory is used by all tests, and we're not
1963
# specifically told when all tests are finished. This will do.
1964
atexit.register(_rmtree_temp_dir, root)
1966
self.addCleanup(self._check_safety_net)
1968
def makeAndChdirToTestDir(self):
1969
"""Create a temporary directories for this one test.
1971
This must set self.test_home_dir and self.test_dir and chdir to
1974
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1976
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1977
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1978
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1980
def make_branch(self, relpath, format=None):
1981
"""Create a branch on the transport at relpath."""
1982
repo = self.make_repository(relpath, format=format)
1983
return repo.bzrdir.create_branch()
1985
def make_bzrdir(self, relpath, format=None):
1987
# might be a relative or absolute path
1988
maybe_a_url = self.get_url(relpath)
1989
segments = maybe_a_url.rsplit('/', 1)
1990
t = get_transport(maybe_a_url)
1991
if len(segments) > 1 and segments[-1] not in ('', '.'):
1995
if isinstance(format, basestring):
1996
format = bzrdir.format_registry.make_bzrdir(format)
1997
return format.initialize_on_transport(t)
1998
except errors.UninitializableFormat:
1999
raise TestSkipped("Format %s is not initializable." % format)
2001
def make_repository(self, relpath, shared=False, format=None):
2002
"""Create a repository on our default transport at relpath.
2004
Note that relpath must be a relative path, not a full url.
2006
# FIXME: If you create a remoterepository this returns the underlying
2007
# real format, which is incorrect. Actually we should make sure that
2008
# RemoteBzrDir returns a RemoteRepository.
2009
# maybe mbp 20070410
2010
made_control = self.make_bzrdir(relpath, format=format)
2011
return made_control.create_repository(shared=shared)
2013
def make_branch_and_memory_tree(self, relpath, format=None):
2014
"""Create a branch on the default transport and a MemoryTree for it."""
2015
b = self.make_branch(relpath, format=format)
2016
return memorytree.MemoryTree.create_on_branch(b)
2018
def make_branch_builder(self, relpath, format=None):
2019
url = self.get_url(relpath)
2020
tran = get_transport(url)
2021
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2023
def overrideEnvironmentForTesting(self):
2024
os.environ['HOME'] = self.test_home_dir
2025
os.environ['BZR_HOME'] = self.test_home_dir
2028
super(TestCaseWithMemoryTransport, self).setUp()
2029
self._make_test_root()
2030
_currentdir = os.getcwdu()
2031
def _leaveDirectory():
2032
os.chdir(_currentdir)
2033
self.addCleanup(_leaveDirectory)
2034
self.makeAndChdirToTestDir()
2035
self.overrideEnvironmentForTesting()
2036
self.__readonly_server = None
2037
self.__server = None
2038
self.reduceLockdirTimeout()
525
BzrTestBase = TestCase
2041
class TestCaseInTempDir(TestCaseWithMemoryTransport):
528
class TestCaseInTempDir(TestCase):
2042
529
"""Derived class that runs a test within a temporary directory.
2044
531
This is useful for tests that need to create a branch, etc.
2284
758
def setUp(self):
2285
759
super(ChrootedTestCase, self).setUp()
2286
if not self.vfs_transport_factory == MemoryServer:
2287
self.transport_readonly_server = HttpServer
2290
def condition_id_re(pattern):
2291
"""Create a condition filter which performs a re check on a test's id.
2293
:param pattern: A regular expression string.
2294
:return: A callable that returns True if the re matches.
760
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
761
self.transport_readonly_server = bzrlib.transport.http.HttpServer
764
def filter_suite_by_re(suite, pattern):
2296
766
filter_re = re.compile(pattern)
2297
def condition(test):
2299
return filter_re.search(test_id)
2303
def condition_isinstance(klass_or_klass_list):
2304
"""Create a condition filter which returns isinstance(param, klass).
2306
:return: A callable which when called with one parameter obj return the
2307
result of isinstance(obj, klass_or_klass_list).
2310
return isinstance(obj, klass_or_klass_list)
2314
def condition_id_in_list(id_list):
2315
"""Create a condition filter which verify that test's id in a list.
2317
:param id_list: A TestIdList object.
2318
:return: A callable that returns True if the test's id appears in the list.
2320
def condition(test):
2321
return id_list.includes(test.id())
2325
def condition_id_startswith(starts):
2326
"""Create a condition filter verifying that test's id starts with a string.
2328
:param starts: A list of string.
2329
:return: A callable that returns True if the test's id starts with one of
2332
def condition(test):
2333
for start in starts:
2334
if test.id().startswith(start):
2340
def exclude_tests_by_condition(suite, condition):
2341
"""Create a test suite which excludes some tests from suite.
2343
:param suite: The suite to get tests from.
2344
:param condition: A callable whose result evaluates True when called with a
2345
test case which should be excluded from the result.
2346
:return: A suite which contains the tests found in suite that fail
2350
for test in iter_suite_tests(suite):
2351
if not condition(test):
2353
return TestUtil.TestSuite(result)
2356
def filter_suite_by_condition(suite, condition):
2357
"""Create a test suite by filtering another one.
2359
:param suite: The source suite.
2360
:param condition: A callable whose result evaluates True when called with a
2361
test case which should be included in the result.
2362
:return: A suite which contains the tests found in suite that pass
2366
for test in iter_suite_tests(suite):
2369
return TestUtil.TestSuite(result)
2372
def filter_suite_by_re(suite, pattern):
2373
"""Create a test suite by filtering another one.
2375
:param suite: the source suite
2376
:param pattern: pattern that names must match
2377
:returns: the newly created suite
2379
condition = condition_id_re(pattern)
2380
result_suite = filter_suite_by_condition(suite, condition)
2384
def filter_suite_by_id_list(suite, test_id_list):
2385
"""Create a test suite by filtering another one.
2387
:param suite: The source suite.
2388
:param test_id_list: A list of the test ids to keep as strings.
2389
:returns: the newly created suite
2391
condition = condition_id_in_list(test_id_list)
2392
result_suite = filter_suite_by_condition(suite, condition)
2396
def filter_suite_by_id_startswith(suite, start):
2397
"""Create a test suite by filtering another one.
2399
:param suite: The source suite.
2400
:param start: A list of string the test id must start with one of.
2401
:returns: the newly created suite
2403
condition = condition_id_startswith(start)
2404
result_suite = filter_suite_by_condition(suite, condition)
2408
def exclude_tests_by_re(suite, pattern):
2409
"""Create a test suite which excludes some tests from suite.
2411
:param suite: The suite to get tests from.
2412
:param pattern: A regular expression string. Test ids that match this
2413
pattern will be excluded from the result.
2414
:return: A TestSuite that contains all the tests from suite without the
2415
tests that matched pattern. The order of tests is the same as it was in
2418
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2421
def preserve_input(something):
2422
"""A helper for performing test suite transformation chains.
2424
:param something: Anything you want to preserve.
2430
def randomize_suite(suite):
2431
"""Return a new TestSuite with suite's tests in random order.
2433
The tests in the input suite are flattened into a single suite in order to
2434
accomplish this. Any nested TestSuites are removed to provide global
2437
tests = list(iter_suite_tests(suite))
2438
random.shuffle(tests)
2439
return TestUtil.TestSuite(tests)
2442
def split_suite_by_condition(suite, condition):
2443
"""Split a test suite into two by a condition.
2445
:param suite: The suite to split.
2446
:param condition: The condition to match on. Tests that match this
2447
condition are returned in the first test suite, ones that do not match
2448
are in the second suite.
2449
:return: A tuple of two test suites, where the first contains tests from
2450
suite matching the condition, and the second contains the remainder
2451
from suite. The order within each output suite is the same as it was in
2456
for test in iter_suite_tests(suite):
2458
matched.append(test)
2460
did_not_match.append(test)
2461
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2464
def split_suite_by_re(suite, pattern):
2465
"""Split a test suite into two by a regular expression.
2467
:param suite: The suite to split.
2468
:param pattern: A regular expression string. Test ids that match this
2469
pattern will be in the first test suite returned, and the others in the
2470
second test suite returned.
2471
:return: A tuple of two test suites, where the first contains tests from
2472
suite matching pattern, and the second contains the remainder from
2473
suite. The order within each output suite is the same as it was in
2476
return split_suite_by_condition(suite, condition_id_re(pattern))
767
for test in iter_suite_tests(suite):
768
if filter_re.search(test.id()):
2479
773
def run_suite(suite, name='test', verbose=False, pattern=".*",
2480
stop_on_failure=False,
2481
transport=None, lsprof_timed=None, bench_history=None,
2482
matching_tests_first=None,
2485
exclude_pattern=None,
2487
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
774
stop_on_failure=False, keep_output=False,
776
TestCaseInTempDir._TEST_NAME = name
2492
781
runner = TextTestRunner(stream=sys.stdout,
2494
verbosity=verbosity,
2495
bench_history=bench_history,
2496
list_only=list_only,
2498
784
runner.stop_on_failure=stop_on_failure
2499
# Initialise the random number generator and display the seed used.
2500
# We convert the seed to a long to make it reuseable across invocations.
2501
random_order = False
2502
if random_seed is not None:
2504
if random_seed == "now":
2505
random_seed = long(time.time())
2507
# Convert the seed to a long if we can
2509
random_seed = long(random_seed)
2512
runner.stream.writeln("Randomizing test order using seed %s\n" %
2514
random.seed(random_seed)
2515
# Customise the list of tests if requested
2516
if exclude_pattern is not None:
2517
suite = exclude_tests_by_re(suite, exclude_pattern)
2519
order_changer = randomize_suite
2521
order_changer = preserve_input
2522
if pattern != '.*' or random_order:
2523
if matching_tests_first:
2524
suites = map(order_changer, split_suite_by_re(suite, pattern))
2525
suite = TestUtil.TestSuite(suites)
2527
suite = order_changer(filter_suite_by_re(suite, pattern))
786
suite = filter_suite_by_re(suite, pattern)
2529
787
result = runner.run(suite)
2532
return result.wasStrictlySuccessful()
788
# This is still a little bogus,
789
# but only a little. Folk not using our testrunner will
790
# have to delete their temp directories themselves.
791
if result.wasSuccessful() or not keep_output:
792
if TestCaseInTempDir.TEST_ROOT is not None:
793
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
795
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
2534
796
return result.wasSuccessful()
2537
# Controlled by "bzr selftest -E=..." option
2538
selftest_debug_flags = set()
2541
799
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2543
test_suite_factory=None,
2546
matching_tests_first=None,
2549
exclude_pattern=None,
2555
802
"""Run the whole test suite under the enhanced runner"""
2556
# XXX: Very ugly way to do this...
2557
# Disable warning about old formats because we don't want it to disturb
2558
# any blackbox tests.
2559
from bzrlib import repository
2560
repository._deprecation_warning_done = True
2562
803
global default_transport
2563
804
if transport is None:
2564
805
transport = default_transport
2565
806
old_transport = default_transport
2566
807
default_transport = transport
2567
global selftest_debug_flags
2568
old_debug_flags = selftest_debug_flags
2569
if debug_flags is not None:
2570
selftest_debug_flags = set(debug_flags)
2572
if load_list is None:
2575
keep_only = load_test_id_list(load_list)
2576
if test_suite_factory is None:
2577
suite = test_suite(keep_only, starting_with)
2579
suite = test_suite_factory()
2580
810
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2581
stop_on_failure=stop_on_failure,
2582
transport=transport,
2583
lsprof_timed=lsprof_timed,
2584
bench_history=bench_history,
2585
matching_tests_first=matching_tests_first,
2586
list_only=list_only,
2587
random_seed=random_seed,
2588
exclude_pattern=exclude_pattern,
811
stop_on_failure=stop_on_failure, keep_output=keep_output,
2591
814
default_transport = old_transport
2592
selftest_debug_flags = old_debug_flags
2595
def load_test_id_list(file_name):
2596
"""Load a test id list from a text file.
2598
The format is one test id by line. No special care is taken to impose
2599
strict rules, these test ids are used to filter the test suite so a test id
2600
that do not match an existing test will do no harm. This allows user to add
2601
comments, leave blank lines, etc.
2605
ftest = open(file_name, 'rt')
2607
if e.errno != errno.ENOENT:
2610
raise errors.NoSuchFile(file_name)
2612
for test_name in ftest.readlines():
2613
test_list.append(test_name.strip())
2618
def suite_matches_id_list(test_suite, id_list):
2619
"""Warns about tests not appearing or appearing more than once.
2621
:param test_suite: A TestSuite object.
2622
:param test_id_list: The list of test ids that should be found in
2625
:return: (absents, duplicates) absents is a list containing the test found
2626
in id_list but not in test_suite, duplicates is a list containing the
2627
test found multiple times in test_suite.
2629
When using a prefined test id list, it may occurs that some tests do not
2630
exist anymore or that some tests use the same id. This function warns the
2631
tester about potential problems in his workflow (test lists are volatile)
2632
or in the test suite itself (using the same id for several tests does not
2633
help to localize defects).
2635
# Build a dict counting id occurrences
2637
for test in iter_suite_tests(test_suite):
2639
tests[id] = tests.get(id, 0) + 1
2644
occurs = tests.get(id, 0)
2646
not_found.append(id)
2648
duplicates.append(id)
2650
return not_found, duplicates
2653
class TestIdList(object):
2654
"""Test id list to filter a test suite.
2656
Relying on the assumption that test ids are built as:
2657
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2658
notation, this class offers methods to :
2659
- avoid building a test suite for modules not refered to in the test list,
2660
- keep only the tests listed from the module test suite.
2663
def __init__(self, test_id_list):
2664
# When a test suite needs to be filtered against us we compare test ids
2665
# for equality, so a simple dict offers a quick and simple solution.
2666
self.tests = dict().fromkeys(test_id_list, True)
2668
# While unittest.TestCase have ids like:
2669
# <module>.<class>.<method>[(<param+)],
2670
# doctest.DocTestCase can have ids like:
2673
# <module>.<function>
2674
# <module>.<class>.<method>
2676
# Since we can't predict a test class from its name only, we settle on
2677
# a simple constraint: a test id always begins with its module name.
2680
for test_id in test_id_list:
2681
parts = test_id.split('.')
2682
mod_name = parts.pop(0)
2683
modules[mod_name] = True
2685
mod_name += '.' + part
2686
modules[mod_name] = True
2687
self.modules = modules
2689
def refers_to(self, module_name):
2690
"""Is there tests for the module or one of its sub modules."""
2691
return self.modules.has_key(module_name)
2693
def includes(self, test_id):
2694
return self.tests.has_key(test_id)
2697
class TestPrefixAliasRegistry(registry.Registry):
2698
"""A registry for test prefix aliases.
2700
This helps implement shorcuts for the --starting-with selftest
2701
option. Overriding existing prefixes is not allowed but not fatal (a
2702
warning will be emitted).
2705
def register(self, key, obj, help=None, info=None,
2706
override_existing=False):
2707
"""See Registry.register.
2709
Trying to override an existing alias causes a warning to be emitted,
2710
not a fatal execption.
2713
super(TestPrefixAliasRegistry, self).register(
2714
key, obj, help=help, info=info, override_existing=False)
2716
actual = self.get(key)
2717
note('Test prefix alias %s is already used for %s, ignoring %s'
2718
% (key, actual, obj))
2720
def resolve_alias(self, id_start):
2721
"""Replace the alias by the prefix in the given string.
2723
Using an unknown prefix is an error to help catching typos.
2725
parts = id_start.split('.')
2727
parts[0] = self.get(parts[0])
2729
raise errors.BzrCommandError(
2730
'%s is not a known test prefix alias' % parts[0])
2731
return '.'.join(parts)
2734
test_prefix_alias_registry = TestPrefixAliasRegistry()
2735
"""Registry of test prefix aliases."""
2738
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2739
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2740
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2742
# Obvious higest levels prefixes, feel free to add your own via a plugin
2743
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2744
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2745
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2746
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2747
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2750
def test_suite(keep_only=None, starting_with=None):
2751
"""Build and return TestSuite for the whole of bzrlib.
2753
:param keep_only: A list of test ids limiting the suite returned.
2755
:param starting_with: An id limiting the suite returned to the tests
2758
This function can be replaced if you need to change the default test
2759
suite on a global basis, but it is not encouraged.
2763
'bzrlib.tests.blackbox',
2764
'bzrlib.tests.branch_implementations',
2765
'bzrlib.tests.bzrdir_implementations',
2766
'bzrlib.tests.commands',
2767
'bzrlib.tests.interrepository_implementations',
2768
'bzrlib.tests.intertree_implementations',
2769
'bzrlib.tests.inventory_implementations',
2770
'bzrlib.tests.per_lock',
2771
'bzrlib.tests.per_repository',
2772
'bzrlib.tests.per_repository_reference',
2773
'bzrlib.tests.test__dirstate_helpers',
2774
'bzrlib.tests.test__walkdirs_win32',
819
"""Build and return TestSuite for the whole program."""
820
from doctest import DocTestSuite
822
global MODULES_TO_DOCTEST
2775
825
'bzrlib.tests.test_ancestry',
2776
826
'bzrlib.tests.test_annotate',
2777
827
'bzrlib.tests.test_api',
2778
'bzrlib.tests.test_atomicfile',
2779
828
'bzrlib.tests.test_bad_files',
2780
'bzrlib.tests.test_bisect_multi',
829
'bzrlib.tests.test_basis_inventory',
2781
830
'bzrlib.tests.test_branch',
2782
'bzrlib.tests.test_branchbuilder',
2783
'bzrlib.tests.test_btree_index',
2784
'bzrlib.tests.test_bugtracker',
2785
'bzrlib.tests.test_bundle',
2786
'bzrlib.tests.test_bzrdir',
2787
'bzrlib.tests.test_cache_utf8',
2788
'bzrlib.tests.test_chunk_writer',
2789
'bzrlib.tests.test_commands',
831
'bzrlib.tests.test_command',
2790
832
'bzrlib.tests.test_commit',
2791
833
'bzrlib.tests.test_commit_merge',
2792
834
'bzrlib.tests.test_config',
2793
835
'bzrlib.tests.test_conflicts',
2794
'bzrlib.tests.test_counted_lock',
2795
836
'bzrlib.tests.test_decorators',
2796
'bzrlib.tests.test_delta',
2797
'bzrlib.tests.test_deprecated_graph',
2798
837
'bzrlib.tests.test_diff',
2799
'bzrlib.tests.test_directory_service',
2800
'bzrlib.tests.test_dirstate',
2801
'bzrlib.tests.test_email_message',
2802
'bzrlib.tests.test_errors',
2803
'bzrlib.tests.test_extract',
838
'bzrlib.tests.test_doc_generate',
2804
839
'bzrlib.tests.test_fetch',
2805
'bzrlib.tests.test_ftp_transport',
2806
'bzrlib.tests.test_generate_docs',
2807
'bzrlib.tests.test_generate_ids',
2808
'bzrlib.tests.test_globbing',
840
'bzrlib.tests.test_fileid_involved',
2809
841
'bzrlib.tests.test_gpg',
2810
842
'bzrlib.tests.test_graph',
2811
843
'bzrlib.tests.test_hashcache',
2812
'bzrlib.tests.test_help',
2813
'bzrlib.tests.test_hooks',
2814
844
'bzrlib.tests.test_http',
2815
'bzrlib.tests.test_http_implementations',
2816
'bzrlib.tests.test_http_response',
2817
'bzrlib.tests.test_https_ca_bundle',
2818
845
'bzrlib.tests.test_identitymap',
2819
'bzrlib.tests.test_ignores',
2820
'bzrlib.tests.test_index',
2821
'bzrlib.tests.test_info',
2822
846
'bzrlib.tests.test_inv',
2823
'bzrlib.tests.test_knit',
2824
'bzrlib.tests.test_lazy_import',
2825
'bzrlib.tests.test_lazy_regex',
847
'bzrlib.tests.test_lockdir',
2826
848
'bzrlib.tests.test_lockable_files',
2827
'bzrlib.tests.test_lockdir',
2828
849
'bzrlib.tests.test_log',
2829
'bzrlib.tests.test_lru_cache',
2830
'bzrlib.tests.test_lsprof',
2831
'bzrlib.tests.test_mail_client',
2832
'bzrlib.tests.test_memorytree',
2833
850
'bzrlib.tests.test_merge',
2834
851
'bzrlib.tests.test_merge3',
2835
852
'bzrlib.tests.test_merge_core',
2836
'bzrlib.tests.test_merge_directive',
2837
853
'bzrlib.tests.test_missing',
2838
854
'bzrlib.tests.test_msgeditor',
2839
'bzrlib.tests.test_multiparent',
2840
'bzrlib.tests.test_mutabletree',
2841
855
'bzrlib.tests.test_nonascii',
2842
856
'bzrlib.tests.test_options',
2843
857
'bzrlib.tests.test_osutils',
2844
'bzrlib.tests.test_osutils_encodings',
2845
'bzrlib.tests.test_pack',
2846
'bzrlib.tests.test_pack_repository',
2847
'bzrlib.tests.test_patch',
2848
'bzrlib.tests.test_patches',
858
'bzrlib.tests.test_parent',
2849
859
'bzrlib.tests.test_permissions',
2850
860
'bzrlib.tests.test_plugins',
2851
'bzrlib.tests.test_progress',
2852
'bzrlib.tests.test_read_bundle',
2853
'bzrlib.tests.test_reconcile',
2854
'bzrlib.tests.test_reconfigure',
2855
'bzrlib.tests.test_registry',
2856
'bzrlib.tests.test_remote',
2857
'bzrlib.tests.test_repository',
2858
'bzrlib.tests.test_revert',
2859
861
'bzrlib.tests.test_revision',
2860
'bzrlib.tests.test_revisionspec',
2861
'bzrlib.tests.test_revisiontree',
862
'bzrlib.tests.test_revisionnamespaces',
863
'bzrlib.tests.test_revprops',
864
'bzrlib.tests.test_reweave',
2862
865
'bzrlib.tests.test_rio',
2863
'bzrlib.tests.test_rules',
2864
866
'bzrlib.tests.test_sampler',
2865
867
'bzrlib.tests.test_selftest',
2866
868
'bzrlib.tests.test_setup',
2867
869
'bzrlib.tests.test_sftp_transport',
2868
'bzrlib.tests.test_smart',
2869
870
'bzrlib.tests.test_smart_add',
2870
'bzrlib.tests.test_smart_transport',
2871
'bzrlib.tests.test_smtp_connection',
2872
871
'bzrlib.tests.test_source',
2873
'bzrlib.tests.test_ssh_transport',
2874
'bzrlib.tests.test_status',
2875
872
'bzrlib.tests.test_store',
2876
'bzrlib.tests.test_strace',
2877
'bzrlib.tests.test_subsume',
2878
'bzrlib.tests.test_switch',
2879
873
'bzrlib.tests.test_symbol_versioning',
2880
'bzrlib.tests.test_tag',
2881
874
'bzrlib.tests.test_testament',
2882
'bzrlib.tests.test_textfile',
2883
'bzrlib.tests.test_textmerge',
2884
'bzrlib.tests.test_timestamp',
2885
875
'bzrlib.tests.test_trace',
2886
876
'bzrlib.tests.test_transactions',
2887
'bzrlib.tests.test_transform',
2888
877
'bzrlib.tests.test_transport',
2889
'bzrlib.tests.test_transport_implementations',
2890
'bzrlib.tests.test_transport_log',
2891
'bzrlib.tests.test_tree',
2892
'bzrlib.tests.test_treebuilder',
2893
878
'bzrlib.tests.test_tsort',
2894
'bzrlib.tests.test_tuned_gzip',
2895
879
'bzrlib.tests.test_ui',
2896
880
'bzrlib.tests.test_uncommit',
2897
881
'bzrlib.tests.test_upgrade',
2898
'bzrlib.tests.test_upgrade_stacked',
2899
'bzrlib.tests.test_urlutils',
2900
'bzrlib.tests.test_version',
2901
'bzrlib.tests.test_version_info',
2902
'bzrlib.tests.test_versionedfile',
2903
882
'bzrlib.tests.test_weave',
2904
883
'bzrlib.tests.test_whitebox',
2905
'bzrlib.tests.test_win32utils',
2906
884
'bzrlib.tests.test_workingtree',
2907
'bzrlib.tests.test_workingtree_4',
2908
'bzrlib.tests.test_wsgi',
2909
885
'bzrlib.tests.test_xml',
2910
'bzrlib.tests.tree_implementations',
2911
'bzrlib.tests.workingtree_implementations',
2912
'bzrlib.util.tests.test_bencode',
2915
loader = TestUtil.TestLoader()
2918
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2919
for start in starting_with]
2920
# We take precedence over keep_only because *at loading time* using
2921
# both options means we will load less tests for the same final result.
2922
def interesting_module(name):
2923
for start in starting_with:
2925
# Either the module name starts with the specified string
2926
name.startswith(start)
2927
# or it may contain tests starting with the specified string
2928
or start.startswith(name)
2932
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2934
elif keep_only is not None:
2935
id_filter = TestIdList(keep_only)
2936
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2937
def interesting_module(name):
2938
return id_filter.refers_to(name)
2941
loader = TestUtil.TestLoader()
2942
def interesting_module(name):
2943
# No filtering, all modules are interesting
2946
suite = loader.suiteClass()
2948
# modules building their suite with loadTestsFromModuleNames
2949
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2951
modules_to_doctest = [
2956
'bzrlib.iterablefile',
2961
'bzrlib.symbol_versioning',
2964
'bzrlib.version_info_formats.format_custom',
2967
for mod in modules_to_doctest:
2968
if not interesting_module(mod):
2969
# No tests to keep here, move along
2972
doc_suite = doctest.DocTestSuite(mod)
2973
except ValueError, e:
2974
print '**failed to get doctest for: %s\n%s' % (mod, e)
2976
suite.addTest(doc_suite)
2978
default_encoding = sys.getdefaultencoding()
2979
for name, plugin in bzrlib.plugin.plugins().items():
2980
if not interesting_module(plugin.module.__name__):
2982
plugin_suite = plugin.test_suite()
2983
# We used to catch ImportError here and turn it into just a warning,
2984
# but really if you don't have --no-plugins this should be a failure.
2985
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2986
if plugin_suite is None:
2987
plugin_suite = plugin.load_plugin_tests(loader)
2988
if plugin_suite is not None:
2989
suite.addTest(plugin_suite)
2990
if default_encoding != sys.getdefaultencoding():
2991
bzrlib.trace.warning(
2992
'Plugin "%s" tried to reset default encoding to: %s', name,
2993
sys.getdefaultencoding())
2995
sys.setdefaultencoding(default_encoding)
2998
suite = filter_suite_by_id_startswith(suite, starting_with)
3000
if keep_only is not None:
3001
# Now that the referred modules have loaded their tests, keep only the
3003
suite = filter_suite_by_id_list(suite, id_filter)
3004
# Do some sanity checks on the id_list filtering
3005
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3007
# The tester has used both keep_only and starting_with, so he is
3008
# already aware that some tests are excluded from the list, there
3009
# is no need to tell him which.
3012
# Some tests mentioned in the list are not in the test suite. The
3013
# list may be out of date, report to the tester.
3014
for id in not_found:
3015
bzrlib.trace.warning('"%s" not found in the test suite', id)
3016
for id in duplicates:
3017
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3022
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3023
"""Adapt all tests in some given modules to given scenarios.
3025
This is the recommended public interface for test parameterization.
3026
Typically the test_suite() method for a per-implementation test
3027
suite will call multiply_tests_from_modules and return the
3030
:param module_name_list: List of fully-qualified names of test
3032
:param scenario_iter: Iterable of pairs of (scenario_name,
3033
scenario_param_dict).
3034
:param loader: If provided, will be used instead of a new
3035
bzrlib.tests.TestLoader() instance.
3037
This returns a new TestSuite containing the cross product of
3038
all the tests in all the modules, each repeated for each scenario.
3039
Each test is adapted by adding the scenario name at the end
3040
of its name, and updating the test object's __dict__ with the
3041
scenario_param_dict.
3043
>>> r = multiply_tests_from_modules(
3044
... ['bzrlib.tests.test_sampler'],
3045
... [('one', dict(param=1)),
3046
... ('two', dict(param=2))])
3047
>>> tests = list(iter_suite_tests(r))
3051
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3057
# XXX: Isn't load_tests() a better way to provide the same functionality
3058
# without forcing a predefined TestScenarioApplier ? --vila 080215
3060
loader = TestUtil.TestLoader()
3062
suite = loader.suiteClass()
3064
adapter = TestScenarioApplier()
3065
adapter.scenarios = list(scenario_iter)
3066
adapt_modules(module_name_list, adapter, loader, suite)
3070
def multiply_scenarios(scenarios_left, scenarios_right):
3071
"""Multiply two sets of scenarios.
3073
:returns: the cartesian product of the two sets of scenarios, that is
3074
a scenario for every possible combination of a left scenario and a
3078
('%s,%s' % (left_name, right_name),
3079
dict(left_dict.items() + right_dict.items()))
3080
for left_name, left_dict in scenarios_left
3081
for right_name, right_dict in scenarios_right]
887
test_transport_implementations = [
888
'bzrlib.tests.test_transport_implementations']
890
TestCase.BZRPATH = osutils.pathjoin(
891
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
892
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
893
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
896
# python2.4's TestLoader.loadTestsFromNames gives very poor
897
# errors if it fails to load a named module - no indication of what's
898
# actually wrong, just "no such module". We should probably override that
899
# class, but for the moment just load them ourselves. (mbp 20051202)
900
loader = TestLoader()
901
from bzrlib.transport import TransportTestProviderAdapter
902
adapter = TransportTestProviderAdapter()
903
adapt_modules(test_transport_implementations, adapter, loader, suite)
904
for mod_name in testmod_names:
905
mod = _load_module_by_name(mod_name)
906
suite.addTest(loader.loadTestsFromModule(mod))
907
for package in packages_to_test():
908
suite.addTest(package.test_suite())
909
for m in MODULES_TO_TEST:
910
suite.addTest(loader.loadTestsFromModule(m))
911
for m in (MODULES_TO_DOCTEST):
912
suite.addTest(DocTestSuite(m))
913
for name, plugin in bzrlib.plugin.all_plugins().items():
914
if getattr(plugin, 'test_suite', None) is not None:
915
suite.addTest(plugin.test_suite())
3085
919
def adapt_modules(mods_list, adapter, loader, suite):
3086
920
"""Adapt the modules in mods_list using adapter and add to suite."""
3087
tests = loader.loadTestsFromModuleNames(mods_list)
3088
adapt_tests(tests, adapter, suite)
3091
def adapt_tests(tests_list, adapter, suite):
3092
"""Adapt the tests in tests_list using adapter and add to suite."""
3093
for test in iter_suite_tests(tests_list):
3094
suite.addTests(adapter.adapt(test))
3097
def _rmtree_temp_dir(dirname):
3098
# If LANG=C we probably have created some bogus paths
3099
# which rmtree(unicode) will fail to delete
3100
# so make sure we are using rmtree(str) to delete everything
3101
# except on win32, where rmtree(str) will fail
3102
# since it doesn't have the property of byte-stream paths
3103
# (they are either ascii or mbcs)
3104
if sys.platform == 'win32':
3105
# make sure we are using the unicode win32 api
3106
dirname = unicode(dirname)
3108
dirname = dirname.encode(sys.getfilesystemencoding())
3110
osutils.rmtree(dirname)
3112
if sys.platform == 'win32' and e.errno == errno.EACCES:
3113
sys.stderr.write(('Permission denied: '
3114
'unable to remove testing dir '
3115
'%s\n' % os.path.basename(dirname)))
3120
class Feature(object):
3121
"""An operating system Feature."""
3124
self._available = None
3126
def available(self):
3127
"""Is the feature available?
3129
:return: True if the feature is available.
3131
if self._available is None:
3132
self._available = self._probe()
3133
return self._available
3136
"""Implement this method in concrete features.
3138
:return: True if the feature is available.
3140
raise NotImplementedError
3143
if getattr(self, 'feature_name', None):
3144
return self.feature_name()
3145
return self.__class__.__name__
3148
class _SymlinkFeature(Feature):
3151
return osutils.has_symlinks()
3153
def feature_name(self):
3156
SymlinkFeature = _SymlinkFeature()
3159
class _HardlinkFeature(Feature):
3162
return osutils.has_hardlinks()
3164
def feature_name(self):
3167
HardlinkFeature = _HardlinkFeature()
3170
class _OsFifoFeature(Feature):
3173
return getattr(os, 'mkfifo', None)
3175
def feature_name(self):
3176
return 'filesystem fifos'
3178
OsFifoFeature = _OsFifoFeature()
3181
class _UnicodeFilenameFeature(Feature):
3182
"""Does the filesystem support Unicode filenames?"""
3186
# Check for character combinations unlikely to be covered by any
3187
# single non-unicode encoding. We use the characters
3188
# - greek small letter alpha (U+03B1) and
3189
# - braille pattern dots-123456 (U+283F).
3190
os.stat(u'\u03b1\u283f')
3191
except UnicodeEncodeError:
3193
except (IOError, OSError):
3194
# The filesystem allows the Unicode filename but the file doesn't
3198
# The filesystem allows the Unicode filename and the file exists,
3202
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3205
class TestScenarioApplier(object):
3206
"""A tool to apply scenarios to tests."""
3208
def adapt(self, test):
3209
"""Return a TestSuite containing a copy of test for each scenario."""
3210
result = unittest.TestSuite()
3211
for scenario in self.scenarios:
3212
result.addTest(self.adapt_test_to_scenario(test, scenario))
3215
def adapt_test_to_scenario(self, test, scenario):
3216
"""Copy test and apply scenario to it.
3218
:param test: A test to adapt.
3219
:param scenario: A tuple describing the scenarion.
3220
The first element of the tuple is the new test id.
3221
The second element is a dict containing attributes to set on the
3223
:return: The adapted test.
3225
from copy import deepcopy
3226
new_test = deepcopy(test)
3227
for name, value in scenario[1].items():
3228
setattr(new_test, name, value)
3229
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3230
new_test.id = lambda: new_id
3234
def probe_unicode_in_user_encoding():
3235
"""Try to encode several unicode strings to use in unicode-aware tests.
3236
Return first successfull match.
3238
:return: (unicode value, encoded plain string value) or (None, None)
3240
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3241
for uni_val in possible_vals:
3243
str_val = uni_val.encode(osutils.get_user_encoding())
3244
except UnicodeEncodeError:
3245
# Try a different character
3248
return uni_val, str_val
3252
def probe_bad_non_ascii(encoding):
3253
"""Try to find [bad] character with code [128..255]
3254
that cannot be decoded to unicode in some encoding.
3255
Return None if all non-ascii characters is valid
3258
for i in xrange(128, 256):
3261
char.decode(encoding)
3262
except UnicodeDecodeError:
3267
class _FTPServerFeature(Feature):
3268
"""Some tests want an FTP Server, check if one is available.
3270
Right now, the only way this is available is if 'medusa' is installed.
3271
http://www.amk.ca/python/code/medusa.html
3276
import bzrlib.tests.ftp_server
3281
def feature_name(self):
3284
FTPServerFeature = _FTPServerFeature()
3287
class _UnicodeFilename(Feature):
3288
"""Does the filesystem support Unicode filenames?"""
3293
except UnicodeEncodeError:
3295
except (IOError, OSError):
3296
# The filesystem allows the Unicode filename but the file doesn't
3300
# The filesystem allows the Unicode filename and the file exists,
3304
UnicodeFilename = _UnicodeFilename()
3307
class _UTF8Filesystem(Feature):
3308
"""Is the filesystem UTF-8?"""
3311
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3315
UTF8Filesystem = _UTF8Filesystem()
3318
class _CaseInsensitiveFilesystemFeature(Feature):
3319
"""Check if underlying filesystem is case-insensitive
3320
(e.g. on Windows, Cygwin, MacOS)
3324
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3325
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3326
TestCaseWithMemoryTransport.TEST_ROOT = root
3328
root = TestCaseWithMemoryTransport.TEST_ROOT
3329
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3331
name_a = osutils.pathjoin(tdir, 'a')
3332
name_A = osutils.pathjoin(tdir, 'A')
3334
result = osutils.isdir(name_A)
3335
_rmtree_temp_dir(tdir)
3338
def feature_name(self):
3339
return 'case-insensitive filesystem'
3341
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
921
for mod_name in mods_list:
922
mod = _load_module_by_name(mod_name)
923
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
924
suite.addTests(adapter.adapt(test))
927
def _load_module_by_name(mod_name):
928
parts = mod_name.split('.')
929
module = __import__(mod_name)
931
# for historical reasons python returns the top-level module even though
932
# it loads the submodule; we need to walk down to get the one we want.
934
module = getattr(module, parts.pop(0))