367
276
"""The test will not be run because of a missing feature.
369
278
# this can be called in two different ways: it may be that the
370
# test started running, and then raised (through requireFeature)
279
# test started running, and then raised (through addError)
371
280
# UnavailableFeature. Alternatively this method can be called
372
# while probing for features before running the test code proper; in
373
# that case we will see startTest and stopTest, but the test will
374
# never actually run.
281
# while probing for features before running the tests; in that
282
# case we will see startTest and stopTest, but the test will never
375
284
self.unsupported.setdefault(str(feature), 0)
376
285
self.unsupported[str(feature)] += 1
377
286
self.report_unsupported(test, feature)
379
def addSkip(self, test, reason):
380
"""A test has not run for 'reason'."""
382
self.report_skip(test, reason)
384
def addNotApplicable(self, test, reason):
385
self.not_applicable_count += 1
386
self.report_not_applicable(test, reason)
388
def _post_mortem(self):
389
"""Start a PDB post mortem session."""
390
if os.environ.get('BZR_TEST_PDB', None):
391
import pdb;pdb.post_mortem()
393
def progress(self, offset, whence):
394
"""The test is adjusting the count of tests to run."""
395
if whence == SUBUNIT_SEEK_SET:
396
self.num_tests = offset
397
elif whence == SUBUNIT_SEEK_CUR:
398
self.num_tests += offset
400
raise errors.BzrError("Unknown whence %r" % whence)
288
def _addSkipped(self, test, skip_excinfo):
289
if isinstance(skip_excinfo[1], TestNotApplicable):
290
self.not_applicable_count += 1
291
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
302
# seems best to treat this as success from point-of-view of unittest
303
# -- it actually does nothing so it barely matters :)
304
unittest.TestResult.addSuccess(self, test)
305
test._log_contents = ''
307
def printErrorList(self, flavour, errors):
308
for test, err in errors:
309
self.stream.writeln(self.separator1)
310
self.stream.write("%s: " % flavour)
311
self.stream.writeln(self.getDescription(test))
312
if getattr(test, '_get_log', None) is not None:
313
self.stream.write('\n')
315
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
316
self.stream.write('\n')
317
self.stream.write(test._get_log())
318
self.stream.write('\n')
320
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
321
self.stream.write('\n')
322
self.stream.writeln(self.separator2)
323
self.stream.writeln("%s" % err)
402
328
def report_cleaning_up(self):
405
def startTestRun(self):
406
self.startTime = time.time()
408
331
def report_success(self, test):
597
510
bench_history=None,
599
result_decorators=None,
601
"""Create a TextTestRunner.
603
:param result_decorators: An optional list of decorators to apply
604
to the result object being used by the runner. Decorators are
605
applied left to right - the first element in the list is the
608
# stream may know claim to know to write unicode strings, but in older
609
# pythons this goes sufficiently wrong that it is a bad idea. (
610
# specifically a built in file with encoding 'UTF-8' will still try
611
# to encode using ascii.
612
new_encoding = osutils.get_terminal_encoding()
613
codec = codecs.lookup(new_encoding)
614
if type(codec) is tuple:
618
encode = codec.encode
619
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
620
stream.encoding = new_encoding
621
513
self.stream = unittest._WritelnDecorator(stream)
622
514
self.descriptions = descriptions
623
515
self.verbosity = verbosity
624
516
self._bench_history = bench_history
625
self._strict = strict
626
self._result_decorators = result_decorators or []
517
self.list_only = list_only
628
519
def run(self, test):
629
520
"Run the given test case or test suite."
521
startTime = time.time()
630
522
if self.verbosity == 1:
631
523
result_class = TextTestResult
632
524
elif self.verbosity >= 2:
633
525
result_class = VerboseTestResult
634
original_result = result_class(self.stream,
526
result = result_class(self.stream,
635
527
self.descriptions,
637
529
bench_history=self._bench_history,
530
num_tests=test.countTestCases(),
640
# Signal to result objects that look at stop early policy to stop,
641
original_result.stop_early = self.stop_on_failure
642
result = original_result
643
for decorator in self._result_decorators:
644
result = decorator(result)
645
result.stop_early = self.stop_on_failure
646
result.startTestRun()
532
result.stop_early = self.stop_on_failure
533
result.report_starting()
535
if self.verbosity >= 2:
536
self.stream.writeln("Listing tests only ...\n")
538
for t in iter_suite_tests(test):
539
self.stream.writeln("%s" % (t.id()))
541
actionTaken = "Listed"
651
# higher level code uses our extended protocol to determine
652
# what exit code to give.
653
return original_result
544
run = result.testsRun
546
stopTime = time.time()
547
timeTaken = stopTime - startTime
549
self.stream.writeln(result.separator2)
550
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
551
run, run != 1 and "s" or "", timeTaken))
552
self.stream.writeln()
553
if not result.wasSuccessful():
554
self.stream.write("FAILED (")
555
failed, errored = map(len, (result.failures, result.errors))
557
self.stream.write("failures=%d" % failed)
559
if failed: self.stream.write(", ")
560
self.stream.write("errors=%d" % errored)
561
if result.known_failure_count:
562
if failed or errored: self.stream.write(", ")
563
self.stream.write("known_failure_count=%d" %
564
result.known_failure_count)
565
self.stream.writeln(")")
567
if result.known_failure_count:
568
self.stream.writeln("OK (known_failures=%d)" %
569
result.known_failure_count)
571
self.stream.writeln("OK")
572
if result.skip_count > 0:
573
skipped = result.skip_count
574
self.stream.writeln('%d test%s skipped' %
575
(skipped, skipped != 1 and "s" or ""))
576
if result.unsupported:
577
for feature, count in sorted(result.unsupported.items()):
578
self.stream.writeln("Missing feature '%s' skipped %d tests." %
656
584
def iter_suite_tests(suite):
657
585
"""Return all tests in a suite, recursing through nested suites"""
658
if isinstance(suite, unittest.TestCase):
660
elif isinstance(suite, unittest.TestSuite):
586
for item in suite._tests:
587
if isinstance(item, unittest.TestCase):
589
elif isinstance(item, unittest.TestSuite):
662
590
for r in iter_suite_tests(item):
665
raise Exception('unknown type %r for object %r'
666
% (type(suite), suite))
669
TestSkipped = testtools.testcase.TestSkipped
593
raise Exception('unknown object %r inside test suite %r'
597
class TestSkipped(Exception):
598
"""Indicates that a test was intentionally skipped, rather than failing."""
672
601
class TestNotApplicable(TestSkipped):
673
602
"""A test is not applicable to the situation where it was run.
675
This is only normally raised by parameterized tests, if they find that
676
the instance they're constructed upon does not support one aspect
604
This is only normally raised by parameterized tests, if they find that
605
the instance they're constructed upon does not support one aspect
677
606
of its interface.
681
# traceback._some_str fails to format exceptions that have the default
682
# __str__ which does an implicit ascii conversion. However, repr() on those
683
# objects works, for all that its not quite what the doctor may have ordered.
684
def _clever_some_str(value):
689
return repr(value).replace('\\n', '\n')
691
return '<unprintable %s object>' % type(value).__name__
693
traceback._some_str = _clever_some_str
696
# deprecated - use self.knownFailure(), or self.expectFailure.
697
KnownFailure = testtools.testcase._ExpectedFailure
610
class KnownFailure(AssertionError):
611
"""Indicates that a test failed in a precisely expected manner.
613
Such failures dont block the whole test suite from passing because they are
614
indicators of partially completed code or of future work. We have an
615
explicit error for them so that we can ensure that they are always visible:
616
KnownFailures are always shown in the output of bzr selftest.
700
620
class UnavailableFeature(Exception):
701
621
"""A feature required for this test was not available.
703
This can be considered a specialised form of SkippedTest.
705
623
The feature should be used to construct the exception.
627
class CommandFailed(Exception):
709
631
class StringIOWrapper(object):
710
632
"""A wrapper around cStringIO which just adds an encoding attribute.
712
634
Internally we can check sys.stdout to see what the output encoding
713
635
should be. However, cStringIO has no encoding attribute that we can
714
636
set. So we wrap it instead.
797
750
_leaking_threads_tests = 0
798
751
_first_thread_leaker_id = None
799
752
_log_file_name = None
754
_keep_log_file = False
800
755
# record lsprof data when performing benchmark calls.
801
756
_gather_lsprof_in_benchmarks = False
757
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
758
'_log_contents', '_log_file_name', '_benchtime',
759
'_TestCase__testMethodName')
803
761
def __init__(self, methodName='testMethod'):
804
762
super(TestCase, self).__init__(methodName)
805
763
self._cleanups = []
806
self._directory_isolation = True
807
self.exception_handlers.insert(0,
808
(UnavailableFeature, self._do_unsupported_or_skip))
809
self.exception_handlers.insert(0,
810
(TestNotApplicable, self._do_not_applicable))
813
super(TestCase, self).setUp()
814
for feature in getattr(self, '_test_needs_features', []):
815
self.requireFeature(feature)
816
self._log_contents = None
817
self.addDetail("log", content.Content(content.ContentType("text",
818
"plain", {"charset": "utf8"}),
819
lambda:[self._get_log(keep_log_file=True)]))
766
unittest.TestCase.setUp(self)
820
767
self._cleanEnvironment()
821
768
self._silenceUI()
822
769
self._startLogFile()
823
770
self._benchcalls = []
824
771
self._benchtime = None
825
772
self._clear_hooks()
826
self._track_transports()
828
773
self._clear_debug_flags()
829
774
TestCase._active_threads = threading.activeCount()
830
775
self.addCleanup(self._check_leaked_threads)
835
pdb.Pdb().set_trace(sys._getframe().f_back)
837
777
def _check_leaked_threads(self):
838
778
active = threading.activeCount()
839
779
leaked_threads = active - TestCase._active_threads
840
780
TestCase._active_threads = active
841
# If some tests make the number of threads *decrease*, we'll consider
842
# that they are just observing old threads dieing, not agressively kill
843
# random threads. So we don't report these tests as leaking. The risk
844
# is that we have false positives that way (the test see 2 threads
845
# going away but leak one) but it seems less likely than the actual
846
# false positives (the test see threads going away and does not leak).
847
if leaked_threads > 0:
848
782
TestCase._leaking_threads_tests += 1
849
783
if TestCase._first_thread_leaker_id is None:
850
784
TestCase._first_thread_leaker_id = self.id()
785
# we're not specifically told when all tests are finished.
786
# This will do. We use a function to avoid keeping a reference
787
# to a TestCase object.
788
atexit.register(_report_leaked_threads)
852
790
def _clear_debug_flags(self):
853
791
"""Prevent externally set debug flags affecting tests.
855
793
Tests that want to use debug flags can just set them in the
856
794
debug_flags set during setup/teardown.
858
# Start with a copy of the current debug flags we can safely modify.
859
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
860
796
if 'allow_debug' not in selftest_debug_flags:
797
self._preserved_debug_flags = set(debug.debug_flags)
861
798
debug.debug_flags.clear()
862
if 'disable_lock_checks' not in selftest_debug_flags:
863
debug.debug_flags.add('strict_locks')
799
self.addCleanup(self._restore_debug_flags)
865
801
def _clear_hooks(self):
866
802
# prevent hooks affecting tests
867
self._preserved_hooks = {}
868
for key, factory in hooks.known_hooks.items():
869
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
870
current_hooks = hooks.known_hooks_key_to_object(key)
871
self._preserved_hooks[parent] = (name, current_hooks)
804
import bzrlib.smart.server
805
self._preserved_hooks = {
806
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
807
bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
808
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
872
810
self.addCleanup(self._restoreHooks)
873
for key, factory in hooks.known_hooks.items():
874
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
875
setattr(parent, name, factory())
876
# this hook should always be installed
877
request._install_hook()
879
def disable_directory_isolation(self):
880
"""Turn off directory isolation checks."""
881
self._directory_isolation = False
883
def enable_directory_isolation(self):
884
"""Enable directory isolation checks."""
885
self._directory_isolation = True
811
# reset all hooks to an empty instance of the appropriate type
812
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
813
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
887
815
def _silenceUI(self):
888
816
"""Turn off UI for duration of test"""
889
817
# by default the UI is off; tests can turn it on if they want it.
890
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
892
def _check_locks(self):
893
"""Check that all lock take/release actions have been paired."""
894
# We always check for mismatched locks. If a mismatch is found, we
895
# fail unless -Edisable_lock_checks is supplied to selftest, in which
896
# case we just print a warning.
898
acquired_locks = [lock for action, lock in self._lock_actions
899
if action == 'acquired']
900
released_locks = [lock for action, lock in self._lock_actions
901
if action == 'released']
902
broken_locks = [lock for action, lock in self._lock_actions
903
if action == 'broken']
904
# trivially, given the tests for lock acquistion and release, if we
905
# have as many in each list, it should be ok. Some lock tests also
906
# break some locks on purpose and should be taken into account by
907
# considering that breaking a lock is just a dirty way of releasing it.
908
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
909
message = ('Different number of acquired and '
910
'released or broken locks. (%s, %s + %s)' %
911
(acquired_locks, released_locks, broken_locks))
912
if not self._lock_check_thorough:
913
# Rather than fail, just warn
914
print "Broken test %s: %s" % (self, message)
918
def _track_locks(self):
919
"""Track lock activity during tests."""
920
self._lock_actions = []
921
if 'disable_lock_checks' in selftest_debug_flags:
922
self._lock_check_thorough = False
924
self._lock_check_thorough = True
926
self.addCleanup(self._check_locks)
927
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
928
self._lock_acquired, None)
929
_mod_lock.Lock.hooks.install_named_hook('lock_released',
930
self._lock_released, None)
931
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
932
self._lock_broken, None)
934
def _lock_acquired(self, result):
935
self._lock_actions.append(('acquired', result))
937
def _lock_released(self, result):
938
self._lock_actions.append(('released', result))
940
def _lock_broken(self, result):
941
self._lock_actions.append(('broken', result))
943
def permit_dir(self, name):
944
"""Permit a directory to be used by this test. See permit_url."""
945
name_transport = get_transport(name)
946
self.permit_url(name)
947
self.permit_url(name_transport.base)
949
def permit_url(self, url):
950
"""Declare that url is an ok url to use in this test.
952
Do this for memory transports, temporary test directory etc.
954
Do not do this for the current working directory, /tmp, or any other
955
preexisting non isolated url.
957
if not url.endswith('/'):
959
self._bzr_selftest_roots.append(url)
961
def permit_source_tree_branch_repo(self):
962
"""Permit the source tree bzr is running from to be opened.
964
Some code such as bzrlib.version attempts to read from the bzr branch
965
that bzr is executing from (if any). This method permits that directory
966
to be used in the test suite.
968
path = self.get_source_path()
969
self.record_directory_isolation()
972
workingtree.WorkingTree.open(path)
973
except (errors.NotBranchError, errors.NoWorkingTree):
976
self.enable_directory_isolation()
978
def _preopen_isolate_transport(self, transport):
979
"""Check that all transport openings are done in the test work area."""
980
while isinstance(transport, pathfilter.PathFilteringTransport):
981
# Unwrap pathfiltered transports
982
transport = transport.server.backing_transport.clone(
983
transport._filter('.'))
985
# ReadonlySmartTCPServer_for_testing decorates the backing transport
986
# urls it is given by prepending readonly+. This is appropriate as the
987
# client shouldn't know that the server is readonly (or not readonly).
988
# We could register all servers twice, with readonly+ prepending, but
989
# that makes for a long list; this is about the same but easier to
991
if url.startswith('readonly+'):
992
url = url[len('readonly+'):]
993
self._preopen_isolate_url(url)
995
def _preopen_isolate_url(self, url):
996
if not self._directory_isolation:
998
if self._directory_isolation == 'record':
999
self._bzr_selftest_roots.append(url)
1001
# This prevents all transports, including e.g. sftp ones backed on disk
1002
# from working unless they are explicitly granted permission. We then
1003
# depend on the code that sets up test transports to check that they are
1004
# appropriately isolated and enable their use by calling
1005
# self.permit_transport()
1006
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1007
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1008
% (url, self._bzr_selftest_roots))
1010
def record_directory_isolation(self):
1011
"""Gather accessed directories to permit later access.
1013
This is used for tests that access the branch bzr is running from.
1015
self._directory_isolation = "record"
1017
def start_server(self, transport_server, backing_server=None):
1018
"""Start transport_server for this test.
1020
This starts the server, registers a cleanup for it and permits the
1021
server's urls to be used.
1023
if backing_server is None:
1024
transport_server.start_server()
1026
transport_server.start_server(backing_server)
1027
self.addCleanup(transport_server.stop_server)
1028
# Obtain a real transport because if the server supplies a password, it
1029
# will be hidden from the base on the client side.
1030
t = get_transport(transport_server.get_url())
1031
# Some transport servers effectively chroot the backing transport;
1032
# others like SFTPServer don't - users of the transport can walk up the
1033
# transport to read the entire backing transport. This wouldn't matter
1034
# except that the workdir tests are given - and that they expect the
1035
# server's url to point at - is one directory under the safety net. So
1036
# Branch operations into the transport will attempt to walk up one
1037
# directory. Chrooting all servers would avoid this but also mean that
1038
# we wouldn't be testing directly against non-root urls. Alternatively
1039
# getting the test framework to start the server with a backing server
1040
# at the actual safety net directory would work too, but this then
1041
# means that the self.get_url/self.get_transport methods would need
1042
# to transform all their results. On balance its cleaner to handle it
1043
# here, and permit a higher url when we have one of these transports.
1044
if t.base.endswith('/work/'):
1045
# we have safety net/test root/work
1046
t = t.clone('../..')
1047
elif isinstance(transport_server,
1048
test_server.SmartTCPServer_for_testing):
1049
# The smart server adds a path similar to work, which is traversed
1050
# up from by the client. But the server is chrooted - the actual
1051
# backing transport is not escaped from, and VFS requests to the
1052
# root will error (because they try to escape the chroot).
1054
while t2.base != t.base:
1057
self.permit_url(t.base)
1059
def _track_transports(self):
1060
"""Install checks for transport usage."""
1061
# TestCase has no safe place it can write to.
1062
self._bzr_selftest_roots = []
1063
# Currently the easiest way to be sure that nothing is going on is to
1064
# hook into bzr dir opening. This leaves a small window of error for
1065
# transport tests, but they are well known, and we can improve on this
1067
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1068
self._preopen_isolate_transport, "Check bzr directories are safe.")
818
saved = ui.ui_factory
820
ui.ui_factory = saved
821
ui.ui_factory = ui.SilentUIFactory()
822
self.addCleanup(_restore)
1070
824
def _ndiff_strings(self, a, b):
1071
825
"""Return ndiff between two strings containing lines.
1073
827
A trailing newline is added if missing to make the strings
1074
828
print properly."""
1075
829
if b and b[-1] != '\n':
1110
864
if message is None:
1111
865
message = "texts not equal:\n"
867
message = 'first string is missing a final newline.\n'
1112
868
if a + '\n' == b:
1113
message = 'first string is missing a final newline.\n'
1115
869
message = 'second string is missing a final newline.\n'
1116
870
raise AssertionError(message +
1117
871
self._ndiff_strings(a, b))
1119
873
def assertEqualMode(self, mode, mode_test):
1120
874
self.assertEqual(mode, mode_test,
1121
875
'mode mismatch %o != %o' % (mode, mode_test))
1123
def assertEqualStat(self, expected, actual):
1124
"""assert that expected and actual are the same stat result.
1126
:param expected: A stat result.
1127
:param actual: A stat result.
1128
:raises AssertionError: If the expected and actual stat values differ
1129
other than by atime.
1131
self.assertEqual(expected.st_size, actual.st_size,
1132
'st_size did not match')
1133
self.assertEqual(expected.st_mtime, actual.st_mtime,
1134
'st_mtime did not match')
1135
self.assertEqual(expected.st_ctime, actual.st_ctime,
1136
'st_ctime did not match')
1137
if sys.platform != 'win32':
1138
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1139
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1140
# odd. Regardless we shouldn't actually try to assert anything
1141
# about their values
1142
self.assertEqual(expected.st_dev, actual.st_dev,
1143
'st_dev did not match')
1144
self.assertEqual(expected.st_ino, actual.st_ino,
1145
'st_ino did not match')
1146
self.assertEqual(expected.st_mode, actual.st_mode,
1147
'st_mode did not match')
1149
def assertLength(self, length, obj_with_len):
1150
"""Assert that obj_with_len is of length length."""
1151
if len(obj_with_len) != length:
1152
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1153
length, len(obj_with_len), obj_with_len))
1155
def assertLogsError(self, exception_class, func, *args, **kwargs):
1156
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1158
from bzrlib import trace
1160
orig_log_exception_quietly = trace.log_exception_quietly
1163
orig_log_exception_quietly()
1164
captured.append(sys.exc_info())
1165
trace.log_exception_quietly = capture
1166
func(*args, **kwargs)
1168
trace.log_exception_quietly = orig_log_exception_quietly
1169
self.assertLength(1, captured)
1170
err = captured[0][1]
1171
self.assertIsInstance(err, exception_class)
1174
877
def assertPositive(self, val):
1175
878
"""Assert that val is greater than 0."""
1176
879
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1458
1185
Close the file and delete it, unless setKeepLogfile was called.
1460
if bzrlib.trace._trace_file:
1461
# flush the log file, to get all content
1462
bzrlib.trace._trace_file.flush()
1187
if self._log_file is None:
1463
1189
bzrlib.trace.pop_log_file(self._log_memento)
1464
# Cache the log result and delete the file on disk
1465
self._get_log(False)
1467
def thisFailsStrictLockCheck(self):
1468
"""It is known that this test would fail with -Dstrict_locks.
1470
By default, all tests are run with strict lock checking unless
1471
-Edisable_lock_checks is supplied. However there are some tests which
1472
we know fail strict locks at this point that have not been fixed.
1473
They should call this function to disable the strict checking.
1475
This should be used sparingly, it is much better to fix the locking
1476
issues rather than papering over the problem by calling this function.
1478
debug.debug_flags.discard('strict_locks')
1190
self._log_file.close()
1191
self._log_file = None
1192
if not self._keep_log_file:
1193
os.remove(self._log_file_name)
1194
self._log_file_name = None
1196
def setKeepLogfile(self):
1197
"""Make the logfile not be deleted when _finishLogFile is called."""
1198
self._keep_log_file = True
1480
1200
def addCleanup(self, callable, *args, **kwargs):
1481
1201
"""Arrange to run a callable when this case is torn down.
1483
Callables are run in the reverse of the order they are registered,
1203
Callables are run in the reverse of the order they are registered,
1484
1204
ie last-in first-out.
1486
1206
self._cleanups.append((callable, args, kwargs))
1488
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1489
"""Overrides an object attribute restoring it after the test.
1491
:param obj: The object that will be mutated.
1493
:param attr_name: The attribute name we want to preserve/override in
1496
:param new: The optional value we want to set the attribute to.
1498
:returns: The actual attr value.
1500
value = getattr(obj, attr_name)
1501
# The actual value is captured by the call below
1502
self.addCleanup(setattr, obj, attr_name, value)
1503
if new is not _unitialized_attr:
1504
setattr(obj, attr_name, new)
1507
1208
def _cleanEnvironment(self):
1509
1210
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1510
1211
'HOME': os.getcwd(),
1511
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1512
# tests do check our impls match APPDATA
1212
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1513
1213
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1516
1214
'BZR_EMAIL': None,
1517
1215
'BZREMAIL': None, # may still be present in the environment
1519
1217
'BZR_PROGRESS_BAR': None,
1520
1218
'BZR_LOG': None,
1521
'BZR_PLUGIN_PATH': None,
1522
'BZR_DISABLE_PLUGINS': None,
1523
'BZR_PLUGINS_AT': None,
1524
'BZR_CONCURRENCY': None,
1525
# Make sure that any text ui tests are consistent regardless of
1526
# the environment the test case is run in; you may want tests that
1527
# test other combinations. 'dumb' is a reasonable guess for tests
1528
# going to a pipe or a StringIO.
1532
'BZR_COLUMNS': '80',
1534
1220
'SSH_AUTH_SOCK': None,
1541
1227
'NO_PROXY': None,
1542
1228
'all_proxy': None,
1543
1229
'ALL_PROXY': None,
1544
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1230
# Nobody cares about these ones AFAIK. So far at
1545
1231
# least. If you do (care), please update this comment
1547
1233
'ftp_proxy': None,
1548
1234
'FTP_PROXY': None,
1549
1235
'BZR_REMOTE_PATH': None,
1550
# Generally speaking, we don't want apport reporting on crashes in
1551
# the test envirnoment unless we're specifically testing apport,
1552
# so that it doesn't leak into the real system environment. We
1553
# use an env var so it propagates to subprocesses.
1554
'APPORT_DISABLE': '1',
1557
1238
self.addCleanup(self._restoreEnvironment)
1558
1239
for name, value in new_env.iteritems():
1559
1240
self._captureVar(name, value)
1561
1242
def _captureVar(self, name, newvalue):
1562
1243
"""Set an environment variable, and reset it when finished."""
1563
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1244
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1246
def _restore_debug_flags(self):
1247
debug.debug_flags.clear()
1248
debug.debug_flags.update(self._preserved_debug_flags)
1565
1250
def _restoreEnvironment(self):
1566
for name, value in self._old_env.iteritems():
1251
for name, value in self.__old_env.iteritems():
1567
1252
osutils.set_or_unset_env(name, value)
1569
1254
def _restoreHooks(self):
1570
for klass, (name, hooks) in self._preserved_hooks.items():
1571
setattr(klass, name, hooks)
1255
for klass, hooks in self._preserved_hooks.items():
1256
setattr(klass, 'hooks', hooks)
1573
1258
def knownFailure(self, reason):
1574
1259
"""This test has failed for some known reason."""
1575
1260
raise KnownFailure(reason)
1577
def _do_skip(self, result, reason):
1578
addSkip = getattr(result, 'addSkip', None)
1579
if not callable(addSkip):
1580
result.addSuccess(result)
1582
addSkip(self, reason)
1585
def _do_known_failure(self, result, e):
1586
err = sys.exc_info()
1587
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1588
if addExpectedFailure is not None:
1589
addExpectedFailure(self, err)
1591
result.addSuccess(self)
1594
def _do_not_applicable(self, result, e):
1596
reason = 'No reason given'
1599
addNotApplicable = getattr(result, 'addNotApplicable', None)
1600
if addNotApplicable is not None:
1601
result.addNotApplicable(self, reason)
1603
self._do_skip(result, reason)
1606
def _do_unsupported_or_skip(self, result, e):
1608
addNotSupported = getattr(result, 'addNotSupported', None)
1609
if addNotSupported is not None:
1610
result.addNotSupported(self, reason)
1612
self._do_skip(result, reason)
1262
def run(self, result=None):
1263
if result is None: result = self.defaultTestResult()
1264
for feature in getattr(self, '_test_needs_features', []):
1265
if not feature.available():
1266
result.startTest(self)
1267
if getattr(result, 'addNotSupported', None):
1268
result.addNotSupported(self, feature)
1270
result.addSuccess(self)
1271
result.stopTest(self)
1274
return unittest.TestCase.run(self, result)
1277
absent_attr = object()
1278
for attr_name in self.attrs_to_keep:
1279
attr = getattr(self, attr_name, absent_attr)
1280
if attr is not absent_attr:
1281
saved_attrs[attr_name] = attr
1282
self.__dict__ = saved_attrs
1286
unittest.TestCase.tearDown(self)
1614
1288
def time(self, callable, *args, **kwargs):
1615
1289
"""Run callable and accrue the time it takes to the benchmark time.
1617
1291
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1618
1292
this will cause lsprofile statistics to be gathered and stored in
1619
1293
self._benchcalls.
1621
1295
if self._benchtime is None:
1622
self.addDetail('benchtime', content.Content(content.ContentType(
1623
"text", "plain"), lambda:[str(self._benchtime)]))
1624
1296
self._benchtime = 0
1625
1297
start = time.time()
2390
1982
def make_repository(self, relpath, shared=False, format=None):
2391
1983
"""Create a repository on our default transport at relpath.
2393
1985
Note that relpath must be a relative path, not a full url.
2395
1987
# FIXME: If you create a remoterepository this returns the underlying
2396
# real format, which is incorrect. Actually we should make sure that
1988
# real format, which is incorrect. Actually we should make sure that
2397
1989
# RemoteBzrDir returns a RemoteRepository.
2398
1990
# maybe mbp 20070410
2399
1991
made_control = self.make_bzrdir(relpath, format=format)
2400
1992
return made_control.create_repository(shared=shared)
2402
def make_smart_server(self, path):
2403
smart_server = test_server.SmartTCPServer_for_testing()
2404
self.start_server(smart_server, self.get_server())
2405
remote_transport = get_transport(smart_server.get_url()).clone(path)
2406
return remote_transport
2408
1994
def make_branch_and_memory_tree(self, relpath, format=None):
2409
1995
"""Create a branch on the default transport and a MemoryTree for it."""
2410
1996
b = self.make_branch(relpath, format=format)
2411
1997
return memorytree.MemoryTree.create_on_branch(b)
2413
1999
def make_branch_builder(self, relpath, format=None):
2414
branch = self.make_branch(relpath, format=format)
2415
return branchbuilder.BranchBuilder(branch=branch)
2000
url = self.get_url(relpath)
2001
tran = get_transport(url)
2002
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2417
2004
def overrideEnvironmentForTesting(self):
2418
test_home_dir = self.test_home_dir
2419
if isinstance(test_home_dir, unicode):
2420
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2421
os.environ['HOME'] = test_home_dir
2422
os.environ['BZR_HOME'] = test_home_dir
2005
os.environ['HOME'] = self.test_home_dir
2006
os.environ['BZR_HOME'] = self.test_home_dir
2424
2008
def setUp(self):
2425
2009
super(TestCaseWithMemoryTransport, self).setUp()
2426
2010
self._make_test_root()
2427
self.addCleanup(os.chdir, os.getcwdu())
2011
_currentdir = os.getcwdu()
2012
def _leaveDirectory():
2013
os.chdir(_currentdir)
2014
self.addCleanup(_leaveDirectory)
2428
2015
self.makeAndChdirToTestDir()
2429
2016
self.overrideEnvironmentForTesting()
2430
2017
self.__readonly_server = None
2431
2018
self.__server = None
2432
2019
self.reduceLockdirTimeout()
2434
def setup_smart_server_with_call_log(self):
2435
"""Sets up a smart server as the transport server with a call log."""
2436
self.transport_server = test_server.SmartTCPServer_for_testing
2437
self.hpss_calls = []
2439
# Skip the current stack down to the caller of
2440
# setup_smart_server_with_call_log
2441
prefix_length = len(traceback.extract_stack()) - 2
2442
def capture_hpss_call(params):
2443
self.hpss_calls.append(
2444
CapturedCall(params, prefix_length))
2445
client._SmartClient.hooks.install_named_hook(
2446
'call', capture_hpss_call, None)
2448
def reset_smart_call_log(self):
2449
self.hpss_calls = []
2452
2022
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2453
2023
"""Derived class that runs a test within a temporary directory.
2919
2464
list_only=False,
2920
2465
random_seed=None,
2921
2466
exclude_pattern=None,
2924
suite_decorators=None,
2926
result_decorators=None,
2928
"""Run a test suite for bzr selftest.
2930
:param runner_class: The class of runner to use. Must support the
2931
constructor arguments passed by run_suite which are more than standard
2933
:return: A boolean indicating success.
2935
2468
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2940
if runner_class is None:
2941
runner_class = TextTestRunner
2944
runner = runner_class(stream=stream,
2473
runner = TextTestRunner(stream=sys.stdout,
2945
2474
descriptions=0,
2946
2475
verbosity=verbosity,
2947
2476
bench_history=bench_history,
2949
result_decorators=result_decorators,
2477
list_only=list_only,
2951
2479
runner.stop_on_failure=stop_on_failure
2952
# built in decorator factories:
2954
random_order(random_seed, runner),
2955
exclude_tests(exclude_pattern),
2957
if matching_tests_first:
2958
decorators.append(tests_first(pattern))
2480
# Initialise the random number generator and display the seed used.
2481
# We convert the seed to a long to make it reuseable across invocations.
2482
random_order = False
2483
if random_seed is not None:
2485
if random_seed == "now":
2486
random_seed = long(time.time())
2488
# Convert the seed to a long if we can
2490
random_seed = long(random_seed)
2493
runner.stream.writeln("Randomizing test order using seed %s\n" %
2495
random.seed(random_seed)
2496
# Customise the list of tests if requested
2497
if exclude_pattern is not None:
2498
suite = exclude_tests_by_re(suite, exclude_pattern)
2500
order_changer = randomize_suite
2960
decorators.append(filter_tests(pattern))
2961
if suite_decorators:
2962
decorators.extend(suite_decorators)
2963
# tell the result object how many tests will be running: (except if
2964
# --parallel=fork is being used. Robert said he will provide a better
2965
# progress design later -- vila 20090817)
2966
if fork_decorator not in decorators:
2967
decorators.append(CountingDecorator)
2968
for decorator in decorators:
2969
suite = decorator(suite)
2971
# Done after test suite decoration to allow randomisation etc
2972
# to take effect, though that is of marginal benefit.
2974
stream.write("Listing tests only ...\n")
2975
for t in iter_suite_tests(suite):
2976
stream.write("%s\n" % (t.id()))
2502
order_changer = preserve_input
2503
if pattern != '.*' or random_order:
2504
if matching_tests_first:
2505
suites = map(order_changer, split_suite_by_re(suite, pattern))
2506
suite = TestUtil.TestSuite(suites)
2508
suite = order_changer(filter_suite_by_re(suite, pattern))
2978
2510
result = runner.run(suite)
2980
2513
return result.wasStrictlySuccessful()
2982
return result.wasSuccessful()
2985
# A registry where get() returns a suite decorator.
2986
parallel_registry = registry.Registry()
2989
def fork_decorator(suite):
2990
concurrency = osutils.local_concurrency()
2991
if concurrency == 1:
2993
from testtools import ConcurrentTestSuite
2994
return ConcurrentTestSuite(suite, fork_for_tests)
2995
parallel_registry.register('fork', fork_decorator)
2998
def subprocess_decorator(suite):
2999
concurrency = osutils.local_concurrency()
3000
if concurrency == 1:
3002
from testtools import ConcurrentTestSuite
3003
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3004
parallel_registry.register('subprocess', subprocess_decorator)
3007
def exclude_tests(exclude_pattern):
3008
"""Return a test suite decorator that excludes tests."""
3009
if exclude_pattern is None:
3010
return identity_decorator
3011
def decorator(suite):
3012
return ExcludeDecorator(suite, exclude_pattern)
3016
def filter_tests(pattern):
3018
return identity_decorator
3019
def decorator(suite):
3020
return FilterTestsDecorator(suite, pattern)
3024
def random_order(random_seed, runner):
3025
"""Return a test suite decorator factory for randomising tests order.
3027
:param random_seed: now, a string which casts to a long, or a long.
3028
:param runner: A test runner with a stream attribute to report on.
3030
if random_seed is None:
3031
return identity_decorator
3032
def decorator(suite):
3033
return RandomDecorator(suite, random_seed, runner.stream)
3037
def tests_first(pattern):
3039
return identity_decorator
3040
def decorator(suite):
3041
return TestFirstDecorator(suite, pattern)
3045
def identity_decorator(suite):
3050
class TestDecorator(TestSuite):
3051
"""A decorator for TestCase/TestSuite objects.
3053
Usually, subclasses should override __iter__(used when flattening test
3054
suites), which we do to filter, reorder, parallelise and so on, run() and
3058
def __init__(self, suite):
3059
TestSuite.__init__(self)
3062
def countTestCases(self):
3065
cases += test.countTestCases()
3072
def run(self, result):
3073
# Use iteration on self, not self._tests, to allow subclasses to hook
3076
if result.shouldStop:
3082
class CountingDecorator(TestDecorator):
3083
"""A decorator which calls result.progress(self.countTestCases)."""
3085
def run(self, result):
3086
progress_method = getattr(result, 'progress', None)
3087
if callable(progress_method):
3088
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3089
return super(CountingDecorator, self).run(result)
3092
class ExcludeDecorator(TestDecorator):
3093
"""A decorator which excludes test matching an exclude pattern."""
3095
def __init__(self, suite, exclude_pattern):
3096
TestDecorator.__init__(self, suite)
3097
self.exclude_pattern = exclude_pattern
3098
self.excluded = False
3102
return iter(self._tests)
3103
self.excluded = True
3104
suite = exclude_tests_by_re(self, self.exclude_pattern)
3106
self.addTests(suite)
3107
return iter(self._tests)
3110
class FilterTestsDecorator(TestDecorator):
3111
"""A decorator which filters tests to those matching a pattern."""
3113
def __init__(self, suite, pattern):
3114
TestDecorator.__init__(self, suite)
3115
self.pattern = pattern
3116
self.filtered = False
3120
return iter(self._tests)
3121
self.filtered = True
3122
suite = filter_suite_by_re(self, self.pattern)
3124
self.addTests(suite)
3125
return iter(self._tests)
3128
class RandomDecorator(TestDecorator):
3129
"""A decorator which randomises the order of its tests."""
3131
def __init__(self, suite, random_seed, stream):
3132
TestDecorator.__init__(self, suite)
3133
self.random_seed = random_seed
3134
self.randomised = False
3135
self.stream = stream
3139
return iter(self._tests)
3140
self.randomised = True
3141
self.stream.write("Randomizing test order using seed %s\n\n" %
3142
(self.actual_seed()))
3143
# Initialise the random number generator.
3144
random.seed(self.actual_seed())
3145
suite = randomize_suite(self)
3147
self.addTests(suite)
3148
return iter(self._tests)
3150
def actual_seed(self):
3151
if self.random_seed == "now":
3152
# We convert the seed to a long to make it reuseable across
3153
# invocations (because the user can reenter it).
3154
self.random_seed = long(time.time())
3156
# Convert the seed to a long if we can
3158
self.random_seed = long(self.random_seed)
3161
return self.random_seed
3164
class TestFirstDecorator(TestDecorator):
3165
"""A decorator which moves named tests to the front."""
3167
def __init__(self, suite, pattern):
3168
TestDecorator.__init__(self, suite)
3169
self.pattern = pattern
3170
self.filtered = False
3174
return iter(self._tests)
3175
self.filtered = True
3176
suites = split_suite_by_re(self, self.pattern)
3178
self.addTests(suites)
3179
return iter(self._tests)
3182
def partition_tests(suite, count):
3183
"""Partition suite into count lists of tests."""
3185
tests = list(iter_suite_tests(suite))
3186
tests_per_process = int(math.ceil(float(len(tests)) / count))
3187
for block in range(count):
3188
low_test = block * tests_per_process
3189
high_test = low_test + tests_per_process
3190
process_tests = tests[low_test:high_test]
3191
result.append(process_tests)
3195
def fork_for_tests(suite):
3196
"""Take suite and start up one runner per CPU by forking()
3198
:return: An iterable of TestCase-like objects which can each have
3199
run(result) called on them to feed tests to result.
3201
concurrency = osutils.local_concurrency()
3203
from subunit import TestProtocolClient, ProtocolTestCase
3204
from subunit.test_results import AutoTimingTestResultDecorator
3205
class TestInOtherProcess(ProtocolTestCase):
3206
# Should be in subunit, I think. RBC.
3207
def __init__(self, stream, pid):
3208
ProtocolTestCase.__init__(self, stream)
3211
def run(self, result):
3213
ProtocolTestCase.run(self, result)
3215
os.waitpid(self.pid, os.WNOHANG)
3217
test_blocks = partition_tests(suite, concurrency)
3218
for process_tests in test_blocks:
3219
process_suite = TestSuite()
3220
process_suite.addTests(process_tests)
3221
c2pread, c2pwrite = os.pipe()
3226
# Leave stderr and stdout open so we can see test noise
3227
# Close stdin so that the child goes away if it decides to
3228
# read from stdin (otherwise its a roulette to see what
3229
# child actually gets keystrokes for pdb etc).
3232
stream = os.fdopen(c2pwrite, 'wb', 1)
3233
subunit_result = AutoTimingTestResultDecorator(
3234
TestProtocolClient(stream))
3235
process_suite.run(subunit_result)
3240
stream = os.fdopen(c2pread, 'rb', 1)
3241
test = TestInOtherProcess(stream, pid)
3246
def reinvoke_for_tests(suite):
3247
"""Take suite and start up one runner per CPU using subprocess().
3249
:return: An iterable of TestCase-like objects which can each have
3250
run(result) called on them to feed tests to result.
3252
concurrency = osutils.local_concurrency()
3254
from subunit import ProtocolTestCase
3255
class TestInSubprocess(ProtocolTestCase):
3256
def __init__(self, process, name):
3257
ProtocolTestCase.__init__(self, process.stdout)
3258
self.process = process
3259
self.process.stdin.close()
3262
def run(self, result):
3264
ProtocolTestCase.run(self, result)
3267
os.unlink(self.name)
3268
# print "pid %d finished" % finished_process
3269
test_blocks = partition_tests(suite, concurrency)
3270
for process_tests in test_blocks:
3271
# ugly; currently reimplement rather than reuses TestCase methods.
3272
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3273
if not os.path.isfile(bzr_path):
3274
# We are probably installed. Assume sys.argv is the right file
3275
bzr_path = sys.argv[0]
3276
bzr_path = [bzr_path]
3277
if sys.platform == "win32":
3278
# if we're on windows, we can't execute the bzr script directly
3279
bzr_path = [sys.executable] + bzr_path
3280
fd, test_list_file_name = tempfile.mkstemp()
3281
test_list_file = os.fdopen(fd, 'wb', 1)
3282
for test in process_tests:
3283
test_list_file.write(test.id() + '\n')
3284
test_list_file.close()
3286
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3288
if '--no-plugins' in sys.argv:
3289
argv.append('--no-plugins')
3290
# stderr=STDOUT would be ideal, but until we prevent noise on
3291
# stderr it can interrupt the subunit protocol.
3292
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3294
test = TestInSubprocess(process, test_list_file_name)
3297
os.unlink(test_list_file_name)
3302
class ForwardingResult(unittest.TestResult):
3304
def __init__(self, target):
3305
unittest.TestResult.__init__(self)
3306
self.result = target
3308
def startTest(self, test):
3309
self.result.startTest(test)
3311
def stopTest(self, test):
3312
self.result.stopTest(test)
3314
def startTestRun(self):
3315
self.result.startTestRun()
3317
def stopTestRun(self):
3318
self.result.stopTestRun()
3320
def addSkip(self, test, reason):
3321
self.result.addSkip(test, reason)
3323
def addSuccess(self, test):
3324
self.result.addSuccess(test)
3326
def addError(self, test, err):
3327
self.result.addError(test, err)
3329
def addFailure(self, test, err):
3330
self.result.addFailure(test, err)
3331
ForwardingResult = testtools.ExtendedToOriginalDecorator
3334
class ProfileResult(ForwardingResult):
3335
"""Generate profiling data for all activity between start and success.
3337
The profile data is appended to the test's _benchcalls attribute and can
3338
be accessed by the forwarded-to TestResult.
3340
While it might be cleaner do accumulate this in stopTest, addSuccess is
3341
where our existing output support for lsprof is, and this class aims to
3342
fit in with that: while it could be moved it's not necessary to accomplish
3343
test profiling, nor would it be dramatically cleaner.
3346
def startTest(self, test):
3347
self.profiler = bzrlib.lsprof.BzrProfiler()
3348
self.profiler.start()
3349
ForwardingResult.startTest(self, test)
3351
def addSuccess(self, test):
3352
stats = self.profiler.stop()
3354
calls = test._benchcalls
3355
except AttributeError:
3356
test._benchcalls = []
3357
calls = test._benchcalls
3358
calls.append(((test.id(), "", ""), stats))
3359
ForwardingResult.addSuccess(self, test)
3361
def stopTest(self, test):
3362
ForwardingResult.stopTest(self, test)
3363
self.profiler = None
2515
return result.wasSuccessful()
3366
2518
# Controlled by "bzr selftest -E=..." option
3367
# Currently supported:
3368
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3369
# preserves any flags supplied at the command line.
3370
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3371
# rather than failing tests. And no longer raise
3372
# LockContention when fctnl locks are not being used
3373
# with proper exclusion rules.
3374
2519
selftest_debug_flags = set()
3603
2728
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3606
def _test_suite_testmod_names():
3607
"""Return the standard list of test module names to test."""
3610
'bzrlib.tests.blackbox',
3611
'bzrlib.tests.commands',
3612
'bzrlib.tests.per_branch',
3613
'bzrlib.tests.per_bzrdir',
3614
'bzrlib.tests.per_bzrdir_colo',
3615
'bzrlib.tests.per_foreign_vcs',
3616
'bzrlib.tests.per_interrepository',
3617
'bzrlib.tests.per_intertree',
3618
'bzrlib.tests.per_inventory',
3619
'bzrlib.tests.per_interbranch',
3620
'bzrlib.tests.per_lock',
3621
'bzrlib.tests.per_merger',
3622
'bzrlib.tests.per_transport',
3623
'bzrlib.tests.per_tree',
3624
'bzrlib.tests.per_pack_repository',
3625
'bzrlib.tests.per_repository',
3626
'bzrlib.tests.per_repository_chk',
3627
'bzrlib.tests.per_repository_reference',
3628
'bzrlib.tests.per_uifactory',
3629
'bzrlib.tests.per_versionedfile',
3630
'bzrlib.tests.per_workingtree',
3631
'bzrlib.tests.test__annotator',
3632
'bzrlib.tests.test__bencode',
3633
'bzrlib.tests.test__chk_map',
3634
'bzrlib.tests.test__dirstate_helpers',
3635
'bzrlib.tests.test__groupcompress',
3636
'bzrlib.tests.test__known_graph',
3637
'bzrlib.tests.test__rio',
3638
'bzrlib.tests.test__simple_set',
3639
'bzrlib.tests.test__static_tuple',
3640
'bzrlib.tests.test__walkdirs_win32',
3641
'bzrlib.tests.test_ancestry',
3642
'bzrlib.tests.test_annotate',
3643
'bzrlib.tests.test_api',
3644
'bzrlib.tests.test_atomicfile',
3645
'bzrlib.tests.test_bad_files',
3646
'bzrlib.tests.test_bisect_multi',
3647
'bzrlib.tests.test_branch',
3648
'bzrlib.tests.test_branchbuilder',
3649
'bzrlib.tests.test_btree_index',
3650
'bzrlib.tests.test_bugtracker',
3651
'bzrlib.tests.test_bundle',
3652
'bzrlib.tests.test_bzrdir',
3653
'bzrlib.tests.test__chunks_to_lines',
3654
'bzrlib.tests.test_cache_utf8',
3655
'bzrlib.tests.test_chk_map',
3656
'bzrlib.tests.test_chk_serializer',
3657
'bzrlib.tests.test_chunk_writer',
3658
'bzrlib.tests.test_clean_tree',
3659
'bzrlib.tests.test_cleanup',
3660
'bzrlib.tests.test_cmdline',
3661
'bzrlib.tests.test_commands',
3662
'bzrlib.tests.test_commit',
3663
'bzrlib.tests.test_commit_merge',
3664
'bzrlib.tests.test_config',
3665
'bzrlib.tests.test_conflicts',
3666
'bzrlib.tests.test_counted_lock',
3667
'bzrlib.tests.test_crash',
3668
'bzrlib.tests.test_decorators',
3669
'bzrlib.tests.test_delta',
3670
'bzrlib.tests.test_debug',
3671
'bzrlib.tests.test_deprecated_graph',
3672
'bzrlib.tests.test_diff',
3673
'bzrlib.tests.test_directory_service',
3674
'bzrlib.tests.test_dirstate',
3675
'bzrlib.tests.test_email_message',
3676
'bzrlib.tests.test_eol_filters',
3677
'bzrlib.tests.test_errors',
3678
'bzrlib.tests.test_export',
3679
'bzrlib.tests.test_extract',
3680
'bzrlib.tests.test_fetch',
3681
'bzrlib.tests.test_fifo_cache',
3682
'bzrlib.tests.test_filters',
3683
'bzrlib.tests.test_ftp_transport',
3684
'bzrlib.tests.test_foreign',
3685
'bzrlib.tests.test_generate_docs',
3686
'bzrlib.tests.test_generate_ids',
3687
'bzrlib.tests.test_globbing',
3688
'bzrlib.tests.test_gpg',
3689
'bzrlib.tests.test_graph',
3690
'bzrlib.tests.test_groupcompress',
3691
'bzrlib.tests.test_hashcache',
3692
'bzrlib.tests.test_help',
3693
'bzrlib.tests.test_hooks',
3694
'bzrlib.tests.test_http',
3695
'bzrlib.tests.test_http_response',
3696
'bzrlib.tests.test_https_ca_bundle',
3697
'bzrlib.tests.test_identitymap',
3698
'bzrlib.tests.test_ignores',
3699
'bzrlib.tests.test_index',
3700
'bzrlib.tests.test_import_tariff',
3701
'bzrlib.tests.test_info',
3702
'bzrlib.tests.test_inv',
3703
'bzrlib.tests.test_inventory_delta',
3704
'bzrlib.tests.test_knit',
3705
'bzrlib.tests.test_lazy_import',
3706
'bzrlib.tests.test_lazy_regex',
3707
'bzrlib.tests.test_lock',
3708
'bzrlib.tests.test_lockable_files',
3709
'bzrlib.tests.test_lockdir',
3710
'bzrlib.tests.test_log',
3711
'bzrlib.tests.test_lru_cache',
3712
'bzrlib.tests.test_lsprof',
3713
'bzrlib.tests.test_mail_client',
3714
'bzrlib.tests.test_memorytree',
3715
'bzrlib.tests.test_merge',
3716
'bzrlib.tests.test_merge3',
3717
'bzrlib.tests.test_merge_core',
3718
'bzrlib.tests.test_merge_directive',
3719
'bzrlib.tests.test_missing',
3720
'bzrlib.tests.test_msgeditor',
3721
'bzrlib.tests.test_multiparent',
3722
'bzrlib.tests.test_mutabletree',
3723
'bzrlib.tests.test_nonascii',
3724
'bzrlib.tests.test_options',
3725
'bzrlib.tests.test_osutils',
3726
'bzrlib.tests.test_osutils_encodings',
3727
'bzrlib.tests.test_pack',
3728
'bzrlib.tests.test_patch',
3729
'bzrlib.tests.test_patches',
3730
'bzrlib.tests.test_permissions',
3731
'bzrlib.tests.test_plugins',
3732
'bzrlib.tests.test_progress',
3733
'bzrlib.tests.test_read_bundle',
3734
'bzrlib.tests.test_reconcile',
3735
'bzrlib.tests.test_reconfigure',
3736
'bzrlib.tests.test_registry',
3737
'bzrlib.tests.test_remote',
3738
'bzrlib.tests.test_rename_map',
3739
'bzrlib.tests.test_repository',
3740
'bzrlib.tests.test_revert',
3741
'bzrlib.tests.test_revision',
3742
'bzrlib.tests.test_revisionspec',
3743
'bzrlib.tests.test_revisiontree',
3744
'bzrlib.tests.test_rio',
3745
'bzrlib.tests.test_rules',
3746
'bzrlib.tests.test_sampler',
3747
'bzrlib.tests.test_script',
3748
'bzrlib.tests.test_selftest',
3749
'bzrlib.tests.test_serializer',
3750
'bzrlib.tests.test_setup',
3751
'bzrlib.tests.test_sftp_transport',
3752
'bzrlib.tests.test_shelf',
3753
'bzrlib.tests.test_shelf_ui',
3754
'bzrlib.tests.test_smart',
3755
'bzrlib.tests.test_smart_add',
3756
'bzrlib.tests.test_smart_request',
3757
'bzrlib.tests.test_smart_transport',
3758
'bzrlib.tests.test_smtp_connection',
3759
'bzrlib.tests.test_source',
3760
'bzrlib.tests.test_ssh_transport',
3761
'bzrlib.tests.test_status',
3762
'bzrlib.tests.test_store',
3763
'bzrlib.tests.test_strace',
3764
'bzrlib.tests.test_subsume',
3765
'bzrlib.tests.test_switch',
3766
'bzrlib.tests.test_symbol_versioning',
3767
'bzrlib.tests.test_tag',
3768
'bzrlib.tests.test_testament',
3769
'bzrlib.tests.test_textfile',
3770
'bzrlib.tests.test_textmerge',
3771
'bzrlib.tests.test_timestamp',
3772
'bzrlib.tests.test_trace',
3773
'bzrlib.tests.test_transactions',
3774
'bzrlib.tests.test_transform',
3775
'bzrlib.tests.test_transport',
3776
'bzrlib.tests.test_transport_log',
3777
'bzrlib.tests.test_tree',
3778
'bzrlib.tests.test_treebuilder',
3779
'bzrlib.tests.test_tsort',
3780
'bzrlib.tests.test_tuned_gzip',
3781
'bzrlib.tests.test_ui',
3782
'bzrlib.tests.test_uncommit',
3783
'bzrlib.tests.test_upgrade',
3784
'bzrlib.tests.test_upgrade_stacked',
3785
'bzrlib.tests.test_urlutils',
3786
'bzrlib.tests.test_version',
3787
'bzrlib.tests.test_version_info',
3788
'bzrlib.tests.test_weave',
3789
'bzrlib.tests.test_whitebox',
3790
'bzrlib.tests.test_win32utils',
3791
'bzrlib.tests.test_workingtree',
3792
'bzrlib.tests.test_workingtree_4',
3793
'bzrlib.tests.test_wsgi',
3794
'bzrlib.tests.test_xml',
3798
def _test_suite_modules_to_doctest():
3799
"""Return the list of modules to doctest."""
3802
'bzrlib.branchbuilder',
3803
'bzrlib.decorators',
3806
'bzrlib.iterablefile',
3810
'bzrlib.symbol_versioning',
3813
'bzrlib.version_info_formats.format_custom',
3817
2731
def test_suite(keep_only=None, starting_with=None):
3818
2732
"""Build and return TestSuite for the whole of bzrlib.
3825
2739
This function can be replaced if you need to change the default test
3826
2740
suite on a global basis, but it is not encouraged.
2744
'bzrlib.util.tests.test_bencode',
2745
'bzrlib.tests.blackbox',
2746
'bzrlib.tests.branch_implementations',
2747
'bzrlib.tests.bzrdir_implementations',
2748
'bzrlib.tests.commands',
2749
'bzrlib.tests.inventory_implementations',
2750
'bzrlib.tests.interrepository_implementations',
2751
'bzrlib.tests.intertree_implementations',
2752
'bzrlib.tests.per_lock',
2753
'bzrlib.tests.repository_implementations',
2754
'bzrlib.tests.test__dirstate_helpers',
2755
'bzrlib.tests.test_ancestry',
2756
'bzrlib.tests.test_annotate',
2757
'bzrlib.tests.test_api',
2758
'bzrlib.tests.test_atomicfile',
2759
'bzrlib.tests.test_bad_files',
2760
'bzrlib.tests.test_bisect_multi',
2761
'bzrlib.tests.test_branch',
2762
'bzrlib.tests.test_branchbuilder',
2763
'bzrlib.tests.test_btree_index',
2764
'bzrlib.tests.test_bugtracker',
2765
'bzrlib.tests.test_bundle',
2766
'bzrlib.tests.test_bzrdir',
2767
'bzrlib.tests.test_cache_utf8',
2768
'bzrlib.tests.test_chunk_writer',
2769
'bzrlib.tests.test_commands',
2770
'bzrlib.tests.test_commit',
2771
'bzrlib.tests.test_commit_merge',
2772
'bzrlib.tests.test_config',
2773
'bzrlib.tests.test_conflicts',
2774
'bzrlib.tests.test_counted_lock',
2775
'bzrlib.tests.test_decorators',
2776
'bzrlib.tests.test_delta',
2777
'bzrlib.tests.test_deprecated_graph',
2778
'bzrlib.tests.test_diff',
2779
'bzrlib.tests.test_dirstate',
2780
'bzrlib.tests.test_directory_service',
2781
'bzrlib.tests.test_email_message',
2782
'bzrlib.tests.test_errors',
2783
'bzrlib.tests.test_extract',
2784
'bzrlib.tests.test_fetch',
2785
'bzrlib.tests.test_ftp_transport',
2786
'bzrlib.tests.test_generate_docs',
2787
'bzrlib.tests.test_generate_ids',
2788
'bzrlib.tests.test_globbing',
2789
'bzrlib.tests.test_gpg',
2790
'bzrlib.tests.test_graph',
2791
'bzrlib.tests.test_hashcache',
2792
'bzrlib.tests.test_help',
2793
'bzrlib.tests.test_hooks',
2794
'bzrlib.tests.test_http',
2795
'bzrlib.tests.test_http_implementations',
2796
'bzrlib.tests.test_http_response',
2797
'bzrlib.tests.test_https_ca_bundle',
2798
'bzrlib.tests.test_identitymap',
2799
'bzrlib.tests.test_ignores',
2800
'bzrlib.tests.test_index',
2801
'bzrlib.tests.test_info',
2802
'bzrlib.tests.test_inv',
2803
'bzrlib.tests.test_knit',
2804
'bzrlib.tests.test_lazy_import',
2805
'bzrlib.tests.test_lazy_regex',
2806
'bzrlib.tests.test_lockdir',
2807
'bzrlib.tests.test_lockable_files',
2808
'bzrlib.tests.test_log',
2809
'bzrlib.tests.test_lsprof',
2810
'bzrlib.tests.test_lru_cache',
2811
'bzrlib.tests.test_mail_client',
2812
'bzrlib.tests.test_memorytree',
2813
'bzrlib.tests.test_merge',
2814
'bzrlib.tests.test_merge3',
2815
'bzrlib.tests.test_merge_core',
2816
'bzrlib.tests.test_merge_directive',
2817
'bzrlib.tests.test_missing',
2818
'bzrlib.tests.test_msgeditor',
2819
'bzrlib.tests.test_multiparent',
2820
'bzrlib.tests.test_mutabletree',
2821
'bzrlib.tests.test_nonascii',
2822
'bzrlib.tests.test_options',
2823
'bzrlib.tests.test_osutils',
2824
'bzrlib.tests.test_osutils_encodings',
2825
'bzrlib.tests.test_pack',
2826
'bzrlib.tests.test_pack_repository',
2827
'bzrlib.tests.test_patch',
2828
'bzrlib.tests.test_patches',
2829
'bzrlib.tests.test_permissions',
2830
'bzrlib.tests.test_plugins',
2831
'bzrlib.tests.test_progress',
2832
'bzrlib.tests.test_read_bundle',
2833
'bzrlib.tests.test_reconfigure',
2834
'bzrlib.tests.test_reconcile',
2835
'bzrlib.tests.test_registry',
2836
'bzrlib.tests.test_remote',
2837
'bzrlib.tests.test_repository',
2838
'bzrlib.tests.per_repository_reference',
2839
'bzrlib.tests.test_revert',
2840
'bzrlib.tests.test_revision',
2841
'bzrlib.tests.test_revisionspec',
2842
'bzrlib.tests.test_revisiontree',
2843
'bzrlib.tests.test_rio',
2844
'bzrlib.tests.test_rules',
2845
'bzrlib.tests.test_sampler',
2846
'bzrlib.tests.test_selftest',
2847
'bzrlib.tests.test_setup',
2848
'bzrlib.tests.test_sftp_transport',
2849
'bzrlib.tests.test_smart',
2850
'bzrlib.tests.test_smart_add',
2851
'bzrlib.tests.test_smart_transport',
2852
'bzrlib.tests.test_smtp_connection',
2853
'bzrlib.tests.test_source',
2854
'bzrlib.tests.test_ssh_transport',
2855
'bzrlib.tests.test_status',
2856
'bzrlib.tests.test_store',
2857
'bzrlib.tests.test_strace',
2858
'bzrlib.tests.test_subsume',
2859
'bzrlib.tests.test_switch',
2860
'bzrlib.tests.test_symbol_versioning',
2861
'bzrlib.tests.test_tag',
2862
'bzrlib.tests.test_testament',
2863
'bzrlib.tests.test_textfile',
2864
'bzrlib.tests.test_textmerge',
2865
'bzrlib.tests.test_timestamp',
2866
'bzrlib.tests.test_trace',
2867
'bzrlib.tests.test_transactions',
2868
'bzrlib.tests.test_transform',
2869
'bzrlib.tests.test_transport',
2870
'bzrlib.tests.test_transport_implementations',
2871
'bzrlib.tests.test_transport_log',
2872
'bzrlib.tests.test_tree',
2873
'bzrlib.tests.test_treebuilder',
2874
'bzrlib.tests.test_tsort',
2875
'bzrlib.tests.test_tuned_gzip',
2876
'bzrlib.tests.test_ui',
2877
'bzrlib.tests.test_uncommit',
2878
'bzrlib.tests.test_upgrade',
2879
'bzrlib.tests.test_upgrade_stacked',
2880
'bzrlib.tests.test_urlutils',
2881
'bzrlib.tests.test_versionedfile',
2882
'bzrlib.tests.test_version',
2883
'bzrlib.tests.test_version_info',
2884
'bzrlib.tests.test__walkdirs_win32',
2885
'bzrlib.tests.test_weave',
2886
'bzrlib.tests.test_whitebox',
2887
'bzrlib.tests.test_win32utils',
2888
'bzrlib.tests.test_workingtree',
2889
'bzrlib.tests.test_workingtree_4',
2890
'bzrlib.tests.test_wsgi',
2891
'bzrlib.tests.test_xml',
2892
'bzrlib.tests.tree_implementations',
2893
'bzrlib.tests.workingtree_implementations',
3829
2896
loader = TestUtil.TestLoader()
3831
if keep_only is not None:
3832
id_filter = TestIdList(keep_only)
3833
2898
if starting_with:
2899
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2900
for start in starting_with]
3834
2901
# We take precedence over keep_only because *at loading time* using
3835
2902
# both options means we will load less tests for the same final result.
3836
2903
def interesting_module(name):
3932
3062
for right_name, right_dict in scenarios_right]
3935
def multiply_tests(tests, scenarios, result):
3936
"""Multiply tests_list by scenarios into result.
3938
This is the core workhorse for test parameterisation.
3940
Typically the load_tests() method for a per-implementation test suite will
3941
call multiply_tests and return the result.
3943
:param tests: The tests to parameterise.
3944
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3945
scenario_param_dict).
3946
:param result: A TestSuite to add created tests to.
3948
This returns the passed in result TestSuite with the cross product of all
3949
the tests repeated once for each scenario. Each test is adapted by adding
3950
the scenario name at the end of its id(), and updating the test object's
3951
__dict__ with the scenario_param_dict.
3953
>>> import bzrlib.tests.test_sampler
3954
>>> r = multiply_tests(
3955
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3956
... [('one', dict(param=1)),
3957
... ('two', dict(param=2))],
3959
>>> tests = list(iter_suite_tests(r))
3963
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3969
for test in iter_suite_tests(tests):
3970
apply_scenarios(test, scenarios, result)
3974
def apply_scenarios(test, scenarios, result):
3975
"""Apply the scenarios in scenarios to test and add to result.
3977
:param test: The test to apply scenarios to.
3978
:param scenarios: An iterable of scenarios to apply to test.
3980
:seealso: apply_scenario
3982
for scenario in scenarios:
3983
result.addTest(apply_scenario(test, scenario))
3987
def apply_scenario(test, scenario):
3988
"""Copy test and apply scenario to it.
3990
:param test: A test to adapt.
3991
:param scenario: A tuple describing the scenarion.
3992
The first element of the tuple is the new test id.
3993
The second element is a dict containing attributes to set on the
3995
:return: The adapted test.
3997
new_id = "%s(%s)" % (test.id(), scenario[0])
3998
new_test = clone_test(test, new_id)
3999
for name, value in scenario[1].items():
4000
setattr(new_test, name, value)
4004
def clone_test(test, new_id):
4005
"""Clone a test giving it a new id.
4007
:param test: The test to clone.
4008
:param new_id: The id to assign to it.
4009
:return: The new test.
4011
new_test = copy(test)
4012
new_test.id = lambda: new_id
4016
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4018
"""Helper for permutating tests against an extension module.
4020
This is meant to be used inside a modules 'load_tests()' function. It will
4021
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4022
against both implementations. Setting 'test.module' to the appropriate
4023
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4025
:param standard_tests: A test suite to permute
4026
:param loader: A TestLoader
4027
:param py_module_name: The python path to a python module that can always
4028
be loaded, and will be considered the 'python' implementation. (eg
4029
'bzrlib._chk_map_py')
4030
:param ext_module_name: The python path to an extension module. If the
4031
module cannot be loaded, a single test will be added, which notes that
4032
the module is not available. If it can be loaded, all standard_tests
4033
will be run against that module.
4034
:return: (suite, feature) suite is a test-suite that has all the permuted
4035
tests. feature is the Feature object that can be used to determine if
4036
the module is available.
4039
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4041
('python', {'module': py_module}),
4043
suite = loader.suiteClass()
4044
feature = ModuleAvailableFeature(ext_module_name)
4045
if feature.available():
4046
scenarios.append(('C', {'module': feature.module}))
4048
# the compiled module isn't available, so we add a failing test
4049
class FailWithoutFeature(TestCase):
4050
def test_fail(self):
4051
self.requireFeature(feature)
4052
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4053
result = multiply_tests(standard_tests, scenarios, suite)
4054
return result, feature
4057
def _rmtree_temp_dir(dirname, test_id=None):
3066
def adapt_modules(mods_list, adapter, loader, suite):
3067
"""Adapt the modules in mods_list using adapter and add to suite."""
3068
tests = loader.loadTestsFromModuleNames(mods_list)
3069
adapt_tests(tests, adapter, suite)
3072
def adapt_tests(tests_list, adapter, suite):
3073
"""Adapt the tests in tests_list using adapter and add to suite."""
3074
for test in iter_suite_tests(tests_list):
3075
suite.addTests(adapter.adapt(test))
3078
def _rmtree_temp_dir(dirname):
4058
3079
# If LANG=C we probably have created some bogus paths
4059
3080
# which rmtree(unicode) will fail to delete
4060
3081
# so make sure we are using rmtree(str) to delete everything
4164
3183
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4167
class _CompatabilityThunkFeature(Feature):
4168
"""This feature is just a thunk to another feature.
4170
It issues a deprecation warning if it is accessed, to let you know that you
4171
should really use a different feature.
4174
def __init__(self, dep_version, module, name,
4175
replacement_name, replacement_module=None):
4176
super(_CompatabilityThunkFeature, self).__init__()
4177
self._module = module
4178
if replacement_module is None:
4179
replacement_module = module
4180
self._replacement_module = replacement_module
4182
self._replacement_name = replacement_name
4183
self._dep_version = dep_version
4184
self._feature = None
4187
if self._feature is None:
4188
depr_msg = self._dep_version % ('%s.%s'
4189
% (self._module, self._name))
4190
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4191
self._replacement_name)
4192
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4193
# Import the new feature and use it as a replacement for the
4195
mod = __import__(self._replacement_module, {}, {},
4196
[self._replacement_name])
4197
self._feature = getattr(mod, self._replacement_name)
4201
return self._feature._probe()
4204
class ModuleAvailableFeature(Feature):
4205
"""This is a feature than describes a module we want to be available.
4207
Declare the name of the module in __init__(), and then after probing, the
4208
module will be available as 'self.module'.
4210
:ivar module: The module if it is available, else None.
4213
def __init__(self, module_name):
4214
super(ModuleAvailableFeature, self).__init__()
4215
self.module_name = module_name
4219
self._module = __import__(self.module_name, {}, {}, [''])
4226
if self.available(): # Make sure the probe has been done
4230
def feature_name(self):
4231
return self.module_name
4234
# This is kept here for compatibility, it is recommended to use
4235
# 'bzrlib.tests.feature.paramiko' instead
4236
ParamikoFeature = _CompatabilityThunkFeature(
4237
deprecated_in((2,1,0)),
4238
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
3186
class TestScenarioApplier(object):
3187
"""A tool to apply scenarios to tests."""
3189
def adapt(self, test):
3190
"""Return a TestSuite containing a copy of test for each scenario."""
3191
result = unittest.TestSuite()
3192
for scenario in self.scenarios:
3193
result.addTest(self.adapt_test_to_scenario(test, scenario))
3196
def adapt_test_to_scenario(self, test, scenario):
3197
"""Copy test and apply scenario to it.
3199
:param test: A test to adapt.
3200
:param scenario: A tuple describing the scenarion.
3201
The first element of the tuple is the new test id.
3202
The second element is a dict containing attributes to set on the
3204
:return: The adapted test.
3206
from copy import deepcopy
3207
new_test = deepcopy(test)
3208
for name, value in scenario[1].items():
3209
setattr(new_test, name, value)
3210
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3211
new_test.id = lambda: new_id
4241
3215
def probe_unicode_in_user_encoding():