26
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
27
# new assertFoo() methods.
30
31
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
44
64
import bzrlib.branch
45
import bzrlib.bzrdir as bzrdir
46
65
import bzrlib.commands
47
import bzrlib.errors as errors
66
import bzrlib.timestamp
48
68
import bzrlib.inventory
49
69
import bzrlib.iterablefile
50
70
import bzrlib.lockdir
74
# lsprof not available
76
from bzrlib.merge import merge_inner
51
77
import bzrlib.merge3
53
import bzrlib.osutils as osutils
54
78
import bzrlib.plugin
55
79
import bzrlib.store
80
from bzrlib import symbol_versioning
81
from bzrlib.symbol_versioning import (
56
87
import bzrlib.trace
57
from bzrlib.transport import urlescape, get_transport
88
from bzrlib.transport import get_transport
58
89
import bzrlib.transport
59
from bzrlib.transport.local import LocalRelpathServer
90
from bzrlib.transport.local import LocalURLServer
91
from bzrlib.transport.memory import MemoryServer
60
92
from bzrlib.transport.readonly import ReadonlyServer
61
from bzrlib.trace import mutter
62
from bzrlib.tests.TestUtil import TestLoader, TestSuite
93
from bzrlib.trace import mutter, note
94
from bzrlib.tests import TestUtil
95
from bzrlib.tests.http_server import HttpServer
96
from bzrlib.tests.TestUtil import (
63
100
from bzrlib.tests.treeshape import build_tree_contents
101
import bzrlib.version_info_formats.format_custom
64
102
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
66
default_transport = LocalRelpathServer
69
MODULES_TO_DOCTEST = [
81
def packages_to_test():
82
"""Return a list of packages to test.
84
The packages are not globally imported so that import failures are
85
triggered when running selftest, not when importing the command.
88
import bzrlib.tests.blackbox
89
import bzrlib.tests.branch_implementations
90
import bzrlib.tests.bzrdir_implementations
91
import bzrlib.tests.interrepository_implementations
92
import bzrlib.tests.interversionedfile_implementations
93
import bzrlib.tests.repository_implementations
94
import bzrlib.tests.revisionstore_implementations
95
import bzrlib.tests.workingtree_implementations
98
bzrlib.tests.blackbox,
99
bzrlib.tests.branch_implementations,
100
bzrlib.tests.bzrdir_implementations,
101
bzrlib.tests.interrepository_implementations,
102
bzrlib.tests.interversionedfile_implementations,
103
bzrlib.tests.repository_implementations,
104
bzrlib.tests.revisionstore_implementations,
105
bzrlib.tests.workingtree_implementations,
109
class _MyResult(unittest._TextTestResult):
110
"""Custom TestResult.
112
Shows output in a different format, including displaying runtime for tests.
104
# Mark this python module as being part of the implementation
105
# of unittest: this gives us better tracebacks where the last
106
# shown frame is the test code, not our assertXYZ.
109
default_transport = LocalURLServer
112
class ExtendedTestResult(unittest._TextTestResult):
113
"""Accepts, reports and accumulates the results of running tests.
115
Compared to the unittest version this class adds support for
116
profiling, benchmarking, stopping as soon as a test fails, and
117
skipping tests. There are further-specialized subclasses for
118
different types of display.
120
When a test finishes, in whatever way, it calls one of the addSuccess,
121
addFailure or addError classes. These in turn may redirect to a more
122
specific case for the special test results supported by our extended
125
Note that just one of these objects is fed the results from many tests.
114
128
stop_early = False
116
def _elapsedTime(self):
117
return "%5dms" % (1000 * (time.time() - self._start_time))
130
def __init__(self, stream, descriptions, verbosity,
134
"""Construct new TestResult.
136
:param bench_history: Optionally, a writable file object to accumulate
139
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
140
if bench_history is not None:
141
from bzrlib.version import _get_bzr_source_tree
142
src_tree = _get_bzr_source_tree()
145
revision_id = src_tree.get_parent_ids()[0]
147
# XXX: if this is a brand new tree, do the same as if there
151
# XXX: If there's no branch, what should we do?
153
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
154
self._bench_history = bench_history
155
self.ui = ui.ui_factory
156
self.num_tests = num_tests
158
self.failure_count = 0
159
self.known_failure_count = 0
161
self.not_applicable_count = 0
162
self.unsupported = {}
164
self._overall_start_time = time.time()
166
def _extractBenchmarkTime(self, testCase):
167
"""Add a benchmark time for the current test case."""
168
return getattr(testCase, "_benchtime", None)
170
def _elapsedTestTimeString(self):
171
"""Return a time string for the overall time the current test has taken."""
172
return self._formatTime(time.time() - self._start_time)
174
def _testTimeString(self, testCase):
175
benchmark_time = self._extractBenchmarkTime(testCase)
176
if benchmark_time is not None:
178
self._formatTime(benchmark_time),
179
self._elapsedTestTimeString())
181
return " %s" % self._elapsedTestTimeString()
183
def _formatTime(self, seconds):
184
"""Format seconds as milliseconds with leading spaces."""
185
# some benchmarks can take thousands of seconds to run, so we need 8
187
return "%8dms" % (1000 * seconds)
189
def _shortened_test_description(self, test):
191
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
119
194
def startTest(self, test):
120
195
unittest.TestResult.startTest(self, test)
121
# In a short description, the important words are in
122
# the beginning, but in an id, the important words are
124
SHOW_DESCRIPTIONS = False
126
width = osutils.terminal_width()
127
name_width = width - 15
129
if SHOW_DESCRIPTIONS:
130
what = test.shortDescription()
132
if len(what) > name_width:
133
what = what[:name_width-3] + '...'
136
if what.startswith('bzrlib.tests.'):
138
if len(what) > name_width:
139
what = '...' + what[3-name_width:]
140
what = what.ljust(name_width)
141
self.stream.write(what)
196
self.report_test_start(test)
197
test.number = self.count
198
self._recordTestStartTime()
200
def _recordTestStartTime(self):
201
"""Record that a test has started."""
143
202
self._start_time = time.time()
204
def _cleanupLogFile(self, test):
205
# We can only do this if we have one of our TestCases, not if
207
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
208
if setKeepLogfile is not None:
145
211
def addError(self, test, err):
212
"""Tell result that test finished with an error.
214
Called from the TestCase run() method when the test
215
fails with an unexpected error.
217
self._testConcluded(test)
146
218
if isinstance(err[1], TestSkipped):
147
return self.addSkipped(test, err)
148
unittest.TestResult.addError(self, test, err)
150
self.stream.writeln("ERROR %s" % self._elapsedTime())
152
self.stream.write('E')
219
return self._addSkipped(test, err)
220
elif isinstance(err[1], UnavailableFeature):
221
return self.addNotSupported(test, err[1].args[0])
223
unittest.TestResult.addError(self, test, err)
224
self.error_count += 1
225
self.report_error(test, err)
228
self._cleanupLogFile(test)
157
230
def addFailure(self, test, err):
158
unittest.TestResult.addFailure(self, test, err)
160
self.stream.writeln(" FAIL %s" % self._elapsedTime())
162
self.stream.write('F')
231
"""Tell result that test failed.
233
Called from the TestCase run() method when the test
234
fails because e.g. an assert() method failed.
236
self._testConcluded(test)
237
if isinstance(err[1], KnownFailure):
238
return self._addKnownFailure(test, err)
240
unittest.TestResult.addFailure(self, test, err)
241
self.failure_count += 1
242
self.report_failure(test, err)
245
self._cleanupLogFile(test)
167
247
def addSuccess(self, test):
169
self.stream.writeln(' OK %s' % self._elapsedTime())
171
self.stream.write('~')
173
unittest.TestResult.addSuccess(self, test)
175
def addSkipped(self, test, skip_excinfo):
177
print >>self.stream, ' SKIP %s' % self._elapsedTime()
178
print >>self.stream, ' %s' % skip_excinfo[1]
180
self.stream.write('S')
182
# seems best to treat this as success from point-of-view of unittest
183
# -- it actually does nothing so it barely matters :)
184
unittest.TestResult.addSuccess(self, test)
248
"""Tell result that test completed successfully.
250
Called from the TestCase run()
252
self._testConcluded(test)
253
if self._bench_history is not None:
254
benchmark_time = self._extractBenchmarkTime(test)
255
if benchmark_time is not None:
256
self._bench_history.write("%s %s\n" % (
257
self._formatTime(benchmark_time),
259
self.report_success(test)
260
self._cleanupLogFile(test)
261
unittest.TestResult.addSuccess(self, test)
262
test._log_contents = ''
264
def _testConcluded(self, test):
265
"""Common code when a test has finished.
267
Called regardless of whether it succeded, failed, etc.
271
def _addKnownFailure(self, test, err):
272
self.known_failure_count += 1
273
self.report_known_failure(test, err)
275
def addNotSupported(self, test, feature):
276
"""The test will not be run because of a missing feature.
278
# this can be called in two different ways: it may be that the
279
# test started running, and then raised (through addError)
280
# UnavailableFeature. Alternatively this method can be called
281
# while probing for features before running the tests; in that
282
# case we will see startTest and stopTest, but the test will never
284
self.unsupported.setdefault(str(feature), 0)
285
self.unsupported[str(feature)] += 1
286
self.report_unsupported(test, feature)
288
def _addSkipped(self, test, skip_excinfo):
289
if isinstance(skip_excinfo[1], TestNotApplicable):
290
self.not_applicable_count += 1
291
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
302
# seems best to treat this as success from point-of-view of unittest
303
# -- it actually does nothing so it barely matters :)
304
unittest.TestResult.addSuccess(self, test)
305
test._log_contents = ''
186
307
def printErrorList(self, flavour, errors):
187
308
for test, err in errors:
188
309
self.stream.writeln(self.separator1)
189
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
310
self.stream.write("%s: " % flavour)
311
self.stream.writeln(self.getDescription(test))
190
312
if getattr(test, '_get_log', None) is not None:
192
print >>self.stream, \
193
('vvvv[log from %s]' % test.id()).ljust(78,'-')
194
print >>self.stream, test._get_log()
195
print >>self.stream, \
196
('^^^^[log from %s]' % test.id()).ljust(78,'-')
313
self.stream.write('\n')
315
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
316
self.stream.write('\n')
317
self.stream.write(test._get_log())
318
self.stream.write('\n')
320
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
321
self.stream.write('\n')
197
322
self.stream.writeln(self.separator2)
198
323
self.stream.writeln("%s" % err)
201
class TextTestRunner(unittest.TextTestRunner):
328
def report_cleaning_up(self):
331
def report_success(self, test):
334
def wasStrictlySuccessful(self):
335
if self.unsupported or self.known_failure_count:
337
return self.wasSuccessful()
340
class TextTestResult(ExtendedTestResult):
341
"""Displays progress and results of tests in text form"""
343
def __init__(self, stream, descriptions, verbosity,
348
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
349
bench_history, num_tests)
351
self.pb = self.ui.nested_progress_bar()
352
self._supplied_pb = False
355
self._supplied_pb = True
356
self.pb.show_pct = False
357
self.pb.show_spinner = False
358
self.pb.show_eta = False,
359
self.pb.show_count = False
360
self.pb.show_bar = False
362
def report_starting(self):
363
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
365
def _progress_prefix_text(self):
366
# the longer this text, the less space we have to show the test
368
a = '[%d' % self.count # total that have been run
369
# tests skipped as known not to be relevant are not important enough
371
## if self.skip_count:
372
## a += ', %d skip' % self.skip_count
373
## if self.known_failure_count:
374
## a += '+%dX' % self.known_failure_count
375
if self.num_tests is not None:
376
a +='/%d' % self.num_tests
378
runtime = time.time() - self._overall_start_time
380
a += '%dm%ds' % (runtime / 60, runtime % 60)
384
a += ', %d err' % self.error_count
385
if self.failure_count:
386
a += ', %d fail' % self.failure_count
388
a += ', %d missing' % len(self.unsupported)
392
def report_test_start(self, test):
395
self._progress_prefix_text()
397
+ self._shortened_test_description(test))
399
def _test_description(self, test):
400
return self._shortened_test_description(test)
402
def report_error(self, test, err):
403
self.pb.note('ERROR: %s\n %s\n',
404
self._test_description(test),
408
def report_failure(self, test, err):
409
self.pb.note('FAIL: %s\n %s\n',
410
self._test_description(test),
414
def report_known_failure(self, test, err):
415
self.pb.note('XFAIL: %s\n%s\n',
416
self._test_description(test), err[1])
418
def report_skip(self, test, skip_excinfo):
421
def report_not_applicable(self, test, skip_excinfo):
424
def report_unsupported(self, test, feature):
425
"""test cannot be run because feature is missing."""
427
def report_cleaning_up(self):
428
self.pb.update('cleaning up...')
431
if not self._supplied_pb:
435
class VerboseTestResult(ExtendedTestResult):
436
"""Produce long output, with one line per test run plus times"""
438
def _ellipsize_to_right(self, a_string, final_width):
439
"""Truncate and pad a string, keeping the right hand side"""
440
if len(a_string) > final_width:
441
result = '...' + a_string[3-final_width:]
444
return result.ljust(final_width)
446
def report_starting(self):
447
self.stream.write('running %d tests...\n' % self.num_tests)
449
def report_test_start(self, test):
451
name = self._shortened_test_description(test)
452
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
453
# numbers, plus a trailing blank
454
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
455
self.stream.write(self._ellipsize_to_right(name,
456
osutils.terminal_width()-30))
459
def _error_summary(self, err):
461
return '%s%s' % (indent, err[1])
463
def report_error(self, test, err):
464
self.stream.writeln('ERROR %s\n%s'
465
% (self._testTimeString(test),
466
self._error_summary(err)))
468
def report_failure(self, test, err):
469
self.stream.writeln(' FAIL %s\n%s'
470
% (self._testTimeString(test),
471
self._error_summary(err)))
473
def report_known_failure(self, test, err):
474
self.stream.writeln('XFAIL %s\n%s'
475
% (self._testTimeString(test),
476
self._error_summary(err)))
478
def report_success(self, test):
479
self.stream.writeln(' OK %s' % self._testTimeString(test))
480
for bench_called, stats in getattr(test, '_benchcalls', []):
481
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
482
stats.pprint(file=self.stream)
483
# flush the stream so that we get smooth output. This verbose mode is
484
# used to show the output in PQM.
487
def report_skip(self, test, skip_excinfo):
488
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(test),
490
self._error_summary(skip_excinfo)))
492
def report_not_applicable(self, test, skip_excinfo):
493
self.stream.writeln(' N/A %s\n%s'
494
% (self._testTimeString(test),
495
self._error_summary(skip_excinfo)))
497
def report_unsupported(self, test, feature):
498
"""test cannot be run because feature is missing."""
499
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
500
%(self._testTimeString(test), feature))
503
class TextTestRunner(object):
202
504
stop_on_failure = False
204
def _makeResult(self):
205
result = _MyResult(self.stream, self.descriptions, self.verbosity)
513
self.stream = unittest._WritelnDecorator(stream)
514
self.descriptions = descriptions
515
self.verbosity = verbosity
516
self._bench_history = bench_history
517
self.list_only = list_only
520
"Run the given test case or test suite."
521
startTime = time.time()
522
if self.verbosity == 1:
523
result_class = TextTestResult
524
elif self.verbosity >= 2:
525
result_class = VerboseTestResult
526
result = result_class(self.stream,
529
bench_history=self._bench_history,
530
num_tests=test.countTestCases(),
206
532
result.stop_early = self.stop_on_failure
533
result.report_starting()
535
if self.verbosity >= 2:
536
self.stream.writeln("Listing tests only ...\n")
538
for t in iter_suite_tests(test):
539
self.stream.writeln("%s" % (t.id()))
541
actionTaken = "Listed"
544
run = result.testsRun
546
stopTime = time.time()
547
timeTaken = stopTime - startTime
549
self.stream.writeln(result.separator2)
550
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
551
run, run != 1 and "s" or "", timeTaken))
552
self.stream.writeln()
553
if not result.wasSuccessful():
554
self.stream.write("FAILED (")
555
failed, errored = map(len, (result.failures, result.errors))
557
self.stream.write("failures=%d" % failed)
559
if failed: self.stream.write(", ")
560
self.stream.write("errors=%d" % errored)
561
if result.known_failure_count:
562
if failed or errored: self.stream.write(", ")
563
self.stream.write("known_failure_count=%d" %
564
result.known_failure_count)
565
self.stream.writeln(")")
567
if result.known_failure_count:
568
self.stream.writeln("OK (known_failures=%d)" %
569
result.known_failure_count)
571
self.stream.writeln("OK")
572
if result.skip_count > 0:
573
skipped = result.skip_count
574
self.stream.writeln('%d test%s skipped' %
575
(skipped, skipped != 1 and "s" or ""))
576
if result.unsupported:
577
for feature, count in sorted(result.unsupported.items()):
578
self.stream.writeln("Missing feature '%s' skipped %d tests." %
290
867
if message is None:
291
868
message = "texts not equal:\n"
292
raise AssertionError(message +
293
self._ndiff_strings(a, b))
870
message = 'first string is missing a final newline.\n'
872
message = 'second string is missing a final newline.\n'
873
raise AssertionError(message +
874
self._ndiff_strings(a, b))
295
876
def assertEqualMode(self, mode, mode_test):
296
877
self.assertEqual(mode, mode_test,
297
878
'mode mismatch %o != %o' % (mode, mode_test))
880
def assertEqualStat(self, expected, actual):
881
"""assert that expected and actual are the same stat result.
883
:param expected: A stat result.
884
:param actual: A stat result.
885
:raises AssertionError: If the expected and actual stat values differ
888
self.assertEqual(expected.st_size, actual.st_size)
889
self.assertEqual(expected.st_mtime, actual.st_mtime)
890
self.assertEqual(expected.st_ctime, actual.st_ctime)
891
self.assertEqual(expected.st_dev, actual.st_dev)
892
self.assertEqual(expected.st_ino, actual.st_ino)
893
self.assertEqual(expected.st_mode, actual.st_mode)
895
def assertPositive(self, val):
896
"""Assert that val is greater than 0."""
897
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
899
def assertNegative(self, val):
900
"""Assert that val is less than 0."""
901
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
299
903
def assertStartsWith(self, s, prefix):
300
904
if not s.startswith(prefix):
301
905
raise AssertionError('string %r does not start with %r' % (s, prefix))
303
907
def assertEndsWith(self, s, suffix):
304
if not s.endswith(prefix):
908
"""Asserts that s ends with suffix."""
909
if not s.endswith(suffix):
305
910
raise AssertionError('string %r does not end with %r' % (s, suffix))
307
912
def assertContainsRe(self, haystack, needle_re):
308
913
"""Assert that a contains something matching a regular expression."""
309
914
if not re.search(needle_re, haystack):
310
raise AssertionError('pattern "%s" not found in "%s"'
915
if '\n' in haystack or len(haystack) > 60:
916
# a long string, format it in a more readable way
917
raise AssertionError(
918
'pattern "%s" not found in\n"""\\\n%s"""\n'
919
% (needle_re, haystack))
921
raise AssertionError('pattern "%s" not found in "%s"'
922
% (needle_re, haystack))
924
def assertNotContainsRe(self, haystack, needle_re):
925
"""Assert that a does not match a regular expression"""
926
if re.search(needle_re, haystack):
927
raise AssertionError('pattern "%s" found in "%s"'
311
928
% (needle_re, haystack))
313
930
def assertSubset(self, sublist, superlist):
314
931
"""Assert that every entry in sublist is present in superlist."""
316
for entry in sublist:
317
if entry not in superlist:
318
missing.append(entry)
932
missing = set(sublist) - set(superlist)
319
933
if len(missing) > 0:
320
raise AssertionError("value(s) %r not present in container %r" %
934
raise AssertionError("value(s) %r not present in container %r" %
321
935
(missing, superlist))
323
def assertIs(self, left, right):
937
def assertListRaises(self, excClass, func, *args, **kwargs):
938
"""Fail unless excClass is raised when the iterator from func is used.
940
Many functions can return generators this makes sure
941
to wrap them in a list() call to make sure the whole generator
942
is run, and that the proper exception is raised.
945
list(func(*args, **kwargs))
949
if getattr(excClass,'__name__', None) is not None:
950
excName = excClass.__name__
952
excName = str(excClass)
953
raise self.failureException, "%s not raised" % excName
955
def assertRaises(self, excClass, callableObj, *args, **kwargs):
956
"""Assert that a callable raises a particular exception.
958
:param excClass: As for the except statement, this may be either an
959
exception class, or a tuple of classes.
960
:param callableObj: A callable, will be passed ``*args`` and
963
Returns the exception so that you can examine it.
966
callableObj(*args, **kwargs)
970
if getattr(excClass,'__name__', None) is not None:
971
excName = excClass.__name__
974
excName = str(excClass)
975
raise self.failureException, "%s not raised" % excName
977
def assertIs(self, left, right, message=None):
324
978
if not (left is right):
325
raise AssertionError("%r is not %r." % (left, right))
979
if message is not None:
980
raise AssertionError(message)
982
raise AssertionError("%r is not %r." % (left, right))
984
def assertIsNot(self, left, right, message=None):
986
if message is not None:
987
raise AssertionError(message)
989
raise AssertionError("%r is %r." % (left, right))
327
991
def assertTransportMode(self, transport, path, mode):
328
992
"""Fail if a path does not have mode mode.
330
If modes are not supported on this platform, the test is skipped.
994
If modes are not supported on this transport, the assertion is ignored.
332
if sys.platform == 'win32':
996
if not transport._can_roundtrip_unix_modebits():
334
998
path_stat = transport.stat(path)
335
999
actual_mode = stat.S_IMODE(path_stat.st_mode)
336
1000
self.assertEqual(mode, actual_mode,
337
1001
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
1003
def assertIsSameRealPath(self, path1, path2):
1004
"""Fail if path1 and path2 points to different files"""
1005
self.assertEqual(osutils.realpath(path1),
1006
osutils.realpath(path2),
1007
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
339
1009
def assertIsInstance(self, obj, kls):
340
1010
"""Fail if obj is not an instance of kls"""
341
1011
if not isinstance(obj, kls):
342
self.fail("%r is not an instance of %s" % (obj, kls))
1012
self.fail("%r is an instance of %s rather than %s" % (
1013
obj, obj.__class__, kls))
1015
def expectFailure(self, reason, assertion, *args, **kwargs):
1016
"""Invoke a test, expecting it to fail for the given reason.
1018
This is for assertions that ought to succeed, but currently fail.
1019
(The failure is *expected* but not *wanted*.) Please be very precise
1020
about the failure you're expecting. If a new bug is introduced,
1021
AssertionError should be raised, not KnownFailure.
1023
Frequently, expectFailure should be followed by an opposite assertion.
1026
Intended to be used with a callable that raises AssertionError as the
1027
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1029
Raises KnownFailure if the test fails. Raises AssertionError if the
1034
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1036
self.assertEqual(42, dynamic_val)
1038
This means that a dynamic_val of 54 will cause the test to raise
1039
a KnownFailure. Once math is fixed and the expectFailure is removed,
1040
only a dynamic_val of 42 will allow the test to pass. Anything other
1041
than 54 or 42 will cause an AssertionError.
1044
assertion(*args, **kwargs)
1045
except AssertionError:
1046
raise KnownFailure(reason)
1048
self.fail('Unexpected success. Should have failed: %s' % reason)
1050
def assertFileEqual(self, content, path):
1051
"""Fail if path does not contain 'content'."""
1052
self.failUnlessExists(path)
1053
f = file(path, 'rb')
1058
self.assertEqualDiff(content, s)
1060
def failUnlessExists(self, path):
1061
"""Fail unless path or paths, which may be abs or relative, exist."""
1062
if not isinstance(path, basestring):
1064
self.failUnlessExists(p)
1066
self.failUnless(osutils.lexists(path),path+" does not exist")
1068
def failIfExists(self, path):
1069
"""Fail if path or paths, which may be abs or relative, exist."""
1070
if not isinstance(path, basestring):
1072
self.failIfExists(p)
1074
self.failIf(osutils.lexists(path),path+" exists")
1076
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1077
"""A helper for callDeprecated and applyDeprecated.
1079
:param a_callable: A callable to call.
1080
:param args: The positional arguments for the callable
1081
:param kwargs: The keyword arguments for the callable
1082
:return: A tuple (warnings, result). result is the result of calling
1083
a_callable(``*args``, ``**kwargs``).
1086
def capture_warnings(msg, cls=None, stacklevel=None):
1087
# we've hooked into a deprecation specific callpath,
1088
# only deprecations should getting sent via it.
1089
self.assertEqual(cls, DeprecationWarning)
1090
local_warnings.append(msg)
1091
original_warning_method = symbol_versioning.warn
1092
symbol_versioning.set_warning_method(capture_warnings)
1094
result = a_callable(*args, **kwargs)
1096
symbol_versioning.set_warning_method(original_warning_method)
1097
return (local_warnings, result)
1099
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1100
"""Call a deprecated callable without warning the user.
1102
Note that this only captures warnings raised by symbol_versioning.warn,
1103
not other callers that go direct to the warning module.
1105
To test that a deprecated method raises an error, do something like
1108
self.assertRaises(errors.ReservedId,
1109
self.applyDeprecated,
1110
deprecated_in((1, 5, 0)),
1114
:param deprecation_format: The deprecation format that the callable
1115
should have been deprecated with. This is the same type as the
1116
parameter to deprecated_method/deprecated_function. If the
1117
callable is not deprecated with this format, an assertion error
1119
:param a_callable: A callable to call. This may be a bound method or
1120
a regular function. It will be called with ``*args`` and
1122
:param args: The positional arguments for the callable
1123
:param kwargs: The keyword arguments for the callable
1124
:return: The result of a_callable(``*args``, ``**kwargs``)
1126
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1128
expected_first_warning = symbol_versioning.deprecation_string(
1129
a_callable, deprecation_format)
1130
if len(call_warnings) == 0:
1131
self.fail("No deprecation warning generated by call to %s" %
1133
self.assertEqual(expected_first_warning, call_warnings[0])
1136
def callCatchWarnings(self, fn, *args, **kw):
1137
"""Call a callable that raises python warnings.
1139
The caller's responsible for examining the returned warnings.
1141
If the callable raises an exception, the exception is not
1142
caught and propagates up to the caller. In that case, the list
1143
of warnings is not available.
1145
:returns: ([warning_object, ...], fn_result)
1147
# XXX: This is not perfect, because it completely overrides the
1148
# warnings filters, and some code may depend on suppressing particular
1149
# warnings. It's the easiest way to insulate ourselves from -Werror,
1150
# though. -- Andrew, 20071062
1152
def _catcher(message, category, filename, lineno, file=None, line=None):
1153
# despite the name, 'message' is normally(?) a Warning subclass
1155
wlist.append(message)
1156
saved_showwarning = warnings.showwarning
1157
saved_filters = warnings.filters
1159
warnings.showwarning = _catcher
1160
warnings.filters = []
1161
result = fn(*args, **kw)
1163
warnings.showwarning = saved_showwarning
1164
warnings.filters = saved_filters
1165
return wlist, result
1167
def callDeprecated(self, expected, callable, *args, **kwargs):
1168
"""Assert that a callable is deprecated in a particular way.
1170
This is a very precise test for unusual requirements. The
1171
applyDeprecated helper function is probably more suited for most tests
1172
as it allows you to simply specify the deprecation format being used
1173
and will ensure that that is issued for the function being called.
1175
Note that this only captures warnings raised by symbol_versioning.warn,
1176
not other callers that go direct to the warning module. To catch
1177
general warnings, use callCatchWarnings.
1179
:param expected: a list of the deprecation warnings expected, in order
1180
:param callable: The callable to call
1181
:param args: The positional arguments for the callable
1182
:param kwargs: The keyword arguments for the callable
1184
call_warnings, result = self._capture_deprecation_warnings(callable,
1186
self.assertEqual(expected, call_warnings)
344
1189
def _startLogFile(self):
345
1190
"""Send bzr and test log messages to a temporary file.
422
1334
# TODO: Perhaps this should keep running cleanups even if
423
1335
# one of them fails?
424
for cleanup_fn in reversed(self._cleanups):
1337
# Actually pop the cleanups from the list so tearDown running
1338
# twice is safe (this happens for skipped tests).
1339
while self._cleanups:
1340
cleanup, args, kwargs = self._cleanups.pop()
1341
cleanup(*args, **kwargs)
427
1343
def log(self, *args):
431
"""Return as a string the log for this test"""
432
if self._log_file_name:
433
return open(self._log_file_name).read()
1346
def _get_log(self, keep_log_file=False):
1347
"""Get the log from bzrlib.trace calls from this test.
1349
:param keep_log_file: When True, if the log is still a file on disk
1350
leave it as a file on disk. When False, if the log is still a file
1351
on disk, the log file is deleted and the log preserved as
1353
:return: A string containing the log.
1355
# flush the log file, to get all content
1357
bzrlib.trace._trace_file.flush()
1358
if self._log_contents:
1359
# XXX: this can hardly contain the content flushed above --vila
435
1361
return self._log_contents
436
# TODO: Delete the log after it's been read in
438
def capture(self, cmd, retcode=0):
439
"""Shortcut that splits cmd into words, runs, and returns stdout"""
440
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
442
def run_bzr_captured(self, argv, retcode=0):
443
"""Invoke bzr and return (stdout, stderr).
445
Useful for code that wants to check the contents of the
446
output, the way error messages are presented, etc.
448
This should be the main method for tests that want to exercise the
449
overall behavior of the bzr application (rather than a unit test
450
or a functional test of the library.)
452
Much of the old code runs bzr by forking a new copy of Python, but
453
that is slower, harder to debug, and generally not necessary.
455
This runs bzr through the interface that catches and reports
456
errors, and with logging set to something approximating the
457
default, so that error reporting can be checked.
459
argv -- arguments to invoke bzr
460
retcode -- expected return code, or None for don't-care.
1362
if self._log_file_name is not None:
1363
logfile = open(self._log_file_name)
1365
log_contents = logfile.read()
1368
if not keep_log_file:
1369
self._log_contents = log_contents
1371
os.remove(self._log_file_name)
1373
if sys.platform == 'win32' and e.errno == errno.EACCES:
1374
sys.stderr.write(('Unable to delete log file '
1375
' %r\n' % self._log_file_name))
1380
return "DELETED log file to reduce memory footprint"
1382
def requireFeature(self, feature):
1383
"""This test requires a specific feature is available.
1385
:raises UnavailableFeature: When feature is not available.
464
self.log('run bzr: %s', ' '.join(argv))
1387
if not feature.available():
1388
raise UnavailableFeature(feature)
1390
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1392
"""Run bazaar command line, splitting up a string command line."""
1393
if isinstance(args, basestring):
1394
# shlex don't understand unicode strings,
1395
# so args should be plain string (bialix 20070906)
1396
args = list(shlex.split(str(args)))
1397
return self._run_bzr_core(args, retcode=retcode,
1398
encoding=encoding, stdin=stdin, working_dir=working_dir,
1401
def _run_bzr_core(self, args, retcode, encoding, stdin,
1403
if encoding is None:
1404
encoding = osutils.get_user_encoding()
1405
stdout = StringIOWrapper()
1406
stderr = StringIOWrapper()
1407
stdout.encoding = encoding
1408
stderr.encoding = encoding
1410
self.log('run bzr: %r', args)
465
1411
# FIXME: don't call into logging here
466
1412
handler = logging.StreamHandler(stderr)
467
handler.setFormatter(bzrlib.trace.QuietFormatter())
468
1413
handler.setLevel(logging.INFO)
469
1414
logger = logging.getLogger('')
470
1415
logger.addHandler(handler)
1416
old_ui_factory = ui.ui_factory
1417
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1420
if working_dir is not None:
1421
cwd = osutils.getcwd()
1422
os.chdir(working_dir)
472
result = self.apply_redirected(None, stdout, stderr,
473
bzrlib.commands.run_bzr_catch_errors,
1425
result = self.apply_redirected(ui.ui_factory.stdin,
1427
bzrlib.commands.run_bzr_catch_user_errors,
476
1430
logger.removeHandler(handler)
1431
ui.ui_factory = old_ui_factory
477
1435
out = stdout.getvalue()
478
1436
err = stderr.getvalue()
480
self.log('output:\n%s', out)
1438
self.log('output:\n%r', out)
482
self.log('errors:\n%s', err)
1440
self.log('errors:\n%r', err)
483
1441
if retcode is not None:
484
self.assertEquals(result, retcode)
1442
self.assertEquals(retcode, result,
1443
message='Unexpected return code')
487
def run_bzr(self, *args, **kwargs):
1446
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1447
working_dir=None, error_regexes=[], output_encoding=None):
488
1448
"""Invoke bzr, as if it were run from the command line.
1450
The argument list should not include the bzr program name - the
1451
first argument is normally the bzr command. Arguments may be
1452
passed in three ways:
1454
1- A list of strings, eg ["commit", "a"]. This is recommended
1455
when the command contains whitespace or metacharacters, or
1456
is built up at run time.
1458
2- A single string, eg "add a". This is the most convenient
1459
for hardcoded commands.
1461
This runs bzr through the interface that catches and reports
1462
errors, and with logging set to something approximating the
1463
default, so that error reporting can be checked.
490
1465
This should be the main method for tests that want to exercise the
491
1466
overall behavior of the bzr application (rather than a unit test
492
1467
or a functional test of the library.)
494
1469
This sends the stdout/stderr results into the test's log,
495
1470
where it may be useful for debugging. See also run_captured.
497
retcode = kwargs.pop('retcode', 0)
498
return self.run_bzr_captured(args, retcode)
1472
:keyword stdin: A string to be used as stdin for the command.
1473
:keyword retcode: The status code the command should return;
1475
:keyword working_dir: The directory to run the command in
1476
:keyword error_regexes: A list of expected error messages. If
1477
specified they must be seen in the error output of the command.
1479
out, err = self._run_bzr_autosplit(
1484
working_dir=working_dir,
1486
for regex in error_regexes:
1487
self.assertContainsRe(err, regex)
1490
def run_bzr_error(self, error_regexes, *args, **kwargs):
1491
"""Run bzr, and check that stderr contains the supplied regexes
1493
:param error_regexes: Sequence of regular expressions which
1494
must each be found in the error output. The relative ordering
1496
:param args: command-line arguments for bzr
1497
:param kwargs: Keyword arguments which are interpreted by run_bzr
1498
This function changes the default value of retcode to be 3,
1499
since in most cases this is run when you expect bzr to fail.
1501
:return: (out, err) The actual output of running the command (in case
1502
you want to do more inspection)
1506
# Make sure that commit is failing because there is nothing to do
1507
self.run_bzr_error(['no changes to commit'],
1508
['commit', '-m', 'my commit comment'])
1509
# Make sure --strict is handling an unknown file, rather than
1510
# giving us the 'nothing to do' error
1511
self.build_tree(['unknown'])
1512
self.run_bzr_error(['Commit refused because there are unknown files'],
1513
['commit', --strict', '-m', 'my commit comment'])
1515
kwargs.setdefault('retcode', 3)
1516
kwargs['error_regexes'] = error_regexes
1517
out, err = self.run_bzr(*args, **kwargs)
1520
def run_bzr_subprocess(self, *args, **kwargs):
1521
"""Run bzr in a subprocess for testing.
1523
This starts a new Python interpreter and runs bzr in there.
1524
This should only be used for tests that have a justifiable need for
1525
this isolation: e.g. they are testing startup time, or signal
1526
handling, or early startup code, etc. Subprocess code can't be
1527
profiled or debugged so easily.
1529
:keyword retcode: The status code that is expected. Defaults to 0. If
1530
None is supplied, the status code is not checked.
1531
:keyword env_changes: A dictionary which lists changes to environment
1532
variables. A value of None will unset the env variable.
1533
The values must be strings. The change will only occur in the
1534
child, so you don't need to fix the environment after running.
1535
:keyword universal_newlines: Convert CRLF => LF
1536
:keyword allow_plugins: By default the subprocess is run with
1537
--no-plugins to ensure test reproducibility. Also, it is possible
1538
for system-wide plugins to create unexpected output on stderr,
1539
which can cause unnecessary test failures.
1541
env_changes = kwargs.get('env_changes', {})
1542
working_dir = kwargs.get('working_dir', None)
1543
allow_plugins = kwargs.get('allow_plugins', False)
1545
if isinstance(args[0], list):
1547
elif isinstance(args[0], basestring):
1548
args = list(shlex.split(args[0]))
1550
raise ValueError("passing varargs to run_bzr_subprocess")
1551
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1552
working_dir=working_dir,
1553
allow_plugins=allow_plugins)
1554
# We distinguish between retcode=None and retcode not passed.
1555
supplied_retcode = kwargs.get('retcode', 0)
1556
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1557
universal_newlines=kwargs.get('universal_newlines', False),
1560
def start_bzr_subprocess(self, process_args, env_changes=None,
1561
skip_if_plan_to_signal=False,
1563
allow_plugins=False):
1564
"""Start bzr in a subprocess for testing.
1566
This starts a new Python interpreter and runs bzr in there.
1567
This should only be used for tests that have a justifiable need for
1568
this isolation: e.g. they are testing startup time, or signal
1569
handling, or early startup code, etc. Subprocess code can't be
1570
profiled or debugged so easily.
1572
:param process_args: a list of arguments to pass to the bzr executable,
1573
for example ``['--version']``.
1574
:param env_changes: A dictionary which lists changes to environment
1575
variables. A value of None will unset the env variable.
1576
The values must be strings. The change will only occur in the
1577
child, so you don't need to fix the environment after running.
1578
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1580
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1582
:returns: Popen object for the started process.
1584
if skip_if_plan_to_signal:
1585
if not getattr(os, 'kill', None):
1586
raise TestSkipped("os.kill not available.")
1588
if env_changes is None:
1592
def cleanup_environment():
1593
for env_var, value in env_changes.iteritems():
1594
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1596
def restore_environment():
1597
for env_var, value in old_env.iteritems():
1598
osutils.set_or_unset_env(env_var, value)
1600
bzr_path = self.get_bzr_path()
1603
if working_dir is not None:
1604
cwd = osutils.getcwd()
1605
os.chdir(working_dir)
1608
# win32 subprocess doesn't support preexec_fn
1609
# so we will avoid using it on all platforms, just to
1610
# make sure the code path is used, and we don't break on win32
1611
cleanup_environment()
1612
command = [sys.executable]
1613
# frozen executables don't need the path to bzr
1614
if getattr(sys, "frozen", None) is None:
1615
command.append(bzr_path)
1616
if not allow_plugins:
1617
command.append('--no-plugins')
1618
command.extend(process_args)
1619
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1621
restore_environment()
1627
def _popen(self, *args, **kwargs):
1628
"""Place a call to Popen.
1630
Allows tests to override this method to intercept the calls made to
1631
Popen for introspection.
1633
return Popen(*args, **kwargs)
1635
def get_bzr_path(self):
1636
"""Return the path of the 'bzr' executable for this test suite."""
1637
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1638
if not os.path.isfile(bzr_path):
1639
# We are probably installed. Assume sys.argv is the right file
1640
bzr_path = sys.argv[0]
1643
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1644
universal_newlines=False, process_args=None):
1645
"""Finish the execution of process.
1647
:param process: the Popen object returned from start_bzr_subprocess.
1648
:param retcode: The status code that is expected. Defaults to 0. If
1649
None is supplied, the status code is not checked.
1650
:param send_signal: an optional signal to send to the process.
1651
:param universal_newlines: Convert CRLF => LF
1652
:returns: (stdout, stderr)
1654
if send_signal is not None:
1655
os.kill(process.pid, send_signal)
1656
out, err = process.communicate()
1658
if universal_newlines:
1659
out = out.replace('\r\n', '\n')
1660
err = err.replace('\r\n', '\n')
1662
if retcode is not None and retcode != process.returncode:
1663
if process_args is None:
1664
process_args = "(unknown args)"
1665
mutter('Output of bzr %s:\n%s', process_args, out)
1666
mutter('Error for bzr %s:\n%s', process_args, err)
1667
self.fail('Command bzr %s failed with retcode %s != %s'
1668
% (process_args, retcode, process.returncode))
500
1671
def check_inventory_shape(self, inv, shape):
501
1672
"""Compare an inventory to a list of expected names.
549
1720
sys.stderr = real_stderr
550
1721
sys.stdin = real_stdin
553
BzrTestBase = TestCase
1723
def reduceLockdirTimeout(self):
1724
"""Reduce the default lock timeout for the duration of the test, so that
1725
if LockContention occurs during a test, it does so quickly.
1727
Tests that expect to provoke LockContention errors should call this.
1729
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1731
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1732
self.addCleanup(resetTimeout)
1733
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1735
def make_utf8_encoded_stringio(self, encoding_type=None):
1736
"""Return a StringIOWrapper instance, that will encode Unicode
1739
if encoding_type is None:
1740
encoding_type = 'strict'
1742
output_encoding = 'utf-8'
1743
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1744
sio.encoding = output_encoding
1748
class TestCaseWithMemoryTransport(TestCase):
1749
"""Common test class for tests that do not need disk resources.
1751
Tests that need disk resources should derive from TestCaseWithTransport.
1753
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1755
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1756
a directory which does not exist. This serves to help ensure test isolation
1757
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1758
must exist. However, TestCaseWithMemoryTransport does not offer local
1759
file defaults for the transport in tests, nor does it obey the command line
1760
override, so tests that accidentally write to the common directory should
1763
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1764
a .bzr directory that stops us ascending higher into the filesystem.
1770
def __init__(self, methodName='runTest'):
1771
# allow test parameterization after test construction and before test
1772
# execution. Variables that the parameterizer sets need to be
1773
# ones that are not set by setUp, or setUp will trash them.
1774
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1775
self.vfs_transport_factory = default_transport
1776
self.transport_server = None
1777
self.transport_readonly_server = None
1778
self.__vfs_server = None
1780
def get_transport(self, relpath=None):
1781
"""Return a writeable transport.
1783
This transport is for the test scratch space relative to
1786
:param relpath: a path relative to the base url.
1788
t = get_transport(self.get_url(relpath))
1789
self.assertFalse(t.is_readonly())
1792
def get_readonly_transport(self, relpath=None):
1793
"""Return a readonly transport for the test scratch space
1795
This can be used to test that operations which should only need
1796
readonly access in fact do not try to write.
1798
:param relpath: a path relative to the base url.
1800
t = get_transport(self.get_readonly_url(relpath))
1801
self.assertTrue(t.is_readonly())
1804
def create_transport_readonly_server(self):
1805
"""Create a transport server from class defined at init.
1807
This is mostly a hook for daughter classes.
1809
return self.transport_readonly_server()
1811
def get_readonly_server(self):
1812
"""Get the server instance for the readonly transport
1814
This is useful for some tests with specific servers to do diagnostics.
1816
if self.__readonly_server is None:
1817
if self.transport_readonly_server is None:
1818
# readonly decorator requested
1819
# bring up the server
1820
self.__readonly_server = ReadonlyServer()
1821
self.__readonly_server.setUp(self.get_vfs_only_server())
1823
self.__readonly_server = self.create_transport_readonly_server()
1824
self.__readonly_server.setUp(self.get_vfs_only_server())
1825
self.addCleanup(self.__readonly_server.tearDown)
1826
return self.__readonly_server
1828
def get_readonly_url(self, relpath=None):
1829
"""Get a URL for the readonly transport.
1831
This will either be backed by '.' or a decorator to the transport
1832
used by self.get_url()
1833
relpath provides for clients to get a path relative to the base url.
1834
These should only be downwards relative, not upwards.
1836
base = self.get_readonly_server().get_url()
1837
return self._adjust_url(base, relpath)
1839
def get_vfs_only_server(self):
1840
"""Get the vfs only read/write server instance.
1842
This is useful for some tests with specific servers that need
1845
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1846
is no means to override it.
1848
if self.__vfs_server is None:
1849
self.__vfs_server = MemoryServer()
1850
self.__vfs_server.setUp()
1851
self.addCleanup(self.__vfs_server.tearDown)
1852
return self.__vfs_server
1854
def get_server(self):
1855
"""Get the read/write server instance.
1857
This is useful for some tests with specific servers that need
1860
This is built from the self.transport_server factory. If that is None,
1861
then the self.get_vfs_server is returned.
1863
if self.__server is None:
1864
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1865
return self.get_vfs_only_server()
1867
# bring up a decorated means of access to the vfs only server.
1868
self.__server = self.transport_server()
1870
self.__server.setUp(self.get_vfs_only_server())
1871
except TypeError, e:
1872
# This should never happen; the try:Except here is to assist
1873
# developers having to update code rather than seeing an
1874
# uninformative TypeError.
1875
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1876
self.addCleanup(self.__server.tearDown)
1877
return self.__server
1879
def _adjust_url(self, base, relpath):
1880
"""Get a URL (or maybe a path) for the readwrite transport.
1882
This will either be backed by '.' or to an equivalent non-file based
1884
relpath provides for clients to get a path relative to the base url.
1885
These should only be downwards relative, not upwards.
1887
if relpath is not None and relpath != '.':
1888
if not base.endswith('/'):
1890
# XXX: Really base should be a url; we did after all call
1891
# get_url()! But sometimes it's just a path (from
1892
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1893
# to a non-escaped local path.
1894
if base.startswith('./') or base.startswith('/'):
1897
base += urlutils.escape(relpath)
1900
def get_url(self, relpath=None):
1901
"""Get a URL (or maybe a path) for the readwrite transport.
1903
This will either be backed by '.' or to an equivalent non-file based
1905
relpath provides for clients to get a path relative to the base url.
1906
These should only be downwards relative, not upwards.
1908
base = self.get_server().get_url()
1909
return self._adjust_url(base, relpath)
1911
def get_vfs_only_url(self, relpath=None):
1912
"""Get a URL (or maybe a path for the plain old vfs transport.
1914
This will never be a smart protocol. It always has all the
1915
capabilities of the local filesystem, but it might actually be a
1916
MemoryTransport or some other similar virtual filesystem.
1918
This is the backing transport (if any) of the server returned by
1919
get_url and get_readonly_url.
1921
:param relpath: provides for clients to get a path relative to the base
1922
url. These should only be downwards relative, not upwards.
1925
base = self.get_vfs_only_server().get_url()
1926
return self._adjust_url(base, relpath)
1928
def _create_safety_net(self):
1929
"""Make a fake bzr directory.
1931
This prevents any tests propagating up onto the TEST_ROOT directory's
1934
root = TestCaseWithMemoryTransport.TEST_ROOT
1935
bzrdir.BzrDir.create_standalone_workingtree(root)
1937
def _check_safety_net(self):
1938
"""Check that the safety .bzr directory have not been touched.
1940
_make_test_root have created a .bzr directory to prevent tests from
1941
propagating. This method ensures than a test did not leaked.
1943
root = TestCaseWithMemoryTransport.TEST_ROOT
1944
wt = workingtree.WorkingTree.open(root)
1945
last_rev = wt.last_revision()
1946
if last_rev != 'null:':
1947
# The current test have modified the /bzr directory, we need to
1948
# recreate a new one or all the followng tests will fail.
1949
# If you need to inspect its content uncomment the following line
1950
# import pdb; pdb.set_trace()
1951
_rmtree_temp_dir(root + '/.bzr')
1952
self._create_safety_net()
1953
raise AssertionError('%s/.bzr should not be modified' % root)
1955
def _make_test_root(self):
1956
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1957
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1958
TestCaseWithMemoryTransport.TEST_ROOT = root
1960
self._create_safety_net()
1962
# The same directory is used by all tests, and we're not
1963
# specifically told when all tests are finished. This will do.
1964
atexit.register(_rmtree_temp_dir, root)
1966
self.addCleanup(self._check_safety_net)
1968
def makeAndChdirToTestDir(self):
1969
"""Create a temporary directories for this one test.
1971
This must set self.test_home_dir and self.test_dir and chdir to
1974
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1976
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1977
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1978
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1980
def make_branch(self, relpath, format=None):
1981
"""Create a branch on the transport at relpath."""
1982
repo = self.make_repository(relpath, format=format)
1983
return repo.bzrdir.create_branch()
1985
def make_bzrdir(self, relpath, format=None):
1987
# might be a relative or absolute path
1988
maybe_a_url = self.get_url(relpath)
1989
segments = maybe_a_url.rsplit('/', 1)
1990
t = get_transport(maybe_a_url)
1991
if len(segments) > 1 and segments[-1] not in ('', '.'):
1995
if isinstance(format, basestring):
1996
format = bzrdir.format_registry.make_bzrdir(format)
1997
return format.initialize_on_transport(t)
1998
except errors.UninitializableFormat:
1999
raise TestSkipped("Format %s is not initializable." % format)
2001
def make_repository(self, relpath, shared=False, format=None):
2002
"""Create a repository on our default transport at relpath.
2004
Note that relpath must be a relative path, not a full url.
2006
# FIXME: If you create a remoterepository this returns the underlying
2007
# real format, which is incorrect. Actually we should make sure that
2008
# RemoteBzrDir returns a RemoteRepository.
2009
# maybe mbp 20070410
2010
made_control = self.make_bzrdir(relpath, format=format)
2011
return made_control.create_repository(shared=shared)
2013
def make_branch_and_memory_tree(self, relpath, format=None):
2014
"""Create a branch on the default transport and a MemoryTree for it."""
2015
b = self.make_branch(relpath, format=format)
2016
return memorytree.MemoryTree.create_on_branch(b)
2018
def make_branch_builder(self, relpath, format=None):
2019
url = self.get_url(relpath)
2020
tran = get_transport(url)
2021
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2023
def overrideEnvironmentForTesting(self):
2024
os.environ['HOME'] = self.test_home_dir
2025
os.environ['BZR_HOME'] = self.test_home_dir
2028
super(TestCaseWithMemoryTransport, self).setUp()
2029
self._make_test_root()
2030
_currentdir = os.getcwdu()
2031
def _leaveDirectory():
2032
os.chdir(_currentdir)
2033
self.addCleanup(_leaveDirectory)
2034
self.makeAndChdirToTestDir()
2035
self.overrideEnvironmentForTesting()
2036
self.__readonly_server = None
2037
self.__server = None
2038
self.reduceLockdirTimeout()
556
class TestCaseInTempDir(TestCase):
2041
class TestCaseInTempDir(TestCaseWithMemoryTransport):
557
2042
"""Derived class that runs a test within a temporary directory.
559
2044
This is useful for tests that need to create a branch, etc.
849
2284
def setUp(self):
850
2285
super(ChrootedTestCase, self).setUp()
851
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
852
self.transport_readonly_server = bzrlib.transport.http.HttpServer
2286
if not self.vfs_transport_factory == MemoryServer:
2287
self.transport_readonly_server = HttpServer
2290
def condition_id_re(pattern):
2291
"""Create a condition filter which performs a re check on a test's id.
2293
:param pattern: A regular expression string.
2294
:return: A callable that returns True if the re matches.
2296
filter_re = re.compile(pattern)
2297
def condition(test):
2299
return filter_re.search(test_id)
2303
def condition_isinstance(klass_or_klass_list):
2304
"""Create a condition filter which returns isinstance(param, klass).
2306
:return: A callable which when called with one parameter obj return the
2307
result of isinstance(obj, klass_or_klass_list).
2310
return isinstance(obj, klass_or_klass_list)
2314
def condition_id_in_list(id_list):
2315
"""Create a condition filter which verify that test's id in a list.
2317
:param id_list: A TestIdList object.
2318
:return: A callable that returns True if the test's id appears in the list.
2320
def condition(test):
2321
return id_list.includes(test.id())
2325
def condition_id_startswith(starts):
2326
"""Create a condition filter verifying that test's id starts with a string.
2328
:param starts: A list of string.
2329
:return: A callable that returns True if the test's id starts with one of
2332
def condition(test):
2333
for start in starts:
2334
if test.id().startswith(start):
2340
def exclude_tests_by_condition(suite, condition):
2341
"""Create a test suite which excludes some tests from suite.
2343
:param suite: The suite to get tests from.
2344
:param condition: A callable whose result evaluates True when called with a
2345
test case which should be excluded from the result.
2346
:return: A suite which contains the tests found in suite that fail
2350
for test in iter_suite_tests(suite):
2351
if not condition(test):
2353
return TestUtil.TestSuite(result)
2356
def filter_suite_by_condition(suite, condition):
2357
"""Create a test suite by filtering another one.
2359
:param suite: The source suite.
2360
:param condition: A callable whose result evaluates True when called with a
2361
test case which should be included in the result.
2362
:return: A suite which contains the tests found in suite that pass
2366
for test in iter_suite_tests(suite):
2369
return TestUtil.TestSuite(result)
855
2372
def filter_suite_by_re(suite, pattern):
857
filter_re = re.compile(pattern)
2373
"""Create a test suite by filtering another one.
2375
:param suite: the source suite
2376
:param pattern: pattern that names must match
2377
:returns: the newly created suite
2379
condition = condition_id_re(pattern)
2380
result_suite = filter_suite_by_condition(suite, condition)
2384
def filter_suite_by_id_list(suite, test_id_list):
2385
"""Create a test suite by filtering another one.
2387
:param suite: The source suite.
2388
:param test_id_list: A list of the test ids to keep as strings.
2389
:returns: the newly created suite
2391
condition = condition_id_in_list(test_id_list)
2392
result_suite = filter_suite_by_condition(suite, condition)
2396
def filter_suite_by_id_startswith(suite, start):
2397
"""Create a test suite by filtering another one.
2399
:param suite: The source suite.
2400
:param start: A list of string the test id must start with one of.
2401
:returns: the newly created suite
2403
condition = condition_id_startswith(start)
2404
result_suite = filter_suite_by_condition(suite, condition)
2408
def exclude_tests_by_re(suite, pattern):
2409
"""Create a test suite which excludes some tests from suite.
2411
:param suite: The suite to get tests from.
2412
:param pattern: A regular expression string. Test ids that match this
2413
pattern will be excluded from the result.
2414
:return: A TestSuite that contains all the tests from suite without the
2415
tests that matched pattern. The order of tests is the same as it was in
2418
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2421
def preserve_input(something):
2422
"""A helper for performing test suite transformation chains.
2424
:param something: Anything you want to preserve.
2430
def randomize_suite(suite):
2431
"""Return a new TestSuite with suite's tests in random order.
2433
The tests in the input suite are flattened into a single suite in order to
2434
accomplish this. Any nested TestSuites are removed to provide global
2437
tests = list(iter_suite_tests(suite))
2438
random.shuffle(tests)
2439
return TestUtil.TestSuite(tests)
2442
def split_suite_by_condition(suite, condition):
2443
"""Split a test suite into two by a condition.
2445
:param suite: The suite to split.
2446
:param condition: The condition to match on. Tests that match this
2447
condition are returned in the first test suite, ones that do not match
2448
are in the second suite.
2449
:return: A tuple of two test suites, where the first contains tests from
2450
suite matching the condition, and the second contains the remainder
2451
from suite. The order within each output suite is the same as it was in
858
2456
for test in iter_suite_tests(suite):
859
if filter_re.search(test.id()):
2458
matched.append(test)
2460
did_not_match.append(test)
2461
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2464
def split_suite_by_re(suite, pattern):
2465
"""Split a test suite into two by a regular expression.
2467
:param suite: The suite to split.
2468
:param pattern: A regular expression string. Test ids that match this
2469
pattern will be in the first test suite returned, and the others in the
2470
second test suite returned.
2471
:return: A tuple of two test suites, where the first contains tests from
2472
suite matching pattern, and the second contains the remainder from
2473
suite. The order within each output suite is the same as it was in
2476
return split_suite_by_condition(suite, condition_id_re(pattern))
864
2479
def run_suite(suite, name='test', verbose=False, pattern=".*",
865
stop_on_failure=False, keep_output=False,
867
TestCaseInTempDir._TEST_NAME = name
2480
stop_on_failure=False,
2481
transport=None, lsprof_timed=None, bench_history=None,
2482
matching_tests_first=None,
2485
exclude_pattern=None,
2487
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
872
2492
runner = TextTestRunner(stream=sys.stdout,
2494
verbosity=verbosity,
2495
bench_history=bench_history,
2496
list_only=list_only,
875
2498
runner.stop_on_failure=stop_on_failure
877
suite = filter_suite_by_re(suite, pattern)
2499
# Initialise the random number generator and display the seed used.
2500
# We convert the seed to a long to make it reuseable across invocations.
2501
random_order = False
2502
if random_seed is not None:
2504
if random_seed == "now":
2505
random_seed = long(time.time())
2507
# Convert the seed to a long if we can
2509
random_seed = long(random_seed)
2512
runner.stream.writeln("Randomizing test order using seed %s\n" %
2514
random.seed(random_seed)
2515
# Customise the list of tests if requested
2516
if exclude_pattern is not None:
2517
suite = exclude_tests_by_re(suite, exclude_pattern)
2519
order_changer = randomize_suite
2521
order_changer = preserve_input
2522
if pattern != '.*' or random_order:
2523
if matching_tests_first:
2524
suites = map(order_changer, split_suite_by_re(suite, pattern))
2525
suite = TestUtil.TestSuite(suites)
2527
suite = order_changer(filter_suite_by_re(suite, pattern))
878
2529
result = runner.run(suite)
879
# This is still a little bogus,
880
# but only a little. Folk not using our testrunner will
881
# have to delete their temp directories themselves.
882
test_root = TestCaseInTempDir.TEST_ROOT
883
if result.wasSuccessful() or not keep_output:
884
if test_root is not None:
885
print 'Deleting test root %s...' % test_root
887
shutil.rmtree(test_root)
891
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
2532
return result.wasStrictlySuccessful()
892
2534
return result.wasSuccessful()
2537
# Controlled by "bzr selftest -E=..." option
2538
selftest_debug_flags = set()
895
2541
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2543
test_suite_factory=None,
2546
matching_tests_first=None,
2549
exclude_pattern=None,
898
2555
"""Run the whole test suite under the enhanced runner"""
2556
# XXX: Very ugly way to do this...
2557
# Disable warning about old formats because we don't want it to disturb
2558
# any blackbox tests.
2559
from bzrlib import repository
2560
repository._deprecation_warning_done = True
899
2562
global default_transport
900
2563
if transport is None:
901
2564
transport = default_transport
902
2565
old_transport = default_transport
903
2566
default_transport = transport
2567
global selftest_debug_flags
2568
old_debug_flags = selftest_debug_flags
2569
if debug_flags is not None:
2570
selftest_debug_flags = set(debug_flags)
2572
if load_list is None:
2575
keep_only = load_test_id_list(load_list)
2576
if test_suite_factory is None:
2577
suite = test_suite(keep_only, starting_with)
2579
suite = test_suite_factory()
906
2580
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
907
stop_on_failure=stop_on_failure, keep_output=keep_output,
2581
stop_on_failure=stop_on_failure,
2582
transport=transport,
2583
lsprof_timed=lsprof_timed,
2584
bench_history=bench_history,
2585
matching_tests_first=matching_tests_first,
2586
list_only=list_only,
2587
random_seed=random_seed,
2588
exclude_pattern=exclude_pattern,
910
2591
default_transport = old_transport
915
"""Build and return TestSuite for the whole program."""
916
from doctest import DocTestSuite
918
global MODULES_TO_DOCTEST
2592
selftest_debug_flags = old_debug_flags
2595
def load_test_id_list(file_name):
2596
"""Load a test id list from a text file.
2598
The format is one test id by line. No special care is taken to impose
2599
strict rules, these test ids are used to filter the test suite so a test id
2600
that do not match an existing test will do no harm. This allows user to add
2601
comments, leave blank lines, etc.
2605
ftest = open(file_name, 'rt')
2607
if e.errno != errno.ENOENT:
2610
raise errors.NoSuchFile(file_name)
2612
for test_name in ftest.readlines():
2613
test_list.append(test_name.strip())
2618
def suite_matches_id_list(test_suite, id_list):
2619
"""Warns about tests not appearing or appearing more than once.
2621
:param test_suite: A TestSuite object.
2622
:param test_id_list: The list of test ids that should be found in
2625
:return: (absents, duplicates) absents is a list containing the test found
2626
in id_list but not in test_suite, duplicates is a list containing the
2627
test found multiple times in test_suite.
2629
When using a prefined test id list, it may occurs that some tests do not
2630
exist anymore or that some tests use the same id. This function warns the
2631
tester about potential problems in his workflow (test lists are volatile)
2632
or in the test suite itself (using the same id for several tests does not
2633
help to localize defects).
2635
# Build a dict counting id occurrences
2637
for test in iter_suite_tests(test_suite):
2639
tests[id] = tests.get(id, 0) + 1
2644
occurs = tests.get(id, 0)
2646
not_found.append(id)
2648
duplicates.append(id)
2650
return not_found, duplicates
2653
class TestIdList(object):
2654
"""Test id list to filter a test suite.
2656
Relying on the assumption that test ids are built as:
2657
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2658
notation, this class offers methods to :
2659
- avoid building a test suite for modules not refered to in the test list,
2660
- keep only the tests listed from the module test suite.
2663
def __init__(self, test_id_list):
2664
# When a test suite needs to be filtered against us we compare test ids
2665
# for equality, so a simple dict offers a quick and simple solution.
2666
self.tests = dict().fromkeys(test_id_list, True)
2668
# While unittest.TestCase have ids like:
2669
# <module>.<class>.<method>[(<param+)],
2670
# doctest.DocTestCase can have ids like:
2673
# <module>.<function>
2674
# <module>.<class>.<method>
2676
# Since we can't predict a test class from its name only, we settle on
2677
# a simple constraint: a test id always begins with its module name.
2680
for test_id in test_id_list:
2681
parts = test_id.split('.')
2682
mod_name = parts.pop(0)
2683
modules[mod_name] = True
2685
mod_name += '.' + part
2686
modules[mod_name] = True
2687
self.modules = modules
2689
def refers_to(self, module_name):
2690
"""Is there tests for the module or one of its sub modules."""
2691
return self.modules.has_key(module_name)
2693
def includes(self, test_id):
2694
return self.tests.has_key(test_id)
2697
class TestPrefixAliasRegistry(registry.Registry):
2698
"""A registry for test prefix aliases.
2700
This helps implement shorcuts for the --starting-with selftest
2701
option. Overriding existing prefixes is not allowed but not fatal (a
2702
warning will be emitted).
2705
def register(self, key, obj, help=None, info=None,
2706
override_existing=False):
2707
"""See Registry.register.
2709
Trying to override an existing alias causes a warning to be emitted,
2710
not a fatal execption.
2713
super(TestPrefixAliasRegistry, self).register(
2714
key, obj, help=help, info=info, override_existing=False)
2716
actual = self.get(key)
2717
note('Test prefix alias %s is already used for %s, ignoring %s'
2718
% (key, actual, obj))
2720
def resolve_alias(self, id_start):
2721
"""Replace the alias by the prefix in the given string.
2723
Using an unknown prefix is an error to help catching typos.
2725
parts = id_start.split('.')
2727
parts[0] = self.get(parts[0])
2729
raise errors.BzrCommandError(
2730
'%s is not a known test prefix alias' % parts[0])
2731
return '.'.join(parts)
2734
test_prefix_alias_registry = TestPrefixAliasRegistry()
2735
"""Registry of test prefix aliases."""
2738
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2739
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2740
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2742
# Obvious higest levels prefixes, feel free to add your own via a plugin
2743
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2744
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2745
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2746
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2747
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2750
def test_suite(keep_only=None, starting_with=None):
2751
"""Build and return TestSuite for the whole of bzrlib.
2753
:param keep_only: A list of test ids limiting the suite returned.
2755
:param starting_with: An id limiting the suite returned to the tests
2758
This function can be replaced if you need to change the default test
2759
suite on a global basis, but it is not encouraged.
2763
'bzrlib.tests.blackbox',
2764
'bzrlib.tests.branch_implementations',
2765
'bzrlib.tests.bzrdir_implementations',
2766
'bzrlib.tests.commands',
2767
'bzrlib.tests.interrepository_implementations',
2768
'bzrlib.tests.intertree_implementations',
2769
'bzrlib.tests.inventory_implementations',
2770
'bzrlib.tests.per_lock',
2771
'bzrlib.tests.per_repository',
2772
'bzrlib.tests.per_repository_reference',
2773
'bzrlib.tests.test__dirstate_helpers',
2774
'bzrlib.tests.test__walkdirs_win32',
921
2775
'bzrlib.tests.test_ancestry',
922
2776
'bzrlib.tests.test_annotate',
923
2777
'bzrlib.tests.test_api',
2778
'bzrlib.tests.test_atomicfile',
924
2779
'bzrlib.tests.test_bad_files',
925
'bzrlib.tests.test_basis_inventory',
2780
'bzrlib.tests.test_bisect_multi',
926
2781
'bzrlib.tests.test_branch',
2782
'bzrlib.tests.test_branchbuilder',
2783
'bzrlib.tests.test_btree_index',
2784
'bzrlib.tests.test_bugtracker',
2785
'bzrlib.tests.test_bundle',
927
2786
'bzrlib.tests.test_bzrdir',
928
'bzrlib.tests.test_command',
2787
'bzrlib.tests.test_cache_utf8',
2788
'bzrlib.tests.test_chunk_writer',
2789
'bzrlib.tests.test_commands',
929
2790
'bzrlib.tests.test_commit',
930
2791
'bzrlib.tests.test_commit_merge',
931
2792
'bzrlib.tests.test_config',
932
2793
'bzrlib.tests.test_conflicts',
2794
'bzrlib.tests.test_counted_lock',
933
2795
'bzrlib.tests.test_decorators',
2796
'bzrlib.tests.test_delta',
2797
'bzrlib.tests.test_deprecated_graph',
934
2798
'bzrlib.tests.test_diff',
935
'bzrlib.tests.test_doc_generate',
2799
'bzrlib.tests.test_directory_service',
2800
'bzrlib.tests.test_dirstate',
2801
'bzrlib.tests.test_email_message',
936
2802
'bzrlib.tests.test_errors',
2803
'bzrlib.tests.test_extract',
937
2804
'bzrlib.tests.test_fetch',
2805
'bzrlib.tests.test_ftp_transport',
2806
'bzrlib.tests.test_generate_docs',
2807
'bzrlib.tests.test_generate_ids',
2808
'bzrlib.tests.test_globbing',
938
2809
'bzrlib.tests.test_gpg',
939
2810
'bzrlib.tests.test_graph',
940
2811
'bzrlib.tests.test_hashcache',
2812
'bzrlib.tests.test_help',
2813
'bzrlib.tests.test_hooks',
941
2814
'bzrlib.tests.test_http',
2815
'bzrlib.tests.test_http_implementations',
2816
'bzrlib.tests.test_http_response',
2817
'bzrlib.tests.test_https_ca_bundle',
942
2818
'bzrlib.tests.test_identitymap',
2819
'bzrlib.tests.test_ignores',
2820
'bzrlib.tests.test_index',
2821
'bzrlib.tests.test_info',
943
2822
'bzrlib.tests.test_inv',
944
2823
'bzrlib.tests.test_knit',
2824
'bzrlib.tests.test_lazy_import',
2825
'bzrlib.tests.test_lazy_regex',
2826
'bzrlib.tests.test_lockable_files',
945
2827
'bzrlib.tests.test_lockdir',
946
'bzrlib.tests.test_lockable_files',
947
2828
'bzrlib.tests.test_log',
2829
'bzrlib.tests.test_lru_cache',
2830
'bzrlib.tests.test_lsprof',
2831
'bzrlib.tests.test_mail_client',
2832
'bzrlib.tests.test_memorytree',
948
2833
'bzrlib.tests.test_merge',
949
2834
'bzrlib.tests.test_merge3',
950
2835
'bzrlib.tests.test_merge_core',
2836
'bzrlib.tests.test_merge_directive',
951
2837
'bzrlib.tests.test_missing',
952
2838
'bzrlib.tests.test_msgeditor',
2839
'bzrlib.tests.test_multiparent',
2840
'bzrlib.tests.test_mutabletree',
953
2841
'bzrlib.tests.test_nonascii',
954
2842
'bzrlib.tests.test_options',
955
2843
'bzrlib.tests.test_osutils',
2844
'bzrlib.tests.test_osutils_encodings',
2845
'bzrlib.tests.test_pack',
2846
'bzrlib.tests.test_pack_repository',
2847
'bzrlib.tests.test_patch',
2848
'bzrlib.tests.test_patches',
956
2849
'bzrlib.tests.test_permissions',
957
2850
'bzrlib.tests.test_plugins',
958
2851
'bzrlib.tests.test_progress',
2852
'bzrlib.tests.test_read_bundle',
959
2853
'bzrlib.tests.test_reconcile',
2854
'bzrlib.tests.test_reconfigure',
2855
'bzrlib.tests.test_registry',
2856
'bzrlib.tests.test_remote',
960
2857
'bzrlib.tests.test_repository',
2858
'bzrlib.tests.test_revert',
961
2859
'bzrlib.tests.test_revision',
962
'bzrlib.tests.test_revisionnamespaces',
963
'bzrlib.tests.test_revprops',
2860
'bzrlib.tests.test_revisionspec',
2861
'bzrlib.tests.test_revisiontree',
964
2862
'bzrlib.tests.test_rio',
2863
'bzrlib.tests.test_rules',
965
2864
'bzrlib.tests.test_sampler',
966
2865
'bzrlib.tests.test_selftest',
967
2866
'bzrlib.tests.test_setup',
968
2867
'bzrlib.tests.test_sftp_transport',
2868
'bzrlib.tests.test_smart',
969
2869
'bzrlib.tests.test_smart_add',
2870
'bzrlib.tests.test_smart_transport',
2871
'bzrlib.tests.test_smtp_connection',
970
2872
'bzrlib.tests.test_source',
2873
'bzrlib.tests.test_ssh_transport',
2874
'bzrlib.tests.test_status',
971
2875
'bzrlib.tests.test_store',
2876
'bzrlib.tests.test_strace',
2877
'bzrlib.tests.test_subsume',
2878
'bzrlib.tests.test_switch',
972
2879
'bzrlib.tests.test_symbol_versioning',
2880
'bzrlib.tests.test_tag',
973
2881
'bzrlib.tests.test_testament',
2882
'bzrlib.tests.test_textfile',
2883
'bzrlib.tests.test_textmerge',
2884
'bzrlib.tests.test_timestamp',
974
2885
'bzrlib.tests.test_trace',
975
2886
'bzrlib.tests.test_transactions',
976
2887
'bzrlib.tests.test_transform',
977
2888
'bzrlib.tests.test_transport',
2889
'bzrlib.tests.test_transport_implementations',
2890
'bzrlib.tests.test_transport_log',
2891
'bzrlib.tests.test_tree',
2892
'bzrlib.tests.test_treebuilder',
978
2893
'bzrlib.tests.test_tsort',
2894
'bzrlib.tests.test_tuned_gzip',
979
2895
'bzrlib.tests.test_ui',
980
2896
'bzrlib.tests.test_uncommit',
981
2897
'bzrlib.tests.test_upgrade',
2898
'bzrlib.tests.test_upgrade_stacked',
2899
'bzrlib.tests.test_urlutils',
2900
'bzrlib.tests.test_version',
2901
'bzrlib.tests.test_version_info',
982
2902
'bzrlib.tests.test_versionedfile',
983
2903
'bzrlib.tests.test_weave',
984
2904
'bzrlib.tests.test_whitebox',
2905
'bzrlib.tests.test_win32utils',
985
2906
'bzrlib.tests.test_workingtree',
2907
'bzrlib.tests.test_workingtree_4',
2908
'bzrlib.tests.test_wsgi',
986
2909
'bzrlib.tests.test_xml',
2910
'bzrlib.tests.tree_implementations',
2911
'bzrlib.tests.workingtree_implementations',
2912
'bzrlib.util.tests.test_bencode',
988
test_transport_implementations = [
989
'bzrlib.tests.test_transport_implementations']
991
TestCase.BZRPATH = osutils.pathjoin(
992
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
993
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
994
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
997
# python2.4's TestLoader.loadTestsFromNames gives very poor
998
# errors if it fails to load a named module - no indication of what's
999
# actually wrong, just "no such module". We should probably override that
1000
# class, but for the moment just load them ourselves. (mbp 20051202)
1001
loader = TestLoader()
1002
from bzrlib.transport import TransportTestProviderAdapter
1003
adapter = TransportTestProviderAdapter()
1004
adapt_modules(test_transport_implementations, adapter, loader, suite)
1005
for mod_name in testmod_names:
1006
mod = _load_module_by_name(mod_name)
1007
suite.addTest(loader.loadTestsFromModule(mod))
1008
for package in packages_to_test():
1009
suite.addTest(package.test_suite())
1010
for m in MODULES_TO_TEST:
1011
suite.addTest(loader.loadTestsFromModule(m))
1012
for m in (MODULES_TO_DOCTEST):
1013
suite.addTest(DocTestSuite(m))
1014
for name, plugin in bzrlib.plugin.all_plugins().items():
1015
if getattr(plugin, 'test_suite', None) is not None:
1016
suite.addTest(plugin.test_suite())
2915
loader = TestUtil.TestLoader()
2918
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2919
for start in starting_with]
2920
# We take precedence over keep_only because *at loading time* using
2921
# both options means we will load less tests for the same final result.
2922
def interesting_module(name):
2923
for start in starting_with:
2925
# Either the module name starts with the specified string
2926
name.startswith(start)
2927
# or it may contain tests starting with the specified string
2928
or start.startswith(name)
2932
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2934
elif keep_only is not None:
2935
id_filter = TestIdList(keep_only)
2936
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2937
def interesting_module(name):
2938
return id_filter.refers_to(name)
2941
loader = TestUtil.TestLoader()
2942
def interesting_module(name):
2943
# No filtering, all modules are interesting
2946
suite = loader.suiteClass()
2948
# modules building their suite with loadTestsFromModuleNames
2949
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2951
modules_to_doctest = [
2956
'bzrlib.iterablefile',
2961
'bzrlib.symbol_versioning',
2964
'bzrlib.version_info_formats.format_custom',
2967
for mod in modules_to_doctest:
2968
if not interesting_module(mod):
2969
# No tests to keep here, move along
2972
doc_suite = doctest.DocTestSuite(mod)
2973
except ValueError, e:
2974
print '**failed to get doctest for: %s\n%s' % (mod, e)
2976
suite.addTest(doc_suite)
2978
default_encoding = sys.getdefaultencoding()
2979
for name, plugin in bzrlib.plugin.plugins().items():
2980
if not interesting_module(plugin.module.__name__):
2982
plugin_suite = plugin.test_suite()
2983
# We used to catch ImportError here and turn it into just a warning,
2984
# but really if you don't have --no-plugins this should be a failure.
2985
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2986
if plugin_suite is None:
2987
plugin_suite = plugin.load_plugin_tests(loader)
2988
if plugin_suite is not None:
2989
suite.addTest(plugin_suite)
2990
if default_encoding != sys.getdefaultencoding():
2991
bzrlib.trace.warning(
2992
'Plugin "%s" tried to reset default encoding to: %s', name,
2993
sys.getdefaultencoding())
2995
sys.setdefaultencoding(default_encoding)
2998
suite = filter_suite_by_id_startswith(suite, starting_with)
3000
if keep_only is not None:
3001
# Now that the referred modules have loaded their tests, keep only the
3003
suite = filter_suite_by_id_list(suite, id_filter)
3004
# Do some sanity checks on the id_list filtering
3005
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3007
# The tester has used both keep_only and starting_with, so he is
3008
# already aware that some tests are excluded from the list, there
3009
# is no need to tell him which.
3012
# Some tests mentioned in the list are not in the test suite. The
3013
# list may be out of date, report to the tester.
3014
for id in not_found:
3015
bzrlib.trace.warning('"%s" not found in the test suite', id)
3016
for id in duplicates:
3017
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3022
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3023
"""Adapt all tests in some given modules to given scenarios.
3025
This is the recommended public interface for test parameterization.
3026
Typically the test_suite() method for a per-implementation test
3027
suite will call multiply_tests_from_modules and return the
3030
:param module_name_list: List of fully-qualified names of test
3032
:param scenario_iter: Iterable of pairs of (scenario_name,
3033
scenario_param_dict).
3034
:param loader: If provided, will be used instead of a new
3035
bzrlib.tests.TestLoader() instance.
3037
This returns a new TestSuite containing the cross product of
3038
all the tests in all the modules, each repeated for each scenario.
3039
Each test is adapted by adding the scenario name at the end
3040
of its name, and updating the test object's __dict__ with the
3041
scenario_param_dict.
3043
>>> r = multiply_tests_from_modules(
3044
... ['bzrlib.tests.test_sampler'],
3045
... [('one', dict(param=1)),
3046
... ('two', dict(param=2))])
3047
>>> tests = list(iter_suite_tests(r))
3051
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3057
# XXX: Isn't load_tests() a better way to provide the same functionality
3058
# without forcing a predefined TestScenarioApplier ? --vila 080215
3060
loader = TestUtil.TestLoader()
3062
suite = loader.suiteClass()
3064
adapter = TestScenarioApplier()
3065
adapter.scenarios = list(scenario_iter)
3066
adapt_modules(module_name_list, adapter, loader, suite)
3070
def multiply_scenarios(scenarios_left, scenarios_right):
3071
"""Multiply two sets of scenarios.
3073
:returns: the cartesian product of the two sets of scenarios, that is
3074
a scenario for every possible combination of a left scenario and a
3078
('%s,%s' % (left_name, right_name),
3079
dict(left_dict.items() + right_dict.items()))
3080
for left_name, left_dict in scenarios_left
3081
for right_name, right_dict in scenarios_right]
1020
3085
def adapt_modules(mods_list, adapter, loader, suite):
1021
3086
"""Adapt the modules in mods_list using adapter and add to suite."""
1022
for mod_name in mods_list:
1023
mod = _load_module_by_name(mod_name)
1024
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
1025
suite.addTests(adapter.adapt(test))
1028
def _load_module_by_name(mod_name):
1029
parts = mod_name.split('.')
1030
module = __import__(mod_name)
1032
# for historical reasons python returns the top-level module even though
1033
# it loads the submodule; we need to walk down to get the one we want.
1035
module = getattr(module, parts.pop(0))
3087
tests = loader.loadTestsFromModuleNames(mods_list)
3088
adapt_tests(tests, adapter, suite)
3091
def adapt_tests(tests_list, adapter, suite):
3092
"""Adapt the tests in tests_list using adapter and add to suite."""
3093
for test in iter_suite_tests(tests_list):
3094
suite.addTests(adapter.adapt(test))
3097
def _rmtree_temp_dir(dirname):
3098
# If LANG=C we probably have created some bogus paths
3099
# which rmtree(unicode) will fail to delete
3100
# so make sure we are using rmtree(str) to delete everything
3101
# except on win32, where rmtree(str) will fail
3102
# since it doesn't have the property of byte-stream paths
3103
# (they are either ascii or mbcs)
3104
if sys.platform == 'win32':
3105
# make sure we are using the unicode win32 api
3106
dirname = unicode(dirname)
3108
dirname = dirname.encode(sys.getfilesystemencoding())
3110
osutils.rmtree(dirname)
3112
if sys.platform == 'win32' and e.errno == errno.EACCES:
3113
sys.stderr.write(('Permission denied: '
3114
'unable to remove testing dir '
3115
'%s\n' % os.path.basename(dirname)))
3120
class Feature(object):
3121
"""An operating system Feature."""
3124
self._available = None
3126
def available(self):
3127
"""Is the feature available?
3129
:return: True if the feature is available.
3131
if self._available is None:
3132
self._available = self._probe()
3133
return self._available
3136
"""Implement this method in concrete features.
3138
:return: True if the feature is available.
3140
raise NotImplementedError
3143
if getattr(self, 'feature_name', None):
3144
return self.feature_name()
3145
return self.__class__.__name__
3148
class _SymlinkFeature(Feature):
3151
return osutils.has_symlinks()
3153
def feature_name(self):
3156
SymlinkFeature = _SymlinkFeature()
3159
class _HardlinkFeature(Feature):
3162
return osutils.has_hardlinks()
3164
def feature_name(self):
3167
HardlinkFeature = _HardlinkFeature()
3170
class _OsFifoFeature(Feature):
3173
return getattr(os, 'mkfifo', None)
3175
def feature_name(self):
3176
return 'filesystem fifos'
3178
OsFifoFeature = _OsFifoFeature()
3181
class _UnicodeFilenameFeature(Feature):
3182
"""Does the filesystem support Unicode filenames?"""
3186
# Check for character combinations unlikely to be covered by any
3187
# single non-unicode encoding. We use the characters
3188
# - greek small letter alpha (U+03B1) and
3189
# - braille pattern dots-123456 (U+283F).
3190
os.stat(u'\u03b1\u283f')
3191
except UnicodeEncodeError:
3193
except (IOError, OSError):
3194
# The filesystem allows the Unicode filename but the file doesn't
3198
# The filesystem allows the Unicode filename and the file exists,
3202
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3205
class TestScenarioApplier(object):
3206
"""A tool to apply scenarios to tests."""
3208
def adapt(self, test):
3209
"""Return a TestSuite containing a copy of test for each scenario."""
3210
result = unittest.TestSuite()
3211
for scenario in self.scenarios:
3212
result.addTest(self.adapt_test_to_scenario(test, scenario))
3215
def adapt_test_to_scenario(self, test, scenario):
3216
"""Copy test and apply scenario to it.
3218
:param test: A test to adapt.
3219
:param scenario: A tuple describing the scenarion.
3220
The first element of the tuple is the new test id.
3221
The second element is a dict containing attributes to set on the
3223
:return: The adapted test.
3225
from copy import deepcopy
3226
new_test = deepcopy(test)
3227
for name, value in scenario[1].items():
3228
setattr(new_test, name, value)
3229
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3230
new_test.id = lambda: new_id
3234
def probe_unicode_in_user_encoding():
3235
"""Try to encode several unicode strings to use in unicode-aware tests.
3236
Return first successfull match.
3238
:return: (unicode value, encoded plain string value) or (None, None)
3240
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3241
for uni_val in possible_vals:
3243
str_val = uni_val.encode(osutils.get_user_encoding())
3244
except UnicodeEncodeError:
3245
# Try a different character
3248
return uni_val, str_val
3252
def probe_bad_non_ascii(encoding):
3253
"""Try to find [bad] character with code [128..255]
3254
that cannot be decoded to unicode in some encoding.
3255
Return None if all non-ascii characters is valid
3258
for i in xrange(128, 256):
3261
char.decode(encoding)
3262
except UnicodeDecodeError:
3267
class _FTPServerFeature(Feature):
3268
"""Some tests want an FTP Server, check if one is available.
3270
Right now, the only way this is available is if 'medusa' is installed.
3271
http://www.amk.ca/python/code/medusa.html
3276
import bzrlib.tests.ftp_server
3281
def feature_name(self):
3284
FTPServerFeature = _FTPServerFeature()
3287
class _UnicodeFilename(Feature):
3288
"""Does the filesystem support Unicode filenames?"""
3293
except UnicodeEncodeError:
3295
except (IOError, OSError):
3296
# The filesystem allows the Unicode filename but the file doesn't
3300
# The filesystem allows the Unicode filename and the file exists,
3304
UnicodeFilename = _UnicodeFilename()
3307
class _UTF8Filesystem(Feature):
3308
"""Is the filesystem UTF-8?"""
3311
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3315
UTF8Filesystem = _UTF8Filesystem()
3318
class _CaseInsensitiveFilesystemFeature(Feature):
3319
"""Check if underlying filesystem is case-insensitive
3320
(e.g. on Windows, Cygwin, MacOS)
3324
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3325
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3326
TestCaseWithMemoryTransport.TEST_ROOT = root
3328
root = TestCaseWithMemoryTransport.TEST_ROOT
3329
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3331
name_a = osutils.pathjoin(tdir, 'a')
3332
name_A = osutils.pathjoin(tdir, 'A')
3334
result = osutils.isdir(name_A)
3335
_rmtree_temp_dir(tdir)
3338
def feature_name(self):
3339
return 'case-insensitive filesystem'
3341
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()