128
88
import bzrlib.tests.blackbox
129
89
import bzrlib.tests.branch_implementations
130
90
import bzrlib.tests.bzrdir_implementations
131
import bzrlib.tests.commands
132
91
import bzrlib.tests.interrepository_implementations
133
92
import bzrlib.tests.interversionedfile_implementations
134
import bzrlib.tests.intertree_implementations
135
import bzrlib.tests.per_lock
136
93
import bzrlib.tests.repository_implementations
137
94
import bzrlib.tests.revisionstore_implementations
138
import bzrlib.tests.tree_implementations
139
95
import bzrlib.tests.workingtree_implementations
142
98
bzrlib.tests.blackbox,
143
99
bzrlib.tests.branch_implementations,
144
100
bzrlib.tests.bzrdir_implementations,
145
bzrlib.tests.commands,
146
101
bzrlib.tests.interrepository_implementations,
147
102
bzrlib.tests.interversionedfile_implementations,
148
bzrlib.tests.intertree_implementations,
149
bzrlib.tests.per_lock,
150
103
bzrlib.tests.repository_implementations,
151
104
bzrlib.tests.revisionstore_implementations,
152
bzrlib.tests.tree_implementations,
153
105
bzrlib.tests.workingtree_implementations,
157
class ExtendedTestResult(unittest._TextTestResult):
158
"""Accepts, reports and accumulates the results of running tests.
109
class _MyResult(unittest._TextTestResult):
110
"""Custom TestResult.
160
Compared to this unittest version this class adds support for profiling,
161
benchmarking, stopping as soon as a test fails, and skipping tests.
162
There are further-specialized subclasses for different types of display.
112
Shows output in a different format, including displaying runtime for tests.
165
114
stop_early = False
167
def __init__(self, stream, descriptions, verbosity,
171
"""Construct new TestResult.
173
:param bench_history: Optionally, a writable file object to accumulate
176
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
177
if bench_history is not None:
178
from bzrlib.version import _get_bzr_source_tree
179
src_tree = _get_bzr_source_tree()
182
revision_id = src_tree.get_parent_ids()[0]
184
# XXX: if this is a brand new tree, do the same as if there
188
# XXX: If there's no branch, what should we do?
190
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
191
self._bench_history = bench_history
192
self.ui = ui.ui_factory
193
self.num_tests = num_tests
195
self.failure_count = 0
196
self.known_failure_count = 0
198
self.unsupported = {}
200
self._overall_start_time = time.time()
202
def extractBenchmarkTime(self, testCase):
203
"""Add a benchmark time for the current test case."""
204
self._benchmarkTime = getattr(testCase, "_benchtime", None)
206
def _elapsedTestTimeString(self):
207
"""Return a time string for the overall time the current test has taken."""
208
return self._formatTime(time.time() - self._start_time)
210
def _testTimeString(self):
211
if self._benchmarkTime is not None:
213
self._formatTime(self._benchmarkTime),
214
self._elapsedTestTimeString())
216
return " %s" % self._elapsedTestTimeString()
218
def _formatTime(self, seconds):
219
"""Format seconds as milliseconds with leading spaces."""
220
# some benchmarks can take thousands of seconds to run, so we need 8
222
return "%8dms" % (1000 * seconds)
224
def _shortened_test_description(self, test):
226
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
116
def _elapsedTime(self):
117
return "%5dms" % (1000 * (time.time() - self._start_time))
229
119
def startTest(self, test):
230
120
unittest.TestResult.startTest(self, test)
231
self.report_test_start(test)
232
test.number = self.count
233
self._recordTestStartTime()
235
def _recordTestStartTime(self):
236
"""Record that a test has started."""
121
# In a short description, the important words are in
122
# the beginning, but in an id, the important words are
124
SHOW_DESCRIPTIONS = False
126
width = osutils.terminal_width()
127
name_width = width - 15
129
if SHOW_DESCRIPTIONS:
130
what = test.shortDescription()
132
if len(what) > name_width:
133
what = what[:name_width-3] + '...'
136
if what.startswith('bzrlib.tests.'):
138
if len(what) > name_width:
139
what = '...' + what[3-name_width:]
140
what = what.ljust(name_width)
141
self.stream.write(what)
237
143
self._start_time = time.time()
239
def _cleanupLogFile(self, test):
240
# We can only do this if we have one of our TestCases, not if
242
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
243
if setKeepLogfile is not None:
246
145
def addError(self, test, err):
247
self.extractBenchmarkTime(test)
248
self._cleanupLogFile(test)
249
146
if isinstance(err[1], TestSkipped):
250
return self.addSkipped(test, err)
251
elif isinstance(err[1], UnavailableFeature):
252
return self.addNotSupported(test, err[1].args[0])
147
return self.addSkipped(test, err)
253
148
unittest.TestResult.addError(self, test, err)
254
self.error_count += 1
255
self.report_error(test, err)
150
self.stream.writeln("ERROR %s" % self._elapsedTime())
152
self.stream.write('E')
256
154
if self.stop_early:
259
157
def addFailure(self, test, err):
260
self._cleanupLogFile(test)
261
self.extractBenchmarkTime(test)
262
if isinstance(err[1], KnownFailure):
263
return self.addKnownFailure(test, err)
264
158
unittest.TestResult.addFailure(self, test, err)
265
self.failure_count += 1
266
self.report_failure(test, err)
160
self.stream.writeln(" FAIL %s" % self._elapsedTime())
162
self.stream.write('F')
267
164
if self.stop_early:
270
def addKnownFailure(self, test, err):
271
self.known_failure_count += 1
272
self.report_known_failure(test, err)
274
def addNotSupported(self, test, feature):
275
self.unsupported.setdefault(str(feature), 0)
276
self.unsupported[str(feature)] += 1
277
self.report_unsupported(test, feature)
279
167
def addSuccess(self, test):
280
self.extractBenchmarkTime(test)
281
if self._bench_history is not None:
282
if self._benchmarkTime is not None:
283
self._bench_history.write("%s %s\n" % (
284
self._formatTime(self._benchmarkTime),
286
self.report_success(test)
169
self.stream.writeln(' OK %s' % self._elapsedTime())
171
self.stream.write('~')
287
173
unittest.TestResult.addSuccess(self, test)
289
175
def addSkipped(self, test, skip_excinfo):
290
self.report_skip(test, skip_excinfo)
177
print >>self.stream, ' SKIP %s' % self._elapsedTime()
178
print >>self.stream, ' %s' % skip_excinfo[1]
180
self.stream.write('S')
291
182
# seems best to treat this as success from point-of-view of unittest
292
183
# -- it actually does nothing so it barely matters :)
295
except KeyboardInterrupt:
298
self.addError(test, test.__exc_info())
300
unittest.TestResult.addSuccess(self, test)
184
unittest.TestResult.addSuccess(self, test)
302
186
def printErrorList(self, flavour, errors):
303
187
for test, err in errors:
304
188
self.stream.writeln(self.separator1)
305
self.stream.write("%s: " % flavour)
306
self.stream.writeln(self.getDescription(test))
189
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
307
190
if getattr(test, '_get_log', None) is not None:
308
191
print >>self.stream
309
192
print >>self.stream, \
314
197
self.stream.writeln(self.separator2)
315
198
self.stream.writeln("%s" % err)
320
def report_cleaning_up(self):
323
def report_success(self, test):
327
class TextTestResult(ExtendedTestResult):
328
"""Displays progress and results of tests in text form"""
330
def __init__(self, stream, descriptions, verbosity,
335
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
336
bench_history, num_tests)
338
self.pb = self.ui.nested_progress_bar()
339
self._supplied_pb = False
342
self._supplied_pb = True
343
self.pb.show_pct = False
344
self.pb.show_spinner = False
345
self.pb.show_eta = False,
346
self.pb.show_count = False
347
self.pb.show_bar = False
349
def report_starting(self):
350
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
352
def _progress_prefix_text(self):
353
a = '[%d' % self.count
354
if self.num_tests is not None:
355
a +='/%d' % self.num_tests
356
a += ' in %ds' % (time.time() - self._overall_start_time)
358
a += ', %d errors' % self.error_count
359
if self.failure_count:
360
a += ', %d failed' % self.failure_count
361
if self.known_failure_count:
362
a += ', %d known failures' % self.known_failure_count
364
a += ', %d skipped' % self.skip_count
366
a += ', %d missing features' % len(self.unsupported)
370
def report_test_start(self, test):
373
self._progress_prefix_text()
375
+ self._shortened_test_description(test))
377
def _test_description(self, test):
378
return self._shortened_test_description(test)
380
def report_error(self, test, err):
381
self.pb.note('ERROR: %s\n %s\n',
382
self._test_description(test),
386
def report_failure(self, test, err):
387
self.pb.note('FAIL: %s\n %s\n',
388
self._test_description(test),
392
def report_known_failure(self, test, err):
393
self.pb.note('XFAIL: %s\n%s\n',
394
self._test_description(test), err[1])
396
def report_skip(self, test, skip_excinfo):
399
# at the moment these are mostly not things we can fix
400
# and so they just produce stipple; use the verbose reporter
403
# show test and reason for skip
404
self.pb.note('SKIP: %s\n %s\n',
405
self._shortened_test_description(test),
408
# since the class name was left behind in the still-visible
410
self.pb.note('SKIP: %s', skip_excinfo[1])
412
def report_unsupported(self, test, feature):
413
"""test cannot be run because feature is missing."""
415
def report_cleaning_up(self):
416
self.pb.update('cleaning up...')
419
if not self._supplied_pb:
423
class VerboseTestResult(ExtendedTestResult):
424
"""Produce long output, with one line per test run plus times"""
426
def _ellipsize_to_right(self, a_string, final_width):
427
"""Truncate and pad a string, keeping the right hand side"""
428
if len(a_string) > final_width:
429
result = '...' + a_string[3-final_width:]
432
return result.ljust(final_width)
434
def report_starting(self):
435
self.stream.write('running %d tests...\n' % self.num_tests)
437
def report_test_start(self, test):
439
name = self._shortened_test_description(test)
440
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
441
# numbers, plus a trailing blank
442
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
443
self.stream.write(self._ellipsize_to_right(name,
444
osutils.terminal_width()-30))
447
def _error_summary(self, err):
449
return '%s%s' % (indent, err[1])
451
def report_error(self, test, err):
452
self.stream.writeln('ERROR %s\n%s'
453
% (self._testTimeString(),
454
self._error_summary(err)))
456
def report_failure(self, test, err):
457
self.stream.writeln(' FAIL %s\n%s'
458
% (self._testTimeString(),
459
self._error_summary(err)))
461
def report_known_failure(self, test, err):
462
self.stream.writeln('XFAIL %s\n%s'
463
% (self._testTimeString(),
464
self._error_summary(err)))
466
def report_success(self, test):
467
self.stream.writeln(' OK %s' % self._testTimeString())
468
for bench_called, stats in getattr(test, '_benchcalls', []):
469
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
470
stats.pprint(file=self.stream)
471
# flush the stream so that we get smooth output. This verbose mode is
472
# used to show the output in PQM.
475
def report_skip(self, test, skip_excinfo):
477
self.stream.writeln(' SKIP %s\n%s'
478
% (self._testTimeString(),
479
self._error_summary(skip_excinfo)))
481
def report_unsupported(self, test, feature):
482
"""test cannot be run because feature is missing."""
483
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
484
%(self._testTimeString(), feature))
488
class TextTestRunner(object):
201
class TextTestRunner(unittest.TextTestRunner):
489
202
stop_on_failure = False
498
self.stream = unittest._WritelnDecorator(stream)
499
self.descriptions = descriptions
500
self.verbosity = verbosity
501
self._bench_history = bench_history
502
self.list_only = list_only
505
"Run the given test case or test suite."
506
startTime = time.time()
507
if self.verbosity == 1:
508
result_class = TextTestResult
509
elif self.verbosity >= 2:
510
result_class = VerboseTestResult
511
result = result_class(self.stream,
514
bench_history=self._bench_history,
515
num_tests=test.countTestCases(),
204
def _makeResult(self):
205
result = _MyResult(self.stream, self.descriptions, self.verbosity)
517
206
result.stop_early = self.stop_on_failure
518
result.report_starting()
520
if self.verbosity >= 2:
521
self.stream.writeln("Listing tests only ...\n")
523
for t in iter_suite_tests(test):
524
self.stream.writeln("%s" % (t.id()))
526
actionTaken = "Listed"
529
run = result.testsRun
531
stopTime = time.time()
532
timeTaken = stopTime - startTime
534
self.stream.writeln(result.separator2)
535
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
536
run, run != 1 and "s" or "", timeTaken))
537
self.stream.writeln()
538
if not result.wasSuccessful():
539
self.stream.write("FAILED (")
540
failed, errored = map(len, (result.failures, result.errors))
542
self.stream.write("failures=%d" % failed)
544
if failed: self.stream.write(", ")
545
self.stream.write("errors=%d" % errored)
546
if result.known_failure_count:
547
if failed or errored: self.stream.write(", ")
548
self.stream.write("known_failure_count=%d" %
549
result.known_failure_count)
550
self.stream.writeln(")")
552
if result.known_failure_count:
553
self.stream.writeln("OK (known_failures=%d)" %
554
result.known_failure_count)
556
self.stream.writeln("OK")
557
if result.skip_count > 0:
558
skipped = result.skip_count
559
self.stream.writeln('%d test%s skipped' %
560
(skipped, skipped != 1 and "s" or ""))
561
if result.unsupported:
562
for feature, count in sorted(result.unsupported.items()):
563
self.stream.writeln("Missing feature '%s' skipped %d tests." %
924
339
def assertIsInstance(self, obj, kls):
925
340
"""Fail if obj is not an instance of kls"""
926
341
if not isinstance(obj, kls):
927
self.fail("%r is an instance of %s rather than %s" % (
928
obj, obj.__class__, kls))
930
def expectFailure(self, reason, assertion, *args, **kwargs):
931
"""Invoke a test, expecting it to fail for the given reason.
933
This is for assertions that ought to succeed, but currently fail.
934
(The failure is *expected* but not *wanted*.) Please be very precise
935
about the failure you're expecting. If a new bug is introduced,
936
AssertionError should be raised, not KnownFailure.
938
Frequently, expectFailure should be followed by an opposite assertion.
941
Intended to be used with a callable that raises AssertionError as the
942
'assertion' parameter. args and kwargs are passed to the 'assertion'.
944
Raises KnownFailure if the test fails. Raises AssertionError if the
949
self.expectFailure('Math is broken', self.assertNotEqual, 54,
951
self.assertEqual(42, dynamic_val)
953
This means that a dynamic_val of 54 will cause the test to raise
954
a KnownFailure. Once math is fixed and the expectFailure is removed,
955
only a dynamic_val of 42 will allow the test to pass. Anything other
956
than 54 or 42 will cause an AssertionError.
959
assertion(*args, **kwargs)
960
except AssertionError:
961
raise KnownFailure(reason)
963
self.fail('Unexpected success. Should have failed: %s' % reason)
965
def _capture_warnings(self, a_callable, *args, **kwargs):
966
"""A helper for callDeprecated and applyDeprecated.
968
:param a_callable: A callable to call.
969
:param args: The positional arguments for the callable
970
:param kwargs: The keyword arguments for the callable
971
:return: A tuple (warnings, result). result is the result of calling
972
a_callable(``*args``, ``**kwargs``).
975
def capture_warnings(msg, cls=None, stacklevel=None):
976
# we've hooked into a deprecation specific callpath,
977
# only deprecations should getting sent via it.
978
self.assertEqual(cls, DeprecationWarning)
979
local_warnings.append(msg)
980
original_warning_method = symbol_versioning.warn
981
symbol_versioning.set_warning_method(capture_warnings)
983
result = a_callable(*args, **kwargs)
985
symbol_versioning.set_warning_method(original_warning_method)
986
return (local_warnings, result)
988
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
989
"""Call a deprecated callable without warning the user.
991
Note that this only captures warnings raised by symbol_versioning.warn,
992
not other callers that go direct to the warning module.
994
:param deprecation_format: The deprecation format that the callable
995
should have been deprecated with. This is the same type as the
996
parameter to deprecated_method/deprecated_function. If the
997
callable is not deprecated with this format, an assertion error
999
:param a_callable: A callable to call. This may be a bound method or
1000
a regular function. It will be called with ``*args`` and
1002
:param args: The positional arguments for the callable
1003
:param kwargs: The keyword arguments for the callable
1004
:return: The result of a_callable(``*args``, ``**kwargs``)
1006
call_warnings, result = self._capture_warnings(a_callable,
1008
expected_first_warning = symbol_versioning.deprecation_string(
1009
a_callable, deprecation_format)
1010
if len(call_warnings) == 0:
1011
self.fail("No deprecation warning generated by call to %s" %
1013
self.assertEqual(expected_first_warning, call_warnings[0])
1016
def callDeprecated(self, expected, callable, *args, **kwargs):
1017
"""Assert that a callable is deprecated in a particular way.
1019
This is a very precise test for unusual requirements. The
1020
applyDeprecated helper function is probably more suited for most tests
1021
as it allows you to simply specify the deprecation format being used
1022
and will ensure that that is issued for the function being called.
1024
Note that this only captures warnings raised by symbol_versioning.warn,
1025
not other callers that go direct to the warning module.
1027
:param expected: a list of the deprecation warnings expected, in order
1028
:param callable: The callable to call
1029
:param args: The positional arguments for the callable
1030
:param kwargs: The keyword arguments for the callable
1032
call_warnings, result = self._capture_warnings(callable,
1034
self.assertEqual(expected, call_warnings)
342
self.fail("%r is not an instance of %s" % (obj, kls))
1037
344
def _startLogFile(self):
1038
345
"""Send bzr and test log messages to a temporary file.
1171
422
# TODO: Perhaps this should keep running cleanups even if
1172
423
# one of them fails?
1174
# Actually pop the cleanups from the list so tearDown running
1175
# twice is safe (this happens for skipped tests).
1176
while self._cleanups:
1177
self._cleanups.pop()()
424
for cleanup_fn in reversed(self._cleanups):
1179
427
def log(self, *args):
1182
def _get_log(self, keep_log_file=False):
1183
"""Return as a string the log for this test. If the file is still
1184
on disk and keep_log_file=False, delete the log file and store the
1185
content in self._log_contents."""
1186
# flush the log file, to get all content
1188
bzrlib.trace._trace_file.flush()
1189
if self._log_contents:
431
"""Return as a string the log for this test"""
432
if self._log_file_name:
433
return open(self._log_file_name).read()
1190
435
return self._log_contents
1191
if self._log_file_name is not None:
1192
logfile = open(self._log_file_name)
1194
log_contents = logfile.read()
1197
if not keep_log_file:
1198
self._log_contents = log_contents
1200
os.remove(self._log_file_name)
1202
if sys.platform == 'win32' and e.errno == errno.EACCES:
1203
print >>sys.stderr, ('Unable to delete log file '
1204
' %r' % self._log_file_name)
1209
return "DELETED log file to reduce memory footprint"
436
# TODO: Delete the log after it's been read in
1211
@deprecated_method(zero_eighteen)
1212
438
def capture(self, cmd, retcode=0):
1213
439
"""Shortcut that splits cmd into words, runs, and returns stdout"""
1214
440
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1216
def requireFeature(self, feature):
1217
"""This test requires a specific feature is available.
1219
:raises UnavailableFeature: When feature is not available.
1221
if not feature.available():
1222
raise UnavailableFeature(feature)
1224
@deprecated_method(zero_eighteen)
1225
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
442
def run_bzr_captured(self, argv, retcode=0):
1227
443
"""Invoke bzr and return (stdout, stderr).
1229
Don't call this method, just use run_bzr() which is equivalent.
1231
:param argv: Arguments to invoke bzr. This may be either a
1232
single string, in which case it is split by shlex into words,
1233
or a list of arguments.
1234
:param retcode: Expected return code, or None for don't-care.
1235
:param encoding: Encoding for sys.stdout and sys.stderr
1236
:param stdin: A string to be used as stdin for the command.
1237
:param working_dir: Change to this directory before running
445
Useful for code that wants to check the contents of the
446
output, the way error messages are presented, etc.
448
This should be the main method for tests that want to exercise the
449
overall behavior of the bzr application (rather than a unit test
450
or a functional test of the library.)
452
Much of the old code runs bzr by forking a new copy of Python, but
453
that is slower, harder to debug, and generally not necessary.
455
This runs bzr through the interface that catches and reports
456
errors, and with logging set to something approximating the
457
default, so that error reporting can be checked.
459
argv -- arguments to invoke bzr
460
retcode -- expected return code, or None for don't-care.
1239
return self._run_bzr_autosplit(argv, retcode=retcode,
1240
encoding=encoding, stdin=stdin, working_dir=working_dir,
1243
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1245
"""Run bazaar command line, splitting up a string command line."""
1246
if isinstance(args, basestring):
1247
args = list(shlex.split(args))
1248
return self._run_bzr_core(args, retcode=retcode,
1249
encoding=encoding, stdin=stdin, working_dir=working_dir,
1252
def _run_bzr_core(self, args, retcode, encoding, stdin,
1254
if encoding is None:
1255
encoding = bzrlib.user_encoding
1256
stdout = StringIOWrapper()
1257
stderr = StringIOWrapper()
1258
stdout.encoding = encoding
1259
stderr.encoding = encoding
1261
self.log('run bzr: %r', args)
464
self.log('run bzr: %s', ' '.join(argv))
1262
465
# FIXME: don't call into logging here
1263
466
handler = logging.StreamHandler(stderr)
467
handler.setFormatter(bzrlib.trace.QuietFormatter())
1264
468
handler.setLevel(logging.INFO)
1265
469
logger = logging.getLogger('')
1266
470
logger.addHandler(handler)
1267
old_ui_factory = ui.ui_factory
1268
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1271
if working_dir is not None:
1272
cwd = osutils.getcwd()
1273
os.chdir(working_dir)
1276
result = self.apply_redirected(ui.ui_factory.stdin,
1278
bzrlib.commands.run_bzr_catch_errors,
472
result = self.apply_redirected(None, stdout, stderr,
473
bzrlib.commands.run_bzr_catch_errors,
1281
476
logger.removeHandler(handler)
1282
ui.ui_factory = old_ui_factory
1286
477
out = stdout.getvalue()
1287
478
err = stderr.getvalue()
1289
self.log('output:\n%r', out)
480
self.log('output:\n%s', out)
1291
self.log('errors:\n%r', err)
482
self.log('errors:\n%s', err)
1292
483
if retcode is not None:
1293
self.assertEquals(retcode, result,
1294
message='Unexpected return code')
484
self.assertEquals(result, retcode)
1297
487
def run_bzr(self, *args, **kwargs):
1298
488
"""Invoke bzr, as if it were run from the command line.
1300
The argument list should not include the bzr program name - the
1301
first argument is normally the bzr command. Arguments may be
1302
passed in three ways:
1304
1- A list of strings, eg ["commit", "a"]. This is recommended
1305
when the command contains whitespace or metacharacters, or
1306
is built up at run time.
1308
2- A single string, eg "add a". This is the most convenient
1309
for hardcoded commands.
1311
3- Several varargs parameters, eg run_bzr("add", "a").
1312
This is not recommended for new code.
1314
This runs bzr through the interface that catches and reports
1315
errors, and with logging set to something approximating the
1316
default, so that error reporting can be checked.
1318
490
This should be the main method for tests that want to exercise the
1319
491
overall behavior of the bzr application (rather than a unit test
1320
492
or a functional test of the library.)
1322
494
This sends the stdout/stderr results into the test's log,
1323
495
where it may be useful for debugging. See also run_captured.
1325
:keyword stdin: A string to be used as stdin for the command.
1326
:keyword retcode: The status code the command should return;
1328
:keyword working_dir: The directory to run the command in
1329
:keyword error_regexes: A list of expected error messages. If
1330
specified they must be seen in the error output of the command.
1332
497
retcode = kwargs.pop('retcode', 0)
1333
encoding = kwargs.pop('encoding', None)
1334
stdin = kwargs.pop('stdin', None)
1335
working_dir = kwargs.pop('working_dir', None)
1336
error_regexes = kwargs.pop('error_regexes', [])
1339
if isinstance(args[0], (list, basestring)):
1342
symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
1343
DeprecationWarning, stacklevel=3)
1345
out, err = self._run_bzr_autosplit(args=args,
1347
encoding=encoding, stdin=stdin, working_dir=working_dir,
1350
for regex in error_regexes:
1351
self.assertContainsRe(err, regex)
1354
def run_bzr_decode(self, *args, **kwargs):
1355
if 'encoding' in kwargs:
1356
encoding = kwargs['encoding']
1358
encoding = bzrlib.user_encoding
1359
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1361
def run_bzr_error(self, error_regexes, *args, **kwargs):
1362
"""Run bzr, and check that stderr contains the supplied regexes
1364
:param error_regexes: Sequence of regular expressions which
1365
must each be found in the error output. The relative ordering
1367
:param args: command-line arguments for bzr
1368
:param kwargs: Keyword arguments which are interpreted by run_bzr
1369
This function changes the default value of retcode to be 3,
1370
since in most cases this is run when you expect bzr to fail.
1372
:return: (out, err) The actual output of running the command (in case
1373
you want to do more inspection)
1377
# Make sure that commit is failing because there is nothing to do
1378
self.run_bzr_error(['no changes to commit'],
1379
'commit', '-m', 'my commit comment')
1380
# Make sure --strict is handling an unknown file, rather than
1381
# giving us the 'nothing to do' error
1382
self.build_tree(['unknown'])
1383
self.run_bzr_error(['Commit refused because there are unknown files'],
1384
'commit', '--strict', '-m', 'my commit comment')
1386
kwargs.setdefault('retcode', 3)
1387
kwargs['error_regexes'] = error_regexes
1388
out, err = self.run_bzr(*args, **kwargs)
1391
def run_bzr_subprocess(self, *args, **kwargs):
1392
"""Run bzr in a subprocess for testing.
1394
This starts a new Python interpreter and runs bzr in there.
1395
This should only be used for tests that have a justifiable need for
1396
this isolation: e.g. they are testing startup time, or signal
1397
handling, or early startup code, etc. Subprocess code can't be
1398
profiled or debugged so easily.
1400
:keyword retcode: The status code that is expected. Defaults to 0. If
1401
None is supplied, the status code is not checked.
1402
:keyword env_changes: A dictionary which lists changes to environment
1403
variables. A value of None will unset the env variable.
1404
The values must be strings. The change will only occur in the
1405
child, so you don't need to fix the environment after running.
1406
:keyword universal_newlines: Convert CRLF => LF
1407
:keyword allow_plugins: By default the subprocess is run with
1408
--no-plugins to ensure test reproducibility. Also, it is possible
1409
for system-wide plugins to create unexpected output on stderr,
1410
which can cause unnecessary test failures.
1412
env_changes = kwargs.get('env_changes', {})
1413
working_dir = kwargs.get('working_dir', None)
1414
allow_plugins = kwargs.get('allow_plugins', False)
1415
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1416
working_dir=working_dir,
1417
allow_plugins=allow_plugins)
1418
# We distinguish between retcode=None and retcode not passed.
1419
supplied_retcode = kwargs.get('retcode', 0)
1420
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1421
universal_newlines=kwargs.get('universal_newlines', False),
1424
def start_bzr_subprocess(self, process_args, env_changes=None,
1425
skip_if_plan_to_signal=False,
1427
allow_plugins=False):
1428
"""Start bzr in a subprocess for testing.
1430
This starts a new Python interpreter and runs bzr in there.
1431
This should only be used for tests that have a justifiable need for
1432
this isolation: e.g. they are testing startup time, or signal
1433
handling, or early startup code, etc. Subprocess code can't be
1434
profiled or debugged so easily.
1436
:param process_args: a list of arguments to pass to the bzr executable,
1437
for example ``['--version']``.
1438
:param env_changes: A dictionary which lists changes to environment
1439
variables. A value of None will unset the env variable.
1440
The values must be strings. The change will only occur in the
1441
child, so you don't need to fix the environment after running.
1442
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1444
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1446
:returns: Popen object for the started process.
1448
if skip_if_plan_to_signal:
1449
if not getattr(os, 'kill', None):
1450
raise TestSkipped("os.kill not available.")
1452
if env_changes is None:
1456
def cleanup_environment():
1457
for env_var, value in env_changes.iteritems():
1458
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1460
def restore_environment():
1461
for env_var, value in old_env.iteritems():
1462
osutils.set_or_unset_env(env_var, value)
1464
bzr_path = self.get_bzr_path()
1467
if working_dir is not None:
1468
cwd = osutils.getcwd()
1469
os.chdir(working_dir)
1472
# win32 subprocess doesn't support preexec_fn
1473
# so we will avoid using it on all platforms, just to
1474
# make sure the code path is used, and we don't break on win32
1475
cleanup_environment()
1476
command = [sys.executable, bzr_path]
1477
if not allow_plugins:
1478
command.append('--no-plugins')
1479
command.extend(process_args)
1480
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1482
restore_environment()
1488
def _popen(self, *args, **kwargs):
1489
"""Place a call to Popen.
1491
Allows tests to override this method to intercept the calls made to
1492
Popen for introspection.
1494
return Popen(*args, **kwargs)
1496
def get_bzr_path(self):
1497
"""Return the path of the 'bzr' executable for this test suite."""
1498
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1499
if not os.path.isfile(bzr_path):
1500
# We are probably installed. Assume sys.argv is the right file
1501
bzr_path = sys.argv[0]
1504
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1505
universal_newlines=False, process_args=None):
1506
"""Finish the execution of process.
1508
:param process: the Popen object returned from start_bzr_subprocess.
1509
:param retcode: The status code that is expected. Defaults to 0. If
1510
None is supplied, the status code is not checked.
1511
:param send_signal: an optional signal to send to the process.
1512
:param universal_newlines: Convert CRLF => LF
1513
:returns: (stdout, stderr)
1515
if send_signal is not None:
1516
os.kill(process.pid, send_signal)
1517
out, err = process.communicate()
1519
if universal_newlines:
1520
out = out.replace('\r\n', '\n')
1521
err = err.replace('\r\n', '\n')
1523
if retcode is not None and retcode != process.returncode:
1524
if process_args is None:
1525
process_args = "(unknown args)"
1526
mutter('Output of bzr %s:\n%s', process_args, out)
1527
mutter('Error for bzr %s:\n%s', process_args, err)
1528
self.fail('Command bzr %s failed with retcode %s != %s'
1529
% (process_args, retcode, process.returncode))
498
return self.run_bzr_captured(args, retcode)
1532
500
def check_inventory_shape(self, inv, shape):
1533
501
"""Compare an inventory to a list of expected names.
1581
549
sys.stderr = real_stderr
1582
550
sys.stdin = real_stdin
1584
def reduceLockdirTimeout(self):
1585
"""Reduce the default lock timeout for the duration of the test, so that
1586
if LockContention occurs during a test, it does so quickly.
1588
Tests that expect to provoke LockContention errors should call this.
1590
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1592
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1593
self.addCleanup(resetTimeout)
1594
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1597
class TestCaseWithMemoryTransport(TestCase):
1598
"""Common test class for tests that do not need disk resources.
1600
Tests that need disk resources should derive from TestCaseWithTransport.
1602
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1604
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1605
a directory which does not exist. This serves to help ensure test isolation
1606
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1607
must exist. However, TestCaseWithMemoryTransport does not offer local
1608
file defaults for the transport in tests, nor does it obey the command line
1609
override, so tests that accidentally write to the common directory should
1612
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1613
a .bzr directory that stops us ascending higher into the filesystem.
1619
def __init__(self, methodName='runTest'):
1620
# allow test parameterisation after test construction and before test
1621
# execution. Variables that the parameteriser sets need to be
1622
# ones that are not set by setUp, or setUp will trash them.
1623
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1624
self.vfs_transport_factory = default_transport
1625
self.transport_server = None
1626
self.transport_readonly_server = None
1627
self.__vfs_server = None
1629
def get_transport(self, relpath=None):
1630
"""Return a writeable transport.
1632
This transport is for the test scratch space relative to
1635
:param relpath: a path relative to the base url.
1637
t = get_transport(self.get_url(relpath))
1638
self.assertFalse(t.is_readonly())
1641
def get_readonly_transport(self, relpath=None):
1642
"""Return a readonly transport for the test scratch space
1644
This can be used to test that operations which should only need
1645
readonly access in fact do not try to write.
1647
:param relpath: a path relative to the base url.
1649
t = get_transport(self.get_readonly_url(relpath))
1650
self.assertTrue(t.is_readonly())
1653
def create_transport_readonly_server(self):
1654
"""Create a transport server from class defined at init.
1656
This is mostly a hook for daughter classes.
1658
return self.transport_readonly_server()
1660
def get_readonly_server(self):
1661
"""Get the server instance for the readonly transport
1663
This is useful for some tests with specific servers to do diagnostics.
1665
if self.__readonly_server is None:
1666
if self.transport_readonly_server is None:
1667
# readonly decorator requested
1668
# bring up the server
1669
self.__readonly_server = ReadonlyServer()
1670
self.__readonly_server.setUp(self.get_vfs_only_server())
1672
self.__readonly_server = self.create_transport_readonly_server()
1673
self.__readonly_server.setUp(self.get_vfs_only_server())
1674
self.addCleanup(self.__readonly_server.tearDown)
1675
return self.__readonly_server
1677
def get_readonly_url(self, relpath=None):
1678
"""Get a URL for the readonly transport.
1680
This will either be backed by '.' or a decorator to the transport
1681
used by self.get_url()
1682
relpath provides for clients to get a path relative to the base url.
1683
These should only be downwards relative, not upwards.
1685
base = self.get_readonly_server().get_url()
1686
return self._adjust_url(base, relpath)
1688
def get_vfs_only_server(self):
1689
"""Get the vfs only read/write server instance.
1691
This is useful for some tests with specific servers that need
1694
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1695
is no means to override it.
1697
if self.__vfs_server is None:
1698
self.__vfs_server = MemoryServer()
1699
self.__vfs_server.setUp()
1700
self.addCleanup(self.__vfs_server.tearDown)
1701
return self.__vfs_server
1703
def get_server(self):
1704
"""Get the read/write server instance.
1706
This is useful for some tests with specific servers that need
1709
This is built from the self.transport_server factory. If that is None,
1710
then the self.get_vfs_server is returned.
1712
if self.__server is None:
1713
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1714
return self.get_vfs_only_server()
1716
# bring up a decorated means of access to the vfs only server.
1717
self.__server = self.transport_server()
1719
self.__server.setUp(self.get_vfs_only_server())
1720
except TypeError, e:
1721
# This should never happen; the try:Except here is to assist
1722
# developers having to update code rather than seeing an
1723
# uninformative TypeError.
1724
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1725
self.addCleanup(self.__server.tearDown)
1726
return self.__server
1728
def _adjust_url(self, base, relpath):
1729
"""Get a URL (or maybe a path) for the readwrite transport.
1731
This will either be backed by '.' or to an equivalent non-file based
1733
relpath provides for clients to get a path relative to the base url.
1734
These should only be downwards relative, not upwards.
1736
if relpath is not None and relpath != '.':
1737
if not base.endswith('/'):
1739
# XXX: Really base should be a url; we did after all call
1740
# get_url()! But sometimes it's just a path (from
1741
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1742
# to a non-escaped local path.
1743
if base.startswith('./') or base.startswith('/'):
1746
base += urlutils.escape(relpath)
1749
def get_url(self, relpath=None):
1750
"""Get a URL (or maybe a path) for the readwrite transport.
1752
This will either be backed by '.' or to an equivalent non-file based
1754
relpath provides for clients to get a path relative to the base url.
1755
These should only be downwards relative, not upwards.
1757
base = self.get_server().get_url()
1758
return self._adjust_url(base, relpath)
1760
def get_vfs_only_url(self, relpath=None):
1761
"""Get a URL (or maybe a path for the plain old vfs transport.
1763
This will never be a smart protocol. It always has all the
1764
capabilities of the local filesystem, but it might actually be a
1765
MemoryTransport or some other similar virtual filesystem.
1767
This is the backing transport (if any) of the server returned by
1768
get_url and get_readonly_url.
1770
:param relpath: provides for clients to get a path relative to the base
1771
url. These should only be downwards relative, not upwards.
1774
base = self.get_vfs_only_server().get_url()
1775
return self._adjust_url(base, relpath)
1777
def _make_test_root(self):
1778
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1780
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1781
TestCaseWithMemoryTransport.TEST_ROOT = root
1783
# make a fake bzr directory there to prevent any tests propagating
1784
# up onto the source directory's real branch
1785
bzrdir.BzrDir.create_standalone_workingtree(root)
1787
# The same directory is used by all tests, and we're not specifically
1788
# told when all tests are finished. This will do.
1789
atexit.register(_rmtree_temp_dir, root)
1791
def makeAndChdirToTestDir(self):
1792
"""Create a temporary directories for this one test.
1794
This must set self.test_home_dir and self.test_dir and chdir to
1797
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1799
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1800
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1801
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1803
def make_branch(self, relpath, format=None):
1804
"""Create a branch on the transport at relpath."""
1805
repo = self.make_repository(relpath, format=format)
1806
return repo.bzrdir.create_branch()
1808
def make_bzrdir(self, relpath, format=None):
1810
# might be a relative or absolute path
1811
maybe_a_url = self.get_url(relpath)
1812
segments = maybe_a_url.rsplit('/', 1)
1813
t = get_transport(maybe_a_url)
1814
if len(segments) > 1 and segments[-1] not in ('', '.'):
1818
if isinstance(format, basestring):
1819
format = bzrdir.format_registry.make_bzrdir(format)
1820
return format.initialize_on_transport(t)
1821
except errors.UninitializableFormat:
1822
raise TestSkipped("Format %s is not initializable." % format)
1824
def make_repository(self, relpath, shared=False, format=None):
1825
"""Create a repository on our default transport at relpath.
1827
Note that relpath must be a relative path, not a full url.
1829
# FIXME: If you create a remoterepository this returns the underlying
1830
# real format, which is incorrect. Actually we should make sure that
1831
# RemoteBzrDir returns a RemoteRepository.
1832
# maybe mbp 20070410
1833
made_control = self.make_bzrdir(relpath, format=format)
1834
return made_control.create_repository(shared=shared)
1836
def make_branch_and_memory_tree(self, relpath, format=None):
1837
"""Create a branch on the default transport and a MemoryTree for it."""
1838
b = self.make_branch(relpath, format=format)
1839
return memorytree.MemoryTree.create_on_branch(b)
1841
def overrideEnvironmentForTesting(self):
1842
os.environ['HOME'] = self.test_home_dir
1843
os.environ['BZR_HOME'] = self.test_home_dir
1846
super(TestCaseWithMemoryTransport, self).setUp()
1847
self._make_test_root()
1848
_currentdir = os.getcwdu()
1849
def _leaveDirectory():
1850
os.chdir(_currentdir)
1851
self.addCleanup(_leaveDirectory)
1852
self.makeAndChdirToTestDir()
1853
self.overrideEnvironmentForTesting()
1854
self.__readonly_server = None
1855
self.__server = None
1856
self.reduceLockdirTimeout()
553
BzrTestBase = TestCase
1859
class TestCaseInTempDir(TestCaseWithMemoryTransport):
556
class TestCaseInTempDir(TestCase):
1860
557
"""Derived class that runs a test within a temporary directory.
1862
559
This is useful for tests that need to create a branch, etc.
1886
578
self.log("actually: %r" % contents)
1887
579
self.fail("contents of %s not as expected" % filename)
1889
def makeAndChdirToTestDir(self):
1890
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
581
def _make_test_root(self):
582
if TestCaseInTempDir.TEST_ROOT is not None:
586
root = u'test%04d.tmp' % i
590
if e.errno == errno.EEXIST:
595
# successfully created
596
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
598
# make a fake bzr directory there to prevent any tests propagating
599
# up onto the source directory's real branch
600
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
603
super(TestCaseInTempDir, self).setUp()
604
self._make_test_root()
605
_currentdir = os.getcwdu()
606
short_id = self.id().replace('bzrlib.tests.', '') \
607
.replace('__main__.', '')
608
# it's possible the same test class is run several times for
609
# parameterized tests, so make sure the names don't collide.
613
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
615
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
616
if os.path.exists(candidate_dir):
620
self.test_dir = candidate_dir
621
os.mkdir(self.test_dir)
622
os.chdir(self.test_dir)
624
os.environ['HOME'] = self.test_dir
625
os.environ['APPDATA'] = self.test_dir
626
def _leaveDirectory():
627
os.chdir(_currentdir)
628
self.addCleanup(_leaveDirectory)
1892
For TestCaseInTempDir we create a temporary directory based on the test
1893
name and then create two subdirs - test and home under it.
1895
# create a directory within the top level test directory
1896
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1897
# now create test and home directories within this dir
1898
self.test_base_dir = candidate_dir
1899
self.test_home_dir = self.test_base_dir + '/home'
1900
os.mkdir(self.test_home_dir)
1901
self.test_dir = self.test_base_dir + '/work'
1902
os.mkdir(self.test_dir)
1903
os.chdir(self.test_dir)
1904
# put name of test inside
1905
f = file(self.test_base_dir + '/name', 'w')
1910
self.addCleanup(self.deleteTestDir)
1912
def deleteTestDir(self):
1913
os.chdir(self.TEST_ROOT)
1914
_rmtree_temp_dir(self.test_base_dir)
1916
def build_tree(self, shape, line_endings='binary', transport=None):
630
def build_tree(self, shape, line_endings='native', transport=None):
1917
631
"""Build a test tree according to a pattern.
1919
633
shape is a sequence of file specifications. If the final
1920
634
character is '/', a directory is created.
1922
This assumes that all the elements in the tree being built are new.
1924
636
This doesn't add anything to a branch.
1926
637
:param line_endings: Either 'binary' or 'native'
1927
in binary mode, exact contents are written in native mode, the
1928
line endings match the default platform endings.
1929
:param transport: A transport to write to, for building trees on VFS's.
1930
If the transport is readonly or None, "." is opened automatically.
638
in binary mode, exact contents are written
639
in native mode, the line endings match the
640
default platform endings.
642
:param transport: A transport to write to, for building trees on
643
VFS's. If the transport is readonly or None,
644
"." is opened automatically.
1933
# It's OK to just create them using forward slashes on windows.
646
# XXX: It's OK to just create them using forward slashes on windows?
1934
647
if transport is None or transport.is_readonly():
1935
648
transport = get_transport(".")
1936
649
for name in shape:
1937
650
self.assert_(isinstance(name, basestring))
1938
651
if name[-1] == '/':
1939
transport.mkdir(urlutils.escape(name[:-1]))
652
transport.mkdir(urlescape(name[:-1]))
1941
654
if line_endings == 'binary':
1943
656
elif line_endings == 'native':
1944
657
end = os.linesep
1946
raise errors.BzrError(
1947
'Invalid line ending request %r' % line_endings)
1948
content = "contents of %s%s" % (name.encode('utf-8'), end)
1949
transport.put_bytes_non_atomic(urlutils.escape(name), content)
659
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
660
content = "contents of %s%s" % (name, end)
661
transport.put(urlescape(name), StringIO(content))
1951
663
def build_tree_contents(self, shape):
1952
664
build_tree_contents(shape)
666
def failUnlessExists(self, path):
667
"""Fail unless path, which may be abs or relative, exists."""
668
self.failUnless(osutils.lexists(path))
670
def failIfExists(self, path):
671
"""Fail if path, which may be abs or relative, exists."""
672
self.failIf(osutils.lexists(path))
1954
674
def assertFileEqual(self, content, path):
1955
675
"""Fail if path does not contain 'content'."""
1956
self.failUnlessExists(path)
1957
f = file(path, 'rb')
1962
self.assertEqualDiff(content, s)
1964
def failUnlessExists(self, path):
1965
"""Fail unless path or paths, which may be abs or relative, exist."""
1966
if not isinstance(path, basestring):
1968
self.failUnlessExists(p)
1970
self.failUnless(osutils.lexists(path),path+" does not exist")
1972
def failIfExists(self, path):
1973
"""Fail if path or paths, which may be abs or relative, exist."""
1974
if not isinstance(path, basestring):
1976
self.failIfExists(p)
1978
self.failIf(osutils.lexists(path),path+" exists")
1980
def assertInWorkingTree(self,path,root_path='.',tree=None):
1981
"""Assert whether path or paths are in the WorkingTree"""
1983
tree = workingtree.WorkingTree.open(root_path)
1984
if not isinstance(path, basestring):
1986
self.assertInWorkingTree(p,tree=tree)
1988
self.assertIsNot(tree.path2id(path), None,
1989
path+' not in working tree.')
1991
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
1992
"""Assert whether path or paths are not in the WorkingTree"""
1994
tree = workingtree.WorkingTree.open(root_path)
1995
if not isinstance(path, basestring):
1997
self.assertNotInWorkingTree(p,tree=tree)
1999
self.assertIs(tree.path2id(path), None, path+' in working tree.')
676
self.failUnless(osutils.lexists(path))
677
self.assertEqualDiff(content, open(path, 'r').read())
2002
680
class TestCaseWithTransport(TestCaseInTempDir):
2013
691
readwrite one must both define get_url() as resolving to os.getcwd().
2016
def get_vfs_only_server(self):
2017
"""See TestCaseWithMemoryTransport.
694
def __init__(self, methodName='testMethod'):
695
super(TestCaseWithTransport, self).__init__(methodName)
696
self.__readonly_server = None
698
self.transport_server = default_transport
699
self.transport_readonly_server = None
701
def get_readonly_url(self, relpath=None):
702
"""Get a URL for the readonly transport.
704
This will either be backed by '.' or a decorator to the transport
705
used by self.get_url()
706
relpath provides for clients to get a path relative to the base url.
707
These should only be downwards relative, not upwards.
709
base = self.get_readonly_server().get_url()
710
if relpath is not None:
711
if not base.endswith('/'):
713
base = base + relpath
716
def get_readonly_server(self):
717
"""Get the server instance for the readonly transport
719
This is useful for some tests with specific servers to do diagnostics.
721
if self.__readonly_server is None:
722
if self.transport_readonly_server is None:
723
# readonly decorator requested
724
# bring up the server
726
self.__readonly_server = ReadonlyServer()
727
self.__readonly_server.setUp(self.__server)
729
self.__readonly_server = self.transport_readonly_server()
730
self.__readonly_server.setUp()
731
self.addCleanup(self.__readonly_server.tearDown)
732
return self.__readonly_server
734
def get_server(self):
735
"""Get the read/write server instance.
2019
737
This is useful for some tests with specific servers that need
2022
if self.__vfs_server is None:
2023
self.__vfs_server = self.vfs_transport_factory()
2024
self.__vfs_server.setUp()
2025
self.addCleanup(self.__vfs_server.tearDown)
2026
return self.__vfs_server
2028
def make_branch_and_tree(self, relpath, format=None):
740
if self.__server is None:
741
self.__server = self.transport_server()
742
self.__server.setUp()
743
self.addCleanup(self.__server.tearDown)
746
def get_url(self, relpath=None):
747
"""Get a URL for the readwrite transport.
749
This will either be backed by '.' or to an equivalent non-file based
751
relpath provides for clients to get a path relative to the base url.
752
These should only be downwards relative, not upwards.
754
base = self.get_server().get_url()
755
if relpath is not None and relpath != '.':
756
if not base.endswith('/'):
758
base = base + relpath
761
def get_transport(self):
762
"""Return a writeable transport for the test scratch space"""
763
t = get_transport(self.get_url())
764
self.assertFalse(t.is_readonly())
767
def get_readonly_transport(self):
768
"""Return a readonly transport for the test scratch space
770
This can be used to test that operations which should only need
771
readonly access in fact do not try to write.
773
t = get_transport(self.get_readonly_url())
774
self.assertTrue(t.is_readonly())
777
def make_branch(self, relpath):
778
"""Create a branch on the transport at relpath."""
779
repo = self.make_repository(relpath)
780
return repo.bzrdir.create_branch()
782
def make_bzrdir(self, relpath):
784
url = self.get_url(relpath)
785
segments = relpath.split('/')
786
if segments and segments[-1] not in ('', '.'):
787
parent = self.get_url('/'.join(segments[:-1]))
788
t = get_transport(parent)
790
t.mkdir(segments[-1])
791
except errors.FileExists:
793
return bzrlib.bzrdir.BzrDir.create(url)
794
except errors.UninitializableFormat:
795
raise TestSkipped("Format %s is not initializable.")
797
def make_repository(self, relpath, shared=False):
798
"""Create a repository on our default transport at relpath."""
799
made_control = self.make_bzrdir(relpath)
800
return made_control.create_repository(shared=shared)
802
def make_branch_and_tree(self, relpath):
2029
803
"""Create a branch on the transport and a tree locally.
2031
If the transport is not a LocalTransport, the Tree can't be created on
2032
the transport. In that case if the vfs_transport_factory is
2033
LocalURLServer the working tree is created in the local
2034
directory backing the transport, and the returned tree's branch and
2035
repository will also be accessed locally. Otherwise a lightweight
2036
checkout is created and returned.
2038
:param format: The BzrDirFormat.
2039
:returns: the WorkingTree.
2041
807
# TODO: always use the local disk path for the working tree,
2042
808
# this obviously requires a format that supports branch references
2043
809
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2045
b = self.make_branch(relpath, format=format)
811
b = self.make_branch(relpath)
2047
813
return b.bzrdir.create_workingtree()
2048
814
except errors.NotLocalUrl:
2049
# We can only make working trees locally at the moment. If the
2050
# transport can't support them, then we keep the non-disk-backed
2051
# branch and create a local checkout.
2052
if self.vfs_transport_factory is LocalURLServer:
2053
# the branch is colocated on disk, we cannot create a checkout.
2054
# hopefully callers will expect this.
2055
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2056
return local_controldir.create_workingtree()
2058
return b.create_checkout(relpath, lightweight=True)
815
# new formats - catch No tree error and create
816
# a branch reference and a checkout.
817
# old formats at that point - raise TestSkipped.
819
return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
2060
821
def assertIsDirectory(self, relpath, transport):
2061
822
"""Assert that relpath within transport is a directory.
2101
849
def setUp(self):
2102
850
super(ChrootedTestCase, self).setUp()
2103
if not self.vfs_transport_factory == MemoryServer:
2104
self.transport_readonly_server = HttpServer
2107
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2108
random_order=False):
2109
"""Create a test suite by filtering another one.
2111
:param suite: the source suite
2112
:param pattern: pattern that names must match
2113
:param exclude_pattern: pattern that names must not match, if any
2114
:param random_order: if True, tests in the new suite will be put in
2116
:returns: the newly created suite
2118
return sort_suite_by_re(suite, pattern, exclude_pattern,
2119
random_order, False)
2122
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2123
random_order=False, append_rest=True):
2124
"""Create a test suite by sorting another one.
2126
:param suite: the source suite
2127
:param pattern: pattern that names must match in order to go
2128
first in the new suite
2129
:param exclude_pattern: pattern that names must not match, if any
2130
:param random_order: if True, tests in the new suite will be put in
2132
:param append_rest: if False, pattern is a strict filter and not
2133
just an ordering directive
2134
:returns: the newly created suite
851
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
852
self.transport_readonly_server = bzrlib.transport.http.HttpServer
855
def filter_suite_by_re(suite, pattern):
2138
857
filter_re = re.compile(pattern)
2139
if exclude_pattern is not None:
2140
exclude_re = re.compile(exclude_pattern)
2141
858
for test in iter_suite_tests(suite):
2143
if exclude_pattern is None or not exclude_re.search(test_id):
2144
if filter_re.search(test_id):
2149
random.shuffle(first)
2150
random.shuffle(second)
2151
return TestUtil.TestSuite(first + second)
859
if filter_re.search(test.id()):
2154
864
def run_suite(suite, name='test', verbose=False, pattern=".*",
2155
stop_on_failure=False,
2156
transport=None, lsprof_timed=None, bench_history=None,
2157
matching_tests_first=None,
2160
exclude_pattern=None,
2162
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
865
stop_on_failure=False, keep_output=False,
867
TestCaseInTempDir._TEST_NAME = name
2167
872
runner = TextTestRunner(stream=sys.stdout,
2169
verbosity=verbosity,
2170
bench_history=bench_history,
2171
list_only=list_only,
2173
875
runner.stop_on_failure=stop_on_failure
2174
# Initialise the random number generator and display the seed used.
2175
# We convert the seed to a long to make it reuseable across invocations.
2176
random_order = False
2177
if random_seed is not None:
2179
if random_seed == "now":
2180
random_seed = long(time.time())
2182
# Convert the seed to a long if we can
2184
random_seed = long(random_seed)
2187
runner.stream.writeln("Randomizing test order using seed %s\n" %
2189
random.seed(random_seed)
2190
# Customise the list of tests if requested
2191
if pattern != '.*' or exclude_pattern is not None or random_order:
2192
if matching_tests_first:
2193
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2196
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
877
suite = filter_suite_by_re(suite, pattern)
2198
878
result = runner.run(suite)
879
# This is still a little bogus,
880
# but only a little. Folk not using our testrunner will
881
# have to delete their temp directories themselves.
882
test_root = TestCaseInTempDir.TEST_ROOT
883
if result.wasSuccessful() or not keep_output:
884
if test_root is not None:
885
print 'Deleting test root %s...' % test_root
887
shutil.rmtree(test_root)
891
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
2199
892
return result.wasSuccessful()
2202
895
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2204
test_suite_factory=None,
2207
matching_tests_first=None,
2210
exclude_pattern=None):
2211
898
"""Run the whole test suite under the enhanced runner"""
2212
# XXX: Very ugly way to do this...
2213
# Disable warning about old formats because we don't want it to disturb
2214
# any blackbox tests.
2215
from bzrlib import repository
2216
repository._deprecation_warning_done = True
2218
899
global default_transport
2219
900
if transport is None:
2220
901
transport = default_transport
2221
902
old_transport = default_transport
2222
903
default_transport = transport
2224
if test_suite_factory is None:
2225
suite = test_suite()
2227
suite = test_suite_factory()
2228
906
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2229
stop_on_failure=stop_on_failure,
2230
transport=transport,
2231
lsprof_timed=lsprof_timed,
2232
bench_history=bench_history,
2233
matching_tests_first=matching_tests_first,
2234
list_only=list_only,
2235
random_seed=random_seed,
2236
exclude_pattern=exclude_pattern)
907
stop_on_failure=stop_on_failure, keep_output=keep_output,
2238
910
default_transport = old_transport
2241
914
def test_suite():
2242
"""Build and return TestSuite for the whole of bzrlib.
2244
This function can be replaced if you need to change the default test
2245
suite on a global basis, but it is not encouraged.
915
"""Build and return TestSuite for the whole program."""
916
from doctest import DocTestSuite
918
global MODULES_TO_DOCTEST
2248
921
'bzrlib.tests.test_ancestry',
2249
922
'bzrlib.tests.test_annotate',
2250
923
'bzrlib.tests.test_api',
2251
'bzrlib.tests.test_atomicfile',
2252
924
'bzrlib.tests.test_bad_files',
925
'bzrlib.tests.test_basis_inventory',
2253
926
'bzrlib.tests.test_branch',
2254
'bzrlib.tests.test_branchbuilder',
2255
'bzrlib.tests.test_bugtracker',
2256
'bzrlib.tests.test_bundle',
2257
927
'bzrlib.tests.test_bzrdir',
2258
'bzrlib.tests.test_cache_utf8',
2259
'bzrlib.tests.test_commands',
928
'bzrlib.tests.test_command',
2260
929
'bzrlib.tests.test_commit',
2261
930
'bzrlib.tests.test_commit_merge',
2262
931
'bzrlib.tests.test_config',
2263
932
'bzrlib.tests.test_conflicts',
2264
'bzrlib.tests.test_pack',
2265
'bzrlib.tests.test_counted_lock',
2266
933
'bzrlib.tests.test_decorators',
2267
'bzrlib.tests.test_delta',
2268
'bzrlib.tests.test_deprecated_graph',
2269
934
'bzrlib.tests.test_diff',
2270
'bzrlib.tests.test_dirstate',
935
'bzrlib.tests.test_doc_generate',
2271
936
'bzrlib.tests.test_errors',
2272
'bzrlib.tests.test_escaped_store',
2273
'bzrlib.tests.test_extract',
2274
937
'bzrlib.tests.test_fetch',
2275
'bzrlib.tests.test_ftp_transport',
2276
'bzrlib.tests.test_generate_docs',
2277
'bzrlib.tests.test_generate_ids',
2278
'bzrlib.tests.test_globbing',
2279
938
'bzrlib.tests.test_gpg',
2280
939
'bzrlib.tests.test_graph',
2281
940
'bzrlib.tests.test_hashcache',
2282
'bzrlib.tests.test_help',
2283
'bzrlib.tests.test_hooks',
2284
941
'bzrlib.tests.test_http',
2285
'bzrlib.tests.test_http_response',
2286
'bzrlib.tests.test_https_ca_bundle',
2287
942
'bzrlib.tests.test_identitymap',
2288
'bzrlib.tests.test_ignores',
2289
'bzrlib.tests.test_info',
2290
943
'bzrlib.tests.test_inv',
2291
944
'bzrlib.tests.test_knit',
2292
'bzrlib.tests.test_lazy_import',
2293
'bzrlib.tests.test_lazy_regex',
2294
945
'bzrlib.tests.test_lockdir',
2295
946
'bzrlib.tests.test_lockable_files',
2296
947
'bzrlib.tests.test_log',
2297
'bzrlib.tests.test_lsprof',
2298
'bzrlib.tests.test_memorytree',
2299
948
'bzrlib.tests.test_merge',
2300
949
'bzrlib.tests.test_merge3',
2301
950
'bzrlib.tests.test_merge_core',
2302
'bzrlib.tests.test_merge_directive',
2303
951
'bzrlib.tests.test_missing',
2304
952
'bzrlib.tests.test_msgeditor',
2305
953
'bzrlib.tests.test_nonascii',
2306
954
'bzrlib.tests.test_options',
2307
955
'bzrlib.tests.test_osutils',
2308
'bzrlib.tests.test_osutils_encodings',
2309
'bzrlib.tests.test_patch',
2310
'bzrlib.tests.test_patches',
2311
956
'bzrlib.tests.test_permissions',
2312
957
'bzrlib.tests.test_plugins',
2313
958
'bzrlib.tests.test_progress',
2314
959
'bzrlib.tests.test_reconcile',
2315
'bzrlib.tests.test_registry',
2316
'bzrlib.tests.test_remote',
2317
960
'bzrlib.tests.test_repository',
2318
'bzrlib.tests.test_revert',
2319
961
'bzrlib.tests.test_revision',
2320
962
'bzrlib.tests.test_revisionnamespaces',
2321
'bzrlib.tests.test_revisiontree',
963
'bzrlib.tests.test_revprops',
2322
964
'bzrlib.tests.test_rio',
2323
965
'bzrlib.tests.test_sampler',
2324
966
'bzrlib.tests.test_selftest',
2325
967
'bzrlib.tests.test_setup',
2326
968
'bzrlib.tests.test_sftp_transport',
2327
'bzrlib.tests.test_smart',
2328
969
'bzrlib.tests.test_smart_add',
2329
'bzrlib.tests.test_smart_transport',
2330
'bzrlib.tests.test_smtp_connection',
2331
970
'bzrlib.tests.test_source',
2332
'bzrlib.tests.test_ssh_transport',
2333
'bzrlib.tests.test_status',
2334
971
'bzrlib.tests.test_store',
2335
'bzrlib.tests.test_strace',
2336
'bzrlib.tests.test_subsume',
2337
972
'bzrlib.tests.test_symbol_versioning',
2338
'bzrlib.tests.test_tag',
2339
973
'bzrlib.tests.test_testament',
2340
'bzrlib.tests.test_textfile',
2341
'bzrlib.tests.test_textmerge',
2342
'bzrlib.tests.test_timestamp',
2343
974
'bzrlib.tests.test_trace',
2344
975
'bzrlib.tests.test_transactions',
2345
976
'bzrlib.tests.test_transform',
2346
977
'bzrlib.tests.test_transport',
2347
'bzrlib.tests.test_tree',
2348
'bzrlib.tests.test_treebuilder',
2349
978
'bzrlib.tests.test_tsort',
2350
'bzrlib.tests.test_tuned_gzip',
2351
979
'bzrlib.tests.test_ui',
980
'bzrlib.tests.test_uncommit',
2352
981
'bzrlib.tests.test_upgrade',
2353
'bzrlib.tests.test_urlutils',
2354
982
'bzrlib.tests.test_versionedfile',
2355
'bzrlib.tests.test_version',
2356
'bzrlib.tests.test_version_info',
2357
983
'bzrlib.tests.test_weave',
2358
984
'bzrlib.tests.test_whitebox',
2359
985
'bzrlib.tests.test_workingtree',
2360
'bzrlib.tests.test_workingtree_4',
2361
'bzrlib.tests.test_wsgi',
2362
986
'bzrlib.tests.test_xml',
2364
988
test_transport_implementations = [
2365
'bzrlib.tests.test_transport_implementations',
2366
'bzrlib.tests.test_read_bundle',
2368
suite = TestUtil.TestSuite()
2369
loader = TestUtil.TestLoader()
2370
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2371
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
989
'bzrlib.tests.test_transport_implementations']
991
TestCase.BZRPATH = osutils.pathjoin(
992
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
993
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
994
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
997
# python2.4's TestLoader.loadTestsFromNames gives very poor
998
# errors if it fails to load a named module - no indication of what's
999
# actually wrong, just "no such module". We should probably override that
1000
# class, but for the moment just load them ourselves. (mbp 20051202)
1001
loader = TestLoader()
1002
from bzrlib.transport import TransportTestProviderAdapter
2372
1003
adapter = TransportTestProviderAdapter()
2373
1004
adapt_modules(test_transport_implementations, adapter, loader, suite)
1005
for mod_name in testmod_names:
1006
mod = _load_module_by_name(mod_name)
1007
suite.addTest(loader.loadTestsFromModule(mod))
2374
1008
for package in packages_to_test():
2375
1009
suite.addTest(package.test_suite())
2376
1010
for m in MODULES_TO_TEST:
2377
1011
suite.addTest(loader.loadTestsFromModule(m))
2378
for m in MODULES_TO_DOCTEST:
2380
suite.addTest(doctest.DocTestSuite(m))
2381
except ValueError, e:
2382
print '**failed to get doctest for: %s\n%s' %(m,e)
1012
for m in (MODULES_TO_DOCTEST):
1013
suite.addTest(DocTestSuite(m))
2384
1014
for name, plugin in bzrlib.plugin.all_plugins().items():
2385
1015
if getattr(plugin, 'test_suite', None) is not None:
2386
default_encoding = sys.getdefaultencoding()
2388
plugin_suite = plugin.test_suite()
2389
except ImportError, e:
2390
bzrlib.trace.warning(
2391
'Unable to test plugin "%s": %s', name, e)
2393
suite.addTest(plugin_suite)
2394
if default_encoding != sys.getdefaultencoding():
2395
bzrlib.trace.warning(
2396
'Plugin "%s" tried to reset default encoding to: %s', name,
2397
sys.getdefaultencoding())
2399
sys.setdefaultencoding(default_encoding)
1016
suite.addTest(plugin.test_suite())
2403
1020
def adapt_modules(mods_list, adapter, loader, suite):
2404
1021
"""Adapt the modules in mods_list using adapter and add to suite."""
2405
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2406
suite.addTests(adapter.adapt(test))
2409
def _rmtree_temp_dir(dirname):
2410
# If LANG=C we probably have created some bogus paths
2411
# which rmtree(unicode) will fail to delete
2412
# so make sure we are using rmtree(str) to delete everything
2413
# except on win32, where rmtree(str) will fail
2414
# since it doesn't have the property of byte-stream paths
2415
# (they are either ascii or mbcs)
2416
if sys.platform == 'win32':
2417
# make sure we are using the unicode win32 api
2418
dirname = unicode(dirname)
2420
dirname = dirname.encode(sys.getfilesystemencoding())
2422
osutils.rmtree(dirname)
2424
if sys.platform == 'win32' and e.errno == errno.EACCES:
2425
print >>sys.stderr, ('Permission denied: '
2426
'unable to remove testing dir '
2427
'%s' % os.path.basename(dirname))
2432
class Feature(object):
2433
"""An operating system Feature."""
2436
self._available = None
2438
def available(self):
2439
"""Is the feature available?
2441
:return: True if the feature is available.
2443
if self._available is None:
2444
self._available = self._probe()
2445
return self._available
2448
"""Implement this method in concrete features.
2450
:return: True if the feature is available.
2452
raise NotImplementedError
2455
if getattr(self, 'feature_name', None):
2456
return self.feature_name()
2457
return self.__class__.__name__
2460
class TestScenarioApplier(object):
2461
"""A tool to apply scenarios to tests."""
2463
def adapt(self, test):
2464
"""Return a TestSuite containing a copy of test for each scenario."""
2465
result = unittest.TestSuite()
2466
for scenario in self.scenarios:
2467
result.addTest(self.adapt_test_to_scenario(test, scenario))
2470
def adapt_test_to_scenario(self, test, scenario):
2471
"""Copy test and apply scenario to it.
2473
:param test: A test to adapt.
2474
:param scenario: A tuple describing the scenarion.
2475
The first element of the tuple is the new test id.
2476
The second element is a dict containing attributes to set on the
2478
:return: The adapted test.
2480
from copy import deepcopy
2481
new_test = deepcopy(test)
2482
for name, value in scenario[1].items():
2483
setattr(new_test, name, value)
2484
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2485
new_test.id = lambda: new_id
1022
for mod_name in mods_list:
1023
mod = _load_module_by_name(mod_name)
1024
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
1025
suite.addTests(adapter.adapt(test))
1028
def _load_module_by_name(mod_name):
1029
parts = mod_name.split('.')
1030
module = __import__(mod_name)
1032
# for historical reasons python returns the top-level module even though
1033
# it loads the submodule; we need to walk down to get the one we want.
1035
module = getattr(module, parts.pop(0))