128
107
import bzrlib.tests.blackbox
129
108
import bzrlib.tests.branch_implementations
130
109
import bzrlib.tests.bzrdir_implementations
131
import bzrlib.tests.commands
132
110
import bzrlib.tests.interrepository_implementations
133
111
import bzrlib.tests.interversionedfile_implementations
134
import bzrlib.tests.intertree_implementations
135
import bzrlib.tests.per_lock
136
112
import bzrlib.tests.repository_implementations
137
113
import bzrlib.tests.revisionstore_implementations
138
import bzrlib.tests.tree_implementations
139
114
import bzrlib.tests.workingtree_implementations
142
117
bzrlib.tests.blackbox,
143
118
bzrlib.tests.branch_implementations,
144
119
bzrlib.tests.bzrdir_implementations,
145
bzrlib.tests.commands,
146
120
bzrlib.tests.interrepository_implementations,
147
121
bzrlib.tests.interversionedfile_implementations,
148
bzrlib.tests.intertree_implementations,
149
bzrlib.tests.per_lock,
150
122
bzrlib.tests.repository_implementations,
151
123
bzrlib.tests.revisionstore_implementations,
152
bzrlib.tests.tree_implementations,
153
124
bzrlib.tests.workingtree_implementations,
157
class ExtendedTestResult(unittest._TextTestResult):
158
"""Accepts, reports and accumulates the results of running tests.
128
class _MyResult(unittest._TextTestResult):
129
"""Custom TestResult.
160
Compared to this unittest version this class adds support for profiling,
161
benchmarking, stopping as soon as a test fails, and skipping tests.
162
There are further-specialized subclasses for different types of display.
131
Shows output in a different format, including displaying runtime for tests.
165
133
stop_early = False
167
def __init__(self, stream, descriptions, verbosity,
171
"""Construct new TestResult.
173
:param bench_history: Optionally, a writable file object to accumulate
135
def __init__(self, stream, descriptions, verbosity, pb=None):
176
136
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
177
if bench_history is not None:
178
from bzrlib.version import _get_bzr_source_tree
179
src_tree = _get_bzr_source_tree()
182
revision_id = src_tree.get_parent_ids()[0]
184
# XXX: if this is a brand new tree, do the same as if there
188
# XXX: If there's no branch, what should we do?
190
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
191
self._bench_history = bench_history
192
self.ui = ui.ui_factory
193
self.num_tests = num_tests
195
self.failure_count = 0
196
self.known_failure_count = 0
198
self.unsupported = {}
200
self._overall_start_time = time.time()
202
139
def extractBenchmarkTime(self, testCase):
203
140
"""Add a benchmark time for the current test case."""
213
150
self._formatTime(self._benchmarkTime),
214
151
self._elapsedTestTimeString())
216
return " %s" % self._elapsedTestTimeString()
153
return " %s" % self._elapsedTestTimeString()
218
155
def _formatTime(self, seconds):
219
156
"""Format seconds as milliseconds with leading spaces."""
220
# some benchmarks can take thousands of seconds to run, so we need 8
222
return "%8dms" % (1000 * seconds)
157
return "%5dms" % (1000 * seconds)
224
def _shortened_test_description(self, test):
226
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
159
def _ellipsise_unimportant_words(self, a_string, final_width,
161
"""Add ellipses (sp?) for overly long strings.
163
:param keep_start: If true preserve the start of a_string rather
167
if len(a_string) > final_width:
168
result = a_string[:final_width-3] + '...'
172
if len(a_string) > final_width:
173
result = '...' + a_string[3-final_width:]
176
return result.ljust(final_width)
229
178
def startTest(self, test):
230
179
unittest.TestResult.startTest(self, test)
231
self.report_test_start(test)
232
test.number = self.count
180
# In a short description, the important words are in
181
# the beginning, but in an id, the important words are
183
SHOW_DESCRIPTIONS = False
185
if not self.showAll and self.dots and self.pb is not None:
188
final_width = osutils.terminal_width()
189
final_width = final_width - 15 - 8
191
if SHOW_DESCRIPTIONS:
192
what = test.shortDescription()
194
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
197
if what.startswith('bzrlib.tests.'):
199
what = self._ellipsise_unimportant_words(what, final_width)
201
self.stream.write(what)
202
elif self.dots and self.pb is not None:
203
self.pb.update(what, self.testsRun - 1, None)
233
205
self._recordTestStartTime()
235
207
def _recordTestStartTime(self):
236
208
"""Record that a test has started."""
237
209
self._start_time = time.time()
239
def _cleanupLogFile(self, test):
240
# We can only do this if we have one of our TestCases, not if
242
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
243
if setKeepLogfile is not None:
246
211
def addError(self, test, err):
247
self.extractBenchmarkTime(test)
248
self._cleanupLogFile(test)
249
212
if isinstance(err[1], TestSkipped):
250
return self.addSkipped(test, err)
251
elif isinstance(err[1], UnavailableFeature):
252
return self.addNotSupported(test, err[1].args[0])
213
return self.addSkipped(test, err)
253
214
unittest.TestResult.addError(self, test, err)
254
self.error_count += 1
255
self.report_error(test, err)
215
self.extractBenchmarkTime(test)
217
self.stream.writeln("ERROR %s" % self._testTimeString())
218
elif self.dots and self.pb is None:
219
self.stream.write('E')
221
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
256
223
if self.stop_early:
259
226
def addFailure(self, test, err):
260
self._cleanupLogFile(test)
261
self.extractBenchmarkTime(test)
262
if isinstance(err[1], KnownFailure):
263
return self.addKnownFailure(test, err)
264
227
unittest.TestResult.addFailure(self, test, err)
265
self.failure_count += 1
266
self.report_failure(test, err)
228
self.extractBenchmarkTime(test)
230
self.stream.writeln(" FAIL %s" % self._testTimeString())
231
elif self.dots and self.pb is None:
232
self.stream.write('F')
234
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
267
236
if self.stop_early:
270
def addKnownFailure(self, test, err):
271
self.known_failure_count += 1
272
self.report_known_failure(test, err)
274
def addNotSupported(self, test, feature):
275
self.unsupported.setdefault(str(feature), 0)
276
self.unsupported[str(feature)] += 1
277
self.report_unsupported(test, feature)
279
239
def addSuccess(self, test):
280
240
self.extractBenchmarkTime(test)
281
if self._bench_history is not None:
282
if self._benchmarkTime is not None:
283
self._bench_history.write("%s %s\n" % (
284
self._formatTime(self._benchmarkTime),
286
self.report_success(test)
242
self.stream.writeln(' OK %s' % self._testTimeString())
243
for bench_called, stats in getattr(test, '_benchcalls', []):
244
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
245
stats.pprint(file=self.stream)
246
elif self.dots and self.pb is None:
247
self.stream.write('~')
249
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
287
251
unittest.TestResult.addSuccess(self, test)
289
253
def addSkipped(self, test, skip_excinfo):
290
self.report_skip(test, skip_excinfo)
254
self.extractBenchmarkTime(test)
256
print >>self.stream, ' SKIP %s' % self._testTimeString()
257
print >>self.stream, ' %s' % skip_excinfo[1]
258
elif self.dots and self.pb is None:
259
self.stream.write('S')
261
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
291
263
# seems best to treat this as success from point-of-view of unittest
292
264
# -- it actually does nothing so it barely matters :)
314
285
self.stream.writeln(self.separator2)
315
286
self.stream.writeln("%s" % err)
320
def report_cleaning_up(self):
323
def report_success(self, test):
327
class TextTestResult(ExtendedTestResult):
328
"""Displays progress and results of tests in text form"""
330
def __init__(self, stream, descriptions, verbosity,
335
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
336
bench_history, num_tests)
338
self.pb = self.ui.nested_progress_bar()
339
self._supplied_pb = False
342
self._supplied_pb = True
343
self.pb.show_pct = False
344
self.pb.show_spinner = False
345
self.pb.show_eta = False,
346
self.pb.show_count = False
347
self.pb.show_bar = False
349
def report_starting(self):
350
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
352
def _progress_prefix_text(self):
353
a = '[%d' % self.count
354
if self.num_tests is not None:
355
a +='/%d' % self.num_tests
356
a += ' in %ds' % (time.time() - self._overall_start_time)
358
a += ', %d errors' % self.error_count
359
if self.failure_count:
360
a += ', %d failed' % self.failure_count
361
if self.known_failure_count:
362
a += ', %d known failures' % self.known_failure_count
364
a += ', %d skipped' % self.skip_count
366
a += ', %d missing features' % len(self.unsupported)
370
def report_test_start(self, test):
373
self._progress_prefix_text()
375
+ self._shortened_test_description(test))
377
def _test_description(self, test):
378
return self._shortened_test_description(test)
380
def report_error(self, test, err):
381
self.pb.note('ERROR: %s\n %s\n',
382
self._test_description(test),
386
def report_failure(self, test, err):
387
self.pb.note('FAIL: %s\n %s\n',
388
self._test_description(test),
392
def report_known_failure(self, test, err):
393
self.pb.note('XFAIL: %s\n%s\n',
394
self._test_description(test), err[1])
396
def report_skip(self, test, skip_excinfo):
399
# at the moment these are mostly not things we can fix
400
# and so they just produce stipple; use the verbose reporter
403
# show test and reason for skip
404
self.pb.note('SKIP: %s\n %s\n',
405
self._shortened_test_description(test),
408
# since the class name was left behind in the still-visible
410
self.pb.note('SKIP: %s', skip_excinfo[1])
412
def report_unsupported(self, test, feature):
413
"""test cannot be run because feature is missing."""
415
def report_cleaning_up(self):
416
self.pb.update('cleaning up...')
419
if not self._supplied_pb:
423
class VerboseTestResult(ExtendedTestResult):
424
"""Produce long output, with one line per test run plus times"""
426
def _ellipsize_to_right(self, a_string, final_width):
427
"""Truncate and pad a string, keeping the right hand side"""
428
if len(a_string) > final_width:
429
result = '...' + a_string[3-final_width:]
432
return result.ljust(final_width)
434
def report_starting(self):
435
self.stream.write('running %d tests...\n' % self.num_tests)
437
def report_test_start(self, test):
439
name = self._shortened_test_description(test)
440
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
441
# numbers, plus a trailing blank
442
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
443
self.stream.write(self._ellipsize_to_right(name,
444
osutils.terminal_width()-30))
447
def _error_summary(self, err):
449
return '%s%s' % (indent, err[1])
451
def report_error(self, test, err):
452
self.stream.writeln('ERROR %s\n%s'
453
% (self._testTimeString(),
454
self._error_summary(err)))
456
def report_failure(self, test, err):
457
self.stream.writeln(' FAIL %s\n%s'
458
% (self._testTimeString(),
459
self._error_summary(err)))
461
def report_known_failure(self, test, err):
462
self.stream.writeln('XFAIL %s\n%s'
463
% (self._testTimeString(),
464
self._error_summary(err)))
466
def report_success(self, test):
467
self.stream.writeln(' OK %s' % self._testTimeString())
468
for bench_called, stats in getattr(test, '_benchcalls', []):
469
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
470
stats.pprint(file=self.stream)
471
# flush the stream so that we get smooth output. This verbose mode is
472
# used to show the output in PQM.
475
def report_skip(self, test, skip_excinfo):
477
self.stream.writeln(' SKIP %s\n%s'
478
% (self._testTimeString(),
479
self._error_summary(skip_excinfo)))
481
def report_unsupported(self, test, feature):
482
"""test cannot be run because feature is missing."""
483
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
484
%(self._testTimeString(), feature))
488
289
class TextTestRunner(object):
489
290
stop_on_failure = False
544
333
if failed: self.stream.write(", ")
545
334
self.stream.write("errors=%d" % errored)
546
if result.known_failure_count:
547
if failed or errored: self.stream.write(", ")
548
self.stream.write("known_failure_count=%d" %
549
result.known_failure_count)
550
335
self.stream.writeln(")")
552
if result.known_failure_count:
553
self.stream.writeln("OK (known_failures=%d)" %
554
result.known_failure_count)
337
self.stream.writeln("OK")
338
if self.pb is not None:
339
self.pb.update('Cleaning up', 0, 1)
340
# This is still a little bogus,
341
# but only a little. Folk not using our testrunner will
342
# have to delete their temp directories themselves.
343
test_root = TestCaseInTempDir.TEST_ROOT
344
if result.wasSuccessful() or not self.keep_output:
345
if test_root is not None:
346
# If LANG=C we probably have created some bogus paths
347
# which rmtree(unicode) will fail to delete
348
# so make sure we are using rmtree(str) to delete everything
349
# except on win32, where rmtree(str) will fail
350
# since it doesn't have the property of byte-stream paths
351
# (they are either ascii or mbcs)
352
if sys.platform == 'win32':
353
# make sure we are using the unicode win32 api
354
test_root = unicode(test_root)
356
test_root = test_root.encode(
357
sys.getfilesystemencoding())
358
osutils.rmtree(test_root)
360
if self.pb is not None:
361
self.pb.note("Failed tests working directories are in '%s'\n",
556
self.stream.writeln("OK")
557
if result.skip_count > 0:
558
skipped = result.skip_count
559
self.stream.writeln('%d test%s skipped' %
560
(skipped, skipped != 1 and "s" or ""))
561
if result.unsupported:
562
for feature, count in sorted(result.unsupported.items()):
563
self.stream.writeln("Missing feature '%s' skipped %d tests." %
365
"Failed tests working directories are in '%s'\n" %
367
TestCaseInTempDir.TEST_ROOT = None
368
if self.pb is not None:
855
520
raise AssertionError("value(s) %r not present in container %r" %
856
521
(missing, superlist))
858
def assertListRaises(self, excClass, func, *args, **kwargs):
859
"""Fail unless excClass is raised when the iterator from func is used.
861
Many functions can return generators this makes sure
862
to wrap them in a list() call to make sure the whole generator
863
is run, and that the proper exception is raised.
866
list(func(*args, **kwargs))
870
if getattr(excClass,'__name__', None) is not None:
871
excName = excClass.__name__
873
excName = str(excClass)
874
raise self.failureException, "%s not raised" % excName
876
def assertRaises(self, excClass, callableObj, *args, **kwargs):
877
"""Assert that a callable raises a particular exception.
879
:param excClass: As for the except statement, this may be either an
880
exception class, or a tuple of classes.
881
:param callableObj: A callable, will be passed ``*args`` and
884
Returns the exception so that you can examine it.
887
callableObj(*args, **kwargs)
891
if getattr(excClass,'__name__', None) is not None:
892
excName = excClass.__name__
895
excName = str(excClass)
896
raise self.failureException, "%s not raised" % excName
898
def assertIs(self, left, right, message=None):
523
def assertIs(self, left, right):
899
524
if not (left is right):
900
if message is not None:
901
raise AssertionError(message)
903
raise AssertionError("%r is not %r." % (left, right))
905
def assertIsNot(self, left, right, message=None):
907
if message is not None:
908
raise AssertionError(message)
910
raise AssertionError("%r is %r." % (left, right))
525
raise AssertionError("%r is not %r." % (left, right))
912
527
def assertTransportMode(self, transport, path, mode):
913
528
"""Fail if a path does not have mode mode.
927
542
self.fail("%r is an instance of %s rather than %s" % (
928
543
obj, obj.__class__, kls))
930
def expectFailure(self, reason, assertion, *args, **kwargs):
931
"""Invoke a test, expecting it to fail for the given reason.
933
This is for assertions that ought to succeed, but currently fail.
934
(The failure is *expected* but not *wanted*.) Please be very precise
935
about the failure you're expecting. If a new bug is introduced,
936
AssertionError should be raised, not KnownFailure.
938
Frequently, expectFailure should be followed by an opposite assertion.
941
Intended to be used with a callable that raises AssertionError as the
942
'assertion' parameter. args and kwargs are passed to the 'assertion'.
944
Raises KnownFailure if the test fails. Raises AssertionError if the
949
self.expectFailure('Math is broken', self.assertNotEqual, 54,
951
self.assertEqual(42, dynamic_val)
953
This means that a dynamic_val of 54 will cause the test to raise
954
a KnownFailure. Once math is fixed and the expectFailure is removed,
955
only a dynamic_val of 42 will allow the test to pass. Anything other
956
than 54 or 42 will cause an AssertionError.
959
assertion(*args, **kwargs)
960
except AssertionError:
961
raise KnownFailure(reason)
963
self.fail('Unexpected success. Should have failed: %s' % reason)
965
def _capture_warnings(self, a_callable, *args, **kwargs):
966
"""A helper for callDeprecated and applyDeprecated.
968
:param a_callable: A callable to call.
969
:param args: The positional arguments for the callable
970
:param kwargs: The keyword arguments for the callable
971
:return: A tuple (warnings, result). result is the result of calling
972
a_callable(``*args``, ``**kwargs``).
975
def capture_warnings(msg, cls=None, stacklevel=None):
976
# we've hooked into a deprecation specific callpath,
977
# only deprecations should getting sent via it.
978
self.assertEqual(cls, DeprecationWarning)
979
local_warnings.append(msg)
980
original_warning_method = symbol_versioning.warn
981
symbol_versioning.set_warning_method(capture_warnings)
983
result = a_callable(*args, **kwargs)
985
symbol_versioning.set_warning_method(original_warning_method)
986
return (local_warnings, result)
988
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
989
"""Call a deprecated callable without warning the user.
991
Note that this only captures warnings raised by symbol_versioning.warn,
992
not other callers that go direct to the warning module.
994
:param deprecation_format: The deprecation format that the callable
995
should have been deprecated with. This is the same type as the
996
parameter to deprecated_method/deprecated_function. If the
997
callable is not deprecated with this format, an assertion error
999
:param a_callable: A callable to call. This may be a bound method or
1000
a regular function. It will be called with ``*args`` and
1002
:param args: The positional arguments for the callable
1003
:param kwargs: The keyword arguments for the callable
1004
:return: The result of a_callable(``*args``, ``**kwargs``)
1006
call_warnings, result = self._capture_warnings(a_callable,
1008
expected_first_warning = symbol_versioning.deprecation_string(
1009
a_callable, deprecation_format)
1010
if len(call_warnings) == 0:
1011
self.fail("No deprecation warning generated by call to %s" %
1013
self.assertEqual(expected_first_warning, call_warnings[0])
1016
def callDeprecated(self, expected, callable, *args, **kwargs):
1017
"""Assert that a callable is deprecated in a particular way.
1019
This is a very precise test for unusual requirements. The
1020
applyDeprecated helper function is probably more suited for most tests
1021
as it allows you to simply specify the deprecation format being used
1022
and will ensure that that is issued for the function being called.
1024
Note that this only captures warnings raised by symbol_versioning.warn,
1025
not other callers that go direct to the warning module.
1027
:param expected: a list of the deprecation warnings expected, in order
1028
:param callable: The callable to call
1029
:param args: The positional arguments for the callable
1030
:param kwargs: The keyword arguments for the callable
1032
call_warnings, result = self._capture_warnings(callable,
1034
self.assertEqual(expected, call_warnings)
1037
545
def _startLogFile(self):
1038
546
"""Send bzr and test log messages to a temporary file.
1040
548
The file is removed as the test is torn down.
1042
550
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1043
self._log_file = os.fdopen(fileno, 'w+')
551
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
552
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
1044
553
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1045
554
self._log_file_name = name
1046
555
self.addCleanup(self._finishLogFile)
1077
582
def _cleanEnvironment(self):
1079
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1080
584
'HOME': os.getcwd(),
1081
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1083
'BZREMAIL': None, # may still be present in the environment
585
'APPDATA': os.getcwd(),
1085
'BZR_PROGRESS_BAR': None,
1089
'https_proxy': None,
1090
'HTTPS_PROXY': None,
1095
# Nobody cares about these ones AFAIK. So far at
1096
# least. If you do (care), please update this comment
1100
'BZR_REMOTE_PATH': None,
1102
589
self.__old_env = {}
1103
590
self.addCleanup(self._restoreEnvironment)
1104
591
for name, value in new_env.iteritems():
1105
592
self._captureVar(name, value)
1107
595
def _captureVar(self, name, newvalue):
1108
"""Set an environment variable, and reset it when finished."""
1109
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
596
"""Set an environment variable, preparing it to be reset when finished."""
597
self.__old_env[name] = os.environ.get(name, None)
599
if name in os.environ:
602
os.environ[name] = newvalue
1111
def _restore_debug_flags(self):
1112
debug.debug_flags.clear()
1113
debug.debug_flags.update(self._preserved_debug_flags)
605
def _restoreVar(name, value):
607
if name in os.environ:
610
os.environ[name] = value
1115
612
def _restoreEnvironment(self):
1116
613
for name, value in self.__old_env.iteritems():
1117
osutils.set_or_unset_env(name, value)
1119
def _restoreHooks(self):
1120
for klass, hooks in self._preserved_hooks.items():
1121
setattr(klass, 'hooks', hooks)
1123
def knownFailure(self, reason):
1124
"""This test has failed for some known reason."""
1125
raise KnownFailure(reason)
1127
def run(self, result=None):
1128
if result is None: result = self.defaultTestResult()
1129
for feature in getattr(self, '_test_needs_features', []):
1130
if not feature.available():
1131
result.startTest(self)
1132
if getattr(result, 'addNotSupported', None):
1133
result.addNotSupported(self, feature)
1135
result.addSuccess(self)
1136
result.stopTest(self)
1138
return unittest.TestCase.run(self, result)
614
self._restoreVar(name, value)
1140
616
def tearDown(self):
1141
617
self._runCleanups()
1171
647
# TODO: Perhaps this should keep running cleanups even if
1172
648
# one of them fails?
1174
# Actually pop the cleanups from the list so tearDown running
1175
# twice is safe (this happens for skipped tests).
1176
while self._cleanups:
1177
self._cleanups.pop()()
649
for cleanup_fn in reversed(self._cleanups):
1179
652
def log(self, *args):
1182
def _get_log(self, keep_log_file=False):
1183
"""Return as a string the log for this test. If the file is still
1184
on disk and keep_log_file=False, delete the log file and store the
1185
content in self._log_contents."""
1186
# flush the log file, to get all content
1188
bzrlib.trace._trace_file.flush()
1189
if self._log_contents:
656
"""Return as a string the log for this test"""
657
if self._log_file_name:
658
return open(self._log_file_name).read()
1190
660
return self._log_contents
1191
if self._log_file_name is not None:
1192
logfile = open(self._log_file_name)
1194
log_contents = logfile.read()
1197
if not keep_log_file:
1198
self._log_contents = log_contents
1200
os.remove(self._log_file_name)
1202
if sys.platform == 'win32' and e.errno == errno.EACCES:
1203
print >>sys.stderr, ('Unable to delete log file '
1204
' %r' % self._log_file_name)
1209
return "DELETED log file to reduce memory footprint"
661
# TODO: Delete the log after it's been read in
1211
@deprecated_method(zero_eighteen)
1212
663
def capture(self, cmd, retcode=0):
1213
664
"""Shortcut that splits cmd into words, runs, and returns stdout"""
1214
665
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1216
def requireFeature(self, feature):
1217
"""This test requires a specific feature is available.
1219
:raises UnavailableFeature: When feature is not available.
1221
if not feature.available():
1222
raise UnavailableFeature(feature)
1224
@deprecated_method(zero_eighteen)
1225
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
667
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
1227
668
"""Invoke bzr and return (stdout, stderr).
1229
Don't call this method, just use run_bzr() which is equivalent.
1231
:param argv: Arguments to invoke bzr. This may be either a
1232
single string, in which case it is split by shlex into words,
1233
or a list of arguments.
1234
:param retcode: Expected return code, or None for don't-care.
1235
:param encoding: Encoding for sys.stdout and sys.stderr
670
Useful for code that wants to check the contents of the
671
output, the way error messages are presented, etc.
673
This should be the main method for tests that want to exercise the
674
overall behavior of the bzr application (rather than a unit test
675
or a functional test of the library.)
677
Much of the old code runs bzr by forking a new copy of Python, but
678
that is slower, harder to debug, and generally not necessary.
680
This runs bzr through the interface that catches and reports
681
errors, and with logging set to something approximating the
682
default, so that error reporting can be checked.
684
:param argv: arguments to invoke bzr
685
:param retcode: expected return code, or None for don't-care.
686
:param encoding: encoding for sys.stdout and sys.stderr
1236
687
:param stdin: A string to be used as stdin for the command.
1237
:param working_dir: Change to this directory before running
1239
return self._run_bzr_autosplit(argv, retcode=retcode,
1240
encoding=encoding, stdin=stdin, working_dir=working_dir,
1243
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1245
"""Run bazaar command line, splitting up a string command line."""
1246
if isinstance(args, basestring):
1247
args = list(shlex.split(args))
1248
return self._run_bzr_core(args, retcode=retcode,
1249
encoding=encoding, stdin=stdin, working_dir=working_dir,
1252
def _run_bzr_core(self, args, retcode, encoding, stdin,
1254
689
if encoding is None:
1255
690
encoding = bzrlib.user_encoding
691
if stdin is not None:
692
stdin = StringIO(stdin)
1256
693
stdout = StringIOWrapper()
1257
694
stderr = StringIOWrapper()
1258
695
stdout.encoding = encoding
1259
696
stderr.encoding = encoding
1261
self.log('run bzr: %r', args)
698
self.log('run bzr: %r', argv)
1262
699
# FIXME: don't call into logging here
1263
700
handler = logging.StreamHandler(stderr)
1264
701
handler.setLevel(logging.INFO)
1265
702
logger = logging.getLogger('')
1266
703
logger.addHandler(handler)
1267
old_ui_factory = ui.ui_factory
1268
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1271
if working_dir is not None:
1272
cwd = osutils.getcwd()
1273
os.chdir(working_dir)
704
old_ui_factory = bzrlib.ui.ui_factory
705
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
708
bzrlib.ui.ui_factory.stdin = stdin
1276
result = self.apply_redirected(ui.ui_factory.stdin,
1278
bzrlib.commands.run_bzr_catch_errors,
710
result = self.apply_redirected(stdin, stdout, stderr,
711
bzrlib.commands.run_bzr_catch_errors,
1281
714
logger.removeHandler(handler)
1282
ui.ui_factory = old_ui_factory
715
bzrlib.ui.ui_factory = old_ui_factory
1286
717
out = stdout.getvalue()
1287
718
err = stderr.getvalue()
1397
786
handling, or early startup code, etc. Subprocess code can't be
1398
787
profiled or debugged so easily.
1400
:keyword retcode: The status code that is expected. Defaults to 0. If
1401
None is supplied, the status code is not checked.
1402
:keyword env_changes: A dictionary which lists changes to environment
1403
variables. A value of None will unset the env variable.
1404
The values must be strings. The change will only occur in the
1405
child, so you don't need to fix the environment after running.
1406
:keyword universal_newlines: Convert CRLF => LF
1407
:keyword allow_plugins: By default the subprocess is run with
1408
--no-plugins to ensure test reproducibility. Also, it is possible
1409
for system-wide plugins to create unexpected output on stderr,
1410
which can cause unnecessary test failures.
789
:param retcode: The status code that is expected. Defaults to 0. If
790
None is supplied, the status code is not checked.
1412
env_changes = kwargs.get('env_changes', {})
1413
working_dir = kwargs.get('working_dir', None)
1414
allow_plugins = kwargs.get('allow_plugins', False)
1415
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1416
working_dir=working_dir,
1417
allow_plugins=allow_plugins)
1418
# We distinguish between retcode=None and retcode not passed.
792
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
794
process = Popen([sys.executable, bzr_path]+args, stdout=PIPE,
796
out = process.stdout.read()
797
err = process.stderr.read()
798
retcode = process.wait()
1419
799
supplied_retcode = kwargs.get('retcode', 0)
1420
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1421
universal_newlines=kwargs.get('universal_newlines', False),
1424
def start_bzr_subprocess(self, process_args, env_changes=None,
1425
skip_if_plan_to_signal=False,
1427
allow_plugins=False):
1428
"""Start bzr in a subprocess for testing.
1430
This starts a new Python interpreter and runs bzr in there.
1431
This should only be used for tests that have a justifiable need for
1432
this isolation: e.g. they are testing startup time, or signal
1433
handling, or early startup code, etc. Subprocess code can't be
1434
profiled or debugged so easily.
1436
:param process_args: a list of arguments to pass to the bzr executable,
1437
for example ``['--version']``.
1438
:param env_changes: A dictionary which lists changes to environment
1439
variables. A value of None will unset the env variable.
1440
The values must be strings. The change will only occur in the
1441
child, so you don't need to fix the environment after running.
1442
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1444
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1446
:returns: Popen object for the started process.
1448
if skip_if_plan_to_signal:
1449
if not getattr(os, 'kill', None):
1450
raise TestSkipped("os.kill not available.")
1452
if env_changes is None:
1456
def cleanup_environment():
1457
for env_var, value in env_changes.iteritems():
1458
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1460
def restore_environment():
1461
for env_var, value in old_env.iteritems():
1462
osutils.set_or_unset_env(env_var, value)
1464
bzr_path = self.get_bzr_path()
1467
if working_dir is not None:
1468
cwd = osutils.getcwd()
1469
os.chdir(working_dir)
1472
# win32 subprocess doesn't support preexec_fn
1473
# so we will avoid using it on all platforms, just to
1474
# make sure the code path is used, and we don't break on win32
1475
cleanup_environment()
1476
command = [sys.executable, bzr_path]
1477
if not allow_plugins:
1478
command.append('--no-plugins')
1479
command.extend(process_args)
1480
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1482
restore_environment()
1488
def _popen(self, *args, **kwargs):
1489
"""Place a call to Popen.
1491
Allows tests to override this method to intercept the calls made to
1492
Popen for introspection.
1494
return Popen(*args, **kwargs)
1496
def get_bzr_path(self):
1497
"""Return the path of the 'bzr' executable for this test suite."""
1498
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1499
if not os.path.isfile(bzr_path):
1500
# We are probably installed. Assume sys.argv is the right file
1501
bzr_path = sys.argv[0]
1504
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1505
universal_newlines=False, process_args=None):
1506
"""Finish the execution of process.
1508
:param process: the Popen object returned from start_bzr_subprocess.
1509
:param retcode: The status code that is expected. Defaults to 0. If
1510
None is supplied, the status code is not checked.
1511
:param send_signal: an optional signal to send to the process.
1512
:param universal_newlines: Convert CRLF => LF
1513
:returns: (stdout, stderr)
1515
if send_signal is not None:
1516
os.kill(process.pid, send_signal)
1517
out, err = process.communicate()
1519
if universal_newlines:
1520
out = out.replace('\r\n', '\n')
1521
err = err.replace('\r\n', '\n')
1523
if retcode is not None and retcode != process.returncode:
1524
if process_args is None:
1525
process_args = "(unknown args)"
1526
mutter('Output of bzr %s:\n%s', process_args, out)
1527
mutter('Error for bzr %s:\n%s', process_args, err)
1528
self.fail('Command bzr %s failed with retcode %s != %s'
1529
% (process_args, retcode, process.returncode))
800
if supplied_retcode is not None:
801
assert supplied_retcode == retcode
1530
802
return [out, err]
1532
804
def check_inventory_shape(self, inv, shape):
1581
853
sys.stderr = real_stderr
1582
854
sys.stdin = real_stdin
1584
def reduceLockdirTimeout(self):
1585
"""Reduce the default lock timeout for the duration of the test, so that
1586
if LockContention occurs during a test, it does so quickly.
1588
Tests that expect to provoke LockContention errors should call this.
1590
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1592
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1593
self.addCleanup(resetTimeout)
1594
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1597
class TestCaseWithMemoryTransport(TestCase):
1598
"""Common test class for tests that do not need disk resources.
1600
Tests that need disk resources should derive from TestCaseWithTransport.
1602
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1604
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1605
a directory which does not exist. This serves to help ensure test isolation
1606
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1607
must exist. However, TestCaseWithMemoryTransport does not offer local
1608
file defaults for the transport in tests, nor does it obey the command line
1609
override, so tests that accidentally write to the common directory should
1612
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1613
a .bzr directory that stops us ascending higher into the filesystem.
1619
def __init__(self, methodName='runTest'):
1620
# allow test parameterisation after test construction and before test
1621
# execution. Variables that the parameteriser sets need to be
1622
# ones that are not set by setUp, or setUp will trash them.
1623
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1624
self.vfs_transport_factory = default_transport
1625
self.transport_server = None
1626
self.transport_readonly_server = None
1627
self.__vfs_server = None
1629
def get_transport(self, relpath=None):
1630
"""Return a writeable transport.
1632
This transport is for the test scratch space relative to
1635
:param relpath: a path relative to the base url.
1637
t = get_transport(self.get_url(relpath))
1638
self.assertFalse(t.is_readonly())
1641
def get_readonly_transport(self, relpath=None):
1642
"""Return a readonly transport for the test scratch space
1644
This can be used to test that operations which should only need
1645
readonly access in fact do not try to write.
1647
:param relpath: a path relative to the base url.
1649
t = get_transport(self.get_readonly_url(relpath))
1650
self.assertTrue(t.is_readonly())
1653
def create_transport_readonly_server(self):
1654
"""Create a transport server from class defined at init.
1656
This is mostly a hook for daughter classes.
1658
return self.transport_readonly_server()
1660
def get_readonly_server(self):
1661
"""Get the server instance for the readonly transport
1663
This is useful for some tests with specific servers to do diagnostics.
1665
if self.__readonly_server is None:
1666
if self.transport_readonly_server is None:
1667
# readonly decorator requested
1668
# bring up the server
1669
self.__readonly_server = ReadonlyServer()
1670
self.__readonly_server.setUp(self.get_vfs_only_server())
1672
self.__readonly_server = self.create_transport_readonly_server()
1673
self.__readonly_server.setUp(self.get_vfs_only_server())
1674
self.addCleanup(self.__readonly_server.tearDown)
1675
return self.__readonly_server
1677
def get_readonly_url(self, relpath=None):
1678
"""Get a URL for the readonly transport.
1680
This will either be backed by '.' or a decorator to the transport
1681
used by self.get_url()
1682
relpath provides for clients to get a path relative to the base url.
1683
These should only be downwards relative, not upwards.
1685
base = self.get_readonly_server().get_url()
1686
return self._adjust_url(base, relpath)
1688
def get_vfs_only_server(self):
1689
"""Get the vfs only read/write server instance.
1691
This is useful for some tests with specific servers that need
1694
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1695
is no means to override it.
1697
if self.__vfs_server is None:
1698
self.__vfs_server = MemoryServer()
1699
self.__vfs_server.setUp()
1700
self.addCleanup(self.__vfs_server.tearDown)
1701
return self.__vfs_server
1703
def get_server(self):
1704
"""Get the read/write server instance.
1706
This is useful for some tests with specific servers that need
1709
This is built from the self.transport_server factory. If that is None,
1710
then the self.get_vfs_server is returned.
1712
if self.__server is None:
1713
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1714
return self.get_vfs_only_server()
1716
# bring up a decorated means of access to the vfs only server.
1717
self.__server = self.transport_server()
1719
self.__server.setUp(self.get_vfs_only_server())
1720
except TypeError, e:
1721
# This should never happen; the try:Except here is to assist
1722
# developers having to update code rather than seeing an
1723
# uninformative TypeError.
1724
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1725
self.addCleanup(self.__server.tearDown)
1726
return self.__server
1728
def _adjust_url(self, base, relpath):
1729
"""Get a URL (or maybe a path) for the readwrite transport.
1731
This will either be backed by '.' or to an equivalent non-file based
1733
relpath provides for clients to get a path relative to the base url.
1734
These should only be downwards relative, not upwards.
1736
if relpath is not None and relpath != '.':
1737
if not base.endswith('/'):
1739
# XXX: Really base should be a url; we did after all call
1740
# get_url()! But sometimes it's just a path (from
1741
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1742
# to a non-escaped local path.
1743
if base.startswith('./') or base.startswith('/'):
1746
base += urlutils.escape(relpath)
1749
def get_url(self, relpath=None):
1750
"""Get a URL (or maybe a path) for the readwrite transport.
1752
This will either be backed by '.' or to an equivalent non-file based
1754
relpath provides for clients to get a path relative to the base url.
1755
These should only be downwards relative, not upwards.
1757
base = self.get_server().get_url()
1758
return self._adjust_url(base, relpath)
1760
def get_vfs_only_url(self, relpath=None):
1761
"""Get a URL (or maybe a path for the plain old vfs transport.
1763
This will never be a smart protocol. It always has all the
1764
capabilities of the local filesystem, but it might actually be a
1765
MemoryTransport or some other similar virtual filesystem.
1767
This is the backing transport (if any) of the server returned by
1768
get_url and get_readonly_url.
1770
:param relpath: provides for clients to get a path relative to the base
1771
url. These should only be downwards relative, not upwards.
1774
base = self.get_vfs_only_server().get_url()
1775
return self._adjust_url(base, relpath)
1777
def _make_test_root(self):
1778
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1780
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1781
TestCaseWithMemoryTransport.TEST_ROOT = root
1783
# make a fake bzr directory there to prevent any tests propagating
1784
# up onto the source directory's real branch
1785
bzrdir.BzrDir.create_standalone_workingtree(root)
1787
# The same directory is used by all tests, and we're not specifically
1788
# told when all tests are finished. This will do.
1789
atexit.register(_rmtree_temp_dir, root)
1791
def makeAndChdirToTestDir(self):
1792
"""Create a temporary directories for this one test.
1794
This must set self.test_home_dir and self.test_dir and chdir to
1797
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1799
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1800
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1801
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1803
def make_branch(self, relpath, format=None):
1804
"""Create a branch on the transport at relpath."""
1805
repo = self.make_repository(relpath, format=format)
1806
return repo.bzrdir.create_branch()
1808
def make_bzrdir(self, relpath, format=None):
1810
# might be a relative or absolute path
1811
maybe_a_url = self.get_url(relpath)
1812
segments = maybe_a_url.rsplit('/', 1)
1813
t = get_transport(maybe_a_url)
1814
if len(segments) > 1 and segments[-1] not in ('', '.'):
1818
if isinstance(format, basestring):
1819
format = bzrdir.format_registry.make_bzrdir(format)
1820
return format.initialize_on_transport(t)
1821
except errors.UninitializableFormat:
1822
raise TestSkipped("Format %s is not initializable." % format)
1824
def make_repository(self, relpath, shared=False, format=None):
1825
"""Create a repository on our default transport at relpath.
1827
Note that relpath must be a relative path, not a full url.
1829
# FIXME: If you create a remoterepository this returns the underlying
1830
# real format, which is incorrect. Actually we should make sure that
1831
# RemoteBzrDir returns a RemoteRepository.
1832
# maybe mbp 20070410
1833
made_control = self.make_bzrdir(relpath, format=format)
1834
return made_control.create_repository(shared=shared)
1836
def make_branch_and_memory_tree(self, relpath, format=None):
1837
"""Create a branch on the default transport and a MemoryTree for it."""
1838
b = self.make_branch(relpath, format=format)
1839
return memorytree.MemoryTree.create_on_branch(b)
1841
def overrideEnvironmentForTesting(self):
1842
os.environ['HOME'] = self.test_home_dir
1843
os.environ['BZR_HOME'] = self.test_home_dir
1846
super(TestCaseWithMemoryTransport, self).setUp()
1847
self._make_test_root()
1848
_currentdir = os.getcwdu()
1849
def _leaveDirectory():
1850
os.chdir(_currentdir)
1851
self.addCleanup(_leaveDirectory)
1852
self.makeAndChdirToTestDir()
1853
self.overrideEnvironmentForTesting()
1854
self.__readonly_server = None
1855
self.__server = None
1856
self.reduceLockdirTimeout()
856
def merge(self, branch_from, wt_to):
857
"""A helper for tests to do a ui-less merge.
859
This should move to the main library when someone has time to integrate
862
# minimal ui-less merge.
863
wt_to.branch.fetch(branch_from)
864
base_rev = common_ancestor(branch_from.last_revision(),
865
wt_to.branch.last_revision(),
866
wt_to.branch.repository)
867
merge_inner(wt_to.branch, branch_from.basis_tree(),
868
wt_to.branch.repository.revision_tree(base_rev),
870
wt_to.add_pending_merge(branch_from.last_revision())
873
BzrTestBase = TestCase
1859
class TestCaseInTempDir(TestCaseWithMemoryTransport):
876
class TestCaseInTempDir(TestCase):
1860
877
"""Derived class that runs a test within a temporary directory.
1862
879
This is useful for tests that need to create a branch, etc.
1886
898
self.log("actually: %r" % contents)
1887
899
self.fail("contents of %s not as expected" % filename)
1889
def makeAndChdirToTestDir(self):
1890
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
901
def _make_test_root(self):
902
if TestCaseInTempDir.TEST_ROOT is not None:
906
root = u'test%04d.tmp' % i
910
if e.errno == errno.EEXIST:
915
# successfully created
916
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
918
# make a fake bzr directory there to prevent any tests propagating
919
# up onto the source directory's real branch
920
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
923
super(TestCaseInTempDir, self).setUp()
924
self._make_test_root()
925
_currentdir = os.getcwdu()
926
# shorten the name, to avoid test failures due to path length
927
short_id = self.id().replace('bzrlib.tests.', '') \
928
.replace('__main__.', '')[-100:]
929
# it's possible the same test class is run several times for
930
# parameterized tests, so make sure the names don't collide.
934
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
936
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
937
if os.path.exists(candidate_dir):
941
self.test_dir = candidate_dir
942
os.mkdir(self.test_dir)
943
os.chdir(self.test_dir)
945
os.environ['HOME'] = self.test_dir
946
os.environ['APPDATA'] = self.test_dir
947
def _leaveDirectory():
948
os.chdir(_currentdir)
949
self.addCleanup(_leaveDirectory)
1892
For TestCaseInTempDir we create a temporary directory based on the test
1893
name and then create two subdirs - test and home under it.
1895
# create a directory within the top level test directory
1896
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1897
# now create test and home directories within this dir
1898
self.test_base_dir = candidate_dir
1899
self.test_home_dir = self.test_base_dir + '/home'
1900
os.mkdir(self.test_home_dir)
1901
self.test_dir = self.test_base_dir + '/work'
1902
os.mkdir(self.test_dir)
1903
os.chdir(self.test_dir)
1904
# put name of test inside
1905
f = file(self.test_base_dir + '/name', 'w')
1910
self.addCleanup(self.deleteTestDir)
1912
def deleteTestDir(self):
1913
os.chdir(self.TEST_ROOT)
1914
_rmtree_temp_dir(self.test_base_dir)
1916
def build_tree(self, shape, line_endings='binary', transport=None):
951
def build_tree(self, shape, line_endings='native', transport=None):
1917
952
"""Build a test tree according to a pattern.
1919
954
shape is a sequence of file specifications. If the final
1920
955
character is '/', a directory is created.
1922
This assumes that all the elements in the tree being built are new.
1924
957
This doesn't add anything to a branch.
1926
958
:param line_endings: Either 'binary' or 'native'
1927
in binary mode, exact contents are written in native mode, the
1928
line endings match the default platform endings.
1929
:param transport: A transport to write to, for building trees on VFS's.
1930
If the transport is readonly or None, "." is opened automatically.
959
in binary mode, exact contents are written
960
in native mode, the line endings match the
961
default platform endings.
963
:param transport: A transport to write to, for building trees on
964
VFS's. If the transport is readonly or None,
965
"." is opened automatically.
1933
# It's OK to just create them using forward slashes on windows.
967
# XXX: It's OK to just create them using forward slashes on windows?
1934
968
if transport is None or transport.is_readonly():
1935
969
transport = get_transport(".")
1936
970
for name in shape:
1943
977
elif line_endings == 'native':
1944
978
end = os.linesep
1946
raise errors.BzrError(
1947
'Invalid line ending request %r' % line_endings)
980
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
1948
981
content = "contents of %s%s" % (name.encode('utf-8'), end)
1949
transport.put_bytes_non_atomic(urlutils.escape(name), content)
982
transport.put(urlutils.escape(name), StringIO(content))
1951
984
def build_tree_contents(self, shape):
1952
985
build_tree_contents(shape)
987
def failUnlessExists(self, path):
988
"""Fail unless path, which may be abs or relative, exists."""
989
self.failUnless(osutils.lexists(path))
991
def failIfExists(self, path):
992
"""Fail if path, which may be abs or relative, exists."""
993
self.failIf(osutils.lexists(path))
1954
995
def assertFileEqual(self, content, path):
1955
996
"""Fail if path does not contain 'content'."""
1956
self.failUnlessExists(path)
1957
f = file(path, 'rb')
1962
self.assertEqualDiff(content, s)
1964
def failUnlessExists(self, path):
1965
"""Fail unless path or paths, which may be abs or relative, exist."""
1966
if not isinstance(path, basestring):
1968
self.failUnlessExists(p)
1970
self.failUnless(osutils.lexists(path),path+" does not exist")
1972
def failIfExists(self, path):
1973
"""Fail if path or paths, which may be abs or relative, exist."""
1974
if not isinstance(path, basestring):
1976
self.failIfExists(p)
1978
self.failIf(osutils.lexists(path),path+" exists")
1980
def assertInWorkingTree(self,path,root_path='.',tree=None):
1981
"""Assert whether path or paths are in the WorkingTree"""
1983
tree = workingtree.WorkingTree.open(root_path)
1984
if not isinstance(path, basestring):
1986
self.assertInWorkingTree(p,tree=tree)
1988
self.assertIsNot(tree.path2id(path), None,
1989
path+' not in working tree.')
1991
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
1992
"""Assert whether path or paths are not in the WorkingTree"""
1994
tree = workingtree.WorkingTree.open(root_path)
1995
if not isinstance(path, basestring):
1997
self.assertNotInWorkingTree(p,tree=tree)
1999
self.assertIs(tree.path2id(path), None, path+' in working tree.')
997
self.failUnless(osutils.lexists(path))
998
# TODO: jam 20060427 Shouldn't this be 'rb'?
999
self.assertEqualDiff(content, open(path, 'r').read())
2002
1002
class TestCaseWithTransport(TestCaseInTempDir):
2013
1013
readwrite one must both define get_url() as resolving to os.getcwd().
2016
def get_vfs_only_server(self):
2017
"""See TestCaseWithMemoryTransport.
1016
def __init__(self, methodName='testMethod'):
1017
super(TestCaseWithTransport, self).__init__(methodName)
1018
self.__readonly_server = None
1019
self.__server = None
1020
self.transport_server = default_transport
1021
self.transport_readonly_server = None
1023
def get_readonly_url(self, relpath=None):
1024
"""Get a URL for the readonly transport.
1026
This will either be backed by '.' or a decorator to the transport
1027
used by self.get_url()
1028
relpath provides for clients to get a path relative to the base url.
1029
These should only be downwards relative, not upwards.
1031
base = self.get_readonly_server().get_url()
1032
if relpath is not None:
1033
if not base.endswith('/'):
1035
base = base + relpath
1038
def get_readonly_server(self):
1039
"""Get the server instance for the readonly transport
1041
This is useful for some tests with specific servers to do diagnostics.
1043
if self.__readonly_server is None:
1044
if self.transport_readonly_server is None:
1045
# readonly decorator requested
1046
# bring up the server
1048
self.__readonly_server = ReadonlyServer()
1049
self.__readonly_server.setUp(self.__server)
1051
self.__readonly_server = self.transport_readonly_server()
1052
self.__readonly_server.setUp()
1053
self.addCleanup(self.__readonly_server.tearDown)
1054
return self.__readonly_server
1056
def get_server(self):
1057
"""Get the read/write server instance.
2019
1059
This is useful for some tests with specific servers that need
2022
if self.__vfs_server is None:
2023
self.__vfs_server = self.vfs_transport_factory()
2024
self.__vfs_server.setUp()
2025
self.addCleanup(self.__vfs_server.tearDown)
2026
return self.__vfs_server
1062
if self.__server is None:
1063
self.__server = self.transport_server()
1064
self.__server.setUp()
1065
self.addCleanup(self.__server.tearDown)
1066
return self.__server
1068
def get_url(self, relpath=None):
1069
"""Get a URL for the readwrite transport.
1071
This will either be backed by '.' or to an equivalent non-file based
1073
relpath provides for clients to get a path relative to the base url.
1074
These should only be downwards relative, not upwards.
1076
base = self.get_server().get_url()
1077
if relpath is not None and relpath != '.':
1078
if not base.endswith('/'):
1080
base = base + urlutils.escape(relpath)
1083
def get_transport(self):
1084
"""Return a writeable transport for the test scratch space"""
1085
t = get_transport(self.get_url())
1086
self.assertFalse(t.is_readonly())
1089
def get_readonly_transport(self):
1090
"""Return a readonly transport for the test scratch space
1092
This can be used to test that operations which should only need
1093
readonly access in fact do not try to write.
1095
t = get_transport(self.get_readonly_url())
1096
self.assertTrue(t.is_readonly())
1099
def make_branch(self, relpath, format=None):
1100
"""Create a branch on the transport at relpath."""
1101
repo = self.make_repository(relpath, format=format)
1102
return repo.bzrdir.create_branch()
1104
def make_bzrdir(self, relpath, format=None):
1106
url = self.get_url(relpath)
1107
mutter('relpath %r => url %r', relpath, url)
1108
segments = url.split('/')
1109
if segments and segments[-1] not in ('', '.'):
1110
parent = '/'.join(segments[:-1])
1111
t = get_transport(parent)
1113
t.mkdir(segments[-1])
1114
except errors.FileExists:
1117
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1118
# FIXME: make this use a single transport someday. RBC 20060418
1119
return format.initialize_on_transport(get_transport(relpath))
1120
except errors.UninitializableFormat:
1121
raise TestSkipped("Format %s is not initializable." % format)
1123
def make_repository(self, relpath, shared=False, format=None):
1124
"""Create a repository on our default transport at relpath."""
1125
made_control = self.make_bzrdir(relpath, format=format)
1126
return made_control.create_repository(shared=shared)
2028
1128
def make_branch_and_tree(self, relpath, format=None):
2029
1129
"""Create a branch on the transport and a tree locally.
2031
If the transport is not a LocalTransport, the Tree can't be created on
2032
the transport. In that case if the vfs_transport_factory is
2033
LocalURLServer the working tree is created in the local
2034
directory backing the transport, and the returned tree's branch and
2035
repository will also be accessed locally. Otherwise a lightweight
2036
checkout is created and returned.
2038
:param format: The BzrDirFormat.
2039
:returns: the WorkingTree.
2041
1133
# TODO: always use the local disk path for the working tree,
2042
1134
# this obviously requires a format that supports branch references
2101
1175
def setUp(self):
2102
1176
super(ChrootedTestCase, self).setUp()
2103
if not self.vfs_transport_factory == MemoryServer:
2104
self.transport_readonly_server = HttpServer
2107
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2108
random_order=False):
2109
"""Create a test suite by filtering another one.
2111
:param suite: the source suite
2112
:param pattern: pattern that names must match
2113
:param exclude_pattern: pattern that names must not match, if any
2114
:param random_order: if True, tests in the new suite will be put in
2116
:returns: the newly created suite
2118
return sort_suite_by_re(suite, pattern, exclude_pattern,
2119
random_order, False)
2122
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2123
random_order=False, append_rest=True):
2124
"""Create a test suite by sorting another one.
2126
:param suite: the source suite
2127
:param pattern: pattern that names must match in order to go
2128
first in the new suite
2129
:param exclude_pattern: pattern that names must not match, if any
2130
:param random_order: if True, tests in the new suite will be put in
2132
:param append_rest: if False, pattern is a strict filter and not
2133
just an ordering directive
2134
:returns: the newly created suite
1177
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1178
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1181
def filter_suite_by_re(suite, pattern):
1182
result = TestUtil.TestSuite()
2138
1183
filter_re = re.compile(pattern)
2139
if exclude_pattern is not None:
2140
exclude_re = re.compile(exclude_pattern)
2141
1184
for test in iter_suite_tests(suite):
2143
if exclude_pattern is None or not exclude_re.search(test_id):
2144
if filter_re.search(test_id):
2149
random.shuffle(first)
2150
random.shuffle(second)
2151
return TestUtil.TestSuite(first + second)
1185
if filter_re.search(test.id()):
1186
result.addTest(test)
2154
1190
def run_suite(suite, name='test', verbose=False, pattern=".*",
2155
stop_on_failure=False,
2156
transport=None, lsprof_timed=None, bench_history=None,
2157
matching_tests_first=None,
2160
exclude_pattern=None,
1191
stop_on_failure=False, keep_output=False,
1192
transport=None, lsprof_timed=None):
1193
TestCaseInTempDir._TEST_NAME = name
2162
1194
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1200
pb = progress.ProgressBar()
2167
1201
runner = TextTestRunner(stream=sys.stdout,
2168
1202
descriptions=0,
2169
1203
verbosity=verbosity,
2170
bench_history=bench_history,
2171
list_only=list_only,
1204
keep_output=keep_output,
2173
1206
runner.stop_on_failure=stop_on_failure
2174
# Initialise the random number generator and display the seed used.
2175
# We convert the seed to a long to make it reuseable across invocations.
2176
random_order = False
2177
if random_seed is not None:
2179
if random_seed == "now":
2180
random_seed = long(time.time())
2182
# Convert the seed to a long if we can
2184
random_seed = long(random_seed)
2187
runner.stream.writeln("Randomizing test order using seed %s\n" %
2189
random.seed(random_seed)
2190
# Customise the list of tests if requested
2191
if pattern != '.*' or exclude_pattern is not None or random_order:
2192
if matching_tests_first:
2193
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2196
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
1208
suite = filter_suite_by_re(suite, pattern)
2198
1209
result = runner.run(suite)
2199
1210
return result.wasSuccessful()
2202
1213
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2203
1215
transport=None,
2204
1216
test_suite_factory=None,
2207
matching_tests_first=None,
2210
exclude_pattern=None):
2211
1218
"""Run the whole test suite under the enhanced runner"""
2212
# XXX: Very ugly way to do this...
2213
# Disable warning about old formats because we don't want it to disturb
2214
# any blackbox tests.
2215
from bzrlib import repository
2216
repository._deprecation_warning_done = True
2218
1219
global default_transport
2219
1220
if transport is None:
2220
1221
transport = default_transport
2247
1243
testmod_names = [
2248
1244
'bzrlib.tests.test_ancestry',
2249
'bzrlib.tests.test_annotate',
2250
1245
'bzrlib.tests.test_api',
2251
'bzrlib.tests.test_atomicfile',
2252
1246
'bzrlib.tests.test_bad_files',
2253
1247
'bzrlib.tests.test_branch',
2254
'bzrlib.tests.test_branchbuilder',
2255
'bzrlib.tests.test_bugtracker',
2256
1248
'bzrlib.tests.test_bundle',
2257
1249
'bzrlib.tests.test_bzrdir',
2258
'bzrlib.tests.test_cache_utf8',
2259
'bzrlib.tests.test_commands',
1250
'bzrlib.tests.test_command',
2260
1251
'bzrlib.tests.test_commit',
2261
1252
'bzrlib.tests.test_commit_merge',
2262
1253
'bzrlib.tests.test_config',
2263
1254
'bzrlib.tests.test_conflicts',
2264
'bzrlib.tests.test_pack',
2265
'bzrlib.tests.test_counted_lock',
2266
1255
'bzrlib.tests.test_decorators',
2267
'bzrlib.tests.test_delta',
2268
'bzrlib.tests.test_deprecated_graph',
2269
1256
'bzrlib.tests.test_diff',
2270
'bzrlib.tests.test_dirstate',
1257
'bzrlib.tests.test_doc_generate',
1258
'bzrlib.tests.test_emptytree',
2271
1259
'bzrlib.tests.test_errors',
2272
1260
'bzrlib.tests.test_escaped_store',
2273
'bzrlib.tests.test_extract',
2274
1261
'bzrlib.tests.test_fetch',
2275
'bzrlib.tests.test_ftp_transport',
2276
'bzrlib.tests.test_generate_docs',
2277
'bzrlib.tests.test_generate_ids',
2278
'bzrlib.tests.test_globbing',
2279
1262
'bzrlib.tests.test_gpg',
2280
1263
'bzrlib.tests.test_graph',
2281
1264
'bzrlib.tests.test_hashcache',
2282
'bzrlib.tests.test_help',
2283
'bzrlib.tests.test_hooks',
2284
1265
'bzrlib.tests.test_http',
2285
'bzrlib.tests.test_http_response',
2286
'bzrlib.tests.test_https_ca_bundle',
2287
1266
'bzrlib.tests.test_identitymap',
2288
'bzrlib.tests.test_ignores',
2289
'bzrlib.tests.test_info',
2290
1267
'bzrlib.tests.test_inv',
2291
1268
'bzrlib.tests.test_knit',
2292
'bzrlib.tests.test_lazy_import',
2293
'bzrlib.tests.test_lazy_regex',
2294
1269
'bzrlib.tests.test_lockdir',
2295
1270
'bzrlib.tests.test_lockable_files',
2296
1271
'bzrlib.tests.test_log',
2297
'bzrlib.tests.test_lsprof',
2298
'bzrlib.tests.test_memorytree',
2299
1272
'bzrlib.tests.test_merge',
2300
1273
'bzrlib.tests.test_merge3',
2301
1274
'bzrlib.tests.test_merge_core',
2302
'bzrlib.tests.test_merge_directive',
2303
1275
'bzrlib.tests.test_missing',
2304
1276
'bzrlib.tests.test_msgeditor',
2305
1277
'bzrlib.tests.test_nonascii',
2306
1278
'bzrlib.tests.test_options',
2307
1279
'bzrlib.tests.test_osutils',
2308
'bzrlib.tests.test_osutils_encodings',
2309
1280
'bzrlib.tests.test_patch',
2310
1281
'bzrlib.tests.test_patches',
2311
1282
'bzrlib.tests.test_permissions',
2312
1283
'bzrlib.tests.test_plugins',
2313
1284
'bzrlib.tests.test_progress',
2314
1285
'bzrlib.tests.test_reconcile',
2315
'bzrlib.tests.test_registry',
2316
'bzrlib.tests.test_remote',
2317
1286
'bzrlib.tests.test_repository',
2318
'bzrlib.tests.test_revert',
2319
1287
'bzrlib.tests.test_revision',
2320
1288
'bzrlib.tests.test_revisionnamespaces',
1289
'bzrlib.tests.test_revprops',
2321
1290
'bzrlib.tests.test_revisiontree',
2322
1291
'bzrlib.tests.test_rio',
2323
1292
'bzrlib.tests.test_sampler',
2324
1293
'bzrlib.tests.test_selftest',
2325
1294
'bzrlib.tests.test_setup',
2326
1295
'bzrlib.tests.test_sftp_transport',
2327
'bzrlib.tests.test_smart',
2328
1296
'bzrlib.tests.test_smart_add',
2329
'bzrlib.tests.test_smart_transport',
2330
'bzrlib.tests.test_smtp_connection',
2331
1297
'bzrlib.tests.test_source',
2332
'bzrlib.tests.test_ssh_transport',
2333
1298
'bzrlib.tests.test_status',
2334
1299
'bzrlib.tests.test_store',
2335
'bzrlib.tests.test_strace',
2336
'bzrlib.tests.test_subsume',
2337
1300
'bzrlib.tests.test_symbol_versioning',
2338
'bzrlib.tests.test_tag',
2339
1301
'bzrlib.tests.test_testament',
2340
1302
'bzrlib.tests.test_textfile',
2341
1303
'bzrlib.tests.test_textmerge',
2342
'bzrlib.tests.test_timestamp',
2343
1304
'bzrlib.tests.test_trace',
2344
1305
'bzrlib.tests.test_transactions',
2345
1306
'bzrlib.tests.test_transform',
2346
1307
'bzrlib.tests.test_transport',
2347
'bzrlib.tests.test_tree',
2348
'bzrlib.tests.test_treebuilder',
2349
1308
'bzrlib.tests.test_tsort',
2350
1309
'bzrlib.tests.test_tuned_gzip',
2351
1310
'bzrlib.tests.test_ui',
2352
1311
'bzrlib.tests.test_upgrade',
2353
1312
'bzrlib.tests.test_urlutils',
2354
1313
'bzrlib.tests.test_versionedfile',
2355
'bzrlib.tests.test_version',
2356
'bzrlib.tests.test_version_info',
2357
1314
'bzrlib.tests.test_weave',
2358
1315
'bzrlib.tests.test_whitebox',
2359
1316
'bzrlib.tests.test_workingtree',
2360
'bzrlib.tests.test_workingtree_4',
2361
'bzrlib.tests.test_wsgi',
2362
1317
'bzrlib.tests.test_xml',
2364
1319
test_transport_implementations = [
2404
1342
"""Adapt the modules in mods_list using adapter and add to suite."""
2405
1343
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2406
1344
suite.addTests(adapter.adapt(test))
2409
def _rmtree_temp_dir(dirname):
2410
# If LANG=C we probably have created some bogus paths
2411
# which rmtree(unicode) will fail to delete
2412
# so make sure we are using rmtree(str) to delete everything
2413
# except on win32, where rmtree(str) will fail
2414
# since it doesn't have the property of byte-stream paths
2415
# (they are either ascii or mbcs)
2416
if sys.platform == 'win32':
2417
# make sure we are using the unicode win32 api
2418
dirname = unicode(dirname)
2420
dirname = dirname.encode(sys.getfilesystemencoding())
2422
osutils.rmtree(dirname)
2424
if sys.platform == 'win32' and e.errno == errno.EACCES:
2425
print >>sys.stderr, ('Permission denied: '
2426
'unable to remove testing dir '
2427
'%s' % os.path.basename(dirname))
2432
class Feature(object):
2433
"""An operating system Feature."""
2436
self._available = None
2438
def available(self):
2439
"""Is the feature available?
2441
:return: True if the feature is available.
2443
if self._available is None:
2444
self._available = self._probe()
2445
return self._available
2448
"""Implement this method in concrete features.
2450
:return: True if the feature is available.
2452
raise NotImplementedError
2455
if getattr(self, 'feature_name', None):
2456
return self.feature_name()
2457
return self.__class__.__name__
2460
class TestScenarioApplier(object):
2461
"""A tool to apply scenarios to tests."""
2463
def adapt(self, test):
2464
"""Return a TestSuite containing a copy of test for each scenario."""
2465
result = unittest.TestSuite()
2466
for scenario in self.scenarios:
2467
result.addTest(self.adapt_test_to_scenario(test, scenario))
2470
def adapt_test_to_scenario(self, test, scenario):
2471
"""Copy test and apply scenario to it.
2473
:param test: A test to adapt.
2474
:param scenario: A tuple describing the scenarion.
2475
The first element of the tuple is the new test id.
2476
The second element is a dict containing attributes to set on the
2478
:return: The adapted test.
2480
from copy import deepcopy
2481
new_test = deepcopy(test)
2482
for name, value in scenario[1].items():
2483
setattr(new_test, name, value)
2484
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2485
new_test.id = lambda: new_id