292
298
Called from the TestCase run() method when the test
293
299
fails with an unexpected error.
295
self._testConcluded(test)
296
if isinstance(err[1], TestNotApplicable):
297
return self._addNotApplicable(test, err)
298
elif isinstance(err[1], UnavailableFeature):
299
return self.addNotSupported(test, err[1].args[0])
301
unittest.TestResult.addError(self, test, err)
302
self.error_count += 1
303
self.report_error(test, err)
306
self._cleanupLogFile(test)
302
unittest.TestResult.addError(self, test, err)
303
self.error_count += 1
304
self.report_error(test, err)
307
self._cleanupLogFile(test)
308
309
def addFailure(self, test, err):
309
310
"""Tell result that test failed.
311
312
Called from the TestCase run() method when the test
312
313
fails because e.g. an assert() method failed.
314
self._testConcluded(test)
315
if isinstance(err[1], KnownFailure):
316
return self._addKnownFailure(test, err)
318
unittest.TestResult.addFailure(self, test, err)
319
self.failure_count += 1
320
self.report_failure(test, err)
323
self._cleanupLogFile(test)
316
unittest.TestResult.addFailure(self, test, err)
317
self.failure_count += 1
318
self.report_failure(test, err)
321
self._cleanupLogFile(test)
325
323
def addSuccess(self, test):
326
324
"""Tell result that test completed successfully.
328
326
Called from the TestCase run()
330
self._testConcluded(test)
331
328
if self._bench_history is not None:
332
329
benchmark_time = self._extractBenchmarkTime(test)
333
330
if benchmark_time is not None:
368
358
self.skip_count += 1
369
359
self.report_skip(test, reason)
371
def _addNotApplicable(self, test, skip_excinfo):
372
if isinstance(skip_excinfo[1], TestNotApplicable):
373
self.not_applicable_count += 1
374
self.report_not_applicable(test, skip_excinfo)
377
except KeyboardInterrupt:
380
self.addError(test, test.exc_info())
382
# seems best to treat this as success from point-of-view of unittest
383
# -- it actually does nothing so it barely matters :)
384
unittest.TestResult.addSuccess(self, test)
385
test._log_contents = ''
361
def addNotApplicable(self, test, reason):
362
self.not_applicable_count += 1
363
self.report_not_applicable(test, reason)
387
365
def printErrorList(self, flavour, errors):
388
366
for test, err in errors:
504
487
return self._shortened_test_description(test)
506
489
def report_error(self, test, err):
507
self.pb.note('ERROR: %s\n %s\n',
490
ui.ui_factory.note('ERROR: %s\n %s\n' % (
508
491
self._test_description(test),
512
495
def report_failure(self, test, err):
513
self.pb.note('FAIL: %s\n %s\n',
496
ui.ui_factory.note('FAIL: %s\n %s\n' % (
514
497
self._test_description(test),
518
501
def report_known_failure(self, test, err):
519
self.pb.note('XFAIL: %s\n%s\n',
520
self._test_description(test), err[1])
502
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
self._test_description(test), err[1]))
522
505
def report_skip(self, test, reason):
525
def report_not_applicable(self, test, skip_excinfo):
508
def report_not_applicable(self, test, reason):
528
511
def report_unsupported(self, test, feature):
928
928
def _lock_broken(self, result):
929
929
self._lock_actions.append(('broken', result))
931
def permit_dir(self, name):
932
"""Permit a directory to be used by this test. See permit_url."""
933
name_transport = get_transport(name)
934
self.permit_url(name)
935
self.permit_url(name_transport.base)
937
def permit_url(self, url):
938
"""Declare that url is an ok url to use in this test.
940
Do this for memory transports, temporary test directory etc.
942
Do not do this for the current working directory, /tmp, or any other
943
preexisting non isolated url.
945
if not url.endswith('/'):
947
self._bzr_selftest_roots.append(url)
949
def permit_source_tree_branch_repo(self):
950
"""Permit the source tree bzr is running from to be opened.
952
Some code such as bzrlib.version attempts to read from the bzr branch
953
that bzr is executing from (if any). This method permits that directory
954
to be used in the test suite.
956
path = self.get_source_path()
957
self.record_directory_isolation()
960
workingtree.WorkingTree.open(path)
961
except (errors.NotBranchError, errors.NoWorkingTree):
964
self.enable_directory_isolation()
966
def _preopen_isolate_transport(self, transport):
967
"""Check that all transport openings are done in the test work area."""
968
while isinstance(transport, pathfilter.PathFilteringTransport):
969
# Unwrap pathfiltered transports
970
transport = transport.server.backing_transport.clone(
971
transport._filter('.'))
973
# ReadonlySmartTCPServer_for_testing decorates the backing transport
974
# urls it is given by prepending readonly+. This is appropriate as the
975
# client shouldn't know that the server is readonly (or not readonly).
976
# We could register all servers twice, with readonly+ prepending, but
977
# that makes for a long list; this is about the same but easier to
979
if url.startswith('readonly+'):
980
url = url[len('readonly+'):]
981
self._preopen_isolate_url(url)
983
def _preopen_isolate_url(self, url):
984
if not self._directory_isolation:
986
if self._directory_isolation == 'record':
987
self._bzr_selftest_roots.append(url)
989
# This prevents all transports, including e.g. sftp ones backed on disk
990
# from working unless they are explicitly granted permission. We then
991
# depend on the code that sets up test transports to check that they are
992
# appropriately isolated and enable their use by calling
993
# self.permit_transport()
994
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
995
raise errors.BzrError("Attempt to escape test isolation: %r %r"
996
% (url, self._bzr_selftest_roots))
998
def record_directory_isolation(self):
999
"""Gather accessed directories to permit later access.
1001
This is used for tests that access the branch bzr is running from.
1003
self._directory_isolation = "record"
931
1005
def start_server(self, transport_server, backing_server=None):
932
1006
"""Start transport_server for this test.
940
1014
transport_server.setUp(backing_server)
941
1015
self.addCleanup(transport_server.tearDown)
1016
# Obtain a real transport because if the server supplies a password, it
1017
# will be hidden from the base on the client side.
1018
t = get_transport(transport_server.get_url())
1019
# Some transport servers effectively chroot the backing transport;
1020
# others like SFTPServer don't - users of the transport can walk up the
1021
# transport to read the entire backing transport. This wouldn't matter
1022
# except that the workdir tests are given - and that they expect the
1023
# server's url to point at - is one directory under the safety net. So
1024
# Branch operations into the transport will attempt to walk up one
1025
# directory. Chrooting all servers would avoid this but also mean that
1026
# we wouldn't be testing directly against non-root urls. Alternatively
1027
# getting the test framework to start the server with a backing server
1028
# at the actual safety net directory would work too, but this then
1029
# means that the self.get_url/self.get_transport methods would need
1030
# to transform all their results. On balance its cleaner to handle it
1031
# here, and permit a higher url when we have one of these transports.
1032
if t.base.endswith('/work/'):
1033
# we have safety net/test root/work
1034
t = t.clone('../..')
1035
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1036
# The smart server adds a path similar to work, which is traversed
1037
# up from by the client. But the server is chrooted - the actual
1038
# backing transport is not escaped from, and VFS requests to the
1039
# root will error (because they try to escape the chroot).
1041
while t2.base != t.base:
1044
self.permit_url(t.base)
1046
def _track_transports(self):
1047
"""Install checks for transport usage."""
1048
# TestCase has no safe place it can write to.
1049
self._bzr_selftest_roots = []
1050
# Currently the easiest way to be sure that nothing is going on is to
1051
# hook into bzr dir opening. This leaves a small window of error for
1052
# transport tests, but they are well known, and we can improve on this
1054
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1055
self._preopen_isolate_transport, "Check bzr directories are safe.")
943
1057
def _ndiff_strings(self, a, b):
944
1058
"""Return ndiff between two strings containing lines.
1014
1128
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1015
1129
length, len(obj_with_len), obj_with_len))
1131
def assertLogsError(self, exception_class, func, *args, **kwargs):
1132
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1134
from bzrlib import trace
1136
orig_log_exception_quietly = trace.log_exception_quietly
1139
orig_log_exception_quietly()
1140
captured.append(sys.exc_info())
1141
trace.log_exception_quietly = capture
1142
func(*args, **kwargs)
1144
trace.log_exception_quietly = orig_log_exception_quietly
1145
self.assertLength(1, captured)
1146
err = captured[0][1]
1147
self.assertIsInstance(err, exception_class)
1017
1150
def assertPositive(self, val):
1018
1151
"""Assert that val is greater than 0."""
1019
1152
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1433
1566
def _do_skip(self, result, reason):
1434
1567
addSkip = getattr(result, 'addSkip', None)
1435
1568
if not callable(addSkip):
1436
result.addError(self, sys.exc_info())
1569
result.addSuccess(result)
1438
1571
addSkip(self, reason)
1573
def _do_known_failure(self, result):
1574
err = sys.exc_info()
1575
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1576
if addExpectedFailure is not None:
1577
addExpectedFailure(self, err)
1579
result.addSuccess(self)
1581
def _do_not_applicable(self, result, e):
1583
reason = 'No reason given'
1586
addNotApplicable = getattr(result, 'addNotApplicable', None)
1587
if addNotApplicable is not None:
1588
result.addNotApplicable(self, reason)
1590
self._do_skip(result, reason)
1592
def _do_unsupported_or_skip(self, result, reason):
1593
addNotSupported = getattr(result, 'addNotSupported', None)
1594
if addNotSupported is not None:
1595
result.addNotSupported(self, reason)
1597
self._do_skip(result, reason)
1440
1599
def run(self, result=None):
1441
1600
if result is None: result = self.defaultTestResult()
1601
result.startTest(self)
1606
result.stopTest(self)
1608
def _run(self, result):
1442
1609
for feature in getattr(self, '_test_needs_features', []):
1443
1610
if not feature.available():
1444
result.startTest(self)
1445
if getattr(result, 'addNotSupported', None):
1446
result.addNotSupported(self, feature)
1448
result.addSuccess(self)
1449
result.stopTest(self)
1611
return self._do_unsupported_or_skip(result, feature)
1613
absent_attr = object()
1615
method_name = getattr(self, '_testMethodName', absent_attr)
1616
if method_name is absent_attr:
1618
method_name = getattr(self, '_TestCase__testMethodName')
1619
testMethod = getattr(self, method_name)
1453
result.startTest(self)
1454
absent_attr = object()
1456
method_name = getattr(self, '_testMethodName', absent_attr)
1457
if method_name is absent_attr:
1459
method_name = getattr(self, '_TestCase__testMethodName')
1460
testMethod = getattr(self, method_name)
1464
if not self._bzr_test_setUp_run:
1466
"test setUp did not invoke "
1467
"bzrlib.tests.TestCase's setUp")
1468
except KeyboardInterrupt:
1471
except TestSkipped, e:
1472
self._do_skip(result, e.args[0])
1476
result.addError(self, sys.exc_info())
1623
if not self._bzr_test_setUp_run:
1625
"test setUp did not invoke "
1626
"bzrlib.tests.TestCase's setUp")
1627
except KeyboardInterrupt:
1630
except KnownFailure:
1631
self._do_known_failure(result)
1634
except TestNotApplicable, e:
1635
self._do_not_applicable(result, e)
1638
except TestSkipped, e:
1639
self._do_skip(result, e.args[0])
1642
except UnavailableFeature, e:
1643
self._do_unsupported_or_skip(result, e.args[0])
1647
result.addError(self, sys.exc_info())
1655
except KnownFailure:
1656
self._do_known_failure(result)
1657
except self.failureException:
1658
result.addFailure(self, sys.exc_info())
1659
except TestNotApplicable, e:
1660
self._do_not_applicable(result, e)
1661
except TestSkipped, e:
1663
reason = "No reason given."
1666
self._do_skip(result, reason)
1667
except UnavailableFeature, e:
1668
self._do_unsupported_or_skip(result, e.args[0])
1669
except KeyboardInterrupt:
1673
result.addError(self, sys.exc_info())
1677
if not self._bzr_test_tearDown_run:
1679
"test tearDown did not invoke "
1680
"bzrlib.tests.TestCase's tearDown")
1681
except KeyboardInterrupt:
1685
result.addError(self, sys.exc_info())
1484
except self.failureException:
1485
result.addFailure(self, sys.exc_info())
1486
except TestSkipped, e:
1488
reason = "No reason given."
1491
self._do_skip(result, reason)
1492
except KeyboardInterrupt:
1496
result.addError(self, sys.exc_info())
1500
if not self._bzr_test_tearDown_run:
1502
"test tearDown did not invoke "
1503
"bzrlib.tests.TestCase's tearDown")
1504
except KeyboardInterrupt:
1508
result.addError(self, sys.exc_info())
1511
if ok: result.addSuccess(self)
1513
result.stopTest(self)
1688
if ok: result.addSuccess(self)
1515
except TestNotApplicable:
1516
# Not moved from the result [yet].
1519
1690
except KeyboardInterrupt:
1520
1691
self._runCleanups()