29
31
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
54
# nb: check this before importing anything else from within it
55
_testtools_version = getattr(testtools, '__version__', ())
56
if _testtools_version < (0, 9, 5):
57
raise ImportError("need at least testtools 0.9.5: %s is %r"
58
% (testtools.__file__, _testtools_version))
59
from testtools import content
62
51
from bzrlib import (
66
commands as _mod_commands,
76
plugin as _mod_plugin,
83
transport as _mod_transport,
63
import bzrlib.commands
64
import bzrlib.timestamp
66
import bzrlib.inventory
67
import bzrlib.iterablefile
87
70
import bzrlib.lsprof
88
71
except ImportError:
89
72
# lsprof not available
91
from bzrlib.smart import client, request
92
from bzrlib.transport import (
74
from bzrlib.merge import merge_inner
78
from bzrlib import symbol_versioning
96
79
from bzrlib.symbol_versioning import (
97
81
deprecated_function,
100
from bzrlib.tests import (
106
from bzrlib.ui import NullProgressView
107
from bzrlib.ui.text import TextUIFactory
108
from bzrlib.tests.features import _CompatabilityThunkFeature
86
from bzrlib.transport import get_transport
87
import bzrlib.transport
88
from bzrlib.transport.local import LocalURLServer
89
from bzrlib.transport.memory import MemoryServer
90
from bzrlib.transport.readonly import ReadonlyServer
91
from bzrlib.trace import mutter, note
92
from bzrlib.tests import TestUtil
93
from bzrlib.tests.http_server import HttpServer
94
from bzrlib.tests.TestUtil import (
98
from bzrlib.tests.treeshape import build_tree_contents
99
import bzrlib.version_info_formats.format_custom
100
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
110
102
# Mark this python module as being part of the implementation
111
103
# of unittest: this gives us better tracebacks where the last
112
104
# shown frame is the test code, not our assertXYZ.
115
default_transport = test_server.LocalURLServer
118
_unitialized_attr = object()
119
"""A sentinel needed to act as a default value in a method signature."""
122
# Subunit result codes, defined here to prevent a hard dependency on subunit.
126
# These are intentionally brought into this namespace. That way plugins, etc
127
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
128
TestSuite = TestUtil.TestSuite
129
TestLoader = TestUtil.TestLoader
131
# Tests should run in a clean and clearly defined environment. The goal is to
132
# keep them isolated from the running environment as mush as possible. The test
133
# framework ensures the variables defined below are set (or deleted if the
134
# value is None) before a test is run and reset to their original value after
135
# the test is run. Generally if some code depends on an environment variable,
136
# the tests should start without this variable in the environment. There are a
137
# few exceptions but you shouldn't violate this rule lightly.
141
'XDG_CONFIG_HOME': None,
142
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
143
# tests do check our impls match APPDATA
144
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
148
'BZREMAIL': None, # may still be present in the environment
149
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
150
'BZR_PROGRESS_BAR': None,
151
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
152
# as a base class instead of TestCaseInTempDir. Tests inheriting from
153
# TestCase should not use disk resources, BZR_LOG is one.
154
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
155
'BZR_PLUGIN_PATH': None,
156
'BZR_DISABLE_PLUGINS': None,
157
'BZR_PLUGINS_AT': None,
158
'BZR_CONCURRENCY': None,
159
# Make sure that any text ui tests are consistent regardless of
160
# the environment the test case is run in; you may want tests that
161
# test other combinations. 'dumb' is a reasonable guess for tests
162
# going to a pipe or a StringIO.
168
'SSH_AUTH_SOCK': None,
178
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
179
# least. If you do (care), please update this comment
183
'BZR_REMOTE_PATH': None,
184
# Generally speaking, we don't want apport reporting on crashes in
185
# the test envirnoment unless we're specifically testing apport,
186
# so that it doesn't leak into the real system environment. We
187
# use an env var so it propagates to subprocesses.
188
'APPORT_DISABLE': '1',
192
def override_os_environ(test, env=None):
193
"""Modify os.environ keeping a copy.
195
:param test: A test instance
197
:param env: A dict containing variable definitions to be installed
200
env = isolated_environ
201
test._original_os_environ = dict([(var, value)
202
for var, value in os.environ.iteritems()])
203
for var, value in env.iteritems():
204
osutils.set_or_unset_env(var, value)
205
if var not in test._original_os_environ:
206
# The var is new, add it with a value of None, so
207
# restore_os_environ will delete it
208
test._original_os_environ[var] = None
211
def restore_os_environ(test):
212
"""Restore os.environ to its original state.
214
:param test: A test instance previously passed to override_os_environ.
216
for var, value in test._original_os_environ.iteritems():
217
# Restore the original value (or delete it if the value has been set to
218
# None in override_os_environ).
219
osutils.set_or_unset_env(var, value)
222
def _clear__type_equality_funcs(test):
223
"""Cleanup bound methods stored on TestCase instances
225
Clear the dict breaking a few (mostly) harmless cycles in the affected
226
unittests released with Python 2.6 and initial Python 2.7 versions.
228
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
229
shipped in Oneiric, an object with no clear method was used, hence the
230
extra complications, see bug 809048 for details.
232
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
233
if type_equality_funcs is not None:
234
tef_clear = getattr(type_equality_funcs, "clear", None)
235
if tef_clear is None:
236
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
237
if tef_instance_dict is not None:
238
tef_clear = tef_instance_dict.clear
239
if tef_clear is not None:
243
class ExtendedTestResult(testtools.TextTestResult):
107
default_transport = LocalURLServer
110
class ExtendedTestResult(unittest._TextTestResult):
244
111
"""Accepts, reports and accumulates the results of running tests.
246
113
Compared to the unittest version this class adds support for
293
160
self.unsupported = {}
295
162
self._overall_start_time = time.time()
296
self._strict = strict
297
self._first_thread_leaker_id = None
298
self._tests_leaking_threads_count = 0
299
self._traceback_from_test = None
301
def stopTestRun(self):
304
stopTime = time.time()
305
timeTaken = stopTime - self.startTime
306
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
307
# the parent class method is similar have to duplicate
308
self._show_list('ERROR', self.errors)
309
self._show_list('FAIL', self.failures)
310
self.stream.write(self.sep2)
311
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
312
run, run != 1 and "s" or "", timeTaken))
313
if not self.wasSuccessful():
314
self.stream.write("FAILED (")
315
failed, errored = map(len, (self.failures, self.errors))
317
self.stream.write("failures=%d" % failed)
319
if failed: self.stream.write(", ")
320
self.stream.write("errors=%d" % errored)
321
if self.known_failure_count:
322
if failed or errored: self.stream.write(", ")
323
self.stream.write("known_failure_count=%d" %
324
self.known_failure_count)
325
self.stream.write(")\n")
327
if self.known_failure_count:
328
self.stream.write("OK (known_failures=%d)\n" %
329
self.known_failure_count)
331
self.stream.write("OK\n")
332
if self.skip_count > 0:
333
skipped = self.skip_count
334
self.stream.write('%d test%s skipped\n' %
335
(skipped, skipped != 1 and "s" or ""))
337
for feature, count in sorted(self.unsupported.items()):
338
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
341
ok = self.wasStrictlySuccessful()
343
ok = self.wasSuccessful()
344
if self._first_thread_leaker_id:
346
'%s is leaking threads among %d leaking tests.\n' % (
347
self._first_thread_leaker_id,
348
self._tests_leaking_threads_count))
349
# We don't report the main thread as an active one.
351
'%d non-main threads were left active in the end.\n'
352
% (len(self._active_threads) - 1))
354
def getDescription(self, test):
357
def _extractBenchmarkTime(self, testCase, details=None):
164
def _extractBenchmarkTime(self, testCase):
358
165
"""Add a benchmark time for the current test case."""
359
if details and 'benchtime' in details:
360
return float(''.join(details['benchtime'].iter_bytes()))
361
166
return getattr(testCase, "_benchtime", None)
363
def _delta_to_float(self, a_timedelta, precision):
364
# This calls ceiling to ensure that the most pessimistic view of time
365
# taken is shown (rather than leaving it to the Python %f operator
366
# to decide whether to round/floor/ceiling. This was added when we
367
# had pyp3 test failures that suggest a floor was happening.
368
shift = 10 ** precision
369
return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
370
a_timedelta.microseconds / 1000000.0) * shift) / shift
372
168
def _elapsedTestTimeString(self):
373
169
"""Return a time string for the overall time the current test has taken."""
374
return self._formatTime(self._delta_to_float(
375
self._now() - self._start_datetime, 3))
170
return self._formatTime(time.time() - self._start_time)
377
172
def _testTimeString(self, testCase):
378
173
benchmark_time = self._extractBenchmarkTime(testCase)
379
174
if benchmark_time is not None:
380
return self._formatTime(benchmark_time) + "*"
176
self._formatTime(benchmark_time),
177
self._elapsedTestTimeString())
382
return self._elapsedTestTimeString()
179
return " %s" % self._elapsedTestTimeString()
384
181
def _formatTime(self, seconds):
385
182
"""Format seconds as milliseconds with leading spaces."""
390
187
def _shortened_test_description(self, test):
392
what = re.sub(r'^bzrlib\.tests\.', '', what)
189
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
395
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
396
# multiple times in a row, because the handler is added for
397
# each test but the container list is shared between cases.
398
# See lp:498869 lp:625574 and lp:637725 for background.
399
def _record_traceback_from_test(self, exc_info):
400
"""Store the traceback from passed exc_info tuple till"""
401
self._traceback_from_test = exc_info[2]
403
192
def startTest(self, test):
404
super(ExtendedTestResult, self).startTest(test)
193
unittest.TestResult.startTest(self, test)
408
194
self.report_test_start(test)
409
195
test.number = self.count
410
196
self._recordTestStartTime()
411
# Make testtools cases give us the real traceback on failure
412
addOnException = getattr(test, "addOnException", None)
413
if addOnException is not None:
414
addOnException(self._record_traceback_from_test)
415
# Only check for thread leaks on bzrlib derived test cases
416
if isinstance(test, TestCase):
417
test.addCleanup(self._check_leaked_threads, test)
419
def stopTest(self, test):
420
super(ExtendedTestResult, self).stopTest(test)
421
# Manually break cycles, means touching various private things but hey
422
getDetails = getattr(test, "getDetails", None)
423
if getDetails is not None:
425
_clear__type_equality_funcs(test)
426
self._traceback_from_test = None
428
def startTests(self):
429
self.report_tests_starting()
430
self._active_threads = threading.enumerate()
432
def _check_leaked_threads(self, test):
433
"""See if any threads have leaked since last call
435
A sample of live threads is stored in the _active_threads attribute,
436
when this method runs it compares the current live threads and any not
437
in the previous sample are treated as having leaked.
439
now_active_threads = set(threading.enumerate())
440
threads_leaked = now_active_threads.difference(self._active_threads)
442
self._report_thread_leak(test, threads_leaked, now_active_threads)
443
self._tests_leaking_threads_count += 1
444
if self._first_thread_leaker_id is None:
445
self._first_thread_leaker_id = test.id()
446
self._active_threads = now_active_threads
448
198
def _recordTestStartTime(self):
449
199
"""Record that a test has started."""
450
self._start_datetime = self._now()
200
self._start_time = time.time()
202
def _cleanupLogFile(self, test):
203
# We can only do this if we have one of our TestCases, not if
205
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
206
if setKeepLogfile is not None:
452
209
def addError(self, test, err):
453
210
"""Tell result that test finished with an error.
468
231
Called from the TestCase run() method when the test
469
232
fails because e.g. an assert() method failed.
471
self._post_mortem(self._traceback_from_test)
472
super(ExtendedTestResult, self).addFailure(test, err)
473
self.failure_count += 1
474
self.report_failure(test, err)
234
self._testConcluded(test)
235
if isinstance(err[1], KnownFailure):
236
return self._addKnownFailure(test, err)
238
self._cleanupLogFile(test)
239
unittest.TestResult.addFailure(self, test, err)
240
self.failure_count += 1
241
self.report_failure(test, err)
478
def addSuccess(self, test, details=None):
245
def addSuccess(self, test):
479
246
"""Tell result that test completed successfully.
481
248
Called from the TestCase run()
250
self._testConcluded(test)
483
251
if self._bench_history is not None:
484
benchmark_time = self._extractBenchmarkTime(test, details)
252
benchmark_time = self._extractBenchmarkTime(test)
485
253
if benchmark_time is not None:
486
254
self._bench_history.write("%s %s\n" % (
487
255
self._formatTime(benchmark_time),
489
257
self.report_success(test)
490
super(ExtendedTestResult, self).addSuccess(test)
258
self._cleanupLogFile(test)
259
unittest.TestResult.addSuccess(self, test)
491
260
test._log_contents = ''
493
def addExpectedFailure(self, test, err):
262
def _testConcluded(self, test):
263
"""Common code when a test has finished.
265
Called regardless of whether it succeded, failed, etc.
269
def _addKnownFailure(self, test, err):
494
270
self.known_failure_count += 1
495
271
self.report_known_failure(test, err)
497
def addUnexpectedSuccess(self, test, details=None):
498
"""Tell result the test unexpectedly passed, counting as a failure
500
When the minimum version of testtools required becomes 0.9.8 this
501
can be updated to use the new handling there.
503
super(ExtendedTestResult, self).addFailure(test, details=details)
504
self.failure_count += 1
505
self.report_unexpected_success(test,
506
"".join(details["reason"].iter_text()))
510
273
def addNotSupported(self, test, feature):
511
274
"""The test will not be run because of a missing feature.
513
276
# this can be called in two different ways: it may be that the
514
# test started running, and then raised (through requireFeature)
277
# test started running, and then raised (through addError)
515
278
# UnavailableFeature. Alternatively this method can be called
516
# while probing for features before running the test code proper; in
517
# that case we will see startTest and stopTest, but the test will
518
# never actually run.
279
# while probing for features before running the tests; in that
280
# case we will see startTest and stopTest, but the test will never
519
282
self.unsupported.setdefault(str(feature), 0)
520
283
self.unsupported[str(feature)] += 1
521
284
self.report_unsupported(test, feature)
523
def addSkip(self, test, reason):
524
"""A test has not run for 'reason'."""
526
self.report_skip(test, reason)
528
def addNotApplicable(self, test, reason):
529
self.not_applicable_count += 1
530
self.report_not_applicable(test, reason)
532
def _count_stored_tests(self):
533
"""Count of tests instances kept alive due to not succeeding"""
534
return self.error_count + self.failure_count + self.known_failure_count
536
def _post_mortem(self, tb=None):
537
"""Start a PDB post mortem session."""
538
if os.environ.get('BZR_TEST_PDB', None):
542
def progress(self, offset, whence):
543
"""The test is adjusting the count of tests to run."""
544
if whence == SUBUNIT_SEEK_SET:
545
self.num_tests = offset
546
elif whence == SUBUNIT_SEEK_CUR:
547
self.num_tests += offset
549
raise errors.BzrError("Unknown whence %r" % whence)
551
def report_tests_starting(self):
552
"""Display information before the test run begins"""
553
if getattr(sys, 'frozen', None) is None:
554
bzr_path = osutils.realpath(sys.argv[0])
556
bzr_path = sys.executable
558
'bzr selftest: %s\n' % (bzr_path,))
561
bzrlib.__path__[0],))
563
' bzr-%s python-%s %s\n' % (
564
bzrlib.version_string,
565
bzrlib._format_version_tuple(sys.version_info),
566
platform.platform(aliased=1),
568
self.stream.write('\n')
570
def report_test_start(self, test):
571
"""Display information on the test just about to be run"""
573
def _report_thread_leak(self, test, leaked_threads, active_threads):
574
"""Display information on a test that leaked one or more threads"""
575
# GZ 2010-09-09: A leak summary reported separately from the general
576
# thread debugging would be nice. Tests under subunit
577
# need something not using stream, perhaps adding a
578
# testtools details object would be fitting.
579
if 'threads' in selftest_debug_flags:
580
self.stream.write('%s is leaking, active is now %d\n' %
581
(test.id(), len(active_threads)))
583
def startTestRun(self):
584
self.startTime = time.time()
286
def _addSkipped(self, test, skip_excinfo):
287
if isinstance(skip_excinfo[1], TestNotApplicable):
288
self.not_applicable_count += 1
289
self.report_not_applicable(test, skip_excinfo)
292
self.report_skip(test, skip_excinfo)
295
except KeyboardInterrupt:
298
self.addError(test, test._exc_info())
300
# seems best to treat this as success from point-of-view of unittest
301
# -- it actually does nothing so it barely matters :)
302
unittest.TestResult.addSuccess(self, test)
303
test._log_contents = ''
305
def printErrorList(self, flavour, errors):
306
for test, err in errors:
307
self.stream.writeln(self.separator1)
308
self.stream.write("%s: " % flavour)
309
self.stream.writeln(self.getDescription(test))
310
if getattr(test, '_get_log', None) is not None:
311
self.stream.write('\n')
313
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
314
self.stream.write('\n')
315
self.stream.write(test._get_log())
316
self.stream.write('\n')
318
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
319
self.stream.write('\n')
320
self.stream.writeln(self.separator2)
321
self.stream.writeln("%s" % err)
326
def report_cleaning_up(self):
586
329
def report_success(self, test):
778
508
bench_history=None,
780
result_decorators=None,
782
"""Create a TextTestRunner.
784
:param result_decorators: An optional list of decorators to apply
785
to the result object being used by the runner. Decorators are
786
applied left to right - the first element in the list is the
789
# stream may know claim to know to write unicode strings, but in older
790
# pythons this goes sufficiently wrong that it is a bad idea. (
791
# specifically a built in file with encoding 'UTF-8' will still try
792
# to encode using ascii.
793
new_encoding = osutils.get_terminal_encoding()
794
codec = codecs.lookup(new_encoding)
795
if type(codec) is tuple:
799
encode = codec.encode
800
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
801
# so should swap to the plain codecs.StreamWriter
802
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
804
stream.encoding = new_encoding
511
self.stream = unittest._WritelnDecorator(stream)
806
512
self.descriptions = descriptions
807
513
self.verbosity = verbosity
808
514
self._bench_history = bench_history
809
self._strict = strict
810
self._result_decorators = result_decorators or []
515
self.list_only = list_only
812
517
def run(self, test):
813
518
"Run the given test case or test suite."
519
startTime = time.time()
814
520
if self.verbosity == 1:
815
521
result_class = TextTestResult
816
522
elif self.verbosity >= 2:
817
523
result_class = VerboseTestResult
818
original_result = result_class(self.stream,
524
result = result_class(self.stream,
819
525
self.descriptions,
821
527
bench_history=self._bench_history,
528
num_tests=test.countTestCases(),
824
# Signal to result objects that look at stop early policy to stop,
825
original_result.stop_early = self.stop_on_failure
826
result = original_result
827
for decorator in self._result_decorators:
828
result = decorator(result)
829
result.stop_early = self.stop_on_failure
830
result.startTestRun()
530
result.stop_early = self.stop_on_failure
531
result.report_starting()
533
if self.verbosity >= 2:
534
self.stream.writeln("Listing tests only ...\n")
536
for t in iter_suite_tests(test):
537
self.stream.writeln("%s" % (t.id()))
539
actionTaken = "Listed"
835
# higher level code uses our extended protocol to determine
836
# what exit code to give.
837
return original_result
542
run = result.testsRun
544
stopTime = time.time()
545
timeTaken = stopTime - startTime
547
self.stream.writeln(result.separator2)
548
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
549
run, run != 1 and "s" or "", timeTaken))
550
self.stream.writeln()
551
if not result.wasSuccessful():
552
self.stream.write("FAILED (")
553
failed, errored = map(len, (result.failures, result.errors))
555
self.stream.write("failures=%d" % failed)
557
if failed: self.stream.write(", ")
558
self.stream.write("errors=%d" % errored)
559
if result.known_failure_count:
560
if failed or errored: self.stream.write(", ")
561
self.stream.write("known_failure_count=%d" %
562
result.known_failure_count)
563
self.stream.writeln(")")
565
if result.known_failure_count:
566
self.stream.writeln("OK (known_failures=%d)" %
567
result.known_failure_count)
569
self.stream.writeln("OK")
570
if result.skip_count > 0:
571
skipped = result.skip_count
572
self.stream.writeln('%d test%s skipped' %
573
(skipped, skipped != 1 and "s" or ""))
574
if result.unsupported:
575
for feature, count in sorted(result.unsupported.items()):
576
self.stream.writeln("Missing feature '%s' skipped %d tests." %
840
582
def iter_suite_tests(suite):
841
583
"""Return all tests in a suite, recursing through nested suites"""
842
if isinstance(suite, unittest.TestCase):
844
elif isinstance(suite, unittest.TestSuite):
584
for item in suite._tests:
585
if isinstance(item, unittest.TestCase):
587
elif isinstance(item, unittest.TestSuite):
846
588
for r in iter_suite_tests(item):
849
raise Exception('unknown type %r for object %r'
850
% (type(suite), suite))
853
TestSkipped = testtools.testcase.TestSkipped
591
raise Exception('unknown object %r inside test suite %r'
595
class TestSkipped(Exception):
596
"""Indicates that a test was intentionally skipped, rather than failing."""
856
599
class TestNotApplicable(TestSkipped):
857
600
"""A test is not applicable to the situation where it was run.
859
This is only normally raised by parameterized tests, if they find that
860
the instance they're constructed upon does not support one aspect
602
This is only normally raised by parameterized tests, if they find that
603
the instance they're constructed upon does not support one aspect
861
604
of its interface.
865
# traceback._some_str fails to format exceptions that have the default
866
# __str__ which does an implicit ascii conversion. However, repr() on those
867
# objects works, for all that its not quite what the doctor may have ordered.
868
def _clever_some_str(value):
873
return repr(value).replace('\\n', '\n')
875
return '<unprintable %s object>' % type(value).__name__
877
traceback._some_str = _clever_some_str
880
# deprecated - use self.knownFailure(), or self.expectFailure.
881
KnownFailure = testtools.testcase._ExpectedFailure
608
class KnownFailure(AssertionError):
609
"""Indicates that a test failed in a precisely expected manner.
611
Such failures dont block the whole test suite from passing because they are
612
indicators of partially completed code or of future work. We have an
613
explicit error for them so that we can ensure that they are always visible:
614
KnownFailures are always shown in the output of bzr selftest.
884
618
class UnavailableFeature(Exception):
885
619
"""A feature required for this test was not available.
887
This can be considered a specialised form of SkippedTest.
889
621
The feature should be used to construct the exception.
625
class CommandFailed(Exception):
893
629
class StringIOWrapper(object):
894
630
"""A wrapper around cStringIO which just adds an encoding attribute.
896
632
Internally we can check sys.stdout to see what the output encoding
897
633
should be. However, cStringIO has no encoding attribute that we can
898
634
set. So we wrap it instead.
986
734
retrieved by _get_log(). We use a real OS file, not an in-memory object,
987
735
so that it can also capture file IO. When the test completes this file
988
736
is read into memory and removed from disk.
990
738
There are also convenience functions to invoke bzr's command-line
991
739
routine, and to build and check bzr trees.
993
741
In addition to the usual method of overriding tearDown(), this class also
994
allows subclasses to register cleanup functions via addCleanup, which are
742
allows subclasses to register functions into the _cleanups list, which is
995
743
run in order as the object is torn down. It's less likely this will be
996
744
accidentally overlooked.
747
_active_threads = None
748
_leaking_threads_tests = 0
749
_first_thread_leaker_id = None
750
_log_file_name = None
752
_keep_log_file = False
1000
753
# record lsprof data when performing benchmark calls.
1001
754
_gather_lsprof_in_benchmarks = False
755
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
756
'_log_contents', '_log_file_name', '_benchtime',
757
'_TestCase__testMethodName')
1003
759
def __init__(self, methodName='testMethod'):
1004
760
super(TestCase, self).__init__(methodName)
1005
self._directory_isolation = True
1006
self.exception_handlers.insert(0,
1007
(UnavailableFeature, self._do_unsupported_or_skip))
1008
self.exception_handlers.insert(0,
1009
(TestNotApplicable, self._do_not_applicable))
1011
763
def setUp(self):
1012
super(TestCase, self).setUp()
1014
# At this point we're still accessing the config files in $BZR_HOME (as
1015
# set by the user running selftest).
1016
timeout = config.GlobalStack().get('selftest.timeout')
1018
timeout_fixture = fixtures.TimeoutFixture(timeout)
1019
timeout_fixture.setUp()
1020
self.addCleanup(timeout_fixture.cleanUp)
1022
for feature in getattr(self, '_test_needs_features', []):
1023
self.requireFeature(feature)
764
unittest.TestCase.setUp(self)
1024
765
self._cleanEnvironment()
1026
if bzrlib.global_state is not None:
1027
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1028
config.CommandLineStore())
1030
766
self._silenceUI()
1031
767
self._startLogFile()
1032
768
self._benchcalls = []
1033
769
self._benchtime = None
1034
770
self._clear_hooks()
1035
self._track_transports()
1037
771
self._clear_debug_flags()
1038
# Isolate global verbosity level, to make sure it's reproducible
1039
# between tests. We should get rid of this altogether: bug 656694. --
1041
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1042
self._log_files = set()
1043
# Each key in the ``_counters`` dict holds a value for a different
1044
# counter. When the test ends, addDetail() should be used to output the
1045
# counter values. This happens in install_counter_hook().
1047
if 'config_stats' in selftest_debug_flags:
1048
self._install_config_stats_hooks()
1049
# Do not use i18n for tests (unless the test reverses this)
1055
# The sys preserved stdin/stdout should allow blackbox tests debugging
1056
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1057
).set_trace(sys._getframe().f_back)
1059
def discardDetail(self, name):
1060
"""Extend the addDetail, getDetails api so we can remove a detail.
1062
eg. bzr always adds the 'log' detail at startup, but we don't want to
1063
include it for skipped, xfail, etc tests.
1065
It is safe to call this for a detail that doesn't exist, in case this
1066
gets called multiple times.
1068
# We cheat. details is stored in __details which means we shouldn't
1069
# touch it. but getDetails() returns the dict directly, so we can
1071
details = self.getDetails()
1075
def install_counter_hook(self, hooks, name, counter_name=None):
1076
"""Install a counting hook.
1078
Any hook can be counted as long as it doesn't need to return a value.
1080
:param hooks: Where the hook should be installed.
1082
:param name: The hook name that will be counted.
1084
:param counter_name: The counter identifier in ``_counters``, defaults
1087
_counters = self._counters # Avoid closing over self
1088
if counter_name is None:
1090
if _counters.has_key(counter_name):
1091
raise AssertionError('%s is already used as a counter name'
1093
_counters[counter_name] = 0
1094
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1095
lambda: ['%d' % (_counters[counter_name],)]))
1096
def increment_counter(*args, **kwargs):
1097
_counters[counter_name] += 1
1098
label = 'count %s calls' % (counter_name,)
1099
hooks.install_named_hook(name, increment_counter, label)
1100
self.addCleanup(hooks.uninstall_named_hook, name, label)
1102
def _install_config_stats_hooks(self):
1103
"""Install config hooks to count hook calls.
1106
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1107
self.install_counter_hook(config.ConfigHooks, hook_name,
1108
'config.%s' % (hook_name,))
1110
# The OldConfigHooks are private and need special handling to protect
1111
# against recursive tests (tests that run other tests), so we just do
1112
# manually what registering them into _builtin_known_hooks will provide
1114
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1115
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1116
self.install_counter_hook(config.OldConfigHooks, hook_name,
1117
'old_config.%s' % (hook_name,))
772
TestCase._active_threads = threading.activeCount()
773
self.addCleanup(self._check_leaked_threads)
775
def _check_leaked_threads(self):
776
active = threading.activeCount()
777
leaked_threads = active - TestCase._active_threads
778
TestCase._active_threads = active
780
TestCase._leaking_threads_tests += 1
781
if TestCase._first_thread_leaker_id is None:
782
TestCase._first_thread_leaker_id = self.id()
783
# we're not specifically told when all tests are finished.
784
# This will do. We use a function to avoid keeping a reference
785
# to a TestCase object.
786
atexit.register(_report_leaked_threads)
1119
788
def _clear_debug_flags(self):
1120
789
"""Prevent externally set debug flags affecting tests.
1122
791
Tests that want to use debug flags can just set them in the
1123
792
debug_flags set during setup/teardown.
1125
# Start with a copy of the current debug flags we can safely modify.
1126
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
1127
794
if 'allow_debug' not in selftest_debug_flags:
795
self._preserved_debug_flags = set(debug.debug_flags)
1128
796
debug.debug_flags.clear()
1129
if 'disable_lock_checks' not in selftest_debug_flags:
1130
debug.debug_flags.add('strict_locks')
797
self.addCleanup(self._restore_debug_flags)
1132
799
def _clear_hooks(self):
1133
800
# prevent hooks affecting tests
1134
known_hooks = hooks.known_hooks
1135
self._preserved_hooks = {}
1136
for key, (parent, name) in known_hooks.iter_parent_objects():
1137
current_hooks = getattr(parent, name)
1138
self._preserved_hooks[parent] = (name, current_hooks)
1139
self._preserved_lazy_hooks = hooks._lazy_hooks
1140
hooks._lazy_hooks = {}
802
import bzrlib.smart.server
803
self._preserved_hooks = {
804
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
805
bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
806
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
1141
808
self.addCleanup(self._restoreHooks)
1142
for key, (parent, name) in known_hooks.iter_parent_objects():
1143
factory = known_hooks.get(key)
1144
setattr(parent, name, factory())
1145
# this hook should always be installed
1146
request._install_hook()
1148
def disable_directory_isolation(self):
1149
"""Turn off directory isolation checks."""
1150
self._directory_isolation = False
1152
def enable_directory_isolation(self):
1153
"""Enable directory isolation checks."""
1154
self._directory_isolation = True
809
# reset all hooks to an empty instance of the appropriate type
810
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
811
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
1156
813
def _silenceUI(self):
1157
814
"""Turn off UI for duration of test"""
1158
815
# by default the UI is off; tests can turn it on if they want it.
1159
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1161
def _check_locks(self):
1162
"""Check that all lock take/release actions have been paired."""
1163
# We always check for mismatched locks. If a mismatch is found, we
1164
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1165
# case we just print a warning.
1167
acquired_locks = [lock for action, lock in self._lock_actions
1168
if action == 'acquired']
1169
released_locks = [lock for action, lock in self._lock_actions
1170
if action == 'released']
1171
broken_locks = [lock for action, lock in self._lock_actions
1172
if action == 'broken']
1173
# trivially, given the tests for lock acquistion and release, if we
1174
# have as many in each list, it should be ok. Some lock tests also
1175
# break some locks on purpose and should be taken into account by
1176
# considering that breaking a lock is just a dirty way of releasing it.
1177
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1179
'Different number of acquired and '
1180
'released or broken locks.\n'
1184
(acquired_locks, released_locks, broken_locks))
1185
if not self._lock_check_thorough:
1186
# Rather than fail, just warn
1187
print "Broken test %s: %s" % (self, message)
1191
def _track_locks(self):
1192
"""Track lock activity during tests."""
1193
self._lock_actions = []
1194
if 'disable_lock_checks' in selftest_debug_flags:
1195
self._lock_check_thorough = False
1197
self._lock_check_thorough = True
1199
self.addCleanup(self._check_locks)
1200
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1201
self._lock_acquired, None)
1202
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1203
self._lock_released, None)
1204
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1205
self._lock_broken, None)
1207
def _lock_acquired(self, result):
1208
self._lock_actions.append(('acquired', result))
1210
def _lock_released(self, result):
1211
self._lock_actions.append(('released', result))
1213
def _lock_broken(self, result):
1214
self._lock_actions.append(('broken', result))
1216
def permit_dir(self, name):
1217
"""Permit a directory to be used by this test. See permit_url."""
1218
name_transport = _mod_transport.get_transport_from_path(name)
1219
self.permit_url(name)
1220
self.permit_url(name_transport.base)
1222
def permit_url(self, url):
1223
"""Declare that url is an ok url to use in this test.
1225
Do this for memory transports, temporary test directory etc.
1227
Do not do this for the current working directory, /tmp, or any other
1228
preexisting non isolated url.
1230
if not url.endswith('/'):
1232
self._bzr_selftest_roots.append(url)
1234
def permit_source_tree_branch_repo(self):
1235
"""Permit the source tree bzr is running from to be opened.
1237
Some code such as bzrlib.version attempts to read from the bzr branch
1238
that bzr is executing from (if any). This method permits that directory
1239
to be used in the test suite.
1241
path = self.get_source_path()
1242
self.record_directory_isolation()
1245
workingtree.WorkingTree.open(path)
1246
except (errors.NotBranchError, errors.NoWorkingTree):
1247
raise TestSkipped('Needs a working tree of bzr sources')
1249
self.enable_directory_isolation()
1251
def _preopen_isolate_transport(self, transport):
1252
"""Check that all transport openings are done in the test work area."""
1253
while isinstance(transport, pathfilter.PathFilteringTransport):
1254
# Unwrap pathfiltered transports
1255
transport = transport.server.backing_transport.clone(
1256
transport._filter('.'))
1257
url = transport.base
1258
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1259
# urls it is given by prepending readonly+. This is appropriate as the
1260
# client shouldn't know that the server is readonly (or not readonly).
1261
# We could register all servers twice, with readonly+ prepending, but
1262
# that makes for a long list; this is about the same but easier to
1264
if url.startswith('readonly+'):
1265
url = url[len('readonly+'):]
1266
self._preopen_isolate_url(url)
1268
def _preopen_isolate_url(self, url):
1269
if not self._directory_isolation:
1271
if self._directory_isolation == 'record':
1272
self._bzr_selftest_roots.append(url)
1274
# This prevents all transports, including e.g. sftp ones backed on disk
1275
# from working unless they are explicitly granted permission. We then
1276
# depend on the code that sets up test transports to check that they are
1277
# appropriately isolated and enable their use by calling
1278
# self.permit_transport()
1279
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1280
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1281
% (url, self._bzr_selftest_roots))
1283
def record_directory_isolation(self):
1284
"""Gather accessed directories to permit later access.
1286
This is used for tests that access the branch bzr is running from.
1288
self._directory_isolation = "record"
1290
def start_server(self, transport_server, backing_server=None):
1291
"""Start transport_server for this test.
1293
This starts the server, registers a cleanup for it and permits the
1294
server's urls to be used.
1296
if backing_server is None:
1297
transport_server.start_server()
1299
transport_server.start_server(backing_server)
1300
self.addCleanup(transport_server.stop_server)
1301
# Obtain a real transport because if the server supplies a password, it
1302
# will be hidden from the base on the client side.
1303
t = _mod_transport.get_transport_from_url(transport_server.get_url())
1304
# Some transport servers effectively chroot the backing transport;
1305
# others like SFTPServer don't - users of the transport can walk up the
1306
# transport to read the entire backing transport. This wouldn't matter
1307
# except that the workdir tests are given - and that they expect the
1308
# server's url to point at - is one directory under the safety net. So
1309
# Branch operations into the transport will attempt to walk up one
1310
# directory. Chrooting all servers would avoid this but also mean that
1311
# we wouldn't be testing directly against non-root urls. Alternatively
1312
# getting the test framework to start the server with a backing server
1313
# at the actual safety net directory would work too, but this then
1314
# means that the self.get_url/self.get_transport methods would need
1315
# to transform all their results. On balance its cleaner to handle it
1316
# here, and permit a higher url when we have one of these transports.
1317
if t.base.endswith('/work/'):
1318
# we have safety net/test root/work
1319
t = t.clone('../..')
1320
elif isinstance(transport_server,
1321
test_server.SmartTCPServer_for_testing):
1322
# The smart server adds a path similar to work, which is traversed
1323
# up from by the client. But the server is chrooted - the actual
1324
# backing transport is not escaped from, and VFS requests to the
1325
# root will error (because they try to escape the chroot).
1327
while t2.base != t.base:
1330
self.permit_url(t.base)
1332
def _track_transports(self):
1333
"""Install checks for transport usage."""
1334
# TestCase has no safe place it can write to.
1335
self._bzr_selftest_roots = []
1336
# Currently the easiest way to be sure that nothing is going on is to
1337
# hook into bzr dir opening. This leaves a small window of error for
1338
# transport tests, but they are well known, and we can improve on this
1340
controldir.ControlDir.hooks.install_named_hook("pre_open",
1341
self._preopen_isolate_transport, "Check bzr directories are safe.")
816
saved = ui.ui_factory
818
ui.ui_factory = saved
819
ui.ui_factory = ui.SilentUIFactory()
820
self.addCleanup(_restore)
1343
822
def _ndiff_strings(self, a, b):
1344
823
"""Return ndiff between two strings containing lines.
1346
825
A trailing newline is added if missing to make the strings
1347
826
print properly."""
1348
827
if b and b[-1] != '\n':
1362
841
except UnicodeError, e:
1363
842
# If we can't compare without getting a UnicodeError, then
1364
843
# obviously they are different
1365
trace.mutter('UnicodeError: %s', e)
844
mutter('UnicodeError: %s', e)
1368
847
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1370
pprint.pformat(a), pprint.pformat(b)))
849
pformat(a), pformat(b)))
1372
# FIXME: This is deprecated in unittest2 but plugins may still use it so we
1373
# need a deprecation period for them -- vila 2016-02-01
1374
851
assertEquals = assertEqual
1376
853
def assertEqualDiff(self, a, b, message=None):
1377
854
"""Assert two texts are equal, if not raise an exception.
1379
This is intended for use with multi-line strings where it can
856
This is intended for use with multi-line strings where it can
1380
857
be hard to find the differences by eye.
1382
# TODO: perhaps override assertEqual to call this for strings?
859
# TODO: perhaps override assertEquals to call this for strings?
1385
862
if message is None:
1386
863
message = "texts not equal:\n"
865
message = 'first string is missing a final newline.\n'
1387
866
if a + '\n' == b:
1388
message = 'first string is missing a final newline.\n'
1390
867
message = 'second string is missing a final newline.\n'
1391
868
raise AssertionError(message +
1392
869
self._ndiff_strings(a, b))
1394
871
def assertEqualMode(self, mode, mode_test):
1395
872
self.assertEqual(mode, mode_test,
1396
873
'mode mismatch %o != %o' % (mode, mode_test))
1398
def assertEqualStat(self, expected, actual):
1399
"""assert that expected and actual are the same stat result.
1401
:param expected: A stat result.
1402
:param actual: A stat result.
1403
:raises AssertionError: If the expected and actual stat values differ
1404
other than by atime.
1406
self.assertEqual(expected.st_size, actual.st_size,
1407
'st_size did not match')
1408
self.assertEqual(expected.st_mtime, actual.st_mtime,
1409
'st_mtime did not match')
1410
self.assertEqual(expected.st_ctime, actual.st_ctime,
1411
'st_ctime did not match')
1412
if sys.platform == 'win32':
1413
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1414
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1415
# odd. We just force it to always be 0 to avoid any problems.
1416
self.assertEqual(0, expected.st_dev)
1417
self.assertEqual(0, actual.st_dev)
1418
self.assertEqual(0, expected.st_ino)
1419
self.assertEqual(0, actual.st_ino)
1421
self.assertEqual(expected.st_dev, actual.st_dev,
1422
'st_dev did not match')
1423
self.assertEqual(expected.st_ino, actual.st_ino,
1424
'st_ino did not match')
1425
self.assertEqual(expected.st_mode, actual.st_mode,
1426
'st_mode did not match')
1428
def assertLength(self, length, obj_with_len):
1429
"""Assert that obj_with_len is of length length."""
1430
if len(obj_with_len) != length:
1431
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1432
length, len(obj_with_len), obj_with_len))
1434
def assertLogsError(self, exception_class, func, *args, **kwargs):
1435
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1438
orig_log_exception_quietly = trace.log_exception_quietly
1441
orig_log_exception_quietly()
1442
captured.append(sys.exc_info()[1])
1443
trace.log_exception_quietly = capture
1444
func(*args, **kwargs)
1446
trace.log_exception_quietly = orig_log_exception_quietly
1447
self.assertLength(1, captured)
1449
self.assertIsInstance(err, exception_class)
1452
875
def assertPositive(self, val):
1453
876
"""Assert that val is greater than 0."""
1454
877
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1745
1169
def _startLogFile(self):
1746
"""Setup a in-memory target for bzr and testcase log messages"""
1747
pseudo_log_file = StringIO()
1748
def _get_log_contents_for_weird_testtools_api():
1749
return [pseudo_log_file.getvalue().decode(
1750
"utf-8", "replace").encode("utf-8")]
1751
self.addDetail("log", content.Content(content.ContentType("text",
1752
"plain", {"charset": "utf8"}),
1753
_get_log_contents_for_weird_testtools_api))
1754
self._log_file = pseudo_log_file
1755
self._log_memento = trace.push_log_file(self._log_file)
1170
"""Send bzr and test log messages to a temporary file.
1172
The file is removed as the test is torn down.
1174
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1175
self._log_file = os.fdopen(fileno, 'w+')
1176
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1177
self._log_file_name = name
1756
1178
self.addCleanup(self._finishLogFile)
1758
1180
def _finishLogFile(self):
1759
"""Flush and dereference the in-memory log for this testcase"""
1760
if trace._trace_file:
1761
# flush the log file, to get all content
1762
trace._trace_file.flush()
1763
trace.pop_log_file(self._log_memento)
1764
# The logging module now tracks references for cleanup so discard ours
1765
del self._log_memento
1767
def thisFailsStrictLockCheck(self):
1768
"""It is known that this test would fail with -Dstrict_locks.
1770
By default, all tests are run with strict lock checking unless
1771
-Edisable_lock_checks is supplied. However there are some tests which
1772
we know fail strict locks at this point that have not been fixed.
1773
They should call this function to disable the strict checking.
1775
This should be used sparingly, it is much better to fix the locking
1776
issues rather than papering over the problem by calling this function.
1778
debug.debug_flags.discard('strict_locks')
1780
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1781
"""Overrides an object attribute restoring it after the test.
1783
:note: This should be used with discretion; you should think about
1784
whether it's better to make the code testable without monkey-patching.
1786
:param obj: The object that will be mutated.
1788
:param attr_name: The attribute name we want to preserve/override in
1791
:param new: The optional value we want to set the attribute to.
1793
:returns: The actual attr value.
1795
# The actual value is captured by the call below
1796
value = getattr(obj, attr_name, _unitialized_attr)
1797
if value is _unitialized_attr:
1798
# When the test completes, the attribute should not exist, but if
1799
# we aren't setting a value, we don't need to do anything.
1800
if new is not _unitialized_attr:
1801
self.addCleanup(delattr, obj, attr_name)
1803
self.addCleanup(setattr, obj, attr_name, value)
1804
if new is not _unitialized_attr:
1805
setattr(obj, attr_name, new)
1808
def overrideEnv(self, name, new):
1809
"""Set an environment variable, and reset it after the test.
1811
:param name: The environment variable name.
1813
:param new: The value to set the variable to. If None, the
1814
variable is deleted from the environment.
1816
:returns: The actual variable value.
1818
value = osutils.set_or_unset_env(name, new)
1819
self.addCleanup(osutils.set_or_unset_env, name, value)
1822
def recordCalls(self, obj, attr_name):
1823
"""Monkeypatch in a wrapper that will record calls.
1825
The monkeypatch is automatically removed when the test concludes.
1827
:param obj: The namespace holding the reference to be replaced;
1828
typically a module, class, or object.
1829
:param attr_name: A string for the name of the attribute to
1831
:returns: A list that will be extended with one item every time the
1832
function is called, with a tuple of (args, kwargs).
1836
def decorator(*args, **kwargs):
1837
calls.append((args, kwargs))
1838
return orig(*args, **kwargs)
1839
orig = self.overrideAttr(obj, attr_name, decorator)
1181
"""Finished with the log file.
1183
Close the file and delete it, unless setKeepLogfile was called.
1185
if self._log_file is None:
1187
bzrlib.trace.pop_log_file(self._log_memento)
1188
self._log_file.close()
1189
self._log_file = None
1190
if not self._keep_log_file:
1191
os.remove(self._log_file_name)
1192
self._log_file_name = None
1194
def setKeepLogfile(self):
1195
"""Make the logfile not be deleted when _finishLogFile is called."""
1196
self._keep_log_file = True
1198
def addCleanup(self, callable):
1199
"""Arrange to run a callable when this case is torn down.
1201
Callables are run in the reverse of the order they are registered,
1202
ie last-in first-out.
1204
if callable in self._cleanups:
1205
raise ValueError("cleanup function %r already registered on %s"
1207
self._cleanups.append(callable)
1842
1209
def _cleanEnvironment(self):
1843
for name, value in isolated_environ.iteritems():
1844
self.overrideEnv(name, value)
1211
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1212
'HOME': os.getcwd(),
1213
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1214
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1216
'BZREMAIL': None, # may still be present in the environment
1218
'BZR_PROGRESS_BAR': None,
1221
'SSH_AUTH_SOCK': None,
1225
'https_proxy': None,
1226
'HTTPS_PROXY': None,
1231
# Nobody cares about these ones AFAIK. So far at
1232
# least. If you do (care), please update this comment
1236
'BZR_REMOTE_PATH': None,
1239
self.addCleanup(self._restoreEnvironment)
1240
for name, value in new_env.iteritems():
1241
self._captureVar(name, value)
1243
def _captureVar(self, name, newvalue):
1244
"""Set an environment variable, and reset it when finished."""
1245
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1247
def _restore_debug_flags(self):
1248
debug.debug_flags.clear()
1249
debug.debug_flags.update(self._preserved_debug_flags)
1251
def _restoreEnvironment(self):
1252
for name, value in self.__old_env.iteritems():
1253
osutils.set_or_unset_env(name, value)
1846
1255
def _restoreHooks(self):
1847
for klass, (name, hooks) in self._preserved_hooks.items():
1848
setattr(klass, name, hooks)
1849
self._preserved_hooks.clear()
1850
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1851
self._preserved_lazy_hooks.clear()
1256
for klass, hooks in self._preserved_hooks.items():
1257
setattr(klass, 'hooks', hooks)
1853
1259
def knownFailure(self, reason):
1854
"""Declare that this test fails for a known reason
1856
Tests that are known to fail should generally be using expectedFailure
1857
with an appropriate reverse assertion if a change could cause the test
1858
to start passing. Conversely if the test has no immediate prospect of
1859
succeeding then using skip is more suitable.
1861
When this method is called while an exception is being handled, that
1862
traceback will be used, otherwise a new exception will be thrown to
1863
provide one but won't be reported.
1865
self._add_reason(reason)
1260
"""This test has failed for some known reason."""
1261
raise KnownFailure(reason)
1263
def run(self, result=None):
1264
if result is None: result = self.defaultTestResult()
1265
for feature in getattr(self, '_test_needs_features', []):
1266
if not feature.available():
1267
result.startTest(self)
1268
if getattr(result, 'addNotSupported', None):
1269
result.addNotSupported(self, feature)
1271
result.addSuccess(self)
1272
result.stopTest(self)
1867
exc_info = sys.exc_info()
1868
if exc_info != (None, None, None):
1869
self._report_traceback(exc_info)
1872
raise self.failureException(reason)
1873
except self.failureException:
1874
exc_info = sys.exc_info()
1875
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1876
raise testtools.testcase._ExpectedFailure(exc_info)
1275
return unittest.TestCase.run(self, result)
1880
def _suppress_log(self):
1881
"""Remove the log info from details."""
1882
self.discardDetail('log')
1884
def _do_skip(self, result, reason):
1885
self._suppress_log()
1886
addSkip = getattr(result, 'addSkip', None)
1887
if not callable(addSkip):
1888
result.addSuccess(result)
1890
addSkip(self, reason)
1893
def _do_known_failure(self, result, e):
1894
self._suppress_log()
1895
err = sys.exc_info()
1896
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1897
if addExpectedFailure is not None:
1898
addExpectedFailure(self, err)
1900
result.addSuccess(self)
1903
def _do_not_applicable(self, result, e):
1905
reason = 'No reason given'
1908
self._suppress_log ()
1909
addNotApplicable = getattr(result, 'addNotApplicable', None)
1910
if addNotApplicable is not None:
1911
result.addNotApplicable(self, reason)
1913
self._do_skip(result, reason)
1916
def _report_skip(self, result, err):
1917
"""Override the default _report_skip.
1919
We want to strip the 'log' detail. If we waint until _do_skip, it has
1920
already been formatted into the 'reason' string, and we can't pull it
1923
self._suppress_log()
1924
super(TestCase, self)._report_skip(self, result, err)
1927
def _report_expected_failure(self, result, err):
1930
See _report_skip for motivation.
1932
self._suppress_log()
1933
super(TestCase, self)._report_expected_failure(self, result, err)
1936
def _do_unsupported_or_skip(self, result, e):
1938
self._suppress_log()
1939
addNotSupported = getattr(result, 'addNotSupported', None)
1940
if addNotSupported is not None:
1941
result.addNotSupported(self, reason)
1943
self._do_skip(result, reason)
1278
absent_attr = object()
1279
for attr_name in self.attrs_to_keep:
1280
attr = getattr(self, attr_name, absent_attr)
1281
if attr is not absent_attr:
1282
saved_attrs[attr_name] = attr
1283
self.__dict__ = saved_attrs
1287
unittest.TestCase.tearDown(self)
1945
1289
def time(self, callable, *args, **kwargs):
1946
1290
"""Run callable and accrue the time it takes to the benchmark time.
1948
1292
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1949
1293
this will cause lsprofile statistics to be gathered and stored in
1950
1294
self._benchcalls.
1952
1296
if self._benchtime is None:
1953
self.addDetail('benchtime', content.Content(content.ContentType(
1954
"text", "plain"), lambda:[str(self._benchtime)]))
1955
1297
self._benchtime = 0
1956
1298
start = time.time()
2690
1941
# specifically told when all tests are finished. This will do.
2691
1942
atexit.register(_rmtree_temp_dir, root)
2693
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2694
1944
self.addCleanup(self._check_safety_net)
2696
1946
def makeAndChdirToTestDir(self):
2697
1947
"""Create a temporary directories for this one test.
2699
1949
This must set self.test_home_dir and self.test_dir and chdir to
2702
1952
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2704
1954
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2705
1955
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2706
1956
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2707
self.permit_dir(self.test_dir)
2709
def make_branch(self, relpath, format=None, name=None):
1958
def make_branch(self, relpath, format=None):
2710
1959
"""Create a branch on the transport at relpath."""
2711
1960
repo = self.make_repository(relpath, format=format)
2712
return repo.bzrdir.create_branch(append_revisions_only=False,
2715
def get_default_format(self):
2718
def resolve_format(self, format):
2719
"""Resolve an object to a ControlDir format object.
2721
The initial format object can either already be
2722
a ControlDirFormat, None (for the default format),
2723
or a string with the name of the control dir format.
2725
:param format: Object to resolve
2726
:return A ControlDirFormat instance
2729
format = self.get_default_format()
2730
if isinstance(format, basestring):
2731
format = controldir.format_registry.make_bzrdir(format)
1961
return repo.bzrdir.create_branch()
2734
1963
def make_bzrdir(self, relpath, format=None):
2736
1965
# might be a relative or absolute path
2737
1966
maybe_a_url = self.get_url(relpath)
2738
1967
segments = maybe_a_url.rsplit('/', 1)
2739
t = _mod_transport.get_transport(maybe_a_url)
1968
t = get_transport(maybe_a_url)
2740
1969
if len(segments) > 1 and segments[-1] not in ('', '.'):
2741
1970
t.ensure_base()
2742
format = self.resolve_format(format)
1973
if isinstance(format, basestring):
1974
format = bzrdir.format_registry.make_bzrdir(format)
2743
1975
return format.initialize_on_transport(t)
2744
1976
except errors.UninitializableFormat:
2745
1977
raise TestSkipped("Format %s is not initializable." % format)
2747
def make_repository(self, relpath, shared=None, format=None):
1979
def make_repository(self, relpath, shared=False, format=None):
2748
1980
"""Create a repository on our default transport at relpath.
2750
1982
Note that relpath must be a relative path, not a full url.
2752
1984
# FIXME: If you create a remoterepository this returns the underlying
2753
# real format, which is incorrect. Actually we should make sure that
1985
# real format, which is incorrect. Actually we should make sure that
2754
1986
# RemoteBzrDir returns a RemoteRepository.
2755
1987
# maybe mbp 20070410
2756
1988
made_control = self.make_bzrdir(relpath, format=format)
2757
1989
return made_control.create_repository(shared=shared)
2759
def make_smart_server(self, path, backing_server=None):
2760
if backing_server is None:
2761
backing_server = self.get_server()
2762
smart_server = test_server.SmartTCPServer_for_testing()
2763
self.start_server(smart_server, backing_server)
2764
remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2766
return remote_transport
2768
1991
def make_branch_and_memory_tree(self, relpath, format=None):
2769
1992
"""Create a branch on the default transport and a MemoryTree for it."""
2770
1993
b = self.make_branch(relpath, format=format)
2771
1994
return memorytree.MemoryTree.create_on_branch(b)
2773
def make_branch_builder(self, relpath, format=None):
2774
branch = self.make_branch(relpath, format=format)
2775
return branchbuilder.BranchBuilder(branch=branch)
2777
1996
def overrideEnvironmentForTesting(self):
2778
test_home_dir = self.test_home_dir
2779
if isinstance(test_home_dir, unicode):
2780
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2781
self.overrideEnv('HOME', test_home_dir)
2782
self.overrideEnv('BZR_HOME', test_home_dir)
2784
def setup_smart_server_with_call_log(self):
2785
"""Sets up a smart server as the transport server with a call log."""
2786
self.transport_server = test_server.SmartTCPServer_for_testing
2787
self.hpss_connections = []
2788
self.hpss_calls = []
2790
# Skip the current stack down to the caller of
2791
# setup_smart_server_with_call_log
2792
prefix_length = len(traceback.extract_stack()) - 2
2793
def capture_hpss_call(params):
2794
self.hpss_calls.append(
2795
CapturedCall(params, prefix_length))
2796
def capture_connect(transport):
2797
self.hpss_connections.append(transport)
2798
client._SmartClient.hooks.install_named_hook(
2799
'call', capture_hpss_call, None)
2800
_mod_transport.Transport.hooks.install_named_hook(
2801
'post_connect', capture_connect, None)
2803
def reset_smart_call_log(self):
2804
self.hpss_calls = []
2805
self.hpss_connections = []
1997
os.environ['HOME'] = self.test_home_dir
1998
os.environ['BZR_HOME'] = self.test_home_dir
2001
super(TestCaseWithMemoryTransport, self).setUp()
2002
self._make_test_root()
2003
_currentdir = os.getcwdu()
2004
def _leaveDirectory():
2005
os.chdir(_currentdir)
2006
self.addCleanup(_leaveDirectory)
2007
self.makeAndChdirToTestDir()
2008
self.overrideEnvironmentForTesting()
2009
self.__readonly_server = None
2010
self.__server = None
2011
self.reduceLockdirTimeout()
2808
2014
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2809
2015
"""Derived class that runs a test within a temporary directory.
3289
2437
list_only=False,
3290
2438
random_seed=None,
3291
2439
exclude_pattern=None,
3294
suite_decorators=None,
3296
result_decorators=None,
3298
"""Run a test suite for bzr selftest.
3300
:param runner_class: The class of runner to use. Must support the
3301
constructor arguments passed by run_suite which are more than standard
3303
:return: A boolean indicating success.
3305
2441
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
3310
if runner_class is None:
3311
runner_class = TextTestRunner
3314
runner = runner_class(stream=stream,
2446
runner = TextTestRunner(stream=sys.stdout,
3315
2447
descriptions=0,
3316
2448
verbosity=verbosity,
3317
2449
bench_history=bench_history,
3319
result_decorators=result_decorators,
2450
list_only=list_only,
3321
2452
runner.stop_on_failure=stop_on_failure
3322
if isinstance(suite, unittest.TestSuite):
3323
# Empty out _tests list of passed suite and populate new TestSuite
3324
suite._tests[:], suite = [], TestSuite(suite)
3325
# built in decorator factories:
3327
random_order(random_seed, runner),
3328
exclude_tests(exclude_pattern),
3330
if matching_tests_first:
3331
decorators.append(tests_first(pattern))
2453
# Initialise the random number generator and display the seed used.
2454
# We convert the seed to a long to make it reuseable across invocations.
2455
random_order = False
2456
if random_seed is not None:
2458
if random_seed == "now":
2459
random_seed = long(time.time())
2461
# Convert the seed to a long if we can
2463
random_seed = long(random_seed)
2466
runner.stream.writeln("Randomizing test order using seed %s\n" %
2468
random.seed(random_seed)
2469
# Customise the list of tests if requested
2470
if exclude_pattern is not None:
2471
suite = exclude_tests_by_re(suite, exclude_pattern)
2473
order_changer = randomize_suite
3333
decorators.append(filter_tests(pattern))
3334
if suite_decorators:
3335
decorators.extend(suite_decorators)
3336
# tell the result object how many tests will be running: (except if
3337
# --parallel=fork is being used. Robert said he will provide a better
3338
# progress design later -- vila 20090817)
3339
if fork_decorator not in decorators:
3340
decorators.append(CountingDecorator)
3341
for decorator in decorators:
3342
suite = decorator(suite)
3344
# Done after test suite decoration to allow randomisation etc
3345
# to take effect, though that is of marginal benefit.
3347
stream.write("Listing tests only ...\n")
3348
for t in iter_suite_tests(suite):
3349
stream.write("%s\n" % (t.id()))
2475
order_changer = preserve_input
2476
if pattern != '.*' or random_order:
2477
if matching_tests_first:
2478
suites = map(order_changer, split_suite_by_re(suite, pattern))
2479
suite = TestUtil.TestSuite(suites)
2481
suite = order_changer(filter_suite_by_re(suite, pattern))
3351
2483
result = runner.run(suite)
3353
2486
return result.wasStrictlySuccessful()
3355
return result.wasSuccessful()
3358
# A registry where get() returns a suite decorator.
3359
parallel_registry = registry.Registry()
3362
def fork_decorator(suite):
3363
if getattr(os, "fork", None) is None:
3364
raise errors.BzrCommandError("platform does not support fork,"
3365
" try --parallel=subprocess instead.")
3366
concurrency = osutils.local_concurrency()
3367
if concurrency == 1:
3369
from testtools import ConcurrentTestSuite
3370
return ConcurrentTestSuite(suite, fork_for_tests)
3371
parallel_registry.register('fork', fork_decorator)
3374
def subprocess_decorator(suite):
3375
concurrency = osutils.local_concurrency()
3376
if concurrency == 1:
3378
from testtools import ConcurrentTestSuite
3379
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3380
parallel_registry.register('subprocess', subprocess_decorator)
3383
def exclude_tests(exclude_pattern):
3384
"""Return a test suite decorator that excludes tests."""
3385
if exclude_pattern is None:
3386
return identity_decorator
3387
def decorator(suite):
3388
return ExcludeDecorator(suite, exclude_pattern)
3392
def filter_tests(pattern):
3394
return identity_decorator
3395
def decorator(suite):
3396
return FilterTestsDecorator(suite, pattern)
3400
def random_order(random_seed, runner):
3401
"""Return a test suite decorator factory for randomising tests order.
3403
:param random_seed: now, a string which casts to a long, or a long.
3404
:param runner: A test runner with a stream attribute to report on.
3406
if random_seed is None:
3407
return identity_decorator
3408
def decorator(suite):
3409
return RandomDecorator(suite, random_seed, runner.stream)
3413
def tests_first(pattern):
3415
return identity_decorator
3416
def decorator(suite):
3417
return TestFirstDecorator(suite, pattern)
3421
def identity_decorator(suite):
3426
class TestDecorator(TestUtil.TestSuite):
3427
"""A decorator for TestCase/TestSuite objects.
3429
Contains rather than flattening suite passed on construction
3432
def __init__(self, suite=None):
3433
super(TestDecorator, self).__init__()
3434
if suite is not None:
3437
# Don't need subclass run method with suite emptying
3438
run = unittest.TestSuite.run
3441
class CountingDecorator(TestDecorator):
3442
"""A decorator which calls result.progress(self.countTestCases)."""
3444
def run(self, result):
3445
progress_method = getattr(result, 'progress', None)
3446
if callable(progress_method):
3447
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3448
return super(CountingDecorator, self).run(result)
3451
class ExcludeDecorator(TestDecorator):
3452
"""A decorator which excludes test matching an exclude pattern."""
3454
def __init__(self, suite, exclude_pattern):
3455
super(ExcludeDecorator, self).__init__(
3456
exclude_tests_by_re(suite, exclude_pattern))
3459
class FilterTestsDecorator(TestDecorator):
3460
"""A decorator which filters tests to those matching a pattern."""
3462
def __init__(self, suite, pattern):
3463
super(FilterTestsDecorator, self).__init__(
3464
filter_suite_by_re(suite, pattern))
3467
class RandomDecorator(TestDecorator):
3468
"""A decorator which randomises the order of its tests."""
3470
def __init__(self, suite, random_seed, stream):
3471
random_seed = self.actual_seed(random_seed)
3472
stream.write("Randomizing test order using seed %s\n\n" %
3474
# Initialise the random number generator.
3475
random.seed(random_seed)
3476
super(RandomDecorator, self).__init__(randomize_suite(suite))
3479
def actual_seed(seed):
3481
# We convert the seed to a long to make it reuseable across
3482
# invocations (because the user can reenter it).
3483
return long(time.time())
3485
# Convert the seed to a long if we can
3488
except (TypeError, ValueError):
3493
class TestFirstDecorator(TestDecorator):
3494
"""A decorator which moves named tests to the front."""
3496
def __init__(self, suite, pattern):
3497
super(TestFirstDecorator, self).__init__()
3498
self.addTests(split_suite_by_re(suite, pattern))
3501
def partition_tests(suite, count):
3502
"""Partition suite into count lists of tests."""
3503
# This just assigns tests in a round-robin fashion. On one hand this
3504
# splits up blocks of related tests that might run faster if they shared
3505
# resources, but on the other it avoids assigning blocks of slow tests to
3506
# just one partition. So the slowest partition shouldn't be much slower
3508
partitions = [list() for i in range(count)]
3509
tests = iter_suite_tests(suite)
3510
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3511
partition.append(test)
3515
def workaround_zealous_crypto_random():
3516
"""Crypto.Random want to help us being secure, but we don't care here.
3518
This workaround some test failure related to the sftp server. Once paramiko
3519
stop using the controversial API in Crypto.Random, we may get rid of it.
3522
from Crypto.Random import atfork
3528
def fork_for_tests(suite):
3529
"""Take suite and start up one runner per CPU by forking()
3531
:return: An iterable of TestCase-like objects which can each have
3532
run(result) called on them to feed tests to result.
3534
concurrency = osutils.local_concurrency()
3536
from subunit import ProtocolTestCase
3537
from subunit.test_results import AutoTimingTestResultDecorator
3538
class TestInOtherProcess(ProtocolTestCase):
3539
# Should be in subunit, I think. RBC.
3540
def __init__(self, stream, pid):
3541
ProtocolTestCase.__init__(self, stream)
3544
def run(self, result):
3546
ProtocolTestCase.run(self, result)
3548
pid, status = os.waitpid(self.pid, 0)
3549
# GZ 2011-10-18: If status is nonzero, should report to the result
3550
# that something went wrong.
3552
test_blocks = partition_tests(suite, concurrency)
3553
# Clear the tests from the original suite so it doesn't keep them alive
3554
suite._tests[:] = []
3555
for process_tests in test_blocks:
3556
process_suite = TestUtil.TestSuite(process_tests)
3557
# Also clear each split list so new suite has only reference
3558
process_tests[:] = []
3559
c2pread, c2pwrite = os.pipe()
3563
stream = os.fdopen(c2pwrite, 'wb', 1)
3564
workaround_zealous_crypto_random()
3566
# Leave stderr and stdout open so we can see test noise
3567
# Close stdin so that the child goes away if it decides to
3568
# read from stdin (otherwise its a roulette to see what
3569
# child actually gets keystrokes for pdb etc).
3571
subunit_result = AutoTimingTestResultDecorator(
3572
SubUnitBzrProtocolClient(stream))
3573
process_suite.run(subunit_result)
3575
# Try and report traceback on stream, but exit with error even
3576
# if stream couldn't be created or something else goes wrong.
3577
# The traceback is formatted to a string and written in one go
3578
# to avoid interleaving lines from multiple failing children.
3580
stream.write(traceback.format_exc())
3586
stream = os.fdopen(c2pread, 'rb', 1)
3587
test = TestInOtherProcess(stream, pid)
3592
def reinvoke_for_tests(suite):
3593
"""Take suite and start up one runner per CPU using subprocess().
3595
:return: An iterable of TestCase-like objects which can each have
3596
run(result) called on them to feed tests to result.
3598
concurrency = osutils.local_concurrency()
3600
from subunit import ProtocolTestCase
3601
class TestInSubprocess(ProtocolTestCase):
3602
def __init__(self, process, name):
3603
ProtocolTestCase.__init__(self, process.stdout)
3604
self.process = process
3605
self.process.stdin.close()
3608
def run(self, result):
3610
ProtocolTestCase.run(self, result)
3613
os.unlink(self.name)
3614
# print "pid %d finished" % finished_process
3615
test_blocks = partition_tests(suite, concurrency)
3616
for process_tests in test_blocks:
3617
# ugly; currently reimplement rather than reuses TestCase methods.
3618
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3619
if not os.path.isfile(bzr_path):
3620
# We are probably installed. Assume sys.argv is the right file
3621
bzr_path = sys.argv[0]
3622
bzr_path = [bzr_path]
3623
if sys.platform == "win32":
3624
# if we're on windows, we can't execute the bzr script directly
3625
bzr_path = [sys.executable] + bzr_path
3626
fd, test_list_file_name = tempfile.mkstemp()
3627
test_list_file = os.fdopen(fd, 'wb', 1)
3628
for test in process_tests:
3629
test_list_file.write(test.id() + '\n')
3630
test_list_file.close()
3632
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3634
if '--no-plugins' in sys.argv:
3635
argv.append('--no-plugins')
3636
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3637
# noise on stderr it can interrupt the subunit protocol.
3638
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3639
stdout=subprocess.PIPE,
3640
stderr=subprocess.PIPE,
3642
test = TestInSubprocess(process, test_list_file_name)
3645
os.unlink(test_list_file_name)
3650
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3651
"""Generate profiling data for all activity between start and success.
3653
The profile data is appended to the test's _benchcalls attribute and can
3654
be accessed by the forwarded-to TestResult.
3656
While it might be cleaner do accumulate this in stopTest, addSuccess is
3657
where our existing output support for lsprof is, and this class aims to
3658
fit in with that: while it could be moved it's not necessary to accomplish
3659
test profiling, nor would it be dramatically cleaner.
3662
def startTest(self, test):
3663
self.profiler = bzrlib.lsprof.BzrProfiler()
3664
# Prevent deadlocks in tests that use lsprof: those tests will
3666
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3667
self.profiler.start()
3668
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3670
def addSuccess(self, test):
3671
stats = self.profiler.stop()
3673
calls = test._benchcalls
3674
except AttributeError:
3675
test._benchcalls = []
3676
calls = test._benchcalls
3677
calls.append(((test.id(), "", ""), stats))
3678
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3680
def stopTest(self, test):
3681
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3682
self.profiler = None
2488
return result.wasSuccessful()
3685
2491
# Controlled by "bzr selftest -E=..." option
3686
# Currently supported:
3687
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3688
# preserves any flags supplied at the command line.
3689
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3690
# rather than failing tests. And no longer raise
3691
# LockContention when fctnl locks are not being used
3692
# with proper exclusion rules.
3693
# -Ethreads Will display thread ident at creation/join time to
3694
# help track thread leaks
3695
# -Euncollected_cases Display the identity of any test cases that weren't
3696
# deallocated after being completed.
3697
# -Econfig_stats Will collect statistics using addDetail
3698
2492
selftest_debug_flags = set()
3874
2648
return self.tests.has_key(test_id)
3877
class TestPrefixAliasRegistry(registry.Registry):
3878
"""A registry for test prefix aliases.
3880
This helps implement shorcuts for the --starting-with selftest
3881
option. Overriding existing prefixes is not allowed but not fatal (a
3882
warning will be emitted).
3885
def register(self, key, obj, help=None, info=None,
3886
override_existing=False):
3887
"""See Registry.register.
3889
Trying to override an existing alias causes a warning to be emitted,
3890
not a fatal execption.
3893
super(TestPrefixAliasRegistry, self).register(
3894
key, obj, help=help, info=info, override_existing=False)
3896
actual = self.get(key)
3898
'Test prefix alias %s is already used for %s, ignoring %s'
3899
% (key, actual, obj))
3901
def resolve_alias(self, id_start):
3902
"""Replace the alias by the prefix in the given string.
3904
Using an unknown prefix is an error to help catching typos.
3906
parts = id_start.split('.')
3908
parts[0] = self.get(parts[0])
3910
raise errors.BzrCommandError(
3911
'%s is not a known test prefix alias' % parts[0])
3912
return '.'.join(parts)
3915
test_prefix_alias_registry = TestPrefixAliasRegistry()
3916
"""Registry of test prefix aliases."""
3919
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3920
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3921
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3923
# Obvious highest levels prefixes, feel free to add your own via a plugin
3924
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3925
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3926
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3927
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3928
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3931
def _test_suite_testmod_names():
3932
"""Return the standard list of test module names to test."""
3935
'bzrlib.tests.blackbox',
3936
'bzrlib.tests.commands',
3937
'bzrlib.tests.per_branch',
3938
'bzrlib.tests.per_bzrdir',
3939
'bzrlib.tests.per_controldir',
3940
'bzrlib.tests.per_controldir_colo',
3941
'bzrlib.tests.per_foreign_vcs',
3942
'bzrlib.tests.per_interrepository',
3943
'bzrlib.tests.per_intertree',
3944
'bzrlib.tests.per_inventory',
3945
'bzrlib.tests.per_interbranch',
3946
'bzrlib.tests.per_lock',
3947
'bzrlib.tests.per_merger',
3948
'bzrlib.tests.per_transport',
3949
'bzrlib.tests.per_tree',
3950
'bzrlib.tests.per_pack_repository',
3951
'bzrlib.tests.per_repository',
3952
'bzrlib.tests.per_repository_chk',
3953
'bzrlib.tests.per_repository_reference',
3954
'bzrlib.tests.per_repository_vf',
3955
'bzrlib.tests.per_uifactory',
3956
'bzrlib.tests.per_versionedfile',
3957
'bzrlib.tests.per_workingtree',
3958
'bzrlib.tests.test__annotator',
3959
'bzrlib.tests.test__bencode',
3960
'bzrlib.tests.test__btree_serializer',
3961
'bzrlib.tests.test__chk_map',
3962
'bzrlib.tests.test__dirstate_helpers',
3963
'bzrlib.tests.test__groupcompress',
3964
'bzrlib.tests.test__known_graph',
3965
'bzrlib.tests.test__rio',
3966
'bzrlib.tests.test__simple_set',
3967
'bzrlib.tests.test__static_tuple',
3968
'bzrlib.tests.test__walkdirs_win32',
3969
'bzrlib.tests.test_ancestry',
3970
'bzrlib.tests.test_annotate',
3971
'bzrlib.tests.test_api',
3972
'bzrlib.tests.test_atomicfile',
3973
'bzrlib.tests.test_bad_files',
3974
'bzrlib.tests.test_bisect_multi',
3975
'bzrlib.tests.test_branch',
3976
'bzrlib.tests.test_branchbuilder',
3977
'bzrlib.tests.test_btree_index',
3978
'bzrlib.tests.test_bugtracker',
3979
'bzrlib.tests.test_bundle',
3980
'bzrlib.tests.test_bzrdir',
3981
'bzrlib.tests.test__chunks_to_lines',
3982
'bzrlib.tests.test_cache_utf8',
3983
'bzrlib.tests.test_chk_map',
3984
'bzrlib.tests.test_chk_serializer',
3985
'bzrlib.tests.test_chunk_writer',
3986
'bzrlib.tests.test_clean_tree',
3987
'bzrlib.tests.test_cleanup',
3988
'bzrlib.tests.test_cmdline',
3989
'bzrlib.tests.test_commands',
3990
'bzrlib.tests.test_commit',
3991
'bzrlib.tests.test_commit_merge',
3992
'bzrlib.tests.test_config',
3993
'bzrlib.tests.test_conflicts',
3994
'bzrlib.tests.test_controldir',
3995
'bzrlib.tests.test_counted_lock',
3996
'bzrlib.tests.test_crash',
3997
'bzrlib.tests.test_decorators',
3998
'bzrlib.tests.test_delta',
3999
'bzrlib.tests.test_debug',
4000
'bzrlib.tests.test_diff',
4001
'bzrlib.tests.test_directory_service',
4002
'bzrlib.tests.test_dirstate',
4003
'bzrlib.tests.test_email_message',
4004
'bzrlib.tests.test_eol_filters',
4005
'bzrlib.tests.test_errors',
4006
'bzrlib.tests.test_estimate_compressed_size',
4007
'bzrlib.tests.test_export',
4008
'bzrlib.tests.test_export_pot',
4009
'bzrlib.tests.test_extract',
4010
'bzrlib.tests.test_features',
4011
'bzrlib.tests.test_fetch',
4012
'bzrlib.tests.test_fixtures',
4013
'bzrlib.tests.test_fifo_cache',
4014
'bzrlib.tests.test_filters',
4015
'bzrlib.tests.test_filter_tree',
4016
'bzrlib.tests.test_ftp_transport',
4017
'bzrlib.tests.test_foreign',
4018
'bzrlib.tests.test_generate_docs',
4019
'bzrlib.tests.test_generate_ids',
4020
'bzrlib.tests.test_globbing',
4021
'bzrlib.tests.test_gpg',
4022
'bzrlib.tests.test_graph',
4023
'bzrlib.tests.test_groupcompress',
4024
'bzrlib.tests.test_hashcache',
4025
'bzrlib.tests.test_help',
4026
'bzrlib.tests.test_hooks',
4027
'bzrlib.tests.test_http',
4028
'bzrlib.tests.test_http_response',
4029
'bzrlib.tests.test_https_ca_bundle',
4030
'bzrlib.tests.test_https_urllib',
4031
'bzrlib.tests.test_i18n',
4032
'bzrlib.tests.test_identitymap',
4033
'bzrlib.tests.test_ignores',
4034
'bzrlib.tests.test_index',
4035
'bzrlib.tests.test_import_tariff',
4036
'bzrlib.tests.test_info',
4037
'bzrlib.tests.test_inv',
4038
'bzrlib.tests.test_inventory_delta',
4039
'bzrlib.tests.test_knit',
4040
'bzrlib.tests.test_lazy_import',
4041
'bzrlib.tests.test_lazy_regex',
4042
'bzrlib.tests.test_library_state',
4043
'bzrlib.tests.test_lock',
4044
'bzrlib.tests.test_lockable_files',
4045
'bzrlib.tests.test_lockdir',
4046
'bzrlib.tests.test_log',
4047
'bzrlib.tests.test_lru_cache',
4048
'bzrlib.tests.test_lsprof',
4049
'bzrlib.tests.test_mail_client',
4050
'bzrlib.tests.test_matchers',
4051
'bzrlib.tests.test_memorytree',
4052
'bzrlib.tests.test_merge',
4053
'bzrlib.tests.test_merge3',
4054
'bzrlib.tests.test_merge_core',
4055
'bzrlib.tests.test_merge_directive',
4056
'bzrlib.tests.test_mergetools',
4057
'bzrlib.tests.test_missing',
4058
'bzrlib.tests.test_msgeditor',
4059
'bzrlib.tests.test_multiparent',
4060
'bzrlib.tests.test_mutabletree',
4061
'bzrlib.tests.test_nonascii',
4062
'bzrlib.tests.test_options',
4063
'bzrlib.tests.test_osutils',
4064
'bzrlib.tests.test_osutils_encodings',
4065
'bzrlib.tests.test_pack',
4066
'bzrlib.tests.test_patch',
4067
'bzrlib.tests.test_patches',
4068
'bzrlib.tests.test_permissions',
4069
'bzrlib.tests.test_plugins',
4070
'bzrlib.tests.test_progress',
4071
'bzrlib.tests.test_pyutils',
4072
'bzrlib.tests.test_read_bundle',
4073
'bzrlib.tests.test_reconcile',
4074
'bzrlib.tests.test_reconfigure',
4075
'bzrlib.tests.test_registry',
4076
'bzrlib.tests.test_remote',
4077
'bzrlib.tests.test_rename_map',
4078
'bzrlib.tests.test_repository',
4079
'bzrlib.tests.test_revert',
4080
'bzrlib.tests.test_revision',
4081
'bzrlib.tests.test_revisionspec',
4082
'bzrlib.tests.test_revisiontree',
4083
'bzrlib.tests.test_rio',
4084
'bzrlib.tests.test_rules',
4085
'bzrlib.tests.test_url_policy_open',
4086
'bzrlib.tests.test_sampler',
4087
'bzrlib.tests.test_scenarios',
4088
'bzrlib.tests.test_script',
4089
'bzrlib.tests.test_selftest',
4090
'bzrlib.tests.test_serializer',
4091
'bzrlib.tests.test_setup',
4092
'bzrlib.tests.test_sftp_transport',
4093
'bzrlib.tests.test_shelf',
4094
'bzrlib.tests.test_shelf_ui',
4095
'bzrlib.tests.test_smart',
4096
'bzrlib.tests.test_smart_add',
4097
'bzrlib.tests.test_smart_request',
4098
'bzrlib.tests.test_smart_signals',
4099
'bzrlib.tests.test_smart_transport',
4100
'bzrlib.tests.test_smtp_connection',
4101
'bzrlib.tests.test_source',
4102
'bzrlib.tests.test_ssh_transport',
4103
'bzrlib.tests.test_status',
4104
'bzrlib.tests.test_store',
4105
'bzrlib.tests.test_strace',
4106
'bzrlib.tests.test_subsume',
4107
'bzrlib.tests.test_switch',
4108
'bzrlib.tests.test_symbol_versioning',
4109
'bzrlib.tests.test_tag',
4110
'bzrlib.tests.test_test_server',
4111
'bzrlib.tests.test_testament',
4112
'bzrlib.tests.test_textfile',
4113
'bzrlib.tests.test_textmerge',
4114
'bzrlib.tests.test_cethread',
4115
'bzrlib.tests.test_timestamp',
4116
'bzrlib.tests.test_trace',
4117
'bzrlib.tests.test_transactions',
4118
'bzrlib.tests.test_transform',
4119
'bzrlib.tests.test_transport',
4120
'bzrlib.tests.test_transport_log',
4121
'bzrlib.tests.test_tree',
4122
'bzrlib.tests.test_treebuilder',
4123
'bzrlib.tests.test_treeshape',
4124
'bzrlib.tests.test_tsort',
4125
'bzrlib.tests.test_tuned_gzip',
4126
'bzrlib.tests.test_ui',
4127
'bzrlib.tests.test_uncommit',
4128
'bzrlib.tests.test_upgrade',
4129
'bzrlib.tests.test_upgrade_stacked',
4130
'bzrlib.tests.test_urlutils',
4131
'bzrlib.tests.test_utextwrap',
4132
'bzrlib.tests.test_version',
4133
'bzrlib.tests.test_version_info',
4134
'bzrlib.tests.test_versionedfile',
4135
'bzrlib.tests.test_vf_search',
4136
'bzrlib.tests.test_weave',
4137
'bzrlib.tests.test_whitebox',
4138
'bzrlib.tests.test_win32utils',
4139
'bzrlib.tests.test_workingtree',
4140
'bzrlib.tests.test_workingtree_4',
4141
'bzrlib.tests.test_wsgi',
4142
'bzrlib.tests.test_xml',
4146
def _test_suite_modules_to_doctest():
4147
"""Return the list of modules to doctest."""
4149
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
4153
'bzrlib.branchbuilder',
4154
'bzrlib.decorators',
4156
'bzrlib.iterablefile',
4161
'bzrlib.symbol_versioning',
4163
'bzrlib.tests.fixtures',
4165
'bzrlib.transport.http',
4166
'bzrlib.version_info_formats.format_custom',
4170
2651
def test_suite(keep_only=None, starting_with=None):
4171
2652
"""Build and return TestSuite for the whole of bzrlib.
4178
2659
This function can be replaced if you need to change the default test
4179
2660
suite on a global basis, but it is not encouraged.
2664
'bzrlib.util.tests.test_bencode',
2665
'bzrlib.tests.blackbox',
2666
'bzrlib.tests.branch_implementations',
2667
'bzrlib.tests.bzrdir_implementations',
2668
'bzrlib.tests.commands',
2669
'bzrlib.tests.inventory_implementations',
2670
'bzrlib.tests.interrepository_implementations',
2671
'bzrlib.tests.intertree_implementations',
2672
'bzrlib.tests.per_lock',
2673
'bzrlib.tests.repository_implementations',
2674
'bzrlib.tests.test__dirstate_helpers',
2675
'bzrlib.tests.test_ancestry',
2676
'bzrlib.tests.test_annotate',
2677
'bzrlib.tests.test_api',
2678
'bzrlib.tests.test_atomicfile',
2679
'bzrlib.tests.test_bad_files',
2680
'bzrlib.tests.test_bisect_multi',
2681
'bzrlib.tests.test_branch',
2682
'bzrlib.tests.test_branchbuilder',
2683
'bzrlib.tests.test_bugtracker',
2684
'bzrlib.tests.test_bundle',
2685
'bzrlib.tests.test_bzrdir',
2686
'bzrlib.tests.test_cache_utf8',
2687
'bzrlib.tests.test_commands',
2688
'bzrlib.tests.test_commit',
2689
'bzrlib.tests.test_commit_merge',
2690
'bzrlib.tests.test_config',
2691
'bzrlib.tests.test_conflicts',
2692
'bzrlib.tests.test_counted_lock',
2693
'bzrlib.tests.test_decorators',
2694
'bzrlib.tests.test_delta',
2695
'bzrlib.tests.test_deprecated_graph',
2696
'bzrlib.tests.test_diff',
2697
'bzrlib.tests.test_dirstate',
2698
'bzrlib.tests.test_directory_service',
2699
'bzrlib.tests.test_email_message',
2700
'bzrlib.tests.test_errors',
2701
'bzrlib.tests.test_extract',
2702
'bzrlib.tests.test_fetch',
2703
'bzrlib.tests.test_ftp_transport',
2704
'bzrlib.tests.test_generate_docs',
2705
'bzrlib.tests.test_generate_ids',
2706
'bzrlib.tests.test_globbing',
2707
'bzrlib.tests.test_gpg',
2708
'bzrlib.tests.test_graph',
2709
'bzrlib.tests.test_hashcache',
2710
'bzrlib.tests.test_help',
2711
'bzrlib.tests.test_hooks',
2712
'bzrlib.tests.test_http',
2713
'bzrlib.tests.test_http_implementations',
2714
'bzrlib.tests.test_http_response',
2715
'bzrlib.tests.test_https_ca_bundle',
2716
'bzrlib.tests.test_identitymap',
2717
'bzrlib.tests.test_ignores',
2718
'bzrlib.tests.test_index',
2719
'bzrlib.tests.test_info',
2720
'bzrlib.tests.test_inv',
2721
'bzrlib.tests.test_knit',
2722
'bzrlib.tests.test_lazy_import',
2723
'bzrlib.tests.test_lazy_regex',
2724
'bzrlib.tests.test_lockdir',
2725
'bzrlib.tests.test_lockable_files',
2726
'bzrlib.tests.test_log',
2727
'bzrlib.tests.test_lsprof',
2728
'bzrlib.tests.test_lru_cache',
2729
'bzrlib.tests.test_mail_client',
2730
'bzrlib.tests.test_memorytree',
2731
'bzrlib.tests.test_merge',
2732
'bzrlib.tests.test_merge3',
2733
'bzrlib.tests.test_merge_core',
2734
'bzrlib.tests.test_merge_directive',
2735
'bzrlib.tests.test_missing',
2736
'bzrlib.tests.test_msgeditor',
2737
'bzrlib.tests.test_multiparent',
2738
'bzrlib.tests.test_mutabletree',
2739
'bzrlib.tests.test_nonascii',
2740
'bzrlib.tests.test_options',
2741
'bzrlib.tests.test_osutils',
2742
'bzrlib.tests.test_osutils_encodings',
2743
'bzrlib.tests.test_pack',
2744
'bzrlib.tests.test_patch',
2745
'bzrlib.tests.test_patches',
2746
'bzrlib.tests.test_permissions',
2747
'bzrlib.tests.test_plugins',
2748
'bzrlib.tests.test_progress',
2749
'bzrlib.tests.test_read_bundle',
2750
'bzrlib.tests.test_reconfigure',
2751
'bzrlib.tests.test_reconcile',
2752
'bzrlib.tests.test_registry',
2753
'bzrlib.tests.test_remote',
2754
'bzrlib.tests.test_repository',
2755
'bzrlib.tests.per_repository_reference',
2756
'bzrlib.tests.test_revert',
2757
'bzrlib.tests.test_revision',
2758
'bzrlib.tests.test_revisionspec',
2759
'bzrlib.tests.test_revisiontree',
2760
'bzrlib.tests.test_rio',
2761
'bzrlib.tests.test_rules',
2762
'bzrlib.tests.test_sampler',
2763
'bzrlib.tests.test_selftest',
2764
'bzrlib.tests.test_setup',
2765
'bzrlib.tests.test_sftp_transport',
2766
'bzrlib.tests.test_smart',
2767
'bzrlib.tests.test_smart_add',
2768
'bzrlib.tests.test_smart_transport',
2769
'bzrlib.tests.test_smtp_connection',
2770
'bzrlib.tests.test_source',
2771
'bzrlib.tests.test_ssh_transport',
2772
'bzrlib.tests.test_status',
2773
'bzrlib.tests.test_store',
2774
'bzrlib.tests.test_strace',
2775
'bzrlib.tests.test_subsume',
2776
'bzrlib.tests.test_switch',
2777
'bzrlib.tests.test_symbol_versioning',
2778
'bzrlib.tests.test_tag',
2779
'bzrlib.tests.test_testament',
2780
'bzrlib.tests.test_textfile',
2781
'bzrlib.tests.test_textmerge',
2782
'bzrlib.tests.test_timestamp',
2783
'bzrlib.tests.test_trace',
2784
'bzrlib.tests.test_transactions',
2785
'bzrlib.tests.test_transform',
2786
'bzrlib.tests.test_transport',
2787
'bzrlib.tests.test_transport_implementations',
2788
'bzrlib.tests.test_tree',
2789
'bzrlib.tests.test_treebuilder',
2790
'bzrlib.tests.test_tsort',
2791
'bzrlib.tests.test_tuned_gzip',
2792
'bzrlib.tests.test_ui',
2793
'bzrlib.tests.test_uncommit',
2794
'bzrlib.tests.test_upgrade',
2795
'bzrlib.tests.test_urlutils',
2796
'bzrlib.tests.test_versionedfile',
2797
'bzrlib.tests.test_version',
2798
'bzrlib.tests.test_version_info',
2799
'bzrlib.tests.test_weave',
2800
'bzrlib.tests.test_whitebox',
2801
'bzrlib.tests.test_win32utils',
2802
'bzrlib.tests.test_workingtree',
2803
'bzrlib.tests.test_workingtree_4',
2804
'bzrlib.tests.test_wsgi',
2805
'bzrlib.tests.test_xml',
2806
'bzrlib.tests.tree_implementations',
2807
'bzrlib.tests.workingtree_implementations',
4182
2810
loader = TestUtil.TestLoader()
4184
if keep_only is not None:
4185
id_filter = TestIdList(keep_only)
2812
if starting_with is not None:
4187
2813
# We take precedence over keep_only because *at loading time* using
4188
2814
# both options means we will load less tests for the same final result.
4189
2815
def interesting_module(name):
4190
for start in starting_with:
4192
# Either the module name starts with the specified string
4193
name.startswith(start)
4194
# or it may contain tests starting with the specified string
4195
or start.startswith(name)
2817
# Either the module name starts with the specified string
2818
name.startswith(starting_with)
2819
# or it may contain tests starting with the specified string
2820
or starting_with.startswith(name)
4199
2822
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4201
2824
elif keep_only is not None:
2825
id_filter = TestIdList(keep_only)
4202
2826
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4203
2827
def interesting_module(name):
4204
2828
return id_filter.refers_to(name)
4297
2971
for right_name, right_dict in scenarios_right]
4300
def multiply_tests(tests, scenarios, result):
4301
"""Multiply tests_list by scenarios into result.
4303
This is the core workhorse for test parameterisation.
4305
Typically the load_tests() method for a per-implementation test suite will
4306
call multiply_tests and return the result.
4308
:param tests: The tests to parameterise.
4309
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4310
scenario_param_dict).
4311
:param result: A TestSuite to add created tests to.
4313
This returns the passed in result TestSuite with the cross product of all
4314
the tests repeated once for each scenario. Each test is adapted by adding
4315
the scenario name at the end of its id(), and updating the test object's
4316
__dict__ with the scenario_param_dict.
4318
>>> import bzrlib.tests.test_sampler
4319
>>> r = multiply_tests(
4320
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4321
... [('one', dict(param=1)),
4322
... ('two', dict(param=2))],
4323
... TestUtil.TestSuite())
4324
>>> tests = list(iter_suite_tests(r))
4328
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4334
for test in iter_suite_tests(tests):
4335
apply_scenarios(test, scenarios, result)
4339
def apply_scenarios(test, scenarios, result):
4340
"""Apply the scenarios in scenarios to test and add to result.
4342
:param test: The test to apply scenarios to.
4343
:param scenarios: An iterable of scenarios to apply to test.
4345
:seealso: apply_scenario
4347
for scenario in scenarios:
4348
result.addTest(apply_scenario(test, scenario))
4352
def apply_scenario(test, scenario):
4353
"""Copy test and apply scenario to it.
4355
:param test: A test to adapt.
4356
:param scenario: A tuple describing the scenario.
4357
The first element of the tuple is the new test id.
4358
The second element is a dict containing attributes to set on the
4360
:return: The adapted test.
4362
new_id = "%s(%s)" % (test.id(), scenario[0])
4363
new_test = clone_test(test, new_id)
4364
for name, value in scenario[1].items():
4365
setattr(new_test, name, value)
4369
def clone_test(test, new_id):
4370
"""Clone a test giving it a new id.
4372
:param test: The test to clone.
4373
:param new_id: The id to assign to it.
4374
:return: The new test.
4376
new_test = copy.copy(test)
4377
new_test.id = lambda: new_id
4378
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4379
# causes cloned tests to share the 'details' dict. This makes it hard to
4380
# read the test output for parameterized tests, because tracebacks will be
4381
# associated with irrelevant tests.
4383
details = new_test._TestCase__details
4384
except AttributeError:
4385
# must be a different version of testtools than expected. Do nothing.
4388
# Reset the '__details' dict.
4389
new_test._TestCase__details = {}
4393
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4395
"""Helper for permutating tests against an extension module.
4397
This is meant to be used inside a modules 'load_tests()' function. It will
4398
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4399
against both implementations. Setting 'test.module' to the appropriate
4400
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4402
:param standard_tests: A test suite to permute
4403
:param loader: A TestLoader
4404
:param py_module_name: The python path to a python module that can always
4405
be loaded, and will be considered the 'python' implementation. (eg
4406
'bzrlib._chk_map_py')
4407
:param ext_module_name: The python path to an extension module. If the
4408
module cannot be loaded, a single test will be added, which notes that
4409
the module is not available. If it can be loaded, all standard_tests
4410
will be run against that module.
4411
:return: (suite, feature) suite is a test-suite that has all the permuted
4412
tests. feature is the Feature object that can be used to determine if
4413
the module is available.
4416
from bzrlib.tests.features import ModuleAvailableFeature
4417
py_module = pyutils.get_named_object(py_module_name)
4419
('python', {'module': py_module}),
4421
suite = loader.suiteClass()
4422
feature = ModuleAvailableFeature(ext_module_name)
4423
if feature.available():
4424
scenarios.append(('C', {'module': feature.module}))
4426
# the compiled module isn't available, so we add a failing test
4427
class FailWithoutFeature(TestCase):
4428
def test_fail(self):
4429
self.requireFeature(feature)
4430
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4431
result = multiply_tests(standard_tests, scenarios, suite)
4432
return result, feature
4435
def _rmtree_temp_dir(dirname, test_id=None):
2975
def adapt_modules(mods_list, adapter, loader, suite):
2976
"""Adapt the modules in mods_list using adapter and add to suite."""
2977
tests = loader.loadTestsFromModuleNames(mods_list)
2978
adapt_tests(tests, adapter, suite)
2981
def adapt_tests(tests_list, adapter, suite):
2982
"""Adapt the tests in tests_list using adapter and add to suite."""
2983
for test in iter_suite_tests(tests_list):
2984
suite.addTests(adapter.adapt(test))
2987
def _rmtree_temp_dir(dirname):
4436
2988
# If LANG=C we probably have created some bogus paths
4437
2989
# which rmtree(unicode) will fail to delete
4438
2990
# so make sure we are using rmtree(str) to delete everything
4448
3000
osutils.rmtree(dirname)
4449
3001
except OSError, e:
4450
# We don't want to fail here because some useful display will be lost
4451
# otherwise. Polluting the tmp dir is bad, but not giving all the
4452
# possible info to the test runner is even worse.
4454
ui.ui_factory.clear_term()
4455
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4456
# Ugly, but the last thing we want here is fail, so bear with it.
4457
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4458
).encode('ascii', 'replace')
4459
sys.stderr.write('Unable to remove testing dir %s\n%s'
4460
% (os.path.basename(dirname), printable_e))
3002
if sys.platform == 'win32' and e.errno == errno.EACCES:
3003
sys.stderr.write(('Permission denied: '
3004
'unable to remove testing dir '
3005
'%s\n' % os.path.basename(dirname)))
3010
class Feature(object):
3011
"""An operating system Feature."""
3014
self._available = None
3016
def available(self):
3017
"""Is the feature available?
3019
:return: True if the feature is available.
3021
if self._available is None:
3022
self._available = self._probe()
3023
return self._available
3026
"""Implement this method in concrete features.
3028
:return: True if the feature is available.
3030
raise NotImplementedError
3033
if getattr(self, 'feature_name', None):
3034
return self.feature_name()
3035
return self.__class__.__name__
3038
class _SymlinkFeature(Feature):
3041
return osutils.has_symlinks()
3043
def feature_name(self):
3046
SymlinkFeature = _SymlinkFeature()
3049
class _HardlinkFeature(Feature):
3052
return osutils.has_hardlinks()
3054
def feature_name(self):
3057
HardlinkFeature = _HardlinkFeature()
3060
class _OsFifoFeature(Feature):
3063
return getattr(os, 'mkfifo', None)
3065
def feature_name(self):
3066
return 'filesystem fifos'
3068
OsFifoFeature = _OsFifoFeature()
3071
class _UnicodeFilenameFeature(Feature):
3072
"""Does the filesystem support Unicode filenames?"""
3077
except UnicodeEncodeError:
3079
except (IOError, OSError):
3080
# The filesystem allows the Unicode filename but the file doesn't
3084
# The filesystem allows the Unicode filename and the file exists,
3088
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3091
class TestScenarioApplier(object):
3092
"""A tool to apply scenarios to tests."""
3094
def adapt(self, test):
3095
"""Return a TestSuite containing a copy of test for each scenario."""
3096
result = unittest.TestSuite()
3097
for scenario in self.scenarios:
3098
result.addTest(self.adapt_test_to_scenario(test, scenario))
3101
def adapt_test_to_scenario(self, test, scenario):
3102
"""Copy test and apply scenario to it.
3104
:param test: A test to adapt.
3105
:param scenario: A tuple describing the scenarion.
3106
The first element of the tuple is the new test id.
3107
The second element is a dict containing attributes to set on the
3109
:return: The adapted test.
3111
from copy import deepcopy
3112
new_test = deepcopy(test)
3113
for name, value in scenario[1].items():
3114
setattr(new_test, name, value)
3115
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3116
new_test.id = lambda: new_id
4463
3120
def probe_unicode_in_user_encoding():
4496
# Only define SubUnitBzrRunner if subunit is available.
4498
from subunit import TestProtocolClient
4499
from subunit.test_results import AutoTimingTestResultDecorator
4500
class SubUnitBzrProtocolClient(TestProtocolClient):
4502
def stopTest(self, test):
4503
super(SubUnitBzrProtocolClient, self).stopTest(test)
4504
_clear__type_equality_funcs(test)
4506
def addSuccess(self, test, details=None):
4507
# The subunit client always includes the details in the subunit
4508
# stream, but we don't want to include it in ours.
4509
if details is not None and 'log' in details:
4511
return super(SubUnitBzrProtocolClient, self).addSuccess(
4514
class SubUnitBzrRunner(TextTestRunner):
4515
def run(self, test):
4516
result = AutoTimingTestResultDecorator(
4517
SubUnitBzrProtocolClient(self.stream))
4524
# API compatibility for old plugins; see bug 892622.
4527
'HTTPServerFeature',
4528
'ModuleAvailableFeature',
4529
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4530
'OsFifoFeature', 'UnicodeFilenameFeature',
4531
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4532
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4533
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4534
'posix_permissions_feature',
4536
globals()[name] = _CompatabilityThunkFeature(
4537
symbol_versioning.deprecated_in((2, 5, 0)),
4538
'bzrlib.tests', name,
4539
name, 'bzrlib.tests.features')
4542
for (old_name, new_name) in [
4543
('UnicodeFilename', 'UnicodeFilenameFeature'),
4545
globals()[name] = _CompatabilityThunkFeature(
4546
symbol_versioning.deprecated_in((2, 5, 0)),
4547
'bzrlib.tests', old_name,
4548
new_name, 'bzrlib.tests.features')
3153
class _FTPServerFeature(Feature):
3154
"""Some tests want an FTP Server, check if one is available.
3156
Right now, the only way this is available is if 'medusa' is installed.
3157
http://www.amk.ca/python/code/medusa.html
3162
import bzrlib.tests.ftp_server
3167
def feature_name(self):
3170
FTPServerFeature = _FTPServerFeature()
3173
class _CaseInsensitiveFilesystemFeature(Feature):
3174
"""Check if underlined filesystem is case-insensitive
3175
(e.g. on Windows, Cygwin, MacOS)
3179
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3180
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3181
TestCaseWithMemoryTransport.TEST_ROOT = root
3183
root = TestCaseWithMemoryTransport.TEST_ROOT
3184
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3186
name_a = osutils.pathjoin(tdir, 'a')
3187
name_A = osutils.pathjoin(tdir, 'A')
3189
result = osutils.isdir(name_A)
3190
_rmtree_temp_dir(tdir)
3193
def feature_name(self):
3194
return 'case-insensitive filesystem'
3196
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()