1
# Copyright (C) 2005-2011 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
# NOTE: Some classes in here use camelCaseNaming() rather than
20
# underscore_naming(). That's for consistency with unittest; it's not the
21
# general style of bzrlib. Please continue that consistency when adding e.g.
22
# new assertFoo() methods.
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27
18
from cStringIO import StringIO
50
# nb: check this before importing anything else from within it
51
_testtools_version = getattr(testtools, '__version__', ())
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
54
% (testtools.__file__, _testtools_version))
55
from testtools import content
62
commands as _mod_commands,
71
plugin as _mod_plugin,
78
transport as _mod_transport,
84
# lsprof not available
86
from bzrlib.smart import client, request
87
from bzrlib.transport import (
91
from bzrlib.symbol_versioning import (
95
from bzrlib.tests import (
100
from bzrlib.ui import NullProgressView
101
from bzrlib.ui.text import TextUIFactory
103
# Mark this python module as being part of the implementation
104
# of unittest: this gives us better tracebacks where the last
105
# shown frame is the test code, not our assertXYZ.
108
default_transport = test_server.LocalURLServer
111
_unitialized_attr = object()
112
"""A sentinel needed to act as a default value in a method signature."""
115
# Subunit result codes, defined here to prevent a hard dependency on subunit.
119
# These are intentionally brought into this namespace. That way plugins, etc
120
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
121
TestSuite = TestUtil.TestSuite
122
TestLoader = TestUtil.TestLoader
124
# Tests should run in a clean and clearly defined environment. The goal is to
125
# keep them isolated from the running environment as mush as possible. The test
126
# framework ensures the variables defined below are set (or deleted if the
127
# value is None) before a test is run and reset to their original value after
128
# the test is run. Generally if some code depends on an environment variable,
129
# the tests should start without this variable in the environment. There are a
130
# few exceptions but you shouldn't violate this rule lightly.
134
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
135
# tests do check our impls match APPDATA
136
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
140
'BZREMAIL': None, # may still be present in the environment
141
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
142
'BZR_PROGRESS_BAR': None,
143
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
144
# as a base class instead of TestCaseInTempDir. Tests inheriting from
145
# TestCase should not use disk resources, BZR_LOG is one.
146
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
147
'BZR_PLUGIN_PATH': None,
148
'BZR_DISABLE_PLUGINS': None,
149
'BZR_PLUGINS_AT': None,
150
'BZR_CONCURRENCY': None,
151
# Make sure that any text ui tests are consistent regardless of
152
# the environment the test case is run in; you may want tests that
153
# test other combinations. 'dumb' is a reasonable guess for tests
154
# going to a pipe or a StringIO.
160
'SSH_AUTH_SOCK': None,
170
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
171
# least. If you do (care), please update this comment
175
'BZR_REMOTE_PATH': None,
176
# Generally speaking, we don't want apport reporting on crashes in
177
# the test envirnoment unless we're specifically testing apport,
178
# so that it doesn't leak into the real system environment. We
179
# use an env var so it propagates to subprocesses.
180
'APPORT_DISABLE': '1',
184
def override_os_environ(test, env=None):
185
"""Modify os.environ keeping a copy.
187
:param test: A test instance
189
:param env: A dict containing variable definitions to be installed
192
env = isolated_environ
193
test._original_os_environ = dict([(var, value)
194
for var, value in os.environ.iteritems()])
195
for var, value in env.iteritems():
196
osutils.set_or_unset_env(var, value)
197
if var not in test._original_os_environ:
198
# The var is new, add it with a value of None, so
199
# restore_os_environ will delete it
200
test._original_os_environ[var] = None
203
def restore_os_environ(test):
204
"""Restore os.environ to its original state.
206
:param test: A test instance previously passed to override_os_environ.
208
for var, value in test._original_os_environ.iteritems():
209
# Restore the original value (or delete it if the value has been set to
210
# None in override_os_environ).
211
osutils.set_or_unset_env(var, value)
214
class ExtendedTestResult(testtools.TextTestResult):
215
"""Accepts, reports and accumulates the results of running tests.
217
Compared to the unittest version this class adds support for
218
profiling, benchmarking, stopping as soon as a test fails, and
219
skipping tests. There are further-specialized subclasses for
220
different types of display.
222
When a test finishes, in whatever way, it calls one of the addSuccess,
223
addFailure or addError classes. These in turn may redirect to a more
224
specific case for the special test results supported by our extended
227
Note that just one of these objects is fed the results from many tests.
232
def __init__(self, stream, descriptions, verbosity,
236
"""Construct new TestResult.
238
:param bench_history: Optionally, a writable file object to accumulate
241
testtools.TextTestResult.__init__(self, stream)
242
if bench_history is not None:
243
from bzrlib.version import _get_bzr_source_tree
244
src_tree = _get_bzr_source_tree()
247
revision_id = src_tree.get_parent_ids()[0]
249
# XXX: if this is a brand new tree, do the same as if there
253
# XXX: If there's no branch, what should we do?
255
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
256
self._bench_history = bench_history
257
self.ui = ui.ui_factory
260
self.failure_count = 0
261
self.known_failure_count = 0
263
self.not_applicable_count = 0
264
self.unsupported = {}
266
self._overall_start_time = time.time()
267
self._strict = strict
268
self._first_thread_leaker_id = None
269
self._tests_leaking_threads_count = 0
270
self._traceback_from_test = None
272
def stopTestRun(self):
275
stopTime = time.time()
276
timeTaken = stopTime - self.startTime
277
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
278
# the parent class method is similar have to duplicate
279
self._show_list('ERROR', self.errors)
280
self._show_list('FAIL', self.failures)
281
self.stream.write(self.sep2)
282
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
283
run, run != 1 and "s" or "", timeTaken))
284
if not self.wasSuccessful():
285
self.stream.write("FAILED (")
286
failed, errored = map(len, (self.failures, self.errors))
288
self.stream.write("failures=%d" % failed)
290
if failed: self.stream.write(", ")
291
self.stream.write("errors=%d" % errored)
292
if self.known_failure_count:
293
if failed or errored: self.stream.write(", ")
294
self.stream.write("known_failure_count=%d" %
295
self.known_failure_count)
296
self.stream.write(")\n")
298
if self.known_failure_count:
299
self.stream.write("OK (known_failures=%d)\n" %
300
self.known_failure_count)
302
self.stream.write("OK\n")
303
if self.skip_count > 0:
304
skipped = self.skip_count
305
self.stream.write('%d test%s skipped\n' %
306
(skipped, skipped != 1 and "s" or ""))
308
for feature, count in sorted(self.unsupported.items()):
309
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
312
ok = self.wasStrictlySuccessful()
314
ok = self.wasSuccessful()
315
if self._first_thread_leaker_id:
317
'%s is leaking threads among %d leaking tests.\n' % (
318
self._first_thread_leaker_id,
319
self._tests_leaking_threads_count))
320
# We don't report the main thread as an active one.
322
'%d non-main threads were left active in the end.\n'
323
% (len(self._active_threads) - 1))
325
def getDescription(self, test):
328
def _extractBenchmarkTime(self, testCase, details=None):
329
"""Add a benchmark time for the current test case."""
330
if details and 'benchtime' in details:
331
return float(''.join(details['benchtime'].iter_bytes()))
332
return getattr(testCase, "_benchtime", None)
334
def _elapsedTestTimeString(self):
335
"""Return a time string for the overall time the current test has taken."""
336
return self._formatTime(self._delta_to_float(
337
self._now() - self._start_datetime))
339
def _testTimeString(self, testCase):
340
benchmark_time = self._extractBenchmarkTime(testCase)
341
if benchmark_time is not None:
342
return self._formatTime(benchmark_time) + "*"
344
return self._elapsedTestTimeString()
346
def _formatTime(self, seconds):
347
"""Format seconds as milliseconds with leading spaces."""
348
# some benchmarks can take thousands of seconds to run, so we need 8
350
return "%8dms" % (1000 * seconds)
352
def _shortened_test_description(self, test):
354
what = re.sub(r'^bzrlib\.tests\.', '', what)
357
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
358
# multiple times in a row, because the handler is added for
359
# each test but the container list is shared between cases.
360
# See lp:498869 lp:625574 and lp:637725 for background.
361
def _record_traceback_from_test(self, exc_info):
362
"""Store the traceback from passed exc_info tuple till"""
363
self._traceback_from_test = exc_info[2]
30
import bzrlib.commands
33
import bzrlib.osutils as osutils
34
from bzrlib.selftest import TestUtil
35
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
39
MODULES_TO_DOCTEST = []
41
from logging import debug, warning, error
45
class EarlyStoppingTestResultAdapter(object):
46
"""An adapter for TestResult to stop at the first first failure or error"""
48
def __init__(self, result):
51
def addError(self, test, err):
52
self._result.addError(test, err)
55
def addFailure(self, test, err):
56
self._result.addFailure(test, err)
59
def __getattr__(self, name):
60
return getattr(self._result, name)
62
def __setattr__(self, name, value):
64
object.__setattr__(self, name, value)
65
return setattr(self._result, name, value)
68
class _MyResult(unittest._TextTestResult):
72
No special behaviour for now.
75
def _elapsedTime(self):
76
return "(Took %.3fs)" % (time.time() - self._start_time)
365
78
def startTest(self, test):
366
super(ExtendedTestResult, self).startTest(test)
370
self.report_test_start(test)
371
test.number = self.count
372
self._recordTestStartTime()
373
# Make testtools cases give us the real traceback on failure
374
addOnException = getattr(test, "addOnException", None)
375
if addOnException is not None:
376
addOnException(self._record_traceback_from_test)
377
# Only check for thread leaks on bzrlib derived test cases
378
if isinstance(test, TestCase):
379
test.addCleanup(self._check_leaked_threads, test)
381
def stopTest(self, test):
382
super(ExtendedTestResult, self).stopTest(test)
383
# Manually break cycles, means touching various private things but hey
384
getDetails = getattr(test, "getDetails", None)
385
if getDetails is not None:
387
# Clear _type_equality_funcs to try to stop TestCase instances
388
# from wasting memory. 'clear' is not available in all Python
389
# versions (bug 809048)
390
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
391
if type_equality_funcs is not None:
392
tef_clear = getattr(type_equality_funcs, "clear", None)
393
if tef_clear is None:
394
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
395
if tef_instance_dict is not None:
396
tef_clear = tef_instance_dict.clear
397
if tef_clear is not None:
399
self._traceback_from_test = None
401
def startTests(self):
402
self.report_tests_starting()
403
self._active_threads = threading.enumerate()
405
def _check_leaked_threads(self, test):
406
"""See if any threads have leaked since last call
408
A sample of live threads is stored in the _active_threads attribute,
409
when this method runs it compares the current live threads and any not
410
in the previous sample are treated as having leaked.
412
now_active_threads = set(threading.enumerate())
413
threads_leaked = now_active_threads.difference(self._active_threads)
415
self._report_thread_leak(test, threads_leaked, now_active_threads)
416
self._tests_leaking_threads_count += 1
417
if self._first_thread_leaker_id is None:
418
self._first_thread_leaker_id = test.id()
419
self._active_threads = now_active_threads
421
def _recordTestStartTime(self):
422
"""Record that a test has started."""
423
self._start_datetime = self._now()
79
unittest.TestResult.startTest(self, test)
80
# TODO: Maybe show test.shortDescription somewhere?
81
what = test.shortDescription() or test.id()
83
self.stream.write('%-70.70s' % what)
85
self._start_time = time.time()
425
87
def addError(self, test, err):
426
"""Tell result that test finished with an error.
428
Called from the TestCase run() method when the test
429
fails with an unexpected error.
431
self._post_mortem(self._traceback_from_test)
432
super(ExtendedTestResult, self).addError(test, err)
433
self.error_count += 1
434
self.report_error(test, err)
88
unittest.TestResult.addError(self, test, err)
90
self.stream.writeln("ERROR %s" % self._elapsedTime())
92
self.stream.write('E')
438
95
def addFailure(self, test, err):
439
"""Tell result that test failed.
441
Called from the TestCase run() method when the test
442
fails because e.g. an assert() method failed.
444
self._post_mortem(self._traceback_from_test)
445
super(ExtendedTestResult, self).addFailure(test, err)
446
self.failure_count += 1
447
self.report_failure(test, err)
451
def addSuccess(self, test, details=None):
452
"""Tell result that test completed successfully.
454
Called from the TestCase run()
456
if self._bench_history is not None:
457
benchmark_time = self._extractBenchmarkTime(test, details)
458
if benchmark_time is not None:
459
self._bench_history.write("%s %s\n" % (
460
self._formatTime(benchmark_time),
462
self.report_success(test)
463
super(ExtendedTestResult, self).addSuccess(test)
464
test._log_contents = ''
466
def addExpectedFailure(self, test, err):
467
self.known_failure_count += 1
468
self.report_known_failure(test, err)
470
def addUnexpectedSuccess(self, test, details=None):
471
"""Tell result the test unexpectedly passed, counting as a failure
473
When the minimum version of testtools required becomes 0.9.8 this
474
can be updated to use the new handling there.
476
super(ExtendedTestResult, self).addFailure(test, details=details)
477
self.failure_count += 1
478
self.report_unexpected_success(test,
479
"".join(details["reason"].iter_text()))
483
def addNotSupported(self, test, feature):
484
"""The test will not be run because of a missing feature.
486
# this can be called in two different ways: it may be that the
487
# test started running, and then raised (through requireFeature)
488
# UnavailableFeature. Alternatively this method can be called
489
# while probing for features before running the test code proper; in
490
# that case we will see startTest and stopTest, but the test will
491
# never actually run.
492
self.unsupported.setdefault(str(feature), 0)
493
self.unsupported[str(feature)] += 1
494
self.report_unsupported(test, feature)
496
def addSkip(self, test, reason):
497
"""A test has not run for 'reason'."""
499
self.report_skip(test, reason)
501
def addNotApplicable(self, test, reason):
502
self.not_applicable_count += 1
503
self.report_not_applicable(test, reason)
505
def _post_mortem(self, tb=None):
506
"""Start a PDB post mortem session."""
507
if os.environ.get('BZR_TEST_PDB', None):
511
def progress(self, offset, whence):
512
"""The test is adjusting the count of tests to run."""
513
if whence == SUBUNIT_SEEK_SET:
514
self.num_tests = offset
515
elif whence == SUBUNIT_SEEK_CUR:
516
self.num_tests += offset
518
raise errors.BzrError("Unknown whence %r" % whence)
520
def report_tests_starting(self):
521
"""Display information before the test run begins"""
522
if getattr(sys, 'frozen', None) is None:
523
bzr_path = osutils.realpath(sys.argv[0])
525
bzr_path = sys.executable
527
'bzr selftest: %s\n' % (bzr_path,))
530
bzrlib.__path__[0],))
532
' bzr-%s python-%s %s\n' % (
533
bzrlib.version_string,
534
bzrlib._format_version_tuple(sys.version_info),
535
platform.platform(aliased=1),
537
self.stream.write('\n')
539
def report_test_start(self, test):
540
"""Display information on the test just about to be run"""
542
def _report_thread_leak(self, test, leaked_threads, active_threads):
543
"""Display information on a test that leaked one or more threads"""
544
# GZ 2010-09-09: A leak summary reported separately from the general
545
# thread debugging would be nice. Tests under subunit
546
# need something not using stream, perhaps adding a
547
# testtools details object would be fitting.
548
if 'threads' in selftest_debug_flags:
549
self.stream.write('%s is leaking, active is now %d\n' %
550
(test.id(), len(active_threads)))
552
def startTestRun(self):
553
self.startTime = time.time()
555
def report_success(self, test):
558
def wasStrictlySuccessful(self):
559
if self.unsupported or self.known_failure_count:
561
return self.wasSuccessful()
564
class TextTestResult(ExtendedTestResult):
565
"""Displays progress and results of tests in text form"""
567
def __init__(self, stream, descriptions, verbosity,
572
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
573
bench_history, strict)
574
# We no longer pass them around, but just rely on the UIFactory stack
577
warnings.warn("Passing pb to TextTestResult is deprecated")
578
self.pb = self.ui.nested_progress_bar()
579
self.pb.show_pct = False
580
self.pb.show_spinner = False
581
self.pb.show_eta = False,
582
self.pb.show_count = False
583
self.pb.show_bar = False
584
self.pb.update_latency = 0
585
self.pb.show_transport_activity = False
587
def stopTestRun(self):
588
# called when the tests that are going to run have run
591
super(TextTestResult, self).stopTestRun()
593
def report_tests_starting(self):
594
super(TextTestResult, self).report_tests_starting()
595
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
597
def _progress_prefix_text(self):
598
# the longer this text, the less space we have to show the test
600
a = '[%d' % self.count # total that have been run
601
# tests skipped as known not to be relevant are not important enough
603
## if self.skip_count:
604
## a += ', %d skip' % self.skip_count
605
## if self.known_failure_count:
606
## a += '+%dX' % self.known_failure_count
608
a +='/%d' % self.num_tests
610
runtime = time.time() - self._overall_start_time
612
a += '%dm%ds' % (runtime / 60, runtime % 60)
615
total_fail_count = self.error_count + self.failure_count
617
a += ', %d failed' % total_fail_count
618
# if self.unsupported:
619
# a += ', %d missing' % len(self.unsupported)
623
def report_test_start(self, test):
625
self._progress_prefix_text()
627
+ self._shortened_test_description(test))
629
def _test_description(self, test):
630
return self._shortened_test_description(test)
632
def report_error(self, test, err):
633
self.stream.write('ERROR: %s\n %s\n' % (
634
self._test_description(test),
638
def report_failure(self, test, err):
639
self.stream.write('FAIL: %s\n %s\n' % (
640
self._test_description(test),
644
def report_known_failure(self, test, err):
647
def report_unexpected_success(self, test, reason):
648
self.stream.write('FAIL: %s\n %s: %s\n' % (
649
self._test_description(test),
650
"Unexpected success. Should have failed",
654
def report_skip(self, test, reason):
657
def report_not_applicable(self, test, reason):
660
def report_unsupported(self, test, feature):
661
"""test cannot be run because feature is missing."""
664
class VerboseTestResult(ExtendedTestResult):
665
"""Produce long output, with one line per test run plus times"""
667
def _ellipsize_to_right(self, a_string, final_width):
668
"""Truncate and pad a string, keeping the right hand side"""
669
if len(a_string) > final_width:
670
result = '...' + a_string[3-final_width:]
673
return result.ljust(final_width)
675
def report_tests_starting(self):
676
self.stream.write('running %d tests...\n' % self.num_tests)
677
super(VerboseTestResult, self).report_tests_starting()
679
def report_test_start(self, test):
680
name = self._shortened_test_description(test)
681
width = osutils.terminal_width()
682
if width is not None:
683
# width needs space for 6 char status, plus 1 for slash, plus an
684
# 11-char time string, plus a trailing blank
685
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
687
self.stream.write(self._ellipsize_to_right(name, width-18))
689
self.stream.write(name)
692
def _error_summary(self, err):
694
return '%s%s' % (indent, err[1])
696
def report_error(self, test, err):
697
self.stream.write('ERROR %s\n%s\n'
698
% (self._testTimeString(test),
699
self._error_summary(err)))
701
def report_failure(self, test, err):
702
self.stream.write(' FAIL %s\n%s\n'
703
% (self._testTimeString(test),
704
self._error_summary(err)))
706
def report_known_failure(self, test, err):
707
self.stream.write('XFAIL %s\n%s\n'
708
% (self._testTimeString(test),
709
self._error_summary(err)))
711
def report_unexpected_success(self, test, reason):
712
self.stream.write(' FAIL %s\n%s: %s\n'
713
% (self._testTimeString(test),
714
"Unexpected success. Should have failed",
717
def report_success(self, test):
718
self.stream.write(' OK %s\n' % self._testTimeString(test))
719
for bench_called, stats in getattr(test, '_benchcalls', []):
720
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
721
stats.pprint(file=self.stream)
722
# flush the stream so that we get smooth output. This verbose mode is
723
# used to show the output in PQM.
726
def report_skip(self, test, reason):
727
self.stream.write(' SKIP %s\n%s\n'
728
% (self._testTimeString(test), reason))
730
def report_not_applicable(self, test, reason):
731
self.stream.write(' N/A %s\n %s\n'
732
% (self._testTimeString(test), reason))
734
def report_unsupported(self, test, feature):
735
"""test cannot be run because feature is missing."""
736
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
737
%(self._testTimeString(test), feature))
740
class TextTestRunner(object):
741
stop_on_failure = False
749
result_decorators=None,
751
"""Create a TextTestRunner.
753
:param result_decorators: An optional list of decorators to apply
754
to the result object being used by the runner. Decorators are
755
applied left to right - the first element in the list is the
758
# stream may know claim to know to write unicode strings, but in older
759
# pythons this goes sufficiently wrong that it is a bad idea. (
760
# specifically a built in file with encoding 'UTF-8' will still try
761
# to encode using ascii.
762
new_encoding = osutils.get_terminal_encoding()
763
codec = codecs.lookup(new_encoding)
764
if type(codec) is tuple:
768
encode = codec.encode
769
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
770
# so should swap to the plain codecs.StreamWriter
771
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
773
stream.encoding = new_encoding
775
self.descriptions = descriptions
776
self.verbosity = verbosity
777
self._bench_history = bench_history
778
self._strict = strict
779
self._result_decorators = result_decorators or []
782
"Run the given test case or test suite."
783
if self.verbosity == 1:
784
result_class = TextTestResult
785
elif self.verbosity >= 2:
786
result_class = VerboseTestResult
787
original_result = result_class(self.stream,
790
bench_history=self._bench_history,
793
# Signal to result objects that look at stop early policy to stop,
794
original_result.stop_early = self.stop_on_failure
795
result = original_result
796
for decorator in self._result_decorators:
797
result = decorator(result)
798
result.stop_early = self.stop_on_failure
799
result.startTestRun()
804
# higher level code uses our extended protocol to determine
805
# what exit code to give.
806
return original_result
96
unittest.TestResult.addFailure(self, test, err)
98
self.stream.writeln("FAIL %s" % self._elapsedTime())
100
self.stream.write('F')
103
def addSuccess(self, test):
105
self.stream.writeln('OK %s' % self._elapsedTime())
107
self.stream.write('~')
109
unittest.TestResult.addSuccess(self, test)
111
def printErrorList(self, flavour, errors):
112
for test, err in errors:
113
self.stream.writeln(self.separator1)
114
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
115
if hasattr(test, '_get_log'):
116
self.stream.writeln()
117
self.stream.writeln('log from this test:')
118
print >>self.stream, test._get_log()
119
self.stream.writeln(self.separator2)
120
self.stream.writeln("%s" % err)
123
class TextTestRunner(unittest.TextTestRunner):
125
def _makeResult(self):
126
result = _MyResult(self.stream, self.descriptions, self.verbosity)
127
return EarlyStoppingTestResultAdapter(result)
809
130
def iter_suite_tests(suite):
810
131
"""Return all tests in a suite, recursing through nested suites"""
811
if isinstance(suite, unittest.TestCase):
813
elif isinstance(suite, unittest.TestSuite):
132
for item in suite._tests:
133
if isinstance(item, unittest.TestCase):
135
elif isinstance(item, unittest.TestSuite):
815
136
for r in iter_suite_tests(item):
818
raise Exception('unknown type %r for object %r'
819
% (type(suite), suite))
822
TestSkipped = testtools.testcase.TestSkipped
825
class TestNotApplicable(TestSkipped):
826
"""A test is not applicable to the situation where it was run.
828
This is only normally raised by parameterized tests, if they find that
829
the instance they're constructed upon does not support one aspect
834
# traceback._some_str fails to format exceptions that have the default
835
# __str__ which does an implicit ascii conversion. However, repr() on those
836
# objects works, for all that its not quite what the doctor may have ordered.
837
def _clever_some_str(value):
842
return repr(value).replace('\\n', '\n')
844
return '<unprintable %s object>' % type(value).__name__
846
traceback._some_str = _clever_some_str
849
# deprecated - use self.knownFailure(), or self.expectFailure.
850
KnownFailure = testtools.testcase._ExpectedFailure
853
class UnavailableFeature(Exception):
854
"""A feature required for this test was not available.
856
This can be considered a specialised form of SkippedTest.
858
The feature should be used to construct the exception.
862
class StringIOWrapper(object):
863
"""A wrapper around cStringIO which just adds an encoding attribute.
865
Internally we can check sys.stdout to see what the output encoding
866
should be. However, cStringIO has no encoding attribute that we can
867
set. So we wrap it instead.
872
def __init__(self, s=None):
874
self.__dict__['_cstring'] = StringIO(s)
876
self.__dict__['_cstring'] = StringIO()
878
def __getattr__(self, name, getattr=getattr):
879
return getattr(self.__dict__['_cstring'], name)
881
def __setattr__(self, name, val):
882
if name == 'encoding':
883
self.__dict__['encoding'] = val
885
return setattr(self._cstring, name, val)
888
class TestUIFactory(TextUIFactory):
889
"""A UI Factory for testing.
891
Hide the progress bar but emit note()s.
893
Allows get_password to be tested without real tty attached.
895
See also CannedInputUIFactory which lets you provide programmatic input in
898
# TODO: Capture progress events at the model level and allow them to be
899
# observed by tests that care.
901
# XXX: Should probably unify more with CannedInputUIFactory or a
902
# particular configuration of TextUIFactory, or otherwise have a clearer
903
# idea of how they're supposed to be different.
904
# See https://bugs.launchpad.net/bzr/+bug/408213
906
def __init__(self, stdout=None, stderr=None, stdin=None):
907
if stdin is not None:
908
# We use a StringIOWrapper to be able to test various
909
# encodings, but the user is still responsible to
910
# encode the string and to set the encoding attribute
911
# of StringIOWrapper.
912
stdin = StringIOWrapper(stdin)
913
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
915
def get_non_echoed_password(self):
916
"""Get password from stdin without trying to handle the echo mode"""
917
password = self.stdin.readline()
920
if password[-1] == '\n':
921
password = password[:-1]
924
def make_progress_view(self):
925
return NullProgressView()
928
def isolated_doctest_setUp(test):
929
override_os_environ(test)
932
def isolated_doctest_tearDown(test):
933
restore_os_environ(test)
936
def IsolatedDocTestSuite(*args, **kwargs):
937
"""Overrides doctest.DocTestSuite to handle isolation.
939
The method is really a factory and users are expected to use it as such.
942
kwargs['setUp'] = isolated_doctest_setUp
943
kwargs['tearDown'] = isolated_doctest_tearDown
944
return doctest.DocTestSuite(*args, **kwargs)
947
class TestCase(testtools.TestCase):
139
raise Exception('unknown object %r inside test suite %r'
143
class TestSkipped(Exception):
144
"""Indicates that a test was intentionally skipped, rather than failing."""
148
class CommandFailed(Exception):
151
class TestCase(unittest.TestCase):
948
152
"""Base class for bzr unit tests.
950
Tests that need access to disk resources should subclass
154
Tests that need access to disk resources should subclass
951
155
TestCaseInTempDir not TestCase.
953
157
Error and debug log messages are redirected from their usual
954
158
location into a temporary file, the contents of which can be
955
retrieved by _get_log(). We use a real OS file, not an in-memory object,
956
so that it can also capture file IO. When the test completes this file
957
is read into memory and removed from disk.
159
retrieved by _get_log().
959
161
There are also convenience functions to invoke bzr's command-line
960
routine, and to build and check bzr trees.
962
In addition to the usual method of overriding tearDown(), this class also
963
allows subclasses to register cleanup functions via addCleanup, which are
964
run in order as the object is torn down. It's less likely this will be
965
accidentally overlooked.
969
# record lsprof data when performing benchmark calls.
970
_gather_lsprof_in_benchmarks = False
972
def __init__(self, methodName='testMethod'):
973
super(TestCase, self).__init__(methodName)
974
self._directory_isolation = True
975
self.exception_handlers.insert(0,
976
(UnavailableFeature, self._do_unsupported_or_skip))
977
self.exception_handlers.insert(0,
978
(TestNotApplicable, self._do_not_applicable))
162
routine, and to build and check bzr trees."""
165
_log_file_name = None
981
super(TestCase, self).setUp()
982
for feature in getattr(self, '_test_needs_features', []):
983
self.requireFeature(feature)
984
self._cleanEnvironment()
987
self._benchcalls = []
988
self._benchtime = None
990
self._track_transports()
992
self._clear_debug_flags()
993
# Isolate global verbosity level, to make sure it's reproducible
994
# between tests. We should get rid of this altogether: bug 656694. --
996
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
997
# Isolate config option expansion until its default value for bzrlib is
998
# settled on or a the FIXME associated with _get_expand_default_value
999
# is addressed -- vila 20110219
1000
self.overrideAttr(config, '_expand_default_value', None)
1001
self._log_files = set()
1002
# Each key in the ``_counters`` dict holds a value for a different
1003
# counter. When the test ends, addDetail() should be used to output the
1004
# counter values. This happens in install_counter_hook().
1006
if 'config_stats' in selftest_debug_flags:
1007
self._install_config_stats_hooks()
1012
pdb.Pdb().set_trace(sys._getframe().f_back)
1014
def discardDetail(self, name):
1015
"""Extend the addDetail, getDetails api so we can remove a detail.
1017
eg. bzr always adds the 'log' detail at startup, but we don't want to
1018
include it for skipped, xfail, etc tests.
1020
It is safe to call this for a detail that doesn't exist, in case this
1021
gets called multiple times.
1023
# We cheat. details is stored in __details which means we shouldn't
1024
# touch it. but getDetails() returns the dict directly, so we can
1026
details = self.getDetails()
1030
def install_counter_hook(self, hooks, name, counter_name=None):
1031
"""Install a counting hook.
1033
Any hook can be counted as long as it doesn't need to return a value.
1035
:param hooks: Where the hook should be installed.
1037
:param name: The hook name that will be counted.
1039
:param counter_name: The counter identifier in ``_counters``, defaults
1042
_counters = self._counters # Avoid closing over self
1043
if counter_name is None:
1045
if _counters.has_key(counter_name):
1046
raise AssertionError('%s is already used as a counter name'
1048
_counters[counter_name] = 0
1049
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1050
lambda: ['%d' % (_counters[counter_name],)]))
1051
def increment_counter(*args, **kwargs):
1052
_counters[counter_name] += 1
1053
label = 'count %s calls' % (counter_name,)
1054
hooks.install_named_hook(name, increment_counter, label)
1055
self.addCleanup(hooks.uninstall_named_hook, name, label)
1057
def _install_config_stats_hooks(self):
1058
"""Install config hooks to count hook calls.
1061
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
self.install_counter_hook(config.ConfigHooks, hook_name,
1063
'config.%s' % (hook_name,))
1065
# The OldConfigHooks are private and need special handling to protect
1066
# against recursive tests (tests that run other tests), so we just do
1067
# manually what registering them into _builtin_known_hooks will provide
1069
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1070
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1071
self.install_counter_hook(config.OldConfigHooks, hook_name,
1072
'old_config.%s' % (hook_name,))
1074
def _clear_debug_flags(self):
1075
"""Prevent externally set debug flags affecting tests.
1077
Tests that want to use debug flags can just set them in the
1078
debug_flags set during setup/teardown.
1080
# Start with a copy of the current debug flags we can safely modify.
1081
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
1082
if 'allow_debug' not in selftest_debug_flags:
1083
debug.debug_flags.clear()
1084
if 'disable_lock_checks' not in selftest_debug_flags:
1085
debug.debug_flags.add('strict_locks')
1087
def _clear_hooks(self):
1088
# prevent hooks affecting tests
1089
known_hooks = hooks.known_hooks
1090
self._preserved_hooks = {}
1091
for key, (parent, name) in known_hooks.iter_parent_objects():
1092
current_hooks = getattr(parent, name)
1093
self._preserved_hooks[parent] = (name, current_hooks)
1094
self._preserved_lazy_hooks = hooks._lazy_hooks
1095
hooks._lazy_hooks = {}
1096
self.addCleanup(self._restoreHooks)
1097
for key, (parent, name) in known_hooks.iter_parent_objects():
1098
factory = known_hooks.get(key)
1099
setattr(parent, name, factory())
1100
# this hook should always be installed
1101
request._install_hook()
1103
def disable_directory_isolation(self):
1104
"""Turn off directory isolation checks."""
1105
self._directory_isolation = False
1107
def enable_directory_isolation(self):
1108
"""Enable directory isolation checks."""
1109
self._directory_isolation = True
1111
def _silenceUI(self):
1112
"""Turn off UI for duration of test"""
1113
# by default the UI is off; tests can turn it on if they want it.
1114
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1116
def _check_locks(self):
1117
"""Check that all lock take/release actions have been paired."""
1118
# We always check for mismatched locks. If a mismatch is found, we
1119
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1120
# case we just print a warning.
1122
acquired_locks = [lock for action, lock in self._lock_actions
1123
if action == 'acquired']
1124
released_locks = [lock for action, lock in self._lock_actions
1125
if action == 'released']
1126
broken_locks = [lock for action, lock in self._lock_actions
1127
if action == 'broken']
1128
# trivially, given the tests for lock acquistion and release, if we
1129
# have as many in each list, it should be ok. Some lock tests also
1130
# break some locks on purpose and should be taken into account by
1131
# considering that breaking a lock is just a dirty way of releasing it.
1132
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1134
'Different number of acquired and '
1135
'released or broken locks.\n'
1139
(acquired_locks, released_locks, broken_locks))
1140
if not self._lock_check_thorough:
1141
# Rather than fail, just warn
1142
print "Broken test %s: %s" % (self, message)
1146
def _track_locks(self):
1147
"""Track lock activity during tests."""
1148
self._lock_actions = []
1149
if 'disable_lock_checks' in selftest_debug_flags:
1150
self._lock_check_thorough = False
1152
self._lock_check_thorough = True
1154
self.addCleanup(self._check_locks)
1155
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1156
self._lock_acquired, None)
1157
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1158
self._lock_released, None)
1159
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1160
self._lock_broken, None)
1162
def _lock_acquired(self, result):
1163
self._lock_actions.append(('acquired', result))
1165
def _lock_released(self, result):
1166
self._lock_actions.append(('released', result))
1168
def _lock_broken(self, result):
1169
self._lock_actions.append(('broken', result))
1171
def permit_dir(self, name):
1172
"""Permit a directory to be used by this test. See permit_url."""
1173
name_transport = _mod_transport.get_transport_from_path(name)
1174
self.permit_url(name)
1175
self.permit_url(name_transport.base)
1177
def permit_url(self, url):
1178
"""Declare that url is an ok url to use in this test.
1180
Do this for memory transports, temporary test directory etc.
1182
Do not do this for the current working directory, /tmp, or any other
1183
preexisting non isolated url.
1185
if not url.endswith('/'):
1187
self._bzr_selftest_roots.append(url)
1189
def permit_source_tree_branch_repo(self):
1190
"""Permit the source tree bzr is running from to be opened.
1192
Some code such as bzrlib.version attempts to read from the bzr branch
1193
that bzr is executing from (if any). This method permits that directory
1194
to be used in the test suite.
1196
path = self.get_source_path()
1197
self.record_directory_isolation()
1200
workingtree.WorkingTree.open(path)
1201
except (errors.NotBranchError, errors.NoWorkingTree):
1202
raise TestSkipped('Needs a working tree of bzr sources')
1204
self.enable_directory_isolation()
1206
def _preopen_isolate_transport(self, transport):
1207
"""Check that all transport openings are done in the test work area."""
1208
while isinstance(transport, pathfilter.PathFilteringTransport):
1209
# Unwrap pathfiltered transports
1210
transport = transport.server.backing_transport.clone(
1211
transport._filter('.'))
1212
url = transport.base
1213
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1214
# urls it is given by prepending readonly+. This is appropriate as the
1215
# client shouldn't know that the server is readonly (or not readonly).
1216
# We could register all servers twice, with readonly+ prepending, but
1217
# that makes for a long list; this is about the same but easier to
1219
if url.startswith('readonly+'):
1220
url = url[len('readonly+'):]
1221
self._preopen_isolate_url(url)
1223
def _preopen_isolate_url(self, url):
1224
if not self._directory_isolation:
1226
if self._directory_isolation == 'record':
1227
self._bzr_selftest_roots.append(url)
1229
# This prevents all transports, including e.g. sftp ones backed on disk
1230
# from working unless they are explicitly granted permission. We then
1231
# depend on the code that sets up test transports to check that they are
1232
# appropriately isolated and enable their use by calling
1233
# self.permit_transport()
1234
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1235
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1236
% (url, self._bzr_selftest_roots))
1238
def record_directory_isolation(self):
1239
"""Gather accessed directories to permit later access.
1241
This is used for tests that access the branch bzr is running from.
1243
self._directory_isolation = "record"
1245
def start_server(self, transport_server, backing_server=None):
1246
"""Start transport_server for this test.
1248
This starts the server, registers a cleanup for it and permits the
1249
server's urls to be used.
1251
if backing_server is None:
1252
transport_server.start_server()
1254
transport_server.start_server(backing_server)
1255
self.addCleanup(transport_server.stop_server)
1256
# Obtain a real transport because if the server supplies a password, it
1257
# will be hidden from the base on the client side.
1258
t = _mod_transport.get_transport_from_url(transport_server.get_url())
1259
# Some transport servers effectively chroot the backing transport;
1260
# others like SFTPServer don't - users of the transport can walk up the
1261
# transport to read the entire backing transport. This wouldn't matter
1262
# except that the workdir tests are given - and that they expect the
1263
# server's url to point at - is one directory under the safety net. So
1264
# Branch operations into the transport will attempt to walk up one
1265
# directory. Chrooting all servers would avoid this but also mean that
1266
# we wouldn't be testing directly against non-root urls. Alternatively
1267
# getting the test framework to start the server with a backing server
1268
# at the actual safety net directory would work too, but this then
1269
# means that the self.get_url/self.get_transport methods would need
1270
# to transform all their results. On balance its cleaner to handle it
1271
# here, and permit a higher url when we have one of these transports.
1272
if t.base.endswith('/work/'):
1273
# we have safety net/test root/work
1274
t = t.clone('../..')
1275
elif isinstance(transport_server,
1276
test_server.SmartTCPServer_for_testing):
1277
# The smart server adds a path similar to work, which is traversed
1278
# up from by the client. But the server is chrooted - the actual
1279
# backing transport is not escaped from, and VFS requests to the
1280
# root will error (because they try to escape the chroot).
1282
while t2.base != t.base:
1285
self.permit_url(t.base)
1287
def _track_transports(self):
1288
"""Install checks for transport usage."""
1289
# TestCase has no safe place it can write to.
1290
self._bzr_selftest_roots = []
1291
# Currently the easiest way to be sure that nothing is going on is to
1292
# hook into bzr dir opening. This leaves a small window of error for
1293
# transport tests, but they are well known, and we can improve on this
1295
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1296
self._preopen_isolate_transport, "Check bzr directories are safe.")
168
unittest.TestCase.setUp(self)
169
self.oldenv = os.environ.get('HOME', None)
170
os.environ['HOME'] = os.getcwd()
171
self.bzr_email = os.environ.get('BZREMAIL')
172
if self.bzr_email is not None:
173
del os.environ['BZREMAIL']
174
self.email = os.environ.get('EMAIL')
175
if self.email is not None:
176
del os.environ['EMAIL']
177
bzrlib.trace.disable_default_logging()
178
self._enable_file_logging()
1298
180
def _ndiff_strings(self, a, b):
1299
"""Return ndiff between two strings containing lines.
1301
A trailing newline is added if missing to make the strings
1303
if b and b[-1] != '\n':
1305
if a and a[-1] != '\n':
181
"""Return ndiff between two strings containing lines."""
1307
182
difflines = difflib.ndiff(a.splitlines(True),
1308
183
b.splitlines(True),
1309
184
linejunk=lambda x: False,
1310
185
charjunk=lambda x: False)
1311
186
return ''.join(difflines)
1313
def assertEqual(self, a, b, message=''):
1317
except UnicodeError, e:
1318
# If we can't compare without getting a UnicodeError, then
1319
# obviously they are different
1320
trace.mutter('UnicodeError: %s', e)
1323
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1325
pprint.pformat(a), pprint.pformat(b)))
1327
assertEquals = assertEqual
1329
def assertEqualDiff(self, a, b, message=None):
188
def assertEqualDiff(self, a, b):
1330
189
"""Assert two texts are equal, if not raise an exception.
1332
This is intended for use with multi-line strings where it can
191
This is intended for use with multi-line strings where it can
1333
192
be hard to find the differences by eye.
1335
194
# TODO: perhaps override assertEquals to call this for strings?
1339
message = "texts not equal:\n"
1341
message = 'first string is missing a final newline.\n'
1343
message = 'second string is missing a final newline.\n'
1344
raise AssertionError(message +
1345
self._ndiff_strings(a, b))
1347
def assertEqualMode(self, mode, mode_test):
1348
self.assertEqual(mode, mode_test,
1349
'mode mismatch %o != %o' % (mode, mode_test))
1351
def assertEqualStat(self, expected, actual):
1352
"""assert that expected and actual are the same stat result.
1354
:param expected: A stat result.
1355
:param actual: A stat result.
1356
:raises AssertionError: If the expected and actual stat values differ
1357
other than by atime.
1359
self.assertEqual(expected.st_size, actual.st_size,
1360
'st_size did not match')
1361
self.assertEqual(expected.st_mtime, actual.st_mtime,
1362
'st_mtime did not match')
1363
self.assertEqual(expected.st_ctime, actual.st_ctime,
1364
'st_ctime did not match')
1365
if sys.platform == 'win32':
1366
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1367
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1368
# odd. We just force it to always be 0 to avoid any problems.
1369
self.assertEqual(0, expected.st_dev)
1370
self.assertEqual(0, actual.st_dev)
1371
self.assertEqual(0, expected.st_ino)
1372
self.assertEqual(0, actual.st_ino)
1374
self.assertEqual(expected.st_dev, actual.st_dev,
1375
'st_dev did not match')
1376
self.assertEqual(expected.st_ino, actual.st_ino,
1377
'st_ino did not match')
1378
self.assertEqual(expected.st_mode, actual.st_mode,
1379
'st_mode did not match')
1381
def assertLength(self, length, obj_with_len):
1382
"""Assert that obj_with_len is of length length."""
1383
if len(obj_with_len) != length:
1384
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1385
length, len(obj_with_len), obj_with_len))
1387
def assertLogsError(self, exception_class, func, *args, **kwargs):
1388
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1391
orig_log_exception_quietly = trace.log_exception_quietly
1394
orig_log_exception_quietly()
1395
captured.append(sys.exc_info()[1])
1396
trace.log_exception_quietly = capture
1397
func(*args, **kwargs)
1399
trace.log_exception_quietly = orig_log_exception_quietly
1400
self.assertLength(1, captured)
1402
self.assertIsInstance(err, exception_class)
1405
def assertPositive(self, val):
1406
"""Assert that val is greater than 0."""
1407
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1409
def assertNegative(self, val):
1410
"""Assert that val is less than 0."""
1411
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1413
def assertStartsWith(self, s, prefix):
1414
if not s.startswith(prefix):
1415
raise AssertionError('string %r does not start with %r' % (s, prefix))
1417
def assertEndsWith(self, s, suffix):
1418
"""Asserts that s ends with suffix."""
1419
if not s.endswith(suffix):
1420
raise AssertionError('string %r does not end with %r' % (s, suffix))
1422
def assertContainsRe(self, haystack, needle_re, flags=0):
1423
"""Assert that a contains something matching a regular expression."""
1424
if not re.search(needle_re, haystack, flags):
1425
if '\n' in haystack or len(haystack) > 60:
1426
# a long string, format it in a more readable way
1427
raise AssertionError(
1428
'pattern "%s" not found in\n"""\\\n%s"""\n'
1429
% (needle_re, haystack))
1431
raise AssertionError('pattern "%s" not found in "%s"'
1432
% (needle_re, haystack))
1434
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1435
"""Assert that a does not match a regular expression"""
1436
if re.search(needle_re, haystack, flags):
1437
raise AssertionError('pattern "%s" found in "%s"'
1438
% (needle_re, haystack))
1440
def assertContainsString(self, haystack, needle):
1441
if haystack.find(needle) == -1:
1442
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1444
def assertNotContainsString(self, haystack, needle):
1445
if haystack.find(needle) != -1:
1446
self.fail("string %r found in '''%s'''" % (needle, haystack))
1448
def assertSubset(self, sublist, superlist):
1449
"""Assert that every entry in sublist is present in superlist."""
1450
missing = set(sublist) - set(superlist)
1451
if len(missing) > 0:
1452
raise AssertionError("value(s) %r not present in container %r" %
1453
(missing, superlist))
1455
def assertListRaises(self, excClass, func, *args, **kwargs):
1456
"""Fail unless excClass is raised when the iterator from func is used.
1458
Many functions can return generators this makes sure
1459
to wrap them in a list() call to make sure the whole generator
1460
is run, and that the proper exception is raised.
1463
list(func(*args, **kwargs))
1467
if getattr(excClass,'__name__', None) is not None:
1468
excName = excClass.__name__
1470
excName = str(excClass)
1471
raise self.failureException, "%s not raised" % excName
1473
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1474
"""Assert that a callable raises a particular exception.
1476
:param excClass: As for the except statement, this may be either an
1477
exception class, or a tuple of classes.
1478
:param callableObj: A callable, will be passed ``*args`` and
1481
Returns the exception so that you can examine it.
1484
callableObj(*args, **kwargs)
1488
if getattr(excClass,'__name__', None) is not None:
1489
excName = excClass.__name__
1492
excName = str(excClass)
1493
raise self.failureException, "%s not raised" % excName
1495
def assertIs(self, left, right, message=None):
1496
if not (left is right):
1497
if message is not None:
1498
raise AssertionError(message)
1500
raise AssertionError("%r is not %r." % (left, right))
1502
def assertIsNot(self, left, right, message=None):
1504
if message is not None:
1505
raise AssertionError(message)
1507
raise AssertionError("%r is %r." % (left, right))
1509
def assertTransportMode(self, transport, path, mode):
1510
"""Fail if a path does not have mode "mode".
1512
If modes are not supported on this transport, the assertion is ignored.
1514
if not transport._can_roundtrip_unix_modebits():
1516
path_stat = transport.stat(path)
1517
actual_mode = stat.S_IMODE(path_stat.st_mode)
1518
self.assertEqual(mode, actual_mode,
1519
'mode of %r incorrect (%s != %s)'
1520
% (path, oct(mode), oct(actual_mode)))
1522
def assertIsSameRealPath(self, path1, path2):
1523
"""Fail if path1 and path2 points to different files"""
1524
self.assertEqual(osutils.realpath(path1),
1525
osutils.realpath(path2),
1526
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1528
def assertIsInstance(self, obj, kls, msg=None):
1529
"""Fail if obj is not an instance of kls
1531
:param msg: Supplementary message to show if the assertion fails.
1533
if not isinstance(obj, kls):
1534
m = "%r is an instance of %s rather than %s" % (
1535
obj, obj.__class__, kls)
1540
def assertFileEqual(self, content, path):
1541
"""Fail if path does not contain 'content'."""
1542
self.assertPathExists(path)
1543
f = file(path, 'rb')
1548
self.assertEqualDiff(content, s)
1550
def assertDocstring(self, expected_docstring, obj):
1551
"""Fail if obj does not have expected_docstring"""
1553
# With -OO the docstring should be None instead
1554
self.assertIs(obj.__doc__, None)
1556
self.assertEqual(expected_docstring, obj.__doc__)
1558
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1559
def failUnlessExists(self, path):
1560
return self.assertPathExists(path)
1562
def assertPathExists(self, path):
1563
"""Fail unless path or paths, which may be abs or relative, exist."""
1564
if not isinstance(path, basestring):
1566
self.assertPathExists(p)
1568
self.assertTrue(osutils.lexists(path),
1569
path + " does not exist")
1571
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1572
def failIfExists(self, path):
1573
return self.assertPathDoesNotExist(path)
1575
def assertPathDoesNotExist(self, path):
1576
"""Fail if path or paths, which may be abs or relative, exist."""
1577
if not isinstance(path, basestring):
1579
self.assertPathDoesNotExist(p)
1581
self.assertFalse(osutils.lexists(path),
1584
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1585
"""A helper for callDeprecated and applyDeprecated.
1587
:param a_callable: A callable to call.
1588
:param args: The positional arguments for the callable
1589
:param kwargs: The keyword arguments for the callable
1590
:return: A tuple (warnings, result). result is the result of calling
1591
a_callable(``*args``, ``**kwargs``).
1594
def capture_warnings(msg, cls=None, stacklevel=None):
1595
# we've hooked into a deprecation specific callpath,
1596
# only deprecations should getting sent via it.
1597
self.assertEqual(cls, DeprecationWarning)
1598
local_warnings.append(msg)
1599
original_warning_method = symbol_versioning.warn
1600
symbol_versioning.set_warning_method(capture_warnings)
1602
result = a_callable(*args, **kwargs)
1604
symbol_versioning.set_warning_method(original_warning_method)
1605
return (local_warnings, result)
1607
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1608
"""Call a deprecated callable without warning the user.
1610
Note that this only captures warnings raised by symbol_versioning.warn,
1611
not other callers that go direct to the warning module.
1613
To test that a deprecated method raises an error, do something like
1614
this (remember that both assertRaises and applyDeprecated delays *args
1615
and **kwargs passing)::
1617
self.assertRaises(errors.ReservedId,
1618
self.applyDeprecated,
1619
deprecated_in((1, 5, 0)),
1623
:param deprecation_format: The deprecation format that the callable
1624
should have been deprecated with. This is the same type as the
1625
parameter to deprecated_method/deprecated_function. If the
1626
callable is not deprecated with this format, an assertion error
1628
:param a_callable: A callable to call. This may be a bound method or
1629
a regular function. It will be called with ``*args`` and
1631
:param args: The positional arguments for the callable
1632
:param kwargs: The keyword arguments for the callable
1633
:return: The result of a_callable(``*args``, ``**kwargs``)
1635
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1637
expected_first_warning = symbol_versioning.deprecation_string(
1638
a_callable, deprecation_format)
1639
if len(call_warnings) == 0:
1640
self.fail("No deprecation warning generated by call to %s" %
1642
self.assertEqual(expected_first_warning, call_warnings[0])
1645
def callCatchWarnings(self, fn, *args, **kw):
1646
"""Call a callable that raises python warnings.
1648
The caller's responsible for examining the returned warnings.
1650
If the callable raises an exception, the exception is not
1651
caught and propagates up to the caller. In that case, the list
1652
of warnings is not available.
1654
:returns: ([warning_object, ...], fn_result)
1656
# XXX: This is not perfect, because it completely overrides the
1657
# warnings filters, and some code may depend on suppressing particular
1658
# warnings. It's the easiest way to insulate ourselves from -Werror,
1659
# though. -- Andrew, 20071062
1661
def _catcher(message, category, filename, lineno, file=None, line=None):
1662
# despite the name, 'message' is normally(?) a Warning subclass
1664
wlist.append(message)
1665
saved_showwarning = warnings.showwarning
1666
saved_filters = warnings.filters
1668
warnings.showwarning = _catcher
1669
warnings.filters = []
1670
result = fn(*args, **kw)
1672
warnings.showwarning = saved_showwarning
1673
warnings.filters = saved_filters
1674
return wlist, result
1676
def callDeprecated(self, expected, callable, *args, **kwargs):
1677
"""Assert that a callable is deprecated in a particular way.
1679
This is a very precise test for unusual requirements. The
1680
applyDeprecated helper function is probably more suited for most tests
1681
as it allows you to simply specify the deprecation format being used
1682
and will ensure that that is issued for the function being called.
1684
Note that this only captures warnings raised by symbol_versioning.warn,
1685
not other callers that go direct to the warning module. To catch
1686
general warnings, use callCatchWarnings.
1688
:param expected: a list of the deprecation warnings expected, in order
1689
:param callable: The callable to call
1690
:param args: The positional arguments for the callable
1691
:param kwargs: The keyword arguments for the callable
1693
call_warnings, result = self._capture_deprecation_warnings(callable,
1695
self.assertEqual(expected, call_warnings)
1698
def _startLogFile(self):
1699
"""Send bzr and test log messages to a temporary file.
1701
The file is removed as the test is torn down.
1703
pseudo_log_file = StringIO()
1704
def _get_log_contents_for_weird_testtools_api():
1705
return [pseudo_log_file.getvalue().decode(
1706
"utf-8", "replace").encode("utf-8")]
1707
self.addDetail("log", content.Content(content.ContentType("text",
1708
"plain", {"charset": "utf8"}),
1709
_get_log_contents_for_weird_testtools_api))
1710
self._log_file = pseudo_log_file
1711
self._log_memento = trace.push_log_file(self._log_file)
1712
self.addCleanup(self._finishLogFile)
1714
def _finishLogFile(self):
1715
"""Finished with the log file.
1717
Close the file and delete it.
1719
if trace._trace_file:
1720
# flush the log file, to get all content
1721
trace._trace_file.flush()
1722
trace.pop_log_file(self._log_memento)
1724
def thisFailsStrictLockCheck(self):
1725
"""It is known that this test would fail with -Dstrict_locks.
1727
By default, all tests are run with strict lock checking unless
1728
-Edisable_lock_checks is supplied. However there are some tests which
1729
we know fail strict locks at this point that have not been fixed.
1730
They should call this function to disable the strict checking.
1732
This should be used sparingly, it is much better to fix the locking
1733
issues rather than papering over the problem by calling this function.
1735
debug.debug_flags.discard('strict_locks')
1737
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1738
"""Overrides an object attribute restoring it after the test.
1740
:note: This should be used with discretion; you should think about
1741
whether it's better to make the code testable without monkey-patching.
1743
:param obj: The object that will be mutated.
1745
:param attr_name: The attribute name we want to preserve/override in
1748
:param new: The optional value we want to set the attribute to.
1750
:returns: The actual attr value.
1752
value = getattr(obj, attr_name)
1753
# The actual value is captured by the call below
1754
self.addCleanup(setattr, obj, attr_name, value)
1755
if new is not _unitialized_attr:
1756
setattr(obj, attr_name, new)
1759
def overrideEnv(self, name, new):
1760
"""Set an environment variable, and reset it after the test.
1762
:param name: The environment variable name.
1764
:param new: The value to set the variable to. If None, the
1765
variable is deleted from the environment.
1767
:returns: The actual variable value.
1769
value = osutils.set_or_unset_env(name, new)
1770
self.addCleanup(osutils.set_or_unset_env, name, value)
1773
def recordCalls(self, obj, attr_name):
1774
"""Monkeypatch in a wrapper that will record calls.
1776
The monkeypatch is automatically removed when the test concludes.
1778
:param obj: The namespace holding the reference to be replaced;
1779
typically a module, class, or object.
1780
:param attr_name: A string for the name of the attribute to
1782
:returns: A list that will be extended with one item every time the
1783
function is called, with a tuple of (args, kwargs).
1787
def decorator(*args, **kwargs):
1788
calls.append((args, kwargs))
1789
return orig(*args, **kwargs)
1790
orig = self.overrideAttr(obj, attr_name, decorator)
1793
def _cleanEnvironment(self):
1794
for name, value in isolated_environ.iteritems():
1795
self.overrideEnv(name, value)
1797
def _restoreHooks(self):
1798
for klass, (name, hooks) in self._preserved_hooks.items():
1799
setattr(klass, name, hooks)
1800
self._preserved_hooks.clear()
1801
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1802
self._preserved_lazy_hooks.clear()
1804
def knownFailure(self, reason):
1805
"""Declare that this test fails for a known reason
1807
Tests that are known to fail should generally be using expectedFailure
1808
with an appropriate reverse assertion if a change could cause the test
1809
to start passing. Conversely if the test has no immediate prospect of
1810
succeeding then using skip is more suitable.
1812
When this method is called while an exception is being handled, that
1813
traceback will be used, otherwise a new exception will be thrown to
1814
provide one but won't be reported.
1816
self._add_reason(reason)
1818
exc_info = sys.exc_info()
1819
if exc_info != (None, None, None):
1820
self._report_traceback(exc_info)
1823
raise self.failureException(reason)
1824
except self.failureException:
1825
exc_info = sys.exc_info()
1826
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1827
raise testtools.testcase._ExpectedFailure(exc_info)
1831
def _suppress_log(self):
1832
"""Remove the log info from details."""
1833
self.discardDetail('log')
1835
def _do_skip(self, result, reason):
1836
self._suppress_log()
1837
addSkip = getattr(result, 'addSkip', None)
1838
if not callable(addSkip):
1839
result.addSuccess(result)
1841
addSkip(self, reason)
1844
def _do_known_failure(self, result, e):
1845
self._suppress_log()
1846
err = sys.exc_info()
1847
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1848
if addExpectedFailure is not None:
1849
addExpectedFailure(self, err)
1851
result.addSuccess(self)
1854
def _do_not_applicable(self, result, e):
1856
reason = 'No reason given'
1859
self._suppress_log ()
1860
addNotApplicable = getattr(result, 'addNotApplicable', None)
1861
if addNotApplicable is not None:
1862
result.addNotApplicable(self, reason)
1864
self._do_skip(result, reason)
1867
def _report_skip(self, result, err):
1868
"""Override the default _report_skip.
1870
We want to strip the 'log' detail. If we waint until _do_skip, it has
1871
already been formatted into the 'reason' string, and we can't pull it
1874
self._suppress_log()
1875
super(TestCase, self)._report_skip(self, result, err)
1878
def _report_expected_failure(self, result, err):
1881
See _report_skip for motivation.
1883
self._suppress_log()
1884
super(TestCase, self)._report_expected_failure(self, result, err)
1887
def _do_unsupported_or_skip(self, result, e):
1889
self._suppress_log()
1890
addNotSupported = getattr(result, 'addNotSupported', None)
1891
if addNotSupported is not None:
1892
result.addNotSupported(self, reason)
1894
self._do_skip(result, reason)
1896
def time(self, callable, *args, **kwargs):
1897
"""Run callable and accrue the time it takes to the benchmark time.
1899
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1900
this will cause lsprofile statistics to be gathered and stored in
1903
if self._benchtime is None:
1904
self.addDetail('benchtime', content.Content(content.ContentType(
1905
"text", "plain"), lambda:[str(self._benchtime)]))
1909
if not self._gather_lsprof_in_benchmarks:
1910
return callable(*args, **kwargs)
1912
# record this benchmark
1913
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1915
self._benchcalls.append(((callable, args, kwargs), stats))
1918
self._benchtime += time.time() - start
197
raise AssertionError("texts not equal:\n" +
198
self._ndiff_strings(a, b))
200
def _enable_file_logging(self):
201
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
203
self._log_file = os.fdopen(fileno, 'w+')
205
hdlr = logging.StreamHandler(self._log_file)
206
hdlr.setLevel(logging.DEBUG)
207
hdlr.setFormatter(logging.Formatter('%(levelname)8s %(message)s'))
208
logging.getLogger('').addHandler(hdlr)
209
logging.getLogger('').setLevel(logging.DEBUG)
210
self._log_hdlr = hdlr
211
debug('opened log file %s', name)
213
self._log_file_name = name
216
os.environ['HOME'] = self.oldenv
217
if os.environ.get('BZREMAIL') is not None:
218
del os.environ['BZREMAIL']
219
if self.bzr_email is not None:
220
os.environ['BZREMAIL'] = self.bzr_email
221
if os.environ.get('EMAIL') is not None:
222
del os.environ['EMAIL']
223
if self.email is not None:
224
os.environ['EMAIL'] = self.email
225
logging.getLogger('').removeHandler(self._log_hdlr)
226
bzrlib.trace.enable_default_logging()
227
logging.debug('%s teardown', self.id())
228
self._log_file.close()
229
unittest.TestCase.tearDown(self)
1920
231
def log(self, *args):
1924
"""Get a unicode string containing the log from bzrlib.trace.
1926
Undecodable characters are replaced.
1928
return u"".join(self.getDetails()['log'].iter_text())
1930
def requireFeature(self, feature):
1931
"""This test requires a specific feature is available.
1933
:raises UnavailableFeature: When feature is not available.
1935
if not feature.available():
1936
raise UnavailableFeature(feature)
1938
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1940
"""Run bazaar command line, splitting up a string command line."""
1941
if isinstance(args, basestring):
1942
# shlex don't understand unicode strings,
1943
# so args should be plain string (bialix 20070906)
1944
args = list(shlex.split(str(args)))
1945
return self._run_bzr_core(args, retcode=retcode,
1946
encoding=encoding, stdin=stdin, working_dir=working_dir,
1949
def _run_bzr_core(self, args, retcode, encoding, stdin,
1951
# Clear chk_map page cache, because the contents are likely to mask
1953
chk_map.clear_cache()
1954
if encoding is None:
1955
encoding = osutils.get_user_encoding()
1956
stdout = StringIOWrapper()
1957
stderr = StringIOWrapper()
1958
stdout.encoding = encoding
1959
stderr.encoding = encoding
1961
self.log('run bzr: %r', args)
1962
# FIXME: don't call into logging here
235
"""Return as a string the log for this test"""
236
if self._log_file_name:
237
return open(self._log_file_name).read()
241
def capture(self, cmd):
242
"""Shortcut that splits cmd into words, runs, and returns stdout"""
243
return self.run_bzr_captured(cmd.split())[0]
245
def run_bzr_captured(self, argv, retcode=0):
246
"""Invoke bzr and return (result, stdout, stderr).
248
Useful for code that wants to check the contents of the
249
output, the way error messages are presented, etc.
251
This should be the main method for tests that want to exercise the
252
overall behavior of the bzr application (rather than a unit test
253
or a functional test of the library.)
255
Much of the old code runs bzr by forking a new copy of Python, but
256
that is slower, harder to debug, and generally not necessary.
258
This runs bzr through the interface that catches and reports
259
errors, and with logging set to something approximating the
260
default, so that error reporting can be checked.
262
argv -- arguments to invoke bzr
263
retcode -- expected return code, or None for don't-care.
267
self.log('run bzr: %s', ' '.join(argv))
1963
268
handler = logging.StreamHandler(stderr)
269
handler.setFormatter(bzrlib.trace.QuietFormatter())
1964
270
handler.setLevel(logging.INFO)
1965
271
logger = logging.getLogger('')
1966
272
logger.addHandler(handler)
1967
old_ui_factory = ui.ui_factory
1968
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1971
if working_dir is not None:
1972
cwd = osutils.getcwd()
1973
os.chdir(working_dir)
1977
result = self.apply_redirected(
1978
ui.ui_factory.stdin,
1980
_mod_commands.run_bzr_catch_user_errors,
1982
except KeyboardInterrupt:
1983
# Reraise KeyboardInterrupt with contents of redirected stdout
1984
# and stderr as arguments, for tests which are interested in
1985
# stdout and stderr and are expecting the exception.
1986
out = stdout.getvalue()
1987
err = stderr.getvalue()
1989
self.log('output:\n%r', out)
1991
self.log('errors:\n%r', err)
1992
raise KeyboardInterrupt(out, err)
274
result = self.apply_redirected(None, stdout, stderr,
275
bzrlib.commands.run_bzr_catch_errors,
1994
278
logger.removeHandler(handler)
1995
ui.ui_factory = old_ui_factory
1999
279
out = stdout.getvalue()
2000
280
err = stderr.getvalue()
2002
self.log('output:\n%r', out)
282
self.log('output:\n%s', out)
2004
self.log('errors:\n%r', err)
284
self.log('errors:\n%s', err)
2005
285
if retcode is not None:
2006
self.assertEquals(retcode, result,
2007
message='Unexpected return code')
2008
return result, out, err
286
self.assertEquals(result, retcode)
2010
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
2011
working_dir=None, error_regexes=[], output_encoding=None):
289
def run_bzr(self, *args, **kwargs):
2012
290
"""Invoke bzr, as if it were run from the command line.
2014
The argument list should not include the bzr program name - the
2015
first argument is normally the bzr command. Arguments may be
2016
passed in three ways:
2018
1- A list of strings, eg ["commit", "a"]. This is recommended
2019
when the command contains whitespace or metacharacters, or
2020
is built up at run time.
2022
2- A single string, eg "add a". This is the most convenient
2023
for hardcoded commands.
2025
This runs bzr through the interface that catches and reports
2026
errors, and with logging set to something approximating the
2027
default, so that error reporting can be checked.
2029
292
This should be the main method for tests that want to exercise the
2030
293
overall behavior of the bzr application (rather than a unit test
2031
294
or a functional test of the library.)
2033
296
This sends the stdout/stderr results into the test's log,
2034
297
where it may be useful for debugging. See also run_captured.
2036
:keyword stdin: A string to be used as stdin for the command.
2037
:keyword retcode: The status code the command should return;
2039
:keyword working_dir: The directory to run the command in
2040
:keyword error_regexes: A list of expected error messages. If
2041
specified they must be seen in the error output of the command.
2043
retcode, out, err = self._run_bzr_autosplit(
2048
working_dir=working_dir,
2050
self.assertIsInstance(error_regexes, (list, tuple))
2051
for regex in error_regexes:
2052
self.assertContainsRe(err, regex)
2055
def run_bzr_error(self, error_regexes, *args, **kwargs):
2056
"""Run bzr, and check that stderr contains the supplied regexes
2058
:param error_regexes: Sequence of regular expressions which
2059
must each be found in the error output. The relative ordering
2061
:param args: command-line arguments for bzr
2062
:param kwargs: Keyword arguments which are interpreted by run_bzr
2063
This function changes the default value of retcode to be 3,
2064
since in most cases this is run when you expect bzr to fail.
2066
:return: (out, err) The actual output of running the command (in case
2067
you want to do more inspection)
2071
# Make sure that commit is failing because there is nothing to do
2072
self.run_bzr_error(['no changes to commit'],
2073
['commit', '-m', 'my commit comment'])
2074
# Make sure --strict is handling an unknown file, rather than
2075
# giving us the 'nothing to do' error
2076
self.build_tree(['unknown'])
2077
self.run_bzr_error(['Commit refused because there are unknown files'],
2078
['commit', --strict', '-m', 'my commit comment'])
2080
kwargs.setdefault('retcode', 3)
2081
kwargs['error_regexes'] = error_regexes
2082
out, err = self.run_bzr(*args, **kwargs)
2085
def run_bzr_subprocess(self, *args, **kwargs):
2086
"""Run bzr in a subprocess for testing.
2088
This starts a new Python interpreter and runs bzr in there.
2089
This should only be used for tests that have a justifiable need for
2090
this isolation: e.g. they are testing startup time, or signal
2091
handling, or early startup code, etc. Subprocess code can't be
2092
profiled or debugged so easily.
2094
:keyword retcode: The status code that is expected. Defaults to 0. If
2095
None is supplied, the status code is not checked.
2096
:keyword env_changes: A dictionary which lists changes to environment
2097
variables. A value of None will unset the env variable.
2098
The values must be strings. The change will only occur in the
2099
child, so you don't need to fix the environment after running.
2100
:keyword universal_newlines: Convert CRLF => LF
2101
:keyword allow_plugins: By default the subprocess is run with
2102
--no-plugins to ensure test reproducibility. Also, it is possible
2103
for system-wide plugins to create unexpected output on stderr,
2104
which can cause unnecessary test failures.
2106
env_changes = kwargs.get('env_changes', {})
2107
working_dir = kwargs.get('working_dir', None)
2108
allow_plugins = kwargs.get('allow_plugins', False)
2110
if isinstance(args[0], list):
2112
elif isinstance(args[0], basestring):
2113
args = list(shlex.split(args[0]))
2115
raise ValueError("passing varargs to run_bzr_subprocess")
2116
process = self.start_bzr_subprocess(args, env_changes=env_changes,
2117
working_dir=working_dir,
2118
allow_plugins=allow_plugins)
2119
# We distinguish between retcode=None and retcode not passed.
2120
supplied_retcode = kwargs.get('retcode', 0)
2121
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2122
universal_newlines=kwargs.get('universal_newlines', False),
2125
def start_bzr_subprocess(self, process_args, env_changes=None,
2126
skip_if_plan_to_signal=False,
2128
allow_plugins=False, stderr=subprocess.PIPE):
2129
"""Start bzr in a subprocess for testing.
2131
This starts a new Python interpreter and runs bzr in there.
2132
This should only be used for tests that have a justifiable need for
2133
this isolation: e.g. they are testing startup time, or signal
2134
handling, or early startup code, etc. Subprocess code can't be
2135
profiled or debugged so easily.
2137
:param process_args: a list of arguments to pass to the bzr executable,
2138
for example ``['--version']``.
2139
:param env_changes: A dictionary which lists changes to environment
2140
variables. A value of None will unset the env variable.
2141
The values must be strings. The change will only occur in the
2142
child, so you don't need to fix the environment after running.
2143
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2144
doesn't support signalling subprocesses.
2145
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2146
:param stderr: file to use for the subprocess's stderr. Valid values
2147
are those valid for the stderr argument of `subprocess.Popen`.
2148
Default value is ``subprocess.PIPE``.
2150
:returns: Popen object for the started process.
2152
if skip_if_plan_to_signal:
2153
if os.name != "posix":
2154
raise TestSkipped("Sending signals not supported")
2156
if env_changes is None:
2160
def cleanup_environment():
2161
for env_var, value in env_changes.iteritems():
2162
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2164
def restore_environment():
2165
for env_var, value in old_env.iteritems():
2166
osutils.set_or_unset_env(env_var, value)
2168
bzr_path = self.get_bzr_path()
2171
if working_dir is not None:
2172
cwd = osutils.getcwd()
2173
os.chdir(working_dir)
2176
# win32 subprocess doesn't support preexec_fn
2177
# so we will avoid using it on all platforms, just to
2178
# make sure the code path is used, and we don't break on win32
2179
cleanup_environment()
2180
# Include the subprocess's log file in the test details, in case
2181
# the test fails due to an error in the subprocess.
2182
self._add_subprocess_log(trace._get_bzr_log_filename())
2183
command = [sys.executable]
2184
# frozen executables don't need the path to bzr
2185
if getattr(sys, "frozen", None) is None:
2186
command.append(bzr_path)
2187
if not allow_plugins:
2188
command.append('--no-plugins')
2189
command.extend(process_args)
2190
process = self._popen(command, stdin=subprocess.PIPE,
2191
stdout=subprocess.PIPE,
2194
restore_environment()
2200
def _add_subprocess_log(self, log_file_path):
2201
if len(self._log_files) == 0:
2202
# Register an addCleanup func. We do this on the first call to
2203
# _add_subprocess_log rather than in TestCase.setUp so that this
2204
# addCleanup is registered after any cleanups for tempdirs that
2205
# subclasses might create, which will probably remove the log file
2207
self.addCleanup(self._subprocess_log_cleanup)
2208
# self._log_files is a set, so if a log file is reused we won't grab it
2210
self._log_files.add(log_file_path)
2212
def _subprocess_log_cleanup(self):
2213
for count, log_file_path in enumerate(self._log_files):
2214
# We use buffer_now=True to avoid holding the file open beyond
2215
# the life of this function, which might interfere with e.g.
2216
# cleaning tempdirs on Windows.
2217
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2218
#detail_content = content.content_from_file(
2219
# log_file_path, buffer_now=True)
2220
with open(log_file_path, 'rb') as log_file:
2221
log_file_bytes = log_file.read()
2222
detail_content = content.Content(content.ContentType("text",
2223
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2224
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2227
def _popen(self, *args, **kwargs):
2228
"""Place a call to Popen.
2230
Allows tests to override this method to intercept the calls made to
2231
Popen for introspection.
2233
return subprocess.Popen(*args, **kwargs)
2235
def get_source_path(self):
2236
"""Return the path of the directory containing bzrlib."""
2237
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2239
def get_bzr_path(self):
2240
"""Return the path of the 'bzr' executable for this test suite."""
2241
bzr_path = os.path.join(self.get_source_path(), "bzr")
2242
if not os.path.isfile(bzr_path):
2243
# We are probably installed. Assume sys.argv is the right file
2244
bzr_path = sys.argv[0]
2247
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2248
universal_newlines=False, process_args=None):
2249
"""Finish the execution of process.
2251
:param process: the Popen object returned from start_bzr_subprocess.
2252
:param retcode: The status code that is expected. Defaults to 0. If
2253
None is supplied, the status code is not checked.
2254
:param send_signal: an optional signal to send to the process.
2255
:param universal_newlines: Convert CRLF => LF
2256
:returns: (stdout, stderr)
2258
if send_signal is not None:
2259
os.kill(process.pid, send_signal)
2260
out, err = process.communicate()
2262
if universal_newlines:
2263
out = out.replace('\r\n', '\n')
2264
err = err.replace('\r\n', '\n')
2266
if retcode is not None and retcode != process.returncode:
2267
if process_args is None:
2268
process_args = "(unknown args)"
2269
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2270
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2271
self.fail('Command bzr %s failed with retcode %s != %s'
2272
% (process_args, retcode, process.returncode))
2275
def check_tree_shape(self, tree, shape):
2276
"""Compare a tree to a list of expected names.
299
retcode = kwargs.pop('retcode', 0)
300
return self.run_bzr_captured(args, retcode)
302
def check_inventory_shape(self, inv, shape):
303
"""Compare an inventory to a list of expected names.
2278
305
Fail if they are not precisely equal.
2281
308
shape = list(shape) # copy
2282
for path, ie in tree.iter_entries_by_dir():
309
for path, ie in inv.entries():
2283
310
name = path.replace('\\', '/')
2284
if ie.kind == 'directory':
2285
312
name = name + '/'
2287
pass # ignore root entry
2289
314
shape.remove(name)
2291
316
extras.append(name)
2718
365
All test cases create their own directory within that. If the
2719
366
tests complete successfully, the directory is removed.
2721
:ivar test_base_dir: The path of the top-level directory for this
2722
test, which contains a home directory and a work directory.
2724
:ivar test_home_dir: An initially empty directory under test_base_dir
2725
which is used as $HOME for this test.
2727
:ivar test_dir: A directory under test_base_dir used as the current
2728
directory when the test proper is run.
368
InTempDir is an old alias for FunctionalTestCase.
2731
373
OVERRIDE_PYTHON = 'python'
2734
super(TestCaseInTempDir, self).setUp()
2735
# Remove the protection set in isolated_environ, we have a proper
2736
# access to disk resources now.
2737
self.overrideEnv('BZR_LOG', None)
2739
375
def check_file_contents(self, filename, expect):
2740
376
self.log("check contents of file %s" % filename)
377
contents = file(filename, 'r').read()
2746
378
if contents != expect:
2747
379
self.log("expected: %r" % expect)
2748
380
self.log("actually: %r" % contents)
2749
381
self.fail("contents of %s not as expected" % filename)
2751
def _getTestDirPrefix(self):
2752
# create a directory within the top level test directory
2753
if sys.platform in ('win32', 'cygwin'):
2754
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2755
# windows is likely to have path-length limits so use a short name
2756
name_prefix = name_prefix[-30:]
2758
name_prefix = re.sub('[/]', '_', self.id())
2761
def makeAndChdirToTestDir(self):
2762
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2764
For TestCaseInTempDir we create a temporary directory based on the test
2765
name and then create two subdirs - test and home under it.
2767
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2768
self._getTestDirPrefix())
2770
for i in range(100):
2771
if os.path.exists(name):
2772
name = name_prefix + '_' + str(i)
2774
# now create test and home directories within this dir
2775
self.test_base_dir = name
2776
self.addCleanup(self.deleteTestDir)
2777
os.mkdir(self.test_base_dir)
2779
self.permit_dir(self.test_base_dir)
2780
# 'sprouting' and 'init' of a branch both walk up the tree to find
2781
# stacking policy to honour; create a bzr dir with an unshared
2782
# repository (but not a branch - our code would be trying to escape
2783
# then!) to stop them, and permit it to be read.
2784
# control = bzrdir.BzrDir.create(self.test_base_dir)
2785
# control.create_repository()
2786
self.test_home_dir = self.test_base_dir + '/home'
2787
os.mkdir(self.test_home_dir)
2788
self.test_dir = self.test_base_dir + '/work'
383
def _make_test_root(self):
384
if TestCaseInTempDir.TEST_ROOT is not None:
388
root = 'test%04d.tmp' % i
392
if e.errno == errno.EEXIST:
397
# successfully created
398
TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
400
# make a fake bzr directory there to prevent any tests propagating
401
# up onto the source directory's real branch
402
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
405
self._make_test_root()
406
self._currentdir = os.getcwdu()
407
short_id = self.id().replace('bzrlib.selftest.', '') \
408
.replace('__main__.', '')
409
self.test_dir = os.path.join(self.TEST_ROOT, short_id)
2789
410
os.mkdir(self.test_dir)
2790
411
os.chdir(self.test_dir)
2791
# put name of test inside
2792
f = file(self.test_base_dir + '/name', 'w')
2798
def deleteTestDir(self):
2799
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2800
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2802
def build_tree(self, shape, line_endings='binary', transport=None):
412
super(TestCaseInTempDir, self).setUp()
415
os.chdir(self._currentdir)
416
super(TestCaseInTempDir, self).tearDown()
418
def build_tree(self, shape):
2803
419
"""Build a test tree according to a pattern.
2805
421
shape is a sequence of file specifications. If the final
2806
422
character is '/', a directory is created.
2808
This assumes that all the elements in the tree being built are new.
2810
424
This doesn't add anything to a branch.
2812
:type shape: list or tuple.
2813
:param line_endings: Either 'binary' or 'native'
2814
in binary mode, exact contents are written in native mode, the
2815
line endings match the default platform endings.
2816
:param transport: A transport to write to, for building trees on VFS's.
2817
If the transport is readonly or None, "." is opened automatically.
2820
if type(shape) not in (list, tuple):
2821
raise AssertionError("Parameter 'shape' should be "
2822
"a list or a tuple. Got %r instead" % (shape,))
2823
# It's OK to just create them using forward slashes on windows.
2824
if transport is None or transport.is_readonly():
2825
transport = _mod_transport.get_transport(".")
426
# XXX: It's OK to just create them using forward slashes on windows?
2826
427
for name in shape:
2827
self.assertIsInstance(name, basestring)
428
assert isinstance(name, basestring)
2828
429
if name[-1] == '/':
2829
transport.mkdir(urlutils.escape(name[:-1]))
2831
if line_endings == 'binary':
2833
elif line_endings == 'native':
2836
raise errors.BzrError(
2837
'Invalid line ending request %r' % line_endings)
2838
content = "contents of %s%s" % (name.encode('utf-8'), end)
2839
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2841
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2843
def assertInWorkingTree(self, path, root_path='.', tree=None):
2844
"""Assert whether path or paths are in the WorkingTree"""
2846
tree = workingtree.WorkingTree.open(root_path)
2847
if not isinstance(path, basestring):
2849
self.assertInWorkingTree(p, tree=tree)
2851
self.assertIsNot(tree.path2id(path), None,
2852
path+' not in working tree.')
2854
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2855
"""Assert whether path or paths are not in the WorkingTree"""
2857
tree = workingtree.WorkingTree.open(root_path)
2858
if not isinstance(path, basestring):
2860
self.assertNotInWorkingTree(p,tree=tree)
2862
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2865
class TestCaseWithTransport(TestCaseInTempDir):
2866
"""A test case that provides get_url and get_readonly_url facilities.
2868
These back onto two transport servers, one for readonly access and one for
2871
If no explicit class is provided for readonly access, a
2872
ReadonlyTransportDecorator is used instead which allows the use of non disk
2873
based read write transports.
2875
If an explicit class is provided for readonly access, that server and the
2876
readwrite one must both define get_url() as resolving to os.getcwd().
2879
def get_vfs_only_server(self):
2880
"""See TestCaseWithMemoryTransport.
2882
This is useful for some tests with specific servers that need
2885
if self.__vfs_server is None:
2886
self.__vfs_server = self.vfs_transport_factory()
2887
self.start_server(self.__vfs_server)
2888
return self.__vfs_server
2890
def make_branch_and_tree(self, relpath, format=None):
2891
"""Create a branch on the transport and a tree locally.
2893
If the transport is not a LocalTransport, the Tree can't be created on
2894
the transport. In that case if the vfs_transport_factory is
2895
LocalURLServer the working tree is created in the local
2896
directory backing the transport, and the returned tree's branch and
2897
repository will also be accessed locally. Otherwise a lightweight
2898
checkout is created and returned.
2900
We do this because we can't physically create a tree in the local
2901
path, with a branch reference to the transport_factory url, and
2902
a branch + repository in the vfs_transport, unless the vfs_transport
2903
namespace is distinct from the local disk - the two branch objects
2904
would collide. While we could construct a tree with its branch object
2905
pointing at the transport_factory transport in memory, reopening it
2906
would behaving unexpectedly, and has in the past caused testing bugs
2907
when we tried to do it that way.
2909
:param format: The BzrDirFormat.
2910
:returns: the WorkingTree.
2912
# TODO: always use the local disk path for the working tree,
2913
# this obviously requires a format that supports branch references
2914
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2916
b = self.make_branch(relpath, format=format)
2918
return b.bzrdir.create_workingtree()
2919
except errors.NotLocalUrl:
2920
# We can only make working trees locally at the moment. If the
2921
# transport can't support them, then we keep the non-disk-backed
2922
# branch and create a local checkout.
2923
if self.vfs_transport_factory is test_server.LocalURLServer:
2924
# the branch is colocated on disk, we cannot create a checkout.
2925
# hopefully callers will expect this.
2926
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2927
wt = local_controldir.create_workingtree()
2928
if wt.branch._format != b._format:
2930
# Make sure that assigning to wt._branch fixes wt.branch,
2931
# in case the implementation details of workingtree objects
2933
self.assertIs(b, wt.branch)
2936
return b.create_checkout(relpath, lightweight=True)
2938
def assertIsDirectory(self, relpath, transport):
2939
"""Assert that relpath within transport is a directory.
2941
This may not be possible on all transports; in that case it propagates
2942
a TransportNotPossible.
2945
mode = transport.stat(relpath).st_mode
2946
except errors.NoSuchFile:
2947
self.fail("path %s is not a directory; no such file"
2949
if not stat.S_ISDIR(mode):
2950
self.fail("path %s is not a directory; has mode %#o"
2953
def assertTreesEqual(self, left, right):
2954
"""Check that left and right have the same content and properties."""
2955
# we use a tree delta to check for equality of the content, and we
2956
# manually check for equality of other things such as the parents list.
2957
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2958
differences = left.changes_from(right)
2959
self.assertFalse(differences.has_changed(),
2960
"Trees %r and %r are different: %r" % (left, right, differences))
2963
super(TestCaseWithTransport, self).setUp()
2964
self.__vfs_server = None
2966
def disable_missing_extensions_warning(self):
2967
"""Some tests expect a precise stderr content.
2969
There is no point in forcing them to duplicate the extension related
2972
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2975
class ChrootedTestCase(TestCaseWithTransport):
2976
"""A support class that provides readonly urls outside the local namespace.
2978
This is done by checking if self.transport_server is a MemoryServer. if it
2979
is then we are chrooted already, if it is not then an HttpServer is used
2982
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2983
be used without needed to redo it when a different
2984
subclass is in use ?
2988
from bzrlib.tests import http_server
2989
super(ChrootedTestCase, self).setUp()
2990
if not self.vfs_transport_factory == memory.MemoryServer:
2991
self.transport_readonly_server = http_server.HttpServer
2994
def condition_id_re(pattern):
2995
"""Create a condition filter which performs a re check on a test's id.
2997
:param pattern: A regular expression string.
2998
:return: A callable that returns True if the re matches.
3000
filter_re = re.compile(pattern, 0)
3001
def condition(test):
3003
return filter_re.search(test_id)
3007
def condition_isinstance(klass_or_klass_list):
3008
"""Create a condition filter which returns isinstance(param, klass).
3010
:return: A callable which when called with one parameter obj return the
3011
result of isinstance(obj, klass_or_klass_list).
3014
return isinstance(obj, klass_or_klass_list)
3018
def condition_id_in_list(id_list):
3019
"""Create a condition filter which verify that test's id in a list.
3021
:param id_list: A TestIdList object.
3022
:return: A callable that returns True if the test's id appears in the list.
3024
def condition(test):
3025
return id_list.includes(test.id())
3029
def condition_id_startswith(starts):
3030
"""Create a condition filter verifying that test's id starts with a string.
3032
:param starts: A list of string.
3033
:return: A callable that returns True if the test's id starts with one of
3036
def condition(test):
3037
for start in starts:
3038
if test.id().startswith(start):
3044
def exclude_tests_by_condition(suite, condition):
3045
"""Create a test suite which excludes some tests from suite.
3047
:param suite: The suite to get tests from.
3048
:param condition: A callable whose result evaluates True when called with a
3049
test case which should be excluded from the result.
3050
:return: A suite which contains the tests found in suite that fail
3054
for test in iter_suite_tests(suite):
3055
if not condition(test):
3057
return TestUtil.TestSuite(result)
3060
def filter_suite_by_condition(suite, condition):
3061
"""Create a test suite by filtering another one.
3063
:param suite: The source suite.
3064
:param condition: A callable whose result evaluates True when called with a
3065
test case which should be included in the result.
3066
:return: A suite which contains the tests found in suite that pass
3070
for test in iter_suite_tests(suite):
3073
return TestUtil.TestSuite(result)
433
print >>f, "contents of", name
436
def failUnlessExists(self, path):
437
"""Fail unless path, which may be abs or relative, exists."""
438
self.failUnless(osutils.lexists(path))
441
class MetaTestLog(TestCase):
442
def test_logging(self):
443
"""Test logs are captured when a test fails."""
444
logging.info('an info message')
445
warning('something looks dodgy...')
446
logging.debug('hello, test is running')
3076
450
def filter_suite_by_re(suite, pattern):
3077
"""Create a test suite by filtering another one.
3079
:param suite: the source suite
3080
:param pattern: pattern that names must match
3081
:returns: the newly created suite
3083
condition = condition_id_re(pattern)
3084
result_suite = filter_suite_by_condition(suite, condition)
3088
def filter_suite_by_id_list(suite, test_id_list):
3089
"""Create a test suite by filtering another one.
3091
:param suite: The source suite.
3092
:param test_id_list: A list of the test ids to keep as strings.
3093
:returns: the newly created suite
3095
condition = condition_id_in_list(test_id_list)
3096
result_suite = filter_suite_by_condition(suite, condition)
3100
def filter_suite_by_id_startswith(suite, start):
3101
"""Create a test suite by filtering another one.
3103
:param suite: The source suite.
3104
:param start: A list of string the test id must start with one of.
3105
:returns: the newly created suite
3107
condition = condition_id_startswith(start)
3108
result_suite = filter_suite_by_condition(suite, condition)
3112
def exclude_tests_by_re(suite, pattern):
3113
"""Create a test suite which excludes some tests from suite.
3115
:param suite: The suite to get tests from.
3116
:param pattern: A regular expression string. Test ids that match this
3117
pattern will be excluded from the result.
3118
:return: A TestSuite that contains all the tests from suite without the
3119
tests that matched pattern. The order of tests is the same as it was in
3122
return exclude_tests_by_condition(suite, condition_id_re(pattern))
3125
def preserve_input(something):
3126
"""A helper for performing test suite transformation chains.
3128
:param something: Anything you want to preserve.
3134
def randomize_suite(suite):
3135
"""Return a new TestSuite with suite's tests in random order.
3137
The tests in the input suite are flattened into a single suite in order to
3138
accomplish this. Any nested TestSuites are removed to provide global
3141
tests = list(iter_suite_tests(suite))
3142
random.shuffle(tests)
3143
return TestUtil.TestSuite(tests)
3146
def split_suite_by_condition(suite, condition):
3147
"""Split a test suite into two by a condition.
3149
:param suite: The suite to split.
3150
:param condition: The condition to match on. Tests that match this
3151
condition are returned in the first test suite, ones that do not match
3152
are in the second suite.
3153
:return: A tuple of two test suites, where the first contains tests from
3154
suite matching the condition, and the second contains the remainder
3155
from suite. The order within each output suite is the same as it was in
451
result = TestUtil.TestSuite()
452
filter_re = re.compile(pattern)
3160
453
for test in iter_suite_tests(suite):
3162
matched.append(test)
3164
did_not_match.append(test)
3165
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3168
def split_suite_by_re(suite, pattern):
3169
"""Split a test suite into two by a regular expression.
3171
:param suite: The suite to split.
3172
:param pattern: A regular expression string. Test ids that match this
3173
pattern will be in the first test suite returned, and the others in the
3174
second test suite returned.
3175
:return: A tuple of two test suites, where the first contains tests from
3176
suite matching pattern, and the second contains the remainder from
3177
suite. The order within each output suite is the same as it was in
3180
return split_suite_by_condition(suite, condition_id_re(pattern))
3183
def run_suite(suite, name='test', verbose=False, pattern=".*",
3184
stop_on_failure=False,
3185
transport=None, lsprof_timed=None, bench_history=None,
3186
matching_tests_first=None,
3189
exclude_pattern=None,
3192
suite_decorators=None,
3194
result_decorators=None,
3196
"""Run a test suite for bzr selftest.
3198
:param runner_class: The class of runner to use. Must support the
3199
constructor arguments passed by run_suite which are more than standard
3201
:return: A boolean indicating success.
3203
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
454
if filter_re.search(test.id()):
459
def run_suite(suite, name='test', verbose=False, pattern=".*"):
460
TestCaseInTempDir._TEST_NAME = name
3208
if runner_class is None:
3209
runner_class = TextTestRunner
3212
runner = runner_class(stream=stream,
465
runner = TextTestRunner(stream=sys.stdout,
3214
verbosity=verbosity,
3215
bench_history=bench_history,
3217
result_decorators=result_decorators,
3219
runner.stop_on_failure=stop_on_failure
3220
# built in decorator factories:
3222
random_order(random_seed, runner),
3223
exclude_tests(exclude_pattern),
3225
if matching_tests_first:
3226
decorators.append(tests_first(pattern))
3228
decorators.append(filter_tests(pattern))
3229
if suite_decorators:
3230
decorators.extend(suite_decorators)
3231
# tell the result object how many tests will be running: (except if
3232
# --parallel=fork is being used. Robert said he will provide a better
3233
# progress design later -- vila 20090817)
3234
if fork_decorator not in decorators:
3235
decorators.append(CountingDecorator)
3236
for decorator in decorators:
3237
suite = decorator(suite)
3239
# Done after test suite decoration to allow randomisation etc
3240
# to take effect, though that is of marginal benefit.
3242
stream.write("Listing tests only ...\n")
3243
for t in iter_suite_tests(suite):
3244
stream.write("%s\n" % (t.id()))
469
suite = filter_suite_by_re(suite, pattern)
3246
470
result = runner.run(suite)
3248
return result.wasStrictlySuccessful()
471
# This is still a little bogus,
472
# but only a little. Folk not using our testrunner will
473
# have to delete their temp directories themselves.
474
if result.wasSuccessful():
475
if TestCaseInTempDir.TEST_ROOT is not None:
476
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
3250
return result.wasSuccessful()
3253
# A registry where get() returns a suite decorator.
3254
parallel_registry = registry.Registry()
3257
def fork_decorator(suite):
3258
if getattr(os, "fork", None) is None:
3259
raise errors.BzrCommandError("platform does not support fork,"
3260
" try --parallel=subprocess instead.")
3261
concurrency = osutils.local_concurrency()
3262
if concurrency == 1:
3264
from testtools import ConcurrentTestSuite
3265
return ConcurrentTestSuite(suite, fork_for_tests)
3266
parallel_registry.register('fork', fork_decorator)
3269
def subprocess_decorator(suite):
3270
concurrency = osutils.local_concurrency()
3271
if concurrency == 1:
3273
from testtools import ConcurrentTestSuite
3274
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3275
parallel_registry.register('subprocess', subprocess_decorator)
3278
def exclude_tests(exclude_pattern):
3279
"""Return a test suite decorator that excludes tests."""
3280
if exclude_pattern is None:
3281
return identity_decorator
3282
def decorator(suite):
3283
return ExcludeDecorator(suite, exclude_pattern)
3287
def filter_tests(pattern):
3289
return identity_decorator
3290
def decorator(suite):
3291
return FilterTestsDecorator(suite, pattern)
3295
def random_order(random_seed, runner):
3296
"""Return a test suite decorator factory for randomising tests order.
3298
:param random_seed: now, a string which casts to a long, or a long.
3299
:param runner: A test runner with a stream attribute to report on.
3301
if random_seed is None:
3302
return identity_decorator
3303
def decorator(suite):
3304
return RandomDecorator(suite, random_seed, runner.stream)
3308
def tests_first(pattern):
3310
return identity_decorator
3311
def decorator(suite):
3312
return TestFirstDecorator(suite, pattern)
3316
def identity_decorator(suite):
3321
class TestDecorator(TestUtil.TestSuite):
3322
"""A decorator for TestCase/TestSuite objects.
3324
Usually, subclasses should override __iter__(used when flattening test
3325
suites), which we do to filter, reorder, parallelise and so on, run() and
3329
def __init__(self, suite):
3330
TestUtil.TestSuite.__init__(self)
3333
def countTestCases(self):
3336
cases += test.countTestCases()
3343
def run(self, result):
3344
# Use iteration on self, not self._tests, to allow subclasses to hook
3347
if result.shouldStop:
3353
class CountingDecorator(TestDecorator):
3354
"""A decorator which calls result.progress(self.countTestCases)."""
3356
def run(self, result):
3357
progress_method = getattr(result, 'progress', None)
3358
if callable(progress_method):
3359
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3360
return super(CountingDecorator, self).run(result)
3363
class ExcludeDecorator(TestDecorator):
3364
"""A decorator which excludes test matching an exclude pattern."""
3366
def __init__(self, suite, exclude_pattern):
3367
TestDecorator.__init__(self, suite)
3368
self.exclude_pattern = exclude_pattern
3369
self.excluded = False
3373
return iter(self._tests)
3374
self.excluded = True
3375
suite = exclude_tests_by_re(self, self.exclude_pattern)
3377
self.addTests(suite)
3378
return iter(self._tests)
3381
class FilterTestsDecorator(TestDecorator):
3382
"""A decorator which filters tests to those matching a pattern."""
3384
def __init__(self, suite, pattern):
3385
TestDecorator.__init__(self, suite)
3386
self.pattern = pattern
3387
self.filtered = False
3391
return iter(self._tests)
3392
self.filtered = True
3393
suite = filter_suite_by_re(self, self.pattern)
3395
self.addTests(suite)
3396
return iter(self._tests)
3399
class RandomDecorator(TestDecorator):
3400
"""A decorator which randomises the order of its tests."""
3402
def __init__(self, suite, random_seed, stream):
3403
TestDecorator.__init__(self, suite)
3404
self.random_seed = random_seed
3405
self.randomised = False
3406
self.stream = stream
3410
return iter(self._tests)
3411
self.randomised = True
3412
self.stream.write("Randomizing test order using seed %s\n\n" %
3413
(self.actual_seed()))
3414
# Initialise the random number generator.
3415
random.seed(self.actual_seed())
3416
suite = randomize_suite(self)
3418
self.addTests(suite)
3419
return iter(self._tests)
3421
def actual_seed(self):
3422
if self.random_seed == "now":
3423
# We convert the seed to a long to make it reuseable across
3424
# invocations (because the user can reenter it).
3425
self.random_seed = long(time.time())
3427
# Convert the seed to a long if we can
3429
self.random_seed = long(self.random_seed)
3432
return self.random_seed
3435
class TestFirstDecorator(TestDecorator):
3436
"""A decorator which moves named tests to the front."""
3438
def __init__(self, suite, pattern):
3439
TestDecorator.__init__(self, suite)
3440
self.pattern = pattern
3441
self.filtered = False
3445
return iter(self._tests)
3446
self.filtered = True
3447
suites = split_suite_by_re(self, self.pattern)
3449
self.addTests(suites)
3450
return iter(self._tests)
3453
def partition_tests(suite, count):
3454
"""Partition suite into count lists of tests."""
3455
# This just assigns tests in a round-robin fashion. On one hand this
3456
# splits up blocks of related tests that might run faster if they shared
3457
# resources, but on the other it avoids assigning blocks of slow tests to
3458
# just one partition. So the slowest partition shouldn't be much slower
3460
partitions = [list() for i in range(count)]
3461
tests = iter_suite_tests(suite)
3462
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3463
partition.append(test)
3467
def workaround_zealous_crypto_random():
3468
"""Crypto.Random want to help us being secure, but we don't care here.
3470
This workaround some test failure related to the sftp server. Once paramiko
3471
stop using the controversial API in Crypto.Random, we may get rid of it.
3474
from Crypto.Random import atfork
3480
def fork_for_tests(suite):
3481
"""Take suite and start up one runner per CPU by forking()
3483
:return: An iterable of TestCase-like objects which can each have
3484
run(result) called on them to feed tests to result.
3486
concurrency = osutils.local_concurrency()
3488
from subunit import TestProtocolClient, ProtocolTestCase
3489
from subunit.test_results import AutoTimingTestResultDecorator
3490
class TestInOtherProcess(ProtocolTestCase):
3491
# Should be in subunit, I think. RBC.
3492
def __init__(self, stream, pid):
3493
ProtocolTestCase.__init__(self, stream)
3496
def run(self, result):
3498
ProtocolTestCase.run(self, result)
3500
os.waitpid(self.pid, 0)
3502
test_blocks = partition_tests(suite, concurrency)
3503
for process_tests in test_blocks:
3504
process_suite = TestUtil.TestSuite()
3505
process_suite.addTests(process_tests)
3506
c2pread, c2pwrite = os.pipe()
3509
workaround_zealous_crypto_random()
3512
# Leave stderr and stdout open so we can see test noise
3513
# Close stdin so that the child goes away if it decides to
3514
# read from stdin (otherwise its a roulette to see what
3515
# child actually gets keystrokes for pdb etc).
3518
stream = os.fdopen(c2pwrite, 'wb', 1)
3519
subunit_result = AutoTimingTestResultDecorator(
3520
TestProtocolClient(stream))
3521
process_suite.run(subunit_result)
3526
stream = os.fdopen(c2pread, 'rb', 1)
3527
test = TestInOtherProcess(stream, pid)
3532
def reinvoke_for_tests(suite):
3533
"""Take suite and start up one runner per CPU using subprocess().
3535
:return: An iterable of TestCase-like objects which can each have
3536
run(result) called on them to feed tests to result.
3538
concurrency = osutils.local_concurrency()
3540
from subunit import ProtocolTestCase
3541
class TestInSubprocess(ProtocolTestCase):
3542
def __init__(self, process, name):
3543
ProtocolTestCase.__init__(self, process.stdout)
3544
self.process = process
3545
self.process.stdin.close()
3548
def run(self, result):
3550
ProtocolTestCase.run(self, result)
3553
os.unlink(self.name)
3554
# print "pid %d finished" % finished_process
3555
test_blocks = partition_tests(suite, concurrency)
3556
for process_tests in test_blocks:
3557
# ugly; currently reimplement rather than reuses TestCase methods.
3558
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3559
if not os.path.isfile(bzr_path):
3560
# We are probably installed. Assume sys.argv is the right file
3561
bzr_path = sys.argv[0]
3562
bzr_path = [bzr_path]
3563
if sys.platform == "win32":
3564
# if we're on windows, we can't execute the bzr script directly
3565
bzr_path = [sys.executable] + bzr_path
3566
fd, test_list_file_name = tempfile.mkstemp()
3567
test_list_file = os.fdopen(fd, 'wb', 1)
3568
for test in process_tests:
3569
test_list_file.write(test.id() + '\n')
3570
test_list_file.close()
3572
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3574
if '--no-plugins' in sys.argv:
3575
argv.append('--no-plugins')
3576
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3577
# noise on stderr it can interrupt the subunit protocol.
3578
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3579
stdout=subprocess.PIPE,
3580
stderr=subprocess.PIPE,
3582
test = TestInSubprocess(process, test_list_file_name)
3585
os.unlink(test_list_file_name)
3590
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3591
"""Generate profiling data for all activity between start and success.
3593
The profile data is appended to the test's _benchcalls attribute and can
3594
be accessed by the forwarded-to TestResult.
3596
While it might be cleaner do accumulate this in stopTest, addSuccess is
3597
where our existing output support for lsprof is, and this class aims to
3598
fit in with that: while it could be moved it's not necessary to accomplish
3599
test profiling, nor would it be dramatically cleaner.
3602
def startTest(self, test):
3603
self.profiler = bzrlib.lsprof.BzrProfiler()
3604
# Prevent deadlocks in tests that use lsprof: those tests will
3606
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3607
self.profiler.start()
3608
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3610
def addSuccess(self, test):
3611
stats = self.profiler.stop()
3613
calls = test._benchcalls
3614
except AttributeError:
3615
test._benchcalls = []
3616
calls = test._benchcalls
3617
calls.append(((test.id(), "", ""), stats))
3618
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3620
def stopTest(self, test):
3621
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3622
self.profiler = None
3625
# Controlled by "bzr selftest -E=..." option
3626
# Currently supported:
3627
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3628
# preserves any flags supplied at the command line.
3629
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3630
# rather than failing tests. And no longer raise
3631
# LockContention when fctnl locks are not being used
3632
# with proper exclusion rules.
3633
# -Ethreads Will display thread ident at creation/join time to
3634
# help track thread leaks
3636
# -Econfig_stats Will collect statistics using addDetail
3637
selftest_debug_flags = set()
3640
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3642
test_suite_factory=None,
3645
matching_tests_first=None,
3648
exclude_pattern=None,
3654
suite_decorators=None,
478
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
479
return result.wasSuccessful()
482
def selftest(verbose=False, pattern=".*"):
3658
483
"""Run the whole test suite under the enhanced runner"""
3659
# XXX: Very ugly way to do this...
3660
# Disable warning about old formats because we don't want it to disturb
3661
# any blackbox tests.
3662
from bzrlib import repository
3663
repository._deprecation_warning_done = True
3665
global default_transport
3666
if transport is None:
3667
transport = default_transport
3668
old_transport = default_transport
3669
default_transport = transport
3670
global selftest_debug_flags
3671
old_debug_flags = selftest_debug_flags
3672
if debug_flags is not None:
3673
selftest_debug_flags = set(debug_flags)
3675
if load_list is None:
3678
keep_only = load_test_id_list(load_list)
3680
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3681
for start in starting_with]
3682
if test_suite_factory is None:
3683
# Reduce loading time by loading modules based on the starting_with
3685
suite = test_suite(keep_only, starting_with)
3687
suite = test_suite_factory()
3689
# But always filter as requested.
3690
suite = filter_suite_by_id_startswith(suite, starting_with)
3691
result_decorators = []
3693
result_decorators.append(ProfileResult)
3694
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3695
stop_on_failure=stop_on_failure,
3696
transport=transport,
3697
lsprof_timed=lsprof_timed,
3698
bench_history=bench_history,
3699
matching_tests_first=matching_tests_first,
3700
list_only=list_only,
3701
random_seed=random_seed,
3702
exclude_pattern=exclude_pattern,
3704
runner_class=runner_class,
3705
suite_decorators=suite_decorators,
3707
result_decorators=result_decorators,
3710
default_transport = old_transport
3711
selftest_debug_flags = old_debug_flags
3714
def load_test_id_list(file_name):
3715
"""Load a test id list from a text file.
3717
The format is one test id by line. No special care is taken to impose
3718
strict rules, these test ids are used to filter the test suite so a test id
3719
that do not match an existing test will do no harm. This allows user to add
3720
comments, leave blank lines, etc.
3724
ftest = open(file_name, 'rt')
3726
if e.errno != errno.ENOENT:
3729
raise errors.NoSuchFile(file_name)
3731
for test_name in ftest.readlines():
3732
test_list.append(test_name.strip())
3737
def suite_matches_id_list(test_suite, id_list):
3738
"""Warns about tests not appearing or appearing more than once.
3740
:param test_suite: A TestSuite object.
3741
:param test_id_list: The list of test ids that should be found in
3744
:return: (absents, duplicates) absents is a list containing the test found
3745
in id_list but not in test_suite, duplicates is a list containing the
3746
test found multiple times in test_suite.
3748
When using a prefined test id list, it may occurs that some tests do not
3749
exist anymore or that some tests use the same id. This function warns the
3750
tester about potential problems in his workflow (test lists are volatile)
3751
or in the test suite itself (using the same id for several tests does not
3752
help to localize defects).
3754
# Build a dict counting id occurrences
3756
for test in iter_suite_tests(test_suite):
3758
tests[id] = tests.get(id, 0) + 1
3763
occurs = tests.get(id, 0)
3765
not_found.append(id)
3767
duplicates.append(id)
3769
return not_found, duplicates
3772
class TestIdList(object):
3773
"""Test id list to filter a test suite.
3775
Relying on the assumption that test ids are built as:
3776
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3777
notation, this class offers methods to :
3778
- avoid building a test suite for modules not refered to in the test list,
3779
- keep only the tests listed from the module test suite.
3782
def __init__(self, test_id_list):
3783
# When a test suite needs to be filtered against us we compare test ids
3784
# for equality, so a simple dict offers a quick and simple solution.
3785
self.tests = dict().fromkeys(test_id_list, True)
3787
# While unittest.TestCase have ids like:
3788
# <module>.<class>.<method>[(<param+)],
3789
# doctest.DocTestCase can have ids like:
3792
# <module>.<function>
3793
# <module>.<class>.<method>
3795
# Since we can't predict a test class from its name only, we settle on
3796
# a simple constraint: a test id always begins with its module name.
3799
for test_id in test_id_list:
3800
parts = test_id.split('.')
3801
mod_name = parts.pop(0)
3802
modules[mod_name] = True
3804
mod_name += '.' + part
3805
modules[mod_name] = True
3806
self.modules = modules
3808
def refers_to(self, module_name):
3809
"""Is there tests for the module or one of its sub modules."""
3810
return self.modules.has_key(module_name)
3812
def includes(self, test_id):
3813
return self.tests.has_key(test_id)
3816
class TestPrefixAliasRegistry(registry.Registry):
3817
"""A registry for test prefix aliases.
3819
This helps implement shorcuts for the --starting-with selftest
3820
option. Overriding existing prefixes is not allowed but not fatal (a
3821
warning will be emitted).
3824
def register(self, key, obj, help=None, info=None,
3825
override_existing=False):
3826
"""See Registry.register.
3828
Trying to override an existing alias causes a warning to be emitted,
3829
not a fatal execption.
3832
super(TestPrefixAliasRegistry, self).register(
3833
key, obj, help=help, info=info, override_existing=False)
3835
actual = self.get(key)
3837
'Test prefix alias %s is already used for %s, ignoring %s'
3838
% (key, actual, obj))
3840
def resolve_alias(self, id_start):
3841
"""Replace the alias by the prefix in the given string.
3843
Using an unknown prefix is an error to help catching typos.
3845
parts = id_start.split('.')
3847
parts[0] = self.get(parts[0])
3849
raise errors.BzrCommandError(
3850
'%s is not a known test prefix alias' % parts[0])
3851
return '.'.join(parts)
3854
test_prefix_alias_registry = TestPrefixAliasRegistry()
3855
"""Registry of test prefix aliases."""
3858
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3859
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3860
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3862
# Obvious highest levels prefixes, feel free to add your own via a plugin
3863
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3864
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3865
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3866
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3867
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3870
def _test_suite_testmod_names():
3871
"""Return the standard list of test module names to test."""
3874
'bzrlib.tests.blackbox',
3875
'bzrlib.tests.commands',
3876
'bzrlib.tests.doc_generate',
3877
'bzrlib.tests.per_branch',
3878
'bzrlib.tests.per_bzrdir',
3879
'bzrlib.tests.per_controldir',
3880
'bzrlib.tests.per_controldir_colo',
3881
'bzrlib.tests.per_foreign_vcs',
3882
'bzrlib.tests.per_interrepository',
3883
'bzrlib.tests.per_intertree',
3884
'bzrlib.tests.per_inventory',
3885
'bzrlib.tests.per_interbranch',
3886
'bzrlib.tests.per_lock',
3887
'bzrlib.tests.per_merger',
3888
'bzrlib.tests.per_transport',
3889
'bzrlib.tests.per_tree',
3890
'bzrlib.tests.per_pack_repository',
3891
'bzrlib.tests.per_repository',
3892
'bzrlib.tests.per_repository_chk',
3893
'bzrlib.tests.per_repository_reference',
3894
'bzrlib.tests.per_repository_vf',
3895
'bzrlib.tests.per_uifactory',
3896
'bzrlib.tests.per_versionedfile',
3897
'bzrlib.tests.per_workingtree',
3898
'bzrlib.tests.test__annotator',
3899
'bzrlib.tests.test__bencode',
3900
'bzrlib.tests.test__btree_serializer',
3901
'bzrlib.tests.test__chk_map',
3902
'bzrlib.tests.test__dirstate_helpers',
3903
'bzrlib.tests.test__groupcompress',
3904
'bzrlib.tests.test__known_graph',
3905
'bzrlib.tests.test__rio',
3906
'bzrlib.tests.test__simple_set',
3907
'bzrlib.tests.test__static_tuple',
3908
'bzrlib.tests.test__walkdirs_win32',
3909
'bzrlib.tests.test_ancestry',
3910
'bzrlib.tests.test_annotate',
3911
'bzrlib.tests.test_api',
3912
'bzrlib.tests.test_atomicfile',
3913
'bzrlib.tests.test_bad_files',
3914
'bzrlib.tests.test_bisect_multi',
3915
'bzrlib.tests.test_branch',
3916
'bzrlib.tests.test_branchbuilder',
3917
'bzrlib.tests.test_btree_index',
3918
'bzrlib.tests.test_bugtracker',
3919
'bzrlib.tests.test_bundle',
3920
'bzrlib.tests.test_bzrdir',
3921
'bzrlib.tests.test__chunks_to_lines',
3922
'bzrlib.tests.test_cache_utf8',
3923
'bzrlib.tests.test_chk_map',
3924
'bzrlib.tests.test_chk_serializer',
3925
'bzrlib.tests.test_chunk_writer',
3926
'bzrlib.tests.test_clean_tree',
3927
'bzrlib.tests.test_cleanup',
3928
'bzrlib.tests.test_cmdline',
3929
'bzrlib.tests.test_commands',
3930
'bzrlib.tests.test_commit',
3931
'bzrlib.tests.test_commit_merge',
3932
'bzrlib.tests.test_config',
3933
'bzrlib.tests.test_conflicts',
3934
'bzrlib.tests.test_controldir',
3935
'bzrlib.tests.test_counted_lock',
3936
'bzrlib.tests.test_crash',
3937
'bzrlib.tests.test_decorators',
3938
'bzrlib.tests.test_delta',
3939
'bzrlib.tests.test_debug',
3940
'bzrlib.tests.test_diff',
3941
'bzrlib.tests.test_directory_service',
3942
'bzrlib.tests.test_dirstate',
3943
'bzrlib.tests.test_email_message',
3944
'bzrlib.tests.test_eol_filters',
3945
'bzrlib.tests.test_errors',
3946
'bzrlib.tests.test_export',
3947
'bzrlib.tests.test_export_pot',
3948
'bzrlib.tests.test_extract',
3949
'bzrlib.tests.test_features',
3950
'bzrlib.tests.test_fetch',
3951
'bzrlib.tests.test_fixtures',
3952
'bzrlib.tests.test_fifo_cache',
3953
'bzrlib.tests.test_filters',
3954
'bzrlib.tests.test_filter_tree',
3955
'bzrlib.tests.test_ftp_transport',
3956
'bzrlib.tests.test_foreign',
3957
'bzrlib.tests.test_generate_docs',
3958
'bzrlib.tests.test_generate_ids',
3959
'bzrlib.tests.test_globbing',
3960
'bzrlib.tests.test_gpg',
3961
'bzrlib.tests.test_graph',
3962
'bzrlib.tests.test_groupcompress',
3963
'bzrlib.tests.test_hashcache',
3964
'bzrlib.tests.test_help',
3965
'bzrlib.tests.test_hooks',
3966
'bzrlib.tests.test_http',
3967
'bzrlib.tests.test_http_response',
3968
'bzrlib.tests.test_https_ca_bundle',
3969
'bzrlib.tests.test_i18n',
3970
'bzrlib.tests.test_identitymap',
3971
'bzrlib.tests.test_ignores',
3972
'bzrlib.tests.test_index',
3973
'bzrlib.tests.test_import_tariff',
3974
'bzrlib.tests.test_info',
3975
'bzrlib.tests.test_inv',
3976
'bzrlib.tests.test_inventory_delta',
3977
'bzrlib.tests.test_knit',
3978
'bzrlib.tests.test_lazy_import',
3979
'bzrlib.tests.test_lazy_regex',
3980
'bzrlib.tests.test_library_state',
3981
'bzrlib.tests.test_lock',
3982
'bzrlib.tests.test_lockable_files',
3983
'bzrlib.tests.test_lockdir',
3984
'bzrlib.tests.test_log',
3985
'bzrlib.tests.test_lru_cache',
3986
'bzrlib.tests.test_lsprof',
3987
'bzrlib.tests.test_mail_client',
3988
'bzrlib.tests.test_matchers',
3989
'bzrlib.tests.test_memorytree',
3990
'bzrlib.tests.test_merge',
3991
'bzrlib.tests.test_merge3',
3992
'bzrlib.tests.test_merge_core',
3993
'bzrlib.tests.test_merge_directive',
3994
'bzrlib.tests.test_mergetools',
3995
'bzrlib.tests.test_missing',
3996
'bzrlib.tests.test_msgeditor',
3997
'bzrlib.tests.test_multiparent',
3998
'bzrlib.tests.test_mutabletree',
3999
'bzrlib.tests.test_nonascii',
4000
'bzrlib.tests.test_options',
4001
'bzrlib.tests.test_osutils',
4002
'bzrlib.tests.test_osutils_encodings',
4003
'bzrlib.tests.test_pack',
4004
'bzrlib.tests.test_patch',
4005
'bzrlib.tests.test_patches',
4006
'bzrlib.tests.test_permissions',
4007
'bzrlib.tests.test_plugins',
4008
'bzrlib.tests.test_progress',
4009
'bzrlib.tests.test_pyutils',
4010
'bzrlib.tests.test_read_bundle',
4011
'bzrlib.tests.test_reconcile',
4012
'bzrlib.tests.test_reconfigure',
4013
'bzrlib.tests.test_registry',
4014
'bzrlib.tests.test_remote',
4015
'bzrlib.tests.test_rename_map',
4016
'bzrlib.tests.test_repository',
4017
'bzrlib.tests.test_revert',
4018
'bzrlib.tests.test_revision',
4019
'bzrlib.tests.test_revisionspec',
4020
'bzrlib.tests.test_revisiontree',
4021
'bzrlib.tests.test_rio',
4022
'bzrlib.tests.test_rules',
4023
'bzrlib.tests.test_sampler',
4024
'bzrlib.tests.test_scenarios',
4025
'bzrlib.tests.test_script',
4026
'bzrlib.tests.test_selftest',
4027
'bzrlib.tests.test_serializer',
4028
'bzrlib.tests.test_setup',
4029
'bzrlib.tests.test_sftp_transport',
4030
'bzrlib.tests.test_shelf',
4031
'bzrlib.tests.test_shelf_ui',
4032
'bzrlib.tests.test_smart',
4033
'bzrlib.tests.test_smart_add',
4034
'bzrlib.tests.test_smart_request',
4035
'bzrlib.tests.test_smart_transport',
4036
'bzrlib.tests.test_smtp_connection',
4037
'bzrlib.tests.test_source',
4038
'bzrlib.tests.test_ssh_transport',
4039
'bzrlib.tests.test_status',
4040
'bzrlib.tests.test_store',
4041
'bzrlib.tests.test_strace',
4042
'bzrlib.tests.test_subsume',
4043
'bzrlib.tests.test_switch',
4044
'bzrlib.tests.test_symbol_versioning',
4045
'bzrlib.tests.test_tag',
4046
'bzrlib.tests.test_test_server',
4047
'bzrlib.tests.test_testament',
4048
'bzrlib.tests.test_textfile',
4049
'bzrlib.tests.test_textmerge',
4050
'bzrlib.tests.test_cethread',
4051
'bzrlib.tests.test_timestamp',
4052
'bzrlib.tests.test_trace',
4053
'bzrlib.tests.test_transactions',
4054
'bzrlib.tests.test_transform',
4055
'bzrlib.tests.test_transport',
4056
'bzrlib.tests.test_transport_log',
4057
'bzrlib.tests.test_tree',
4058
'bzrlib.tests.test_treebuilder',
4059
'bzrlib.tests.test_treeshape',
4060
'bzrlib.tests.test_tsort',
4061
'bzrlib.tests.test_tuned_gzip',
4062
'bzrlib.tests.test_ui',
4063
'bzrlib.tests.test_uncommit',
4064
'bzrlib.tests.test_upgrade',
4065
'bzrlib.tests.test_upgrade_stacked',
4066
'bzrlib.tests.test_urlutils',
4067
'bzrlib.tests.test_utextwrap',
4068
'bzrlib.tests.test_version',
4069
'bzrlib.tests.test_version_info',
4070
'bzrlib.tests.test_versionedfile',
4071
'bzrlib.tests.test_weave',
4072
'bzrlib.tests.test_whitebox',
4073
'bzrlib.tests.test_win32utils',
4074
'bzrlib.tests.test_workingtree',
4075
'bzrlib.tests.test_workingtree_4',
4076
'bzrlib.tests.test_wsgi',
4077
'bzrlib.tests.test_xml',
4081
def _test_suite_modules_to_doctest():
4082
"""Return the list of modules to doctest."""
4084
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
4088
'bzrlib.branchbuilder',
4089
'bzrlib.decorators',
4091
'bzrlib.iterablefile',
4096
'bzrlib.symbol_versioning',
4098
'bzrlib.tests.fixtures',
4100
'bzrlib.transport.http',
4101
'bzrlib.version_info_formats.format_custom',
4105
def test_suite(keep_only=None, starting_with=None):
4106
"""Build and return TestSuite for the whole of bzrlib.
4108
:param keep_only: A list of test ids limiting the suite returned.
4110
:param starting_with: An id limiting the suite returned to the tests
4113
This function can be replaced if you need to change the default test
4114
suite on a global basis, but it is not encouraged.
4117
loader = TestUtil.TestLoader()
4119
if keep_only is not None:
4120
id_filter = TestIdList(keep_only)
4122
# We take precedence over keep_only because *at loading time* using
4123
# both options means we will load less tests for the same final result.
4124
def interesting_module(name):
4125
for start in starting_with:
4127
# Either the module name starts with the specified string
4128
name.startswith(start)
4129
# or it may contain tests starting with the specified string
4130
or start.startswith(name)
4134
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4136
elif keep_only is not None:
4137
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4138
def interesting_module(name):
4139
return id_filter.refers_to(name)
4142
loader = TestUtil.TestLoader()
4143
def interesting_module(name):
4144
# No filtering, all modules are interesting
4147
suite = loader.suiteClass()
4149
# modules building their suite with loadTestsFromModuleNames
4150
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
4152
for mod in _test_suite_modules_to_doctest():
4153
if not interesting_module(mod):
4154
# No tests to keep here, move along
4157
# note that this really does mean "report only" -- doctest
4158
# still runs the rest of the examples
4159
doc_suite = IsolatedDocTestSuite(
4160
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
4161
except ValueError, e:
4162
print '**failed to get doctest for: %s\n%s' % (mod, e)
4164
if len(doc_suite._tests) == 0:
4165
raise errors.BzrError("no doctests found in %s" % (mod,))
4166
suite.addTest(doc_suite)
4168
default_encoding = sys.getdefaultencoding()
4169
for name, plugin in _mod_plugin.plugins().items():
4170
if not interesting_module(plugin.module.__name__):
4172
plugin_suite = plugin.test_suite()
4173
# We used to catch ImportError here and turn it into just a warning,
4174
# but really if you don't have --no-plugins this should be a failure.
4175
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4176
if plugin_suite is None:
4177
plugin_suite = plugin.load_plugin_tests(loader)
4178
if plugin_suite is not None:
4179
suite.addTest(plugin_suite)
4180
if default_encoding != sys.getdefaultencoding():
4182
'Plugin "%s" tried to reset default encoding to: %s', name,
4183
sys.getdefaultencoding())
4185
sys.setdefaultencoding(default_encoding)
4187
if keep_only is not None:
4188
# Now that the referred modules have loaded their tests, keep only the
4190
suite = filter_suite_by_id_list(suite, id_filter)
4191
# Do some sanity checks on the id_list filtering
4192
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4194
# The tester has used both keep_only and starting_with, so he is
4195
# already aware that some tests are excluded from the list, there
4196
# is no need to tell him which.
4199
# Some tests mentioned in the list are not in the test suite. The
4200
# list may be out of date, report to the tester.
4201
for id in not_found:
4202
trace.warning('"%s" not found in the test suite', id)
4203
for id in duplicates:
4204
trace.warning('"%s" is used as an id by several tests', id)
484
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
488
"""Build and return TestSuite for the whole program."""
489
import bzrlib.store, bzrlib.inventory, bzrlib.branch
490
import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
491
from doctest import DocTestSuite
493
global MODULES_TO_TEST, MODULES_TO_DOCTEST
496
['bzrlib.selftest.MetaTestLog',
497
'bzrlib.selftest.testgpg',
498
'bzrlib.selftest.testidentitymap',
499
'bzrlib.selftest.testinv',
500
'bzrlib.selftest.test_ancestry',
501
'bzrlib.selftest.test_commit',
502
'bzrlib.selftest.test_commit_merge',
503
'bzrlib.selftest.testconfig',
504
'bzrlib.selftest.versioning',
505
'bzrlib.selftest.testmerge3',
506
'bzrlib.selftest.testmerge',
507
'bzrlib.selftest.testhashcache',
508
'bzrlib.selftest.teststatus',
509
'bzrlib.selftest.testlog',
510
'bzrlib.selftest.testrevisionnamespaces',
511
'bzrlib.selftest.testbranch',
512
'bzrlib.selftest.testrevision',
513
'bzrlib.selftest.test_revision_info',
514
'bzrlib.selftest.test_merge_core',
515
'bzrlib.selftest.test_smart_add',
516
'bzrlib.selftest.test_bad_files',
517
'bzrlib.selftest.testdiff',
518
'bzrlib.selftest.test_parent',
519
'bzrlib.selftest.test_xml',
520
'bzrlib.selftest.test_weave',
521
'bzrlib.selftest.testfetch',
522
'bzrlib.selftest.whitebox',
523
'bzrlib.selftest.teststore',
524
'bzrlib.selftest.blackbox',
525
'bzrlib.selftest.testsampler',
526
'bzrlib.selftest.testtransactions',
527
'bzrlib.selftest.testtransport',
528
'bzrlib.selftest.testgraph',
529
'bzrlib.selftest.testworkingtree',
530
'bzrlib.selftest.test_upgrade',
531
'bzrlib.selftest.test_conflicts',
532
'bzrlib.selftest.testtestament',
533
'bzrlib.selftest.testannotate',
534
'bzrlib.selftest.testrevprops',
537
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
538
bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
539
if m not in MODULES_TO_DOCTEST:
540
MODULES_TO_DOCTEST.append(m)
542
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
543
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
546
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
547
for m in MODULES_TO_TEST:
548
suite.addTest(TestLoader().loadTestsFromModule(m))
549
for m in (MODULES_TO_DOCTEST):
550
suite.addTest(DocTestSuite(m))
551
for p in bzrlib.plugin.all_plugins:
552
if hasattr(p, 'test_suite'):
553
suite.addTest(p.test_suite())
4209
def multiply_scenarios(*scenarios):
4210
"""Multiply two or more iterables of scenarios.
4212
It is safe to pass scenario generators or iterators.
4214
:returns: A list of compound scenarios: the cross-product of all
4215
scenarios, with the names concatenated and the parameters
4218
return reduce(_multiply_two_scenarios, map(list, scenarios))
4221
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4222
"""Multiply two sets of scenarios.
4224
:returns: the cartesian product of the two sets of scenarios, that is
4225
a scenario for every possible combination of a left scenario and a
4229
('%s,%s' % (left_name, right_name),
4230
dict(left_dict.items() + right_dict.items()))
4231
for left_name, left_dict in scenarios_left
4232
for right_name, right_dict in scenarios_right]
4235
def multiply_tests(tests, scenarios, result):
4236
"""Multiply tests_list by scenarios into result.
4238
This is the core workhorse for test parameterisation.
4240
Typically the load_tests() method for a per-implementation test suite will
4241
call multiply_tests and return the result.
4243
:param tests: The tests to parameterise.
4244
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4245
scenario_param_dict).
4246
:param result: A TestSuite to add created tests to.
4248
This returns the passed in result TestSuite with the cross product of all
4249
the tests repeated once for each scenario. Each test is adapted by adding
4250
the scenario name at the end of its id(), and updating the test object's
4251
__dict__ with the scenario_param_dict.
4253
>>> import bzrlib.tests.test_sampler
4254
>>> r = multiply_tests(
4255
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4256
... [('one', dict(param=1)),
4257
... ('two', dict(param=2))],
4258
... TestUtil.TestSuite())
4259
>>> tests = list(iter_suite_tests(r))
4263
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4269
for test in iter_suite_tests(tests):
4270
apply_scenarios(test, scenarios, result)
4274
def apply_scenarios(test, scenarios, result):
4275
"""Apply the scenarios in scenarios to test and add to result.
4277
:param test: The test to apply scenarios to.
4278
:param scenarios: An iterable of scenarios to apply to test.
4280
:seealso: apply_scenario
4282
for scenario in scenarios:
4283
result.addTest(apply_scenario(test, scenario))
4287
def apply_scenario(test, scenario):
4288
"""Copy test and apply scenario to it.
4290
:param test: A test to adapt.
4291
:param scenario: A tuple describing the scenarion.
4292
The first element of the tuple is the new test id.
4293
The second element is a dict containing attributes to set on the
4295
:return: The adapted test.
4297
new_id = "%s(%s)" % (test.id(), scenario[0])
4298
new_test = clone_test(test, new_id)
4299
for name, value in scenario[1].items():
4300
setattr(new_test, name, value)
4304
def clone_test(test, new_id):
4305
"""Clone a test giving it a new id.
4307
:param test: The test to clone.
4308
:param new_id: The id to assign to it.
4309
:return: The new test.
4311
new_test = copy.copy(test)
4312
new_test.id = lambda: new_id
4313
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4314
# causes cloned tests to share the 'details' dict. This makes it hard to
4315
# read the test output for parameterized tests, because tracebacks will be
4316
# associated with irrelevant tests.
4318
details = new_test._TestCase__details
4319
except AttributeError:
4320
# must be a different version of testtools than expected. Do nothing.
4323
# Reset the '__details' dict.
4324
new_test._TestCase__details = {}
4328
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4330
"""Helper for permutating tests against an extension module.
4332
This is meant to be used inside a modules 'load_tests()' function. It will
4333
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4334
against both implementations. Setting 'test.module' to the appropriate
4335
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4337
:param standard_tests: A test suite to permute
4338
:param loader: A TestLoader
4339
:param py_module_name: The python path to a python module that can always
4340
be loaded, and will be considered the 'python' implementation. (eg
4341
'bzrlib._chk_map_py')
4342
:param ext_module_name: The python path to an extension module. If the
4343
module cannot be loaded, a single test will be added, which notes that
4344
the module is not available. If it can be loaded, all standard_tests
4345
will be run against that module.
4346
:return: (suite, feature) suite is a test-suite that has all the permuted
4347
tests. feature is the Feature object that can be used to determine if
4348
the module is available.
4351
from bzrlib.tests.features import ModuleAvailableFeature
4352
py_module = pyutils.get_named_object(py_module_name)
4354
('python', {'module': py_module}),
4356
suite = loader.suiteClass()
4357
feature = ModuleAvailableFeature(ext_module_name)
4358
if feature.available():
4359
scenarios.append(('C', {'module': feature.module}))
4361
# the compiled module isn't available, so we add a failing test
4362
class FailWithoutFeature(TestCase):
4363
def test_fail(self):
4364
self.requireFeature(feature)
4365
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4366
result = multiply_tests(standard_tests, scenarios, suite)
4367
return result, feature
4370
def _rmtree_temp_dir(dirname, test_id=None):
4371
# If LANG=C we probably have created some bogus paths
4372
# which rmtree(unicode) will fail to delete
4373
# so make sure we are using rmtree(str) to delete everything
4374
# except on win32, where rmtree(str) will fail
4375
# since it doesn't have the property of byte-stream paths
4376
# (they are either ascii or mbcs)
4377
if sys.platform == 'win32':
4378
# make sure we are using the unicode win32 api
4379
dirname = unicode(dirname)
4381
dirname = dirname.encode(sys.getfilesystemencoding())
4383
osutils.rmtree(dirname)
4385
# We don't want to fail here because some useful display will be lost
4386
# otherwise. Polluting the tmp dir is bad, but not giving all the
4387
# possible info to the test runner is even worse.
4389
ui.ui_factory.clear_term()
4390
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4391
# Ugly, but the last thing we want here is fail, so bear with it.
4392
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4393
).encode('ascii', 'replace')
4394
sys.stderr.write('Unable to remove testing dir %s\n%s'
4395
% (os.path.basename(dirname), printable_e))
4398
def probe_unicode_in_user_encoding():
4399
"""Try to encode several unicode strings to use in unicode-aware tests.
4400
Return first successfull match.
4402
:return: (unicode value, encoded plain string value) or (None, None)
4404
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4405
for uni_val in possible_vals:
4407
str_val = uni_val.encode(osutils.get_user_encoding())
4408
except UnicodeEncodeError:
4409
# Try a different character
4412
return uni_val, str_val
4416
def probe_bad_non_ascii(encoding):
4417
"""Try to find [bad] character with code [128..255]
4418
that cannot be decoded to unicode in some encoding.
4419
Return None if all non-ascii characters is valid
4422
for i in xrange(128, 256):
4425
char.decode(encoding)
4426
except UnicodeDecodeError:
4431
# Only define SubUnitBzrRunner if subunit is available.
4433
from subunit import TestProtocolClient
4434
from subunit.test_results import AutoTimingTestResultDecorator
4435
class SubUnitBzrProtocolClient(TestProtocolClient):
4437
def addSuccess(self, test, details=None):
4438
# The subunit client always includes the details in the subunit
4439
# stream, but we don't want to include it in ours.
4440
if details is not None and 'log' in details:
4442
return super(SubUnitBzrProtocolClient, self).addSuccess(
4445
class SubUnitBzrRunner(TextTestRunner):
4446
def run(self, test):
4447
result = AutoTimingTestResultDecorator(
4448
SubUnitBzrProtocolClient(self.stream))
4455
@deprecated_function(deprecated_in((2, 5, 0)))
4456
def ModuleAvailableFeature(name):
4457
from bzrlib.tests import features
4458
return features.ModuleAvailableFeature(name)