229
232
'%d non-main threads were left active in the end.\n'
230
233
% (TestCase._active_threads - 1))
232
def _extractBenchmarkTime(self, testCase):
235
def _extractBenchmarkTime(self, testCase, details=None):
233
236
"""Add a benchmark time for the current test case."""
237
if details and 'benchtime' in details:
238
return float(''.join(details['benchtime'].iter_bytes()))
234
239
return getattr(testCase, "_benchtime", None)
236
241
def _elapsedTestTimeString(self):
322
327
self._cleanupLogFile(test)
324
def addSuccess(self, test):
329
def addSuccess(self, test, details=None):
325
330
"""Tell result that test completed successfully.
327
332
Called from the TestCase run()
329
334
if self._bench_history is not None:
330
benchmark_time = self._extractBenchmarkTime(test)
335
benchmark_time = self._extractBenchmarkTime(test, details)
331
336
if benchmark_time is not None:
332
337
self._bench_history.write("%s %s\n" % (
333
338
self._formatTime(benchmark_time),
363
368
self.not_applicable_count += 1
364
369
self.report_not_applicable(test, reason)
366
def printErrorList(self, flavour, errors):
367
for test, err in errors:
368
self.stream.writeln(self.separator1)
369
self.stream.write("%s: " % flavour)
370
self.stream.writeln(self.getDescription(test))
371
if getattr(test, '_get_log', None) is not None:
372
log_contents = test._get_log()
374
self.stream.write('\n')
376
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
377
self.stream.write('\n')
378
self.stream.write(log_contents)
379
self.stream.write('\n')
381
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
382
self.stream.write('\n')
383
self.stream.writeln(self.separator2)
384
self.stream.writeln("%s" % err)
386
371
def _post_mortem(self):
387
372
"""Start a PDB post mortem session."""
388
373
if os.environ.get('BZR_TEST_PDB', None):
604
588
applied left to right - the first element in the list is the
605
589
innermost decorator.
591
# stream may know claim to know to write unicode strings, but in older
592
# pythons this goes sufficiently wrong that it is a bad idea. (
593
# specifically a built in file with encoding 'UTF-8' will still try
594
# to encode using ascii.
595
new_encoding = osutils.get_terminal_encoding()
596
codec = codecs.lookup(new_encoding)
597
if type(codec) is tuple:
601
encode = codec.encode
602
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
603
stream.encoding = new_encoding
607
604
self.stream = unittest._WritelnDecorator(stream)
608
605
self.descriptions = descriptions
609
606
self.verbosity = verbosity
676
class KnownFailure(AssertionError):
677
"""Indicates that a test failed in a precisely expected manner.
679
Such failures dont block the whole test suite from passing because they are
680
indicators of partially completed code or of future work. We have an
681
explicit error for them so that we can ensure that they are always visible:
682
KnownFailures are always shown in the output of bzr selftest.
664
# traceback._some_str fails to format exceptions that have the default
665
# __str__ which does an implicit ascii conversion. However, repr() on those
666
# objects works, for all that its not quite what the doctor may have ordered.
667
def _clever_some_str(value):
672
return repr(value).replace('\\n', '\n')
674
return '<unprintable %s object>' % type(value).__name__
676
traceback._some_str = _clever_some_str
679
# deprecated - use self.knownFailure(), or self.expectFailure.
680
KnownFailure = testtools.testcase._ExpectedFailure
686
683
class UnavailableFeature(Exception):
787
784
_leaking_threads_tests = 0
788
785
_first_thread_leaker_id = None
789
786
_log_file_name = None
791
_keep_log_file = False
792
787
# record lsprof data when performing benchmark calls.
793
788
_gather_lsprof_in_benchmarks = False
794
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
795
'_log_contents', '_log_file_name', '_benchtime',
796
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
798
790
def __init__(self, methodName='testMethod'):
799
791
super(TestCase, self).__init__(methodName)
800
792
self._cleanups = []
801
self._bzr_test_setUp_run = False
802
self._bzr_test_tearDown_run = False
803
793
self._directory_isolation = True
794
self.exception_handlers.insert(0,
795
(UnavailableFeature, self._do_unsupported_or_skip))
796
self.exception_handlers.insert(0,
797
(TestNotApplicable, self._do_not_applicable))
806
unittest.TestCase.setUp(self)
807
self._bzr_test_setUp_run = True
800
super(TestCase, self).setUp()
801
for feature in getattr(self, '_test_needs_features', []):
802
self.requireFeature(feature)
803
self._log_contents = None
804
self.addDetail("log", content.Content(content.ContentType("text",
805
"plain", {"charset": "utf8"}),
806
lambda:[self._get_log(keep_log_file=True)]))
808
807
self._cleanEnvironment()
809
808
self._silenceUI()
810
809
self._startLogFile()
1289
1288
m += ": " + msg
1292
def expectFailure(self, reason, assertion, *args, **kwargs):
1293
"""Invoke a test, expecting it to fail for the given reason.
1295
This is for assertions that ought to succeed, but currently fail.
1296
(The failure is *expected* but not *wanted*.) Please be very precise
1297
about the failure you're expecting. If a new bug is introduced,
1298
AssertionError should be raised, not KnownFailure.
1300
Frequently, expectFailure should be followed by an opposite assertion.
1303
Intended to be used with a callable that raises AssertionError as the
1304
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1306
Raises KnownFailure if the test fails. Raises AssertionError if the
1311
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1313
self.assertEqual(42, dynamic_val)
1315
This means that a dynamic_val of 54 will cause the test to raise
1316
a KnownFailure. Once math is fixed and the expectFailure is removed,
1317
only a dynamic_val of 42 will allow the test to pass. Anything other
1318
than 54 or 42 will cause an AssertionError.
1321
assertion(*args, **kwargs)
1322
except AssertionError:
1323
raise KnownFailure(reason)
1325
self.fail('Unexpected success. Should have failed: %s' % reason)
1327
1291
def assertFileEqual(self, content, path):
1328
1292
"""Fail if path does not contain 'content'."""
1329
1293
self.failUnlessExists(path)
1480
1444
Close the file and delete it, unless setKeepLogfile was called.
1482
if self._log_file is None:
1446
if bzrlib.trace._trace_file:
1447
# flush the log file, to get all content
1448
bzrlib.trace._trace_file.flush()
1484
1449
bzrlib.trace.pop_log_file(self._log_memento)
1485
self._log_file.close()
1486
self._log_file = None
1487
if not self._keep_log_file:
1488
os.remove(self._log_file_name)
1489
self._log_file_name = None
1491
def setKeepLogfile(self):
1492
"""Make the logfile not be deleted when _finishLogFile is called."""
1493
self._keep_log_file = True
1450
# Cache the log result and delete the file on disk
1451
self._get_log(False)
1495
1453
def thisFailsStrictLockCheck(self):
1496
1454
"""It is known that this test would fail with -Dstrict_locks.
1607
1567
self._do_skip(result, reason)
1609
def _do_unsupported_or_skip(self, result, reason):
1570
def _do_unsupported_or_skip(self, result, e):
1610
1572
addNotSupported = getattr(result, 'addNotSupported', None)
1611
1573
if addNotSupported is not None:
1612
1574
result.addNotSupported(self, reason)
1614
1576
self._do_skip(result, reason)
1616
def run(self, result=None):
1617
if result is None: result = self.defaultTestResult()
1618
result.startTest(self)
1623
result.stopTest(self)
1625
def _run(self, result):
1626
for feature in getattr(self, '_test_needs_features', []):
1627
if not feature.available():
1628
return self._do_unsupported_or_skip(result, feature)
1630
absent_attr = object()
1632
method_name = getattr(self, '_testMethodName', absent_attr)
1633
if method_name is absent_attr:
1635
method_name = getattr(self, '_TestCase__testMethodName')
1636
testMethod = getattr(self, method_name)
1640
if not self._bzr_test_setUp_run:
1642
"test setUp did not invoke "
1643
"bzrlib.tests.TestCase's setUp")
1644
except KeyboardInterrupt:
1647
except KnownFailure:
1648
self._do_known_failure(result)
1651
except TestNotApplicable, e:
1652
self._do_not_applicable(result, e)
1655
except TestSkipped, e:
1656
self._do_skip(result, e.args[0])
1659
except UnavailableFeature, e:
1660
self._do_unsupported_or_skip(result, e.args[0])
1664
result.addError(self, sys.exc_info())
1672
except KnownFailure:
1673
self._do_known_failure(result)
1674
except self.failureException:
1675
result.addFailure(self, sys.exc_info())
1676
except TestNotApplicable, e:
1677
self._do_not_applicable(result, e)
1678
except TestSkipped, e:
1680
reason = "No reason given."
1683
self._do_skip(result, reason)
1684
except UnavailableFeature, e:
1685
self._do_unsupported_or_skip(result, e.args[0])
1686
except KeyboardInterrupt:
1690
result.addError(self, sys.exc_info())
1694
if not self._bzr_test_tearDown_run:
1696
"test tearDown did not invoke "
1697
"bzrlib.tests.TestCase's tearDown")
1698
except KeyboardInterrupt:
1702
result.addError(self, sys.exc_info())
1705
if ok: result.addSuccess(self)
1707
except KeyboardInterrupt:
1712
for attr_name in self.attrs_to_keep:
1713
if attr_name in self.__dict__:
1714
saved_attrs[attr_name] = self.__dict__[attr_name]
1715
self.__dict__ = saved_attrs
1719
self._log_contents = ''
1720
self._bzr_test_tearDown_run = True
1721
unittest.TestCase.tearDown(self)
1723
1578
def time(self, callable, *args, **kwargs):
1724
1579
"""Run callable and accrue the time it takes to the benchmark time.
1743
1600
self._benchtime += time.time() - start
1745
def _runCleanups(self):
1746
"""Run registered cleanup functions.
1748
This should only be called from TestCase.tearDown.
1750
# TODO: Perhaps this should keep running cleanups even if
1751
# one of them fails?
1753
# Actually pop the cleanups from the list so tearDown running
1754
# twice is safe (this happens for skipped tests).
1755
while self._cleanups:
1756
cleanup, args, kwargs = self._cleanups.pop()
1757
cleanup(*args, **kwargs)
1759
1602
def log(self, *args):
1762
1605
def _get_log(self, keep_log_file=False):
1763
"""Get the log from bzrlib.trace calls from this test.
1606
"""Internal helper to get the log from bzrlib.trace for this test.
1608
Please use self.getDetails, or self.get_log to access this in test case
1765
1611
:param keep_log_file: When True, if the log is still a file on disk
1766
1612
leave it as a file on disk. When False, if the log is still a file
1768
1614
self._log_contents.
1769
1615
:return: A string containing the log.
1771
# flush the log file, to get all content
1617
if self._log_contents is not None:
1619
self._log_contents.decode('utf8')
1620
except UnicodeDecodeError:
1621
unicodestr = self._log_contents.decode('utf8', 'replace')
1622
self._log_contents = unicodestr.encode('utf8')
1623
return self._log_contents
1772
1624
import bzrlib.trace
1773
1625
if bzrlib.trace._trace_file:
1626
# flush the log file, to get all content
1774
1627
bzrlib.trace._trace_file.flush()
1775
if self._log_contents:
1776
# XXX: this can hardly contain the content flushed above --vila
1778
return self._log_contents
1779
1628
if self._log_file_name is not None:
1780
1629
logfile = open(self._log_file_name)
1782
1631
log_contents = logfile.read()
1784
1633
logfile.close()
1635
log_contents.decode('utf8')
1636
except UnicodeDecodeError:
1637
unicodestr = log_contents.decode('utf8', 'replace')
1638
log_contents = unicodestr.encode('utf8')
1785
1639
if not keep_log_file:
1640
self._log_file.close()
1641
self._log_file = None
1642
# Permit multiple calls to get_log until we clean it up in
1786
1644
self._log_contents = log_contents
1788
1646
os.remove(self._log_file_name)
3409
3276
def addFailure(self, test, err):
3410
3277
self.result.addFailure(test, err)
3413
class BZRTransformingResult(ForwardingResult):
3415
def addError(self, test, err):
3416
feature = self._error_looks_like('UnavailableFeature: ', err)
3417
if feature is not None:
3418
self.result.addNotSupported(test, feature)
3420
self.result.addError(test, err)
3422
def addFailure(self, test, err):
3423
known = self._error_looks_like('KnownFailure: ', err)
3424
if known is not None:
3425
self.result.addExpectedFailure(test,
3426
[KnownFailure, KnownFailure(known), None])
3428
self.result.addFailure(test, err)
3430
def _error_looks_like(self, prefix, err):
3431
"""Deserialize exception and returns the stringify value."""
3435
if isinstance(exc, subunit.RemoteException):
3436
# stringify the exception gives access to the remote traceback
3437
# We search the last line for 'prefix'
3438
lines = str(exc).split('\n')
3439
while lines and not lines[-1]:
3442
if lines[-1].startswith(prefix):
3443
value = lines[-1][len(prefix):]
3448
from subunit.test_results import AutoTimingTestResultDecorator
3449
# Expected failure should be seen as a success not a failure Once subunit
3450
# provide native support for that, BZRTransformingResult and this class
3451
# will become useless.
3452
class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3454
def addExpectedFailure(self, test, err):
3455
self._before_event()
3456
return self._call_maybe("addExpectedFailure", self._degrade_skip,
3459
# Let's just define a no-op decorator
3460
BzrAutoTimingTestResultDecorator = lambda x:x
3278
ForwardingResult = testtools.ExtendedToOriginalDecorator
3463
3281
class ProfileResult(ForwardingResult):