237
228
'%d non-main threads were left active in the end.\n'
238
229
% (TestCase._active_threads - 1))
240
def _extractBenchmarkTime(self, testCase, details=None):
231
def _extractBenchmarkTime(self, testCase):
241
232
"""Add a benchmark time for the current test case."""
242
if details and 'benchtime' in details:
243
return float(''.join(details['benchtime'].iter_bytes()))
244
233
return getattr(testCase, "_benchtime", None)
246
235
def _elapsedTestTimeString(self):
332
321
self._cleanupLogFile(test)
334
def addSuccess(self, test, details=None):
323
def addSuccess(self, test):
335
324
"""Tell result that test completed successfully.
337
326
Called from the TestCase run()
339
328
if self._bench_history is not None:
340
benchmark_time = self._extractBenchmarkTime(test, details)
329
benchmark_time = self._extractBenchmarkTime(test)
341
330
if benchmark_time is not None:
342
331
self._bench_history.write("%s %s\n" % (
343
332
self._formatTime(benchmark_time),
373
362
self.not_applicable_count += 1
374
363
self.report_not_applicable(test, reason)
365
def printErrorList(self, flavour, errors):
366
for test, err in errors:
367
self.stream.writeln(self.separator1)
368
self.stream.write("%s: " % flavour)
369
self.stream.writeln(self.getDescription(test))
370
if getattr(test, '_get_log', None) is not None:
371
log_contents = test._get_log()
373
self.stream.write('\n')
375
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
self.stream.write('\n')
377
self.stream.write(log_contents)
378
self.stream.write('\n')
380
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
self.stream.write('\n')
382
self.stream.writeln(self.separator2)
383
self.stream.writeln("%s" % err)
376
385
def _post_mortem(self):
377
386
"""Start a PDB post mortem session."""
378
387
if os.environ.get('BZR_TEST_PDB', None):
522
533
def report_test_start(self, test):
524
535
name = self._shortened_test_description(test)
525
width = osutils.terminal_width()
526
if width is not None:
527
# width needs space for 6 char status, plus 1 for slash, plus an
528
# 11-char time string, plus a trailing blank
529
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
531
self.stream.write(self._ellipsize_to_right(name, width-18))
533
self.stream.write(name)
536
# width needs space for 6 char status, plus 1 for slash, plus an
537
# 11-char time string, plus a trailing blank
538
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
539
self.stream.write(self._ellipsize_to_right(name,
540
osutils.terminal_width()-18))
534
541
self.stream.flush()
536
543
def _error_summary(self, err):
593
600
applied left to right - the first element in the list is the
594
601
innermost decorator.
596
# stream may know claim to know to write unicode strings, but in older
597
# pythons this goes sufficiently wrong that it is a bad idea. (
598
# specifically a built in file with encoding 'UTF-8' will still try
599
# to encode using ascii.
600
new_encoding = osutils.get_terminal_encoding()
601
codec = codecs.lookup(new_encoding)
602
if type(codec) is tuple:
606
encode = codec.encode
607
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
608
stream.encoding = new_encoding
609
603
self.stream = unittest._WritelnDecorator(stream)
610
604
self.descriptions = descriptions
611
605
self.verbosity = verbosity
669
# traceback._some_str fails to format exceptions that have the default
670
# __str__ which does an implicit ascii conversion. However, repr() on those
671
# objects works, for all that its not quite what the doctor may have ordered.
672
def _clever_some_str(value):
677
return repr(value).replace('\\n', '\n')
679
return '<unprintable %s object>' % type(value).__name__
681
traceback._some_str = _clever_some_str
684
# deprecated - use self.knownFailure(), or self.expectFailure.
685
KnownFailure = testtools.testcase._ExpectedFailure
672
class KnownFailure(AssertionError):
673
"""Indicates that a test failed in a precisely expected manner.
675
Such failures dont block the whole test suite from passing because they are
676
indicators of partially completed code or of future work. We have an
677
explicit error for them so that we can ensure that they are always visible:
678
KnownFailures are always shown in the output of bzr selftest.
688
682
class UnavailableFeature(Exception):
789
783
_leaking_threads_tests = 0
790
784
_first_thread_leaker_id = None
791
785
_log_file_name = None
787
_keep_log_file = False
792
788
# record lsprof data when performing benchmark calls.
793
789
_gather_lsprof_in_benchmarks = False
790
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
791
'_log_contents', '_log_file_name', '_benchtime',
792
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
795
794
def __init__(self, methodName='testMethod'):
796
795
super(TestCase, self).__init__(methodName)
797
796
self._cleanups = []
797
self._bzr_test_setUp_run = False
798
self._bzr_test_tearDown_run = False
798
799
self._directory_isolation = True
799
self.exception_handlers.insert(0,
800
(UnavailableFeature, self._do_unsupported_or_skip))
801
self.exception_handlers.insert(0,
802
(TestNotApplicable, self._do_not_applicable))
805
super(TestCase, self).setUp()
806
for feature in getattr(self, '_test_needs_features', []):
807
self.requireFeature(feature)
808
self._log_contents = None
809
self.addDetail("log", content.Content(content.ContentType("text",
810
"plain", {"charset": "utf8"}),
811
lambda:[self._get_log(keep_log_file=True)]))
802
unittest.TestCase.setUp(self)
803
self._bzr_test_setUp_run = True
812
804
self._cleanEnvironment()
813
805
self._silenceUI()
814
806
self._startLogFile()
1017
1009
server's urls to be used.
1019
1011
if backing_server is None:
1020
transport_server.start_server()
1012
transport_server.setUp()
1022
transport_server.start_server(backing_server)
1023
self.addCleanup(transport_server.stop_server)
1014
transport_server.setUp(backing_server)
1015
self.addCleanup(transport_server.tearDown)
1024
1016
# Obtain a real transport because if the server supplies a password, it
1025
1017
# will be hidden from the base on the client side.
1026
1018
t = get_transport(transport_server.get_url())
1123
1115
:raises AssertionError: If the expected and actual stat values differ
1124
1116
other than by atime.
1126
self.assertEqual(expected.st_size, actual.st_size,
1127
'st_size did not match')
1128
self.assertEqual(expected.st_mtime, actual.st_mtime,
1129
'st_mtime did not match')
1130
self.assertEqual(expected.st_ctime, actual.st_ctime,
1131
'st_ctime did not match')
1132
if sys.platform != 'win32':
1133
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1134
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1135
# odd. Regardless we shouldn't actually try to assert anything
1136
# about their values
1137
self.assertEqual(expected.st_dev, actual.st_dev,
1138
'st_dev did not match')
1139
self.assertEqual(expected.st_ino, actual.st_ino,
1140
'st_ino did not match')
1141
self.assertEqual(expected.st_mode, actual.st_mode,
1142
'st_mode did not match')
1118
self.assertEqual(expected.st_size, actual.st_size)
1119
self.assertEqual(expected.st_mtime, actual.st_mtime)
1120
self.assertEqual(expected.st_ctime, actual.st_ctime)
1121
self.assertEqual(expected.st_dev, actual.st_dev)
1122
self.assertEqual(expected.st_ino, actual.st_ino)
1123
self.assertEqual(expected.st_mode, actual.st_mode)
1144
1125
def assertLength(self, length, obj_with_len):
1145
1126
"""Assert that obj_with_len is of length length."""
1293
1274
m += ": " + msg
1277
def expectFailure(self, reason, assertion, *args, **kwargs):
1278
"""Invoke a test, expecting it to fail for the given reason.
1280
This is for assertions that ought to succeed, but currently fail.
1281
(The failure is *expected* but not *wanted*.) Please be very precise
1282
about the failure you're expecting. If a new bug is introduced,
1283
AssertionError should be raised, not KnownFailure.
1285
Frequently, expectFailure should be followed by an opposite assertion.
1288
Intended to be used with a callable that raises AssertionError as the
1289
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1291
Raises KnownFailure if the test fails. Raises AssertionError if the
1296
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1298
self.assertEqual(42, dynamic_val)
1300
This means that a dynamic_val of 54 will cause the test to raise
1301
a KnownFailure. Once math is fixed and the expectFailure is removed,
1302
only a dynamic_val of 42 will allow the test to pass. Anything other
1303
than 54 or 42 will cause an AssertionError.
1306
assertion(*args, **kwargs)
1307
except AssertionError:
1308
raise KnownFailure(reason)
1310
self.fail('Unexpected success. Should have failed: %s' % reason)
1296
1312
def assertFileEqual(self, content, path):
1297
1313
"""Fail if path does not contain 'content'."""
1298
1314
self.failUnlessExists(path)
1449
1465
Close the file and delete it, unless setKeepLogfile was called.
1451
if bzrlib.trace._trace_file:
1452
# flush the log file, to get all content
1453
bzrlib.trace._trace_file.flush()
1467
if self._log_file is None:
1454
1469
bzrlib.trace.pop_log_file(self._log_memento)
1455
# Cache the log result and delete the file on disk
1456
self._get_log(False)
1470
self._log_file.close()
1471
self._log_file = None
1472
if not self._keep_log_file:
1473
os.remove(self._log_file_name)
1474
self._log_file_name = None
1476
def setKeepLogfile(self):
1477
"""Make the logfile not be deleted when _finishLogFile is called."""
1478
self._keep_log_file = True
1458
1480
def thisFailsStrictLockCheck(self):
1459
1481
"""It is known that this test would fail with -Dstrict_locks.
1572
1590
self._do_skip(result, reason)
1575
def _do_unsupported_or_skip(self, result, e):
1592
def _do_unsupported_or_skip(self, result, reason):
1577
1593
addNotSupported = getattr(result, 'addNotSupported', None)
1578
1594
if addNotSupported is not None:
1579
1595
result.addNotSupported(self, reason)
1581
1597
self._do_skip(result, reason)
1599
def run(self, result=None):
1600
if result is None: result = self.defaultTestResult()
1601
result.startTest(self)
1606
result.stopTest(self)
1608
def _run(self, result):
1609
for feature in getattr(self, '_test_needs_features', []):
1610
if not feature.available():
1611
return self._do_unsupported_or_skip(result, feature)
1613
absent_attr = object()
1615
method_name = getattr(self, '_testMethodName', absent_attr)
1616
if method_name is absent_attr:
1618
method_name = getattr(self, '_TestCase__testMethodName')
1619
testMethod = getattr(self, method_name)
1623
if not self._bzr_test_setUp_run:
1625
"test setUp did not invoke "
1626
"bzrlib.tests.TestCase's setUp")
1627
except KeyboardInterrupt:
1630
except KnownFailure:
1631
self._do_known_failure(result)
1634
except TestNotApplicable, e:
1635
self._do_not_applicable(result, e)
1638
except TestSkipped, e:
1639
self._do_skip(result, e.args[0])
1642
except UnavailableFeature, e:
1643
self._do_unsupported_or_skip(result, e.args[0])
1647
result.addError(self, sys.exc_info())
1655
except KnownFailure:
1656
self._do_known_failure(result)
1657
except self.failureException:
1658
result.addFailure(self, sys.exc_info())
1659
except TestNotApplicable, e:
1660
self._do_not_applicable(result, e)
1661
except TestSkipped, e:
1663
reason = "No reason given."
1666
self._do_skip(result, reason)
1667
except UnavailableFeature, e:
1668
self._do_unsupported_or_skip(result, e.args[0])
1669
except KeyboardInterrupt:
1673
result.addError(self, sys.exc_info())
1677
if not self._bzr_test_tearDown_run:
1679
"test tearDown did not invoke "
1680
"bzrlib.tests.TestCase's tearDown")
1681
except KeyboardInterrupt:
1685
result.addError(self, sys.exc_info())
1688
if ok: result.addSuccess(self)
1690
except KeyboardInterrupt:
1695
for attr_name in self.attrs_to_keep:
1696
if attr_name in self.__dict__:
1697
saved_attrs[attr_name] = self.__dict__[attr_name]
1698
self.__dict__ = saved_attrs
1702
self._log_contents = ''
1703
self._bzr_test_tearDown_run = True
1704
unittest.TestCase.tearDown(self)
1583
1706
def time(self, callable, *args, **kwargs):
1584
1707
"""Run callable and accrue the time it takes to the benchmark time.
1605
1726
self._benchtime += time.time() - start
1728
def _runCleanups(self):
1729
"""Run registered cleanup functions.
1731
This should only be called from TestCase.tearDown.
1733
# TODO: Perhaps this should keep running cleanups even if
1734
# one of them fails?
1736
# Actually pop the cleanups from the list so tearDown running
1737
# twice is safe (this happens for skipped tests).
1738
while self._cleanups:
1739
cleanup, args, kwargs = self._cleanups.pop()
1740
cleanup(*args, **kwargs)
1607
1742
def log(self, *args):
1610
1745
def _get_log(self, keep_log_file=False):
1611
"""Internal helper to get the log from bzrlib.trace for this test.
1613
Please use self.getDetails, or self.get_log to access this in test case
1746
"""Get the log from bzrlib.trace calls from this test.
1616
1748
:param keep_log_file: When True, if the log is still a file on disk
1617
1749
leave it as a file on disk. When False, if the log is still a file
1619
1751
self._log_contents.
1620
1752
:return: A string containing the log.
1622
if self._log_contents is not None:
1624
self._log_contents.decode('utf8')
1625
except UnicodeDecodeError:
1626
unicodestr = self._log_contents.decode('utf8', 'replace')
1627
self._log_contents = unicodestr.encode('utf8')
1754
# flush the log file, to get all content
1756
if bzrlib.trace._trace_file:
1757
bzrlib.trace._trace_file.flush()
1758
if self._log_contents:
1759
# XXX: this can hardly contain the content flushed above --vila
1628
1761
return self._log_contents
1630
if bzrlib.trace._trace_file:
1631
# flush the log file, to get all content
1632
bzrlib.trace._trace_file.flush()
1633
1762
if self._log_file_name is not None:
1634
1763
logfile = open(self._log_file_name)
1636
1765
log_contents = logfile.read()
1638
1767
logfile.close()
1640
log_contents.decode('utf8')
1641
except UnicodeDecodeError:
1642
unicodestr = log_contents.decode('utf8', 'replace')
1643
log_contents = unicodestr.encode('utf8')
1644
1768
if not keep_log_file:
1645
self._log_file.close()
1646
self._log_file = None
1647
# Permit multiple calls to get_log until we clean it up in
1649
1769
self._log_contents = log_contents
1651
1771
os.remove(self._log_file_name)
1713
1825
os.chdir(working_dir)
1717
result = self.apply_redirected(ui.ui_factory.stdin,
1719
bzrlib.commands.run_bzr_catch_user_errors,
1721
except KeyboardInterrupt:
1722
# Reraise KeyboardInterrupt with contents of redirected stdout
1723
# and stderr as arguments, for tests which are interested in
1724
# stdout and stderr and are expecting the exception.
1725
out = stdout.getvalue()
1726
err = stderr.getvalue()
1728
self.log('output:\n%r', out)
1730
self.log('errors:\n%r', err)
1731
raise KeyboardInterrupt(out, err)
1828
result = self.apply_redirected(ui.ui_factory.stdin,
1830
bzrlib.commands.run_bzr_catch_user_errors,
1733
1833
logger.removeHandler(handler)
1734
1834
ui.ui_factory = old_ui_factory
2364
2464
return branchbuilder.BranchBuilder(branch=branch)
2366
2466
def overrideEnvironmentForTesting(self):
2367
test_home_dir = self.test_home_dir
2368
if isinstance(test_home_dir, unicode):
2369
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2370
os.environ['HOME'] = test_home_dir
2371
os.environ['BZR_HOME'] = test_home_dir
2467
os.environ['HOME'] = self.test_home_dir
2468
os.environ['BZR_HOME'] = self.test_home_dir
2373
2470
def setUp(self):
2374
2471
super(TestCaseWithMemoryTransport, self).setUp()
3225
3321
if not os.path.isfile(bzr_path):
3226
3322
# We are probably installed. Assume sys.argv is the right file
3227
3323
bzr_path = sys.argv[0]
3228
bzr_path = [bzr_path]
3229
if sys.platform == "win32":
3230
# if we're on windows, we can't execute the bzr script directly
3231
bzr_path = [sys.executable] + bzr_path
3232
3324
fd, test_list_file_name = tempfile.mkstemp()
3233
3325
test_list_file = os.fdopen(fd, 'wb', 1)
3234
3326
for test in process_tests:
3235
3327
test_list_file.write(test.id() + '\n')
3236
3328
test_list_file.close()
3238
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3330
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3240
3332
if '--no-plugins' in sys.argv:
3241
3333
argv.append('--no-plugins')
3281
3373
def addFailure(self, test, err):
3282
3374
self.result.addFailure(test, err)
3283
ForwardingResult = testtools.ExtendedToOriginalDecorator
3377
class BZRTransformingResult(ForwardingResult):
3379
def addError(self, test, err):
3380
feature = self._error_looks_like('UnavailableFeature: ', err)
3381
if feature is not None:
3382
self.result.addNotSupported(test, feature)
3384
self.result.addError(test, err)
3386
def addFailure(self, test, err):
3387
known = self._error_looks_like('KnownFailure: ', err)
3388
if known is not None:
3389
self.result.addExpectedFailure(test,
3390
[KnownFailure, KnownFailure(known), None])
3392
self.result.addFailure(test, err)
3394
def _error_looks_like(self, prefix, err):
3395
"""Deserialize exception and returns the stringify value."""
3399
if isinstance(exc, subunit.RemoteException):
3400
# stringify the exception gives access to the remote traceback
3401
# We search the last line for 'prefix'
3402
lines = str(exc).split('\n')
3403
while lines and not lines[-1]:
3406
if lines[-1].startswith(prefix):
3407
value = lines[-1][len(prefix):]
3412
from subunit.test_results import AutoTimingTestResultDecorator
3413
# Expected failure should be seen as a success not a failure Once subunit
3414
# provide native support for that, BZRTransformingResult and this class
3415
# will become useless.
3416
class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3418
def addExpectedFailure(self, test, err):
3419
self._before_event()
3420
return self._call_maybe("addExpectedFailure", self._degrade_skip,
3423
# Let's just define a no-op decorator
3424
BzrAutoTimingTestResultDecorator = lambda x:x
3286
3427
class ProfileResult(ForwardingResult):
3961
4101
return new_test
3964
def permute_tests_for_extension(standard_tests, loader, py_module_name,
3966
"""Helper for permutating tests against an extension module.
3968
This is meant to be used inside a modules 'load_tests()' function. It will
3969
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
3970
against both implementations. Setting 'test.module' to the appropriate
3971
module. See bzrlib.tests.test__chk_map.load_tests as an example.
3973
:param standard_tests: A test suite to permute
3974
:param loader: A TestLoader
3975
:param py_module_name: The python path to a python module that can always
3976
be loaded, and will be considered the 'python' implementation. (eg
3977
'bzrlib._chk_map_py')
3978
:param ext_module_name: The python path to an extension module. If the
3979
module cannot be loaded, a single test will be added, which notes that
3980
the module is not available. If it can be loaded, all standard_tests
3981
will be run against that module.
3982
:return: (suite, feature) suite is a test-suite that has all the permuted
3983
tests. feature is the Feature object that can be used to determine if
3984
the module is available.
3987
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
3989
('python', {'module': py_module}),
3991
suite = loader.suiteClass()
3992
feature = ModuleAvailableFeature(ext_module_name)
3993
if feature.available():
3994
scenarios.append(('C', {'module': feature.module}))
3996
# the compiled module isn't available, so we add a failing test
3997
class FailWithoutFeature(TestCase):
3998
def test_fail(self):
3999
self.requireFeature(feature)
4000
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4001
result = multiply_tests(standard_tests, scenarios, suite)
4002
return result, feature
4005
def _rmtree_temp_dir(dirname, test_id=None):
4104
def _rmtree_temp_dir(dirname):
4006
4105
# If LANG=C we probably have created some bogus paths
4007
4106
# which rmtree(unicode) will fail to delete
4008
4107
# so make sure we are using rmtree(str) to delete everything
4112
4208
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4115
class _CompatabilityThunkFeature(Feature):
4116
"""This feature is just a thunk to another feature.
4118
It issues a deprecation warning if it is accessed, to let you know that you
4119
should really use a different feature.
4122
def __init__(self, module, name, this_name, dep_version):
4123
super(_CompatabilityThunkFeature, self).__init__()
4124
self._module = module
4126
self._this_name = this_name
4127
self._dep_version = dep_version
4128
self._feature = None
4131
if self._feature is None:
4132
msg = (self._dep_version % self._this_name) + (
4133
' Use %s.%s instead.' % (self._module, self._name))
4134
symbol_versioning.warn(msg, DeprecationWarning)
4135
mod = __import__(self._module, {}, {}, [self._name])
4136
self._feature = getattr(mod, self._name)
4140
return self._feature._probe()
4143
class ModuleAvailableFeature(Feature):
4144
"""This is a feature than describes a module we want to be available.
4146
Declare the name of the module in __init__(), and then after probing, the
4147
module will be available as 'self.module'.
4149
:ivar module: The module if it is available, else None.
4152
def __init__(self, module_name):
4153
super(ModuleAvailableFeature, self).__init__()
4154
self.module_name = module_name
4158
self._module = __import__(self.module_name, {}, {}, [''])
4165
if self.available(): # Make sure the probe has been done
4169
def feature_name(self):
4170
return self.module_name
4173
# This is kept here for compatibility, it is recommended to use
4174
# 'bzrlib.tests.feature.paramiko' instead
4175
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4176
'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4179
4211
def probe_unicode_in_user_encoding():
4180
4212
"""Try to encode several unicode strings to use in unicode-aware tests.
4181
4213
Return first successfull match.
4340
4389
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4343
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4344
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4345
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4392
class _SubUnitFeature(Feature):
4393
"""Check if subunit is available."""
4402
def feature_name(self):
4405
SubUnitFeature = _SubUnitFeature()
4346
4406
# Only define SubUnitBzrRunner if subunit is available.
4348
4408
from subunit import TestProtocolClient
4349
from subunit.test_results import AutoTimingTestResultDecorator
4350
4409
class SubUnitBzrRunner(TextTestRunner):
4351
4410
def run(self, test):
4352
result = AutoTimingTestResultDecorator(
4411
result = BzrAutoTimingTestResultDecorator(
4353
4412
TestProtocolClient(self.stream))
4354
4413
test.run(result)