208
216
def startTest(self, test):
209
217
unittest.TestResult.startTest(self, test)
210
218
self.report_test_start(test)
219
test.number = self.count
211
220
self._recordTestStartTime()
213
222
def _recordTestStartTime(self):
214
223
"""Record that a test has started."""
215
224
self._start_time = time.time()
226
def _cleanupLogFile(self, test):
227
# We can only do this if we have one of our TestCases, not if
229
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
230
if setKeepLogfile is not None:
217
233
def addError(self, test, err):
234
self.extractBenchmarkTime(test)
235
self._cleanupLogFile(test)
218
236
if isinstance(err[1], TestSkipped):
219
return self.addSkipped(test, err)
237
return self.addSkipped(test, err)
238
elif isinstance(err[1], UnavailableFeature):
239
return self.addNotSupported(test, err[1].args[0])
220
240
unittest.TestResult.addError(self, test, err)
221
# We can only do this if we have one of our TestCases, not if
223
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
224
if setKeepLogfile is not None:
226
self.extractBenchmarkTime(test)
241
self.error_count += 1
227
242
self.report_error(test, err)
228
243
if self.stop_early:
231
246
def addFailure(self, test, err):
247
self._cleanupLogFile(test)
248
self.extractBenchmarkTime(test)
249
if isinstance(err[1], KnownFailure):
250
return self.addKnownFailure(test, err)
232
251
unittest.TestResult.addFailure(self, test, err)
233
# We can only do this if we have one of our TestCases, not if
235
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
236
if setKeepLogfile is not None:
238
self.extractBenchmarkTime(test)
252
self.failure_count += 1
239
253
self.report_failure(test, err)
240
254
if self.stop_early:
257
def addKnownFailure(self, test, err):
258
self.known_failure_count += 1
259
self.report_known_failure(test, err)
261
def addNotSupported(self, test, feature):
262
self.unsupported.setdefault(str(feature), 0)
263
self.unsupported[str(feature)] += 1
264
self.report_unsupported(test, feature)
243
266
def addSuccess(self, test):
244
267
self.extractBenchmarkTime(test)
245
268
if self._bench_history is not None:
267
289
def printErrorList(self, flavour, errors):
268
290
for test, err in errors:
269
291
self.stream.writeln(self.separator1)
270
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
292
self.stream.write("%s: " % flavour)
294
self.stream.write('#%d ' % test.number)
295
self.stream.writeln(self.getDescription(test))
271
296
if getattr(test, '_get_log', None) is not None:
272
297
print >>self.stream
273
298
print >>self.stream, \
291
316
class TextTestResult(ExtendedTestResult):
292
317
"""Displays progress and results of tests in text form"""
294
def __init__(self, *args, **kw):
295
ExtendedTestResult.__init__(self, *args, **kw)
296
self.pb = self.ui.nested_progress_bar()
319
def __init__(self, stream, descriptions, verbosity,
324
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
325
bench_history, num_tests)
327
self.pb = self.ui.nested_progress_bar()
328
self._supplied_pb = False
331
self._supplied_pb = True
297
332
self.pb.show_pct = False
298
333
self.pb.show_spinner = False
299
self.pb.show_eta = False,
334
self.pb.show_eta = False,
300
335
self.pb.show_count = False
301
336
self.pb.show_bar = False
325
364
+ self._shortened_test_description(test))
366
def _test_description(self, test):
368
return '#%d %s' % (self.count,
369
self._shortened_test_description(test))
371
return self._shortened_test_description(test)
327
373
def report_error(self, test, err):
328
self.error_count += 1
329
374
self.pb.note('ERROR: %s\n %s\n',
330
self._shortened_test_description(test),
375
self._test_description(test),
334
379
def report_failure(self, test, err):
335
self.failure_count += 1
336
380
self.pb.note('FAIL: %s\n %s\n',
337
self._shortened_test_description(test),
381
self._test_description(test),
385
def report_known_failure(self, test, err):
386
self.pb.note('XFAIL: %s\n%s\n',
387
self._test_description(test), err[1])
341
389
def report_skip(self, test, skip_excinfo):
342
390
self.skip_count += 1
380
432
name = self._shortened_test_description(test)
381
433
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
382
434
# numbers, plus a trailing blank
383
self.stream.write(self._ellipsize_to_right(name,
384
osutils.terminal_width()-30))
435
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
437
self.stream.write('%5d ' % self.count)
438
self.stream.write(self._ellipsize_to_right(name,
439
osutils.terminal_width()-36))
441
self.stream.write(self._ellipsize_to_right(name,
442
osutils.terminal_width()-30))
385
443
self.stream.flush()
445
def _error_summary(self, err):
449
return '%s%s' % (indent, err[1])
387
451
def report_error(self, test, err):
388
self.error_count += 1
389
self.stream.writeln('ERROR %s\n %s'
390
% (self._testTimeString(), err[1]))
452
self.stream.writeln('ERROR %s\n%s'
453
% (self._testTimeString(),
454
self._error_summary(err)))
392
456
def report_failure(self, test, err):
393
self.failure_count += 1
394
self.stream.writeln(' FAIL %s\n %s'
395
% (self._testTimeString(), err[1]))
457
self.stream.writeln(' FAIL %s\n%s'
458
% (self._testTimeString(),
459
self._error_summary(err)))
461
def report_known_failure(self, test, err):
462
self.stream.writeln('XFAIL %s\n%s'
463
% (self._testTimeString(),
464
self._error_summary(err)))
397
466
def report_success(self, test):
398
467
self.stream.writeln(' OK %s' % self._testTimeString())
399
468
for bench_called, stats in getattr(test, '_benchcalls', []):
400
469
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
401
470
stats.pprint(file=self.stream)
471
# flush the stream so that we get smooth output. This verbose mode is
472
# used to show the output in PQM.
402
473
self.stream.flush()
404
475
def report_skip(self, test, skip_excinfo):
405
print >>self.stream, ' SKIP %s' % self._testTimeString()
406
print >>self.stream, ' %s' % skip_excinfo[1]
477
self.stream.writeln(' SKIP %s\n%s'
478
% (self._testTimeString(),
479
self._error_summary(skip_excinfo)))
481
def report_unsupported(self, test, feature):
482
"""test cannot be run because feature is missing."""
483
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
484
%(self._testTimeString(), feature))
409
488
class TextTestRunner(object):
454
533
if failed: self.stream.write(", ")
455
534
self.stream.write("errors=%d" % errored)
535
if result.known_failure_count:
536
if failed or errored: self.stream.write(", ")
537
self.stream.write("known_failure_count=%d" %
538
result.known_failure_count)
456
539
self.stream.writeln(")")
458
self.stream.writeln("OK")
541
if result.known_failure_count:
542
self.stream.writeln("OK (known_failures=%d)" %
543
result.known_failure_count)
545
self.stream.writeln("OK")
546
if result.skip_count > 0:
547
skipped = result.skip_count
548
self.stream.writeln('%d test%s skipped' %
549
(skipped, skipped != 1 and "s" or ""))
550
if result.unsupported:
551
for feature, count in sorted(result.unsupported.items()):
552
self.stream.writeln("Missing feature '%s' skipped %d tests." %
459
554
result.report_cleaning_up()
460
555
# This is still a little bogus,
461
556
# but only a little. Folk not using our testrunner will
476
571
test_root = test_root.encode(
477
572
sys.getfilesystemencoding())
478
osutils.rmtree(test_root)
574
osutils.rmtree(test_root)
576
if sys.platform == 'win32' and e.errno == errno.EACCES:
577
print >>sys.stderr, ('Permission denied: '
578
'unable to remove testing dir '
579
'%s' % os.path.basename(test_root))
480
583
note("Failed tests working directories are in '%s'\n", test_root)
481
584
TestCaseWithMemoryTransport.TEST_ROOT = None
500
603
"""Indicates that a test was intentionally skipped, rather than failing."""
606
class KnownFailure(AssertionError):
607
"""Indicates that a test failed in a precisely expected manner.
609
Such failures dont block the whole test suite from passing because they are
610
indicators of partially completed code or of future work. We have an
611
explicit error for them so that we can ensure that they are always visible:
612
KnownFailures are always shown in the output of bzr selftest.
616
class UnavailableFeature(Exception):
617
"""A feature required for this test was not available.
619
The feature should be used to construct the exception.
503
623
class CommandFailed(Exception):
530
650
return setattr(self._cstring, name, val)
653
class TestUIFactory(ui.CLIUIFactory):
654
"""A UI Factory for testing.
656
Hide the progress bar but emit note()s.
658
Allows get_password to be tested without real tty attached.
665
super(TestUIFactory, self).__init__()
666
if stdin is not None:
667
# We use a StringIOWrapper to be able to test various
668
# encodings, but the user is still responsible to
669
# encode the string and to set the encoding attribute
670
# of StringIOWrapper.
671
self.stdin = StringIOWrapper(stdin)
673
self.stdout = sys.stdout
677
self.stderr = sys.stderr
682
"""See progress.ProgressBar.clear()."""
684
def clear_term(self):
685
"""See progress.ProgressBar.clear_term()."""
687
def clear_term(self):
688
"""See progress.ProgressBar.clear_term()."""
691
"""See progress.ProgressBar.finished()."""
693
def note(self, fmt_string, *args, **kwargs):
694
"""See progress.ProgressBar.note()."""
695
self.stdout.write((fmt_string + "\n") % args)
697
def progress_bar(self):
700
def nested_progress_bar(self):
703
def update(self, message, count=None, total=None):
704
"""See progress.ProgressBar.update()."""
706
def get_non_echoed_password(self, prompt):
707
"""Get password from stdin without trying to handle the echo mode"""
709
self.stdout.write(prompt)
710
password = self.stdin.readline()
713
if password[-1] == '\n':
714
password = password[:-1]
533
718
class TestCase(unittest.TestCase):
534
719
"""Base class for bzr unit tests.
600
785
charjunk=lambda x: False)
601
786
return ''.join(difflines)
788
def assertEqual(self, a, b, message=''):
792
except UnicodeError, e:
793
# If we can't compare without getting a UnicodeError, then
794
# obviously they are different
795
mutter('UnicodeError: %s', e)
798
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
800
pformat(a, indent=4), pformat(b, indent=4)))
802
assertEquals = assertEqual
603
804
def assertEqualDiff(self, a, b, message=None):
604
805
"""Assert two texts are equal, if not raise an exception.
844
1045
def _restoreHooks(self):
845
1046
bzrlib.branch.Branch.hooks = self._preserved_hooks
1048
def knownFailure(self, reason):
1049
"""This test has failed for some known reason."""
1050
raise KnownFailure(reason)
1052
def run(self, result=None):
1053
if result is None: result = self.defaultTestResult()
1054
for feature in getattr(self, '_test_needs_features', []):
1055
if not feature.available():
1056
result.startTest(self)
1057
if getattr(result, 'addNotSupported', None):
1058
result.addNotSupported(self, feature)
1060
result.addSuccess(self)
1061
result.stopTest(self)
1063
return unittest.TestCase.run(self, result)
847
1065
def tearDown(self):
848
1066
self._runCleanups()
849
1067
unittest.TestCase.tearDown(self)
909
1137
"""Shortcut that splits cmd into words, runs, and returns stdout"""
910
1138
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1140
def requireFeature(self, feature):
1141
"""This test requires a specific feature is available.
1143
:raises UnavailableFeature: When feature is not available.
1145
if not feature.available():
1146
raise UnavailableFeature(feature)
912
1148
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
913
1149
working_dir=None):
914
1150
"""Invoke bzr and return (stdout, stderr).
948
1182
handler.setLevel(logging.INFO)
949
1183
logger = logging.getLogger('')
950
1184
logger.addHandler(handler)
951
old_ui_factory = bzrlib.ui.ui_factory
952
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
955
bzrlib.ui.ui_factory.stdin = stdin
1185
old_ui_factory = ui.ui_factory
1186
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
958
1189
if working_dir is not None:
963
1194
saved_debug_flags = frozenset(debug.debug_flags)
964
1195
debug.debug_flags.clear()
966
result = self.apply_redirected(stdin, stdout, stderr,
1197
result = self.apply_redirected(ui.ui_factory.stdin,
967
1199
bzrlib.commands.run_bzr_catch_errors,
970
1202
debug.debug_flags.update(saved_debug_flags)
972
1204
logger.removeHandler(handler)
973
bzrlib.ui.ui_factory = old_ui_factory
1205
ui.ui_factory = old_ui_factory
974
1206
if cwd is not None:
1488
1722
For TestCaseInTempDir we create a temporary directory based on the test
1489
1723
name and then create two subdirs - test and home under it.
1725
if NUMBERED_DIRS: # strongly recommended on Windows
1726
# due the path length limitation (260 chars)
1727
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1728
int(self.number/1000),
1730
os.makedirs(candidate_dir)
1731
self.test_home_dir = candidate_dir + '/home'
1732
os.mkdir(self.test_home_dir)
1733
self.test_dir = candidate_dir + '/work'
1734
os.mkdir(self.test_dir)
1735
os.chdir(self.test_dir)
1736
# put name of test inside
1737
f = file(candidate_dir + '/name', 'w')
1491
1742
# shorten the name, to avoid test failures due to path length
1492
1743
short_id = self.id().replace('bzrlib.tests.', '') \
1493
1744
.replace('__main__.', '')[-100:]
1554
1805
"""Fail if path does not contain 'content'."""
1555
1806
self.failUnlessExists(path)
1556
1807
# TODO: jam 20060427 Shouldn't this be 'rb'?
1557
self.assertEqualDiff(content, open(path, 'r').read())
1813
self.assertEqualDiff(content, s)
1559
1815
def failUnlessExists(self, path):
1560
1816
"""Fail unless path, which may be abs or relative, exists."""
1645
1901
self.fail("path %s is not a directory; has mode %#o"
1646
1902
% (relpath, mode))
1904
def assertTreesEqual(self, left, right):
1905
"""Check that left and right have the same content and properties."""
1906
# we use a tree delta to check for equality of the content, and we
1907
# manually check for equality of other things such as the parents list.
1908
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
1909
differences = left.changes_from(right)
1910
self.assertFalse(differences.has_changed(),
1911
"Trees %r and %r are different: %r" % (left, right, differences))
1648
1913
def setUp(self):
1649
1914
super(TestCaseWithTransport, self).setUp()
1650
1915
self.__server = None
1692
1957
def run_suite(suite, name='test', verbose=False, pattern=".*",
1693
1958
stop_on_failure=False, keep_output=False,
1694
1959
transport=None, lsprof_timed=None, bench_history=None,
1695
matching_tests_first=None):
1960
matching_tests_first=None,
1961
numbered_dirs=None):
1962
global NUMBERED_DIRS
1963
if numbered_dirs is not None:
1964
NUMBERED_DIRS = bool(numbered_dirs)
1696
1966
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1771
2043
'bzrlib.tests.test_decorators',
1772
2044
'bzrlib.tests.test_delta',
1773
2045
'bzrlib.tests.test_diff',
2046
'bzrlib.tests.test_dirstate',
1774
2047
'bzrlib.tests.test_doc_generate',
1775
2048
'bzrlib.tests.test_errors',
1776
2049
'bzrlib.tests.test_escaped_store',
2050
'bzrlib.tests.test_extract',
1777
2051
'bzrlib.tests.test_fetch',
1778
2052
'bzrlib.tests.test_ftp_transport',
1779
2053
'bzrlib.tests.test_generate_docs',
1797
2072
'bzrlib.tests.test_merge',
1798
2073
'bzrlib.tests.test_merge3',
1799
2074
'bzrlib.tests.test_merge_core',
2075
'bzrlib.tests.test_merge_directive',
1800
2076
'bzrlib.tests.test_missing',
1801
2077
'bzrlib.tests.test_msgeditor',
1802
2078
'bzrlib.tests.test_nonascii',
1823
2099
'bzrlib.tests.test_smart_add',
1824
2100
'bzrlib.tests.test_smart_transport',
1825
2101
'bzrlib.tests.test_source',
2102
'bzrlib.tests.test_ssh_transport',
1826
2103
'bzrlib.tests.test_status',
1827
2104
'bzrlib.tests.test_store',
2105
'bzrlib.tests.test_strace',
2106
'bzrlib.tests.test_subsume',
1828
2107
'bzrlib.tests.test_symbol_versioning',
2108
'bzrlib.tests.test_tag',
1829
2109
'bzrlib.tests.test_testament',
1830
2110
'bzrlib.tests.test_textfile',
1831
2111
'bzrlib.tests.test_textmerge',
2112
'bzrlib.tests.test_timestamp',
1832
2113
'bzrlib.tests.test_trace',
1833
2114
'bzrlib.tests.test_transactions',
1834
2115
'bzrlib.tests.test_transform',
1872
2154
for name, plugin in bzrlib.plugin.all_plugins().items():
1873
2155
if getattr(plugin, 'test_suite', None) is not None:
1874
suite.addTest(plugin.test_suite())
2156
default_encoding = sys.getdefaultencoding()
2158
plugin_suite = plugin.test_suite()
2159
except ImportError, e:
2160
bzrlib.trace.warning(
2161
'Unable to test plugin "%s": %s', name, e)
2163
suite.addTest(plugin_suite)
2164
if default_encoding != sys.getdefaultencoding():
2165
bzrlib.trace.warning(
2166
'Plugin "%s" tried to reset default encoding to: %s', name,
2167
sys.getdefaultencoding())
2169
sys.setdefaultencoding(default_encoding)
1900
2195
print 'delete directory:', i
1901
2196
shutil.rmtree(i)
2199
class Feature(object):
2200
"""An operating system Feature."""
2203
self._available = None
2205
def available(self):
2206
"""Is the feature available?
2208
:return: True if the feature is available.
2210
if self._available is None:
2211
self._available = self._probe()
2212
return self._available
2215
"""Implement this method in concrete features.
2217
:return: True if the feature is available.
2219
raise NotImplementedError
2222
if getattr(self, 'feature_name', None):
2223
return self.feature_name()
2224
return self.__class__.__name__