1
# Copyright (C) 2005-2011 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
# TODO: Perhaps there should be an API to find out if bzr running under the
20
# test suite -- some plugins might want to avoid making intrusive changes if
21
# this is the case. However, we want behaviour under to test to diverge as
22
# little as possible, so this should be used rarely if it's added at all.
23
# (Suggestion from j-a-meinel, 2005-11-24)
25
# NOTE: Some classes in here use camelCaseNaming() rather than
26
# underscore_naming(). That's for consistency with unittest; it's not the
27
# general style of bzrlib. Please continue that consistency when adding e.g.
28
# new assertFoo() methods.
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
33
18
from cStringIO import StringIO
56
# nb: check this before importing anything else from within it
57
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 5):
59
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
% (testtools.__file__, _testtools_version))
61
from testtools import content
68
commands as _mod_commands,
77
plugin as _mod_plugin,
84
transport as _mod_transport,
90
# lsprof not available
92
from bzrlib.smart import client, request
93
from bzrlib.transport import (
97
from bzrlib.tests import (
102
from bzrlib.ui import NullProgressView
103
from bzrlib.ui.text import TextUIFactory
105
# Mark this python module as being part of the implementation
106
# of unittest: this gives us better tracebacks where the last
107
# shown frame is the test code, not our assertXYZ.
110
default_transport = test_server.LocalURLServer
113
_unitialized_attr = object()
114
"""A sentinel needed to act as a default value in a method signature."""
117
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
# These are intentionally brought into this namespace. That way plugins, etc
122
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
123
TestSuite = TestUtil.TestSuite
124
TestLoader = TestUtil.TestLoader
126
# Tests should run in a clean and clearly defined environment. The goal is to
127
# keep them isolated from the running environment as mush as possible. The test
128
# framework ensures the variables defined below are set (or deleted if the
129
# value is None) before a test is run and reset to their original value after
130
# the test is run. Generally if some code depends on an environment variable,
131
# the tests should start without this variable in the environment. There are a
132
# few exceptions but you shouldn't violate this rule lightly.
136
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
# tests do check our impls match APPDATA
138
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
142
'BZREMAIL': None, # may still be present in the environment
143
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
'BZR_PROGRESS_BAR': None,
145
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
146
# as a base class instead of TestCaseInTempDir. Tests inheriting from
147
# TestCase should not use disk resources, BZR_LOG is one.
148
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
149
'BZR_PLUGIN_PATH': None,
150
'BZR_DISABLE_PLUGINS': None,
151
'BZR_PLUGINS_AT': None,
152
'BZR_CONCURRENCY': None,
153
# Make sure that any text ui tests are consistent regardless of
154
# the environment the test case is run in; you may want tests that
155
# test other combinations. 'dumb' is a reasonable guess for tests
156
# going to a pipe or a StringIO.
162
'SSH_AUTH_SOCK': None,
172
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
173
# least. If you do (care), please update this comment
177
'BZR_REMOTE_PATH': None,
178
# Generally speaking, we don't want apport reporting on crashes in
179
# the test envirnoment unless we're specifically testing apport,
180
# so that it doesn't leak into the real system environment. We
181
# use an env var so it propagates to subprocesses.
182
'APPORT_DISABLE': '1',
186
def override_os_environ(test, env=None):
187
"""Modify os.environ keeping a copy.
189
:param test: A test instance
191
:param env: A dict containing variable definitions to be installed
194
env = isolated_environ
195
test._original_os_environ = dict([(var, value)
196
for var, value in os.environ.iteritems()])
197
for var, value in env.iteritems():
198
osutils.set_or_unset_env(var, value)
199
if var not in test._original_os_environ:
200
# The var is new, add it with a value of None, so
201
# restore_os_environ will delete it
202
test._original_os_environ[var] = None
205
def restore_os_environ(test):
206
"""Restore os.environ to its original state.
208
:param test: A test instance previously passed to override_os_environ.
210
for var, value in test._original_os_environ.iteritems():
211
# Restore the original value (or delete it if the value has been set to
212
# None in override_os_environ).
213
osutils.set_or_unset_env(var, value)
216
class ExtendedTestResult(testtools.TextTestResult):
217
"""Accepts, reports and accumulates the results of running tests.
219
Compared to the unittest version this class adds support for
220
profiling, benchmarking, stopping as soon as a test fails, and
221
skipping tests. There are further-specialized subclasses for
222
different types of display.
224
When a test finishes, in whatever way, it calls one of the addSuccess,
225
addFailure or addError classes. These in turn may redirect to a more
226
specific case for the special test results supported by our extended
229
Note that just one of these objects is fed the results from many tests.
234
def __init__(self, stream, descriptions, verbosity,
238
"""Construct new TestResult.
240
:param bench_history: Optionally, a writable file object to accumulate
243
testtools.TextTestResult.__init__(self, stream)
244
if bench_history is not None:
245
from bzrlib.version import _get_bzr_source_tree
246
src_tree = _get_bzr_source_tree()
249
revision_id = src_tree.get_parent_ids()[0]
251
# XXX: if this is a brand new tree, do the same as if there
255
# XXX: If there's no branch, what should we do?
257
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
258
self._bench_history = bench_history
259
self.ui = ui.ui_factory
262
self.failure_count = 0
263
self.known_failure_count = 0
265
self.not_applicable_count = 0
266
self.unsupported = {}
268
self._overall_start_time = time.time()
269
self._strict = strict
270
self._first_thread_leaker_id = None
271
self._tests_leaking_threads_count = 0
272
self._traceback_from_test = None
274
def stopTestRun(self):
277
stopTime = time.time()
278
timeTaken = stopTime - self.startTime
279
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
280
# the parent class method is similar have to duplicate
281
self._show_list('ERROR', self.errors)
282
self._show_list('FAIL', self.failures)
283
self.stream.write(self.sep2)
284
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
285
run, run != 1 and "s" or "", timeTaken))
286
if not self.wasSuccessful():
287
self.stream.write("FAILED (")
288
failed, errored = map(len, (self.failures, self.errors))
290
self.stream.write("failures=%d" % failed)
292
if failed: self.stream.write(", ")
293
self.stream.write("errors=%d" % errored)
294
if self.known_failure_count:
295
if failed or errored: self.stream.write(", ")
296
self.stream.write("known_failure_count=%d" %
297
self.known_failure_count)
298
self.stream.write(")\n")
300
if self.known_failure_count:
301
self.stream.write("OK (known_failures=%d)\n" %
302
self.known_failure_count)
304
self.stream.write("OK\n")
305
if self.skip_count > 0:
306
skipped = self.skip_count
307
self.stream.write('%d test%s skipped\n' %
308
(skipped, skipped != 1 and "s" or ""))
310
for feature, count in sorted(self.unsupported.items()):
311
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
314
ok = self.wasStrictlySuccessful()
316
ok = self.wasSuccessful()
317
if self._first_thread_leaker_id:
319
'%s is leaking threads among %d leaking tests.\n' % (
320
self._first_thread_leaker_id,
321
self._tests_leaking_threads_count))
322
# We don't report the main thread as an active one.
324
'%d non-main threads were left active in the end.\n'
325
% (len(self._active_threads) - 1))
327
def getDescription(self, test):
330
def _extractBenchmarkTime(self, testCase, details=None):
331
"""Add a benchmark time for the current test case."""
332
if details and 'benchtime' in details:
333
return float(''.join(details['benchtime'].iter_bytes()))
334
return getattr(testCase, "_benchtime", None)
336
def _elapsedTestTimeString(self):
337
"""Return a time string for the overall time the current test has taken."""
338
return self._formatTime(self._delta_to_float(
339
self._now() - self._start_datetime))
341
def _testTimeString(self, testCase):
342
benchmark_time = self._extractBenchmarkTime(testCase)
343
if benchmark_time is not None:
344
return self._formatTime(benchmark_time) + "*"
346
return self._elapsedTestTimeString()
348
def _formatTime(self, seconds):
349
"""Format seconds as milliseconds with leading spaces."""
350
# some benchmarks can take thousands of seconds to run, so we need 8
352
return "%8dms" % (1000 * seconds)
354
def _shortened_test_description(self, test):
356
what = re.sub(r'^bzrlib\.tests\.', '', what)
359
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
360
# multiple times in a row, because the handler is added for
361
# each test but the container list is shared between cases.
362
# See lp:498869 lp:625574 and lp:637725 for background.
363
def _record_traceback_from_test(self, exc_info):
364
"""Store the traceback from passed exc_info tuple till"""
365
self._traceback_from_test = exc_info[2]
31
import bzrlib.commands
32
from bzrlib.errors import BzrError
33
import bzrlib.inventory
36
import bzrlib.osutils as osutils
40
from bzrlib.trace import mutter
41
from bzrlib.tests.TestUtil import TestLoader, TestSuite
42
from bzrlib.tests.treeshape import build_tree_contents
45
MODULES_TO_DOCTEST = [
54
def packages_to_test():
55
import bzrlib.tests.blackbox
61
class EarlyStoppingTestResultAdapter(object):
62
"""An adapter for TestResult to stop at the first first failure or error"""
64
def __init__(self, result):
67
def addError(self, test, err):
68
self._result.addError(test, err)
71
def addFailure(self, test, err):
72
self._result.addFailure(test, err)
75
def __getattr__(self, name):
76
return getattr(self._result, name)
78
def __setattr__(self, name, value):
80
object.__setattr__(self, name, value)
81
return setattr(self._result, name, value)
84
class _MyResult(unittest._TextTestResult):
87
Shows output in a different format, including displaying runtime for tests.
90
def _elapsedTime(self):
91
return "%5dms" % (1000 * (time.time() - self._start_time))
367
93
def startTest(self, test):
368
super(ExtendedTestResult, self).startTest(test)
372
self.report_test_start(test)
373
test.number = self.count
374
self._recordTestStartTime()
375
# Make testtools cases give us the real traceback on failure
376
addOnException = getattr(test, "addOnException", None)
377
if addOnException is not None:
378
addOnException(self._record_traceback_from_test)
379
# Only check for thread leaks on bzrlib derived test cases
380
if isinstance(test, TestCase):
381
test.addCleanup(self._check_leaked_threads, test)
383
def stopTest(self, test):
384
super(ExtendedTestResult, self).stopTest(test)
385
# Manually break cycles, means touching various private things but hey
386
getDetails = getattr(test, "getDetails", None)
387
if getDetails is not None:
389
# Clear _type_equality_funcs to try to stop TestCase instances
390
# from wasting memory. 'clear' is not available in all Python
391
# versions (bug 809048)
392
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
393
if type_equality_funcs is not None:
394
tef_clear = getattr(type_equality_funcs, "clear", None)
395
if tef_clear is None:
396
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
397
if tef_instance_dict is not None:
398
tef_clear = tef_instance_dict.clear
399
if tef_clear is not None:
401
self._traceback_from_test = None
403
def startTests(self):
404
self.report_tests_starting()
405
self._active_threads = threading.enumerate()
407
def _check_leaked_threads(self, test):
408
"""See if any threads have leaked since last call
410
A sample of live threads is stored in the _active_threads attribute,
411
when this method runs it compares the current live threads and any not
412
in the previous sample are treated as having leaked.
414
now_active_threads = set(threading.enumerate())
415
threads_leaked = now_active_threads.difference(self._active_threads)
417
self._report_thread_leak(test, threads_leaked, now_active_threads)
418
self._tests_leaking_threads_count += 1
419
if self._first_thread_leaker_id is None:
420
self._first_thread_leaker_id = test.id()
421
self._active_threads = now_active_threads
423
def _recordTestStartTime(self):
424
"""Record that a test has started."""
425
self._start_datetime = self._now()
94
unittest.TestResult.startTest(self, test)
95
# In a short description, the important words are in
96
# the beginning, but in an id, the important words are
98
SHOW_DESCRIPTIONS = False
100
width = osutils.terminal_width()
101
name_width = width - 15
103
if SHOW_DESCRIPTIONS:
104
what = test.shortDescription()
106
if len(what) > name_width:
107
what = what[:name_width-3] + '...'
110
if what.startswith('bzrlib.tests.'):
112
if len(what) > name_width:
113
what = '...' + what[3-name_width:]
114
what = what.ljust(name_width)
115
self.stream.write(what)
117
self._start_time = time.time()
427
119
def addError(self, test, err):
428
"""Tell result that test finished with an error.
430
Called from the TestCase run() method when the test
431
fails with an unexpected error.
433
self._post_mortem(self._traceback_from_test)
434
super(ExtendedTestResult, self).addError(test, err)
435
self.error_count += 1
436
self.report_error(test, err)
120
unittest.TestResult.addError(self, test, err)
122
self.stream.writeln("ERROR %s" % self._elapsedTime())
124
self.stream.write('E')
440
127
def addFailure(self, test, err):
441
"""Tell result that test failed.
443
Called from the TestCase run() method when the test
444
fails because e.g. an assert() method failed.
446
self._post_mortem(self._traceback_from_test)
447
super(ExtendedTestResult, self).addFailure(test, err)
448
self.failure_count += 1
449
self.report_failure(test, err)
453
def addSuccess(self, test, details=None):
454
"""Tell result that test completed successfully.
456
Called from the TestCase run()
458
if self._bench_history is not None:
459
benchmark_time = self._extractBenchmarkTime(test, details)
460
if benchmark_time is not None:
461
self._bench_history.write("%s %s\n" % (
462
self._formatTime(benchmark_time),
464
self.report_success(test)
465
super(ExtendedTestResult, self).addSuccess(test)
466
test._log_contents = ''
468
def addExpectedFailure(self, test, err):
469
self.known_failure_count += 1
470
self.report_known_failure(test, err)
472
def addUnexpectedSuccess(self, test, details=None):
473
"""Tell result the test unexpectedly passed, counting as a failure
475
When the minimum version of testtools required becomes 0.9.8 this
476
can be updated to use the new handling there.
478
super(ExtendedTestResult, self).addFailure(test, details=details)
479
self.failure_count += 1
480
self.report_unexpected_success(test,
481
"".join(details["reason"].iter_text()))
485
def addNotSupported(self, test, feature):
486
"""The test will not be run because of a missing feature.
488
# this can be called in two different ways: it may be that the
489
# test started running, and then raised (through requireFeature)
490
# UnavailableFeature. Alternatively this method can be called
491
# while probing for features before running the test code proper; in
492
# that case we will see startTest and stopTest, but the test will
493
# never actually run.
494
self.unsupported.setdefault(str(feature), 0)
495
self.unsupported[str(feature)] += 1
496
self.report_unsupported(test, feature)
498
def addSkip(self, test, reason):
499
"""A test has not run for 'reason'."""
501
self.report_skip(test, reason)
503
def addNotApplicable(self, test, reason):
504
self.not_applicable_count += 1
505
self.report_not_applicable(test, reason)
507
def _post_mortem(self, tb=None):
508
"""Start a PDB post mortem session."""
509
if os.environ.get('BZR_TEST_PDB', None):
513
def progress(self, offset, whence):
514
"""The test is adjusting the count of tests to run."""
515
if whence == SUBUNIT_SEEK_SET:
516
self.num_tests = offset
517
elif whence == SUBUNIT_SEEK_CUR:
518
self.num_tests += offset
520
raise errors.BzrError("Unknown whence %r" % whence)
522
def report_tests_starting(self):
523
"""Display information before the test run begins"""
524
if getattr(sys, 'frozen', None) is None:
525
bzr_path = osutils.realpath(sys.argv[0])
527
bzr_path = sys.executable
529
'bzr selftest: %s\n' % (bzr_path,))
532
bzrlib.__path__[0],))
534
' bzr-%s python-%s %s\n' % (
535
bzrlib.version_string,
536
bzrlib._format_version_tuple(sys.version_info),
537
platform.platform(aliased=1),
539
self.stream.write('\n')
541
def report_test_start(self, test):
542
"""Display information on the test just about to be run"""
544
def _report_thread_leak(self, test, leaked_threads, active_threads):
545
"""Display information on a test that leaked one or more threads"""
546
# GZ 2010-09-09: A leak summary reported separately from the general
547
# thread debugging would be nice. Tests under subunit
548
# need something not using stream, perhaps adding a
549
# testtools details object would be fitting.
550
if 'threads' in selftest_debug_flags:
551
self.stream.write('%s is leaking, active is now %d\n' %
552
(test.id(), len(active_threads)))
554
def startTestRun(self):
555
self.startTime = time.time()
557
def report_success(self, test):
560
def wasStrictlySuccessful(self):
561
if self.unsupported or self.known_failure_count:
563
return self.wasSuccessful()
566
class TextTestResult(ExtendedTestResult):
567
"""Displays progress and results of tests in text form"""
569
def __init__(self, stream, descriptions, verbosity,
574
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
575
bench_history, strict)
576
# We no longer pass them around, but just rely on the UIFactory stack
579
warnings.warn("Passing pb to TextTestResult is deprecated")
580
self.pb = self.ui.nested_progress_bar()
581
self.pb.show_pct = False
582
self.pb.show_spinner = False
583
self.pb.show_eta = False,
584
self.pb.show_count = False
585
self.pb.show_bar = False
586
self.pb.update_latency = 0
587
self.pb.show_transport_activity = False
589
def stopTestRun(self):
590
# called when the tests that are going to run have run
593
super(TextTestResult, self).stopTestRun()
595
def report_tests_starting(self):
596
super(TextTestResult, self).report_tests_starting()
597
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
599
def _progress_prefix_text(self):
600
# the longer this text, the less space we have to show the test
602
a = '[%d' % self.count # total that have been run
603
# tests skipped as known not to be relevant are not important enough
605
## if self.skip_count:
606
## a += ', %d skip' % self.skip_count
607
## if self.known_failure_count:
608
## a += '+%dX' % self.known_failure_count
610
a +='/%d' % self.num_tests
612
runtime = time.time() - self._overall_start_time
614
a += '%dm%ds' % (runtime / 60, runtime % 60)
617
total_fail_count = self.error_count + self.failure_count
619
a += ', %d failed' % total_fail_count
620
# if self.unsupported:
621
# a += ', %d missing' % len(self.unsupported)
625
def report_test_start(self, test):
627
self._progress_prefix_text()
629
+ self._shortened_test_description(test))
631
def _test_description(self, test):
632
return self._shortened_test_description(test)
634
def report_error(self, test, err):
635
self.stream.write('ERROR: %s\n %s\n' % (
636
self._test_description(test),
640
def report_failure(self, test, err):
641
self.stream.write('FAIL: %s\n %s\n' % (
642
self._test_description(test),
646
def report_known_failure(self, test, err):
649
def report_unexpected_success(self, test, reason):
650
self.stream.write('FAIL: %s\n %s: %s\n' % (
651
self._test_description(test),
652
"Unexpected success. Should have failed",
656
def report_skip(self, test, reason):
659
def report_not_applicable(self, test, reason):
662
def report_unsupported(self, test, feature):
663
"""test cannot be run because feature is missing."""
666
class VerboseTestResult(ExtendedTestResult):
667
"""Produce long output, with one line per test run plus times"""
669
def _ellipsize_to_right(self, a_string, final_width):
670
"""Truncate and pad a string, keeping the right hand side"""
671
if len(a_string) > final_width:
672
result = '...' + a_string[3-final_width:]
675
return result.ljust(final_width)
677
def report_tests_starting(self):
678
self.stream.write('running %d tests...\n' % self.num_tests)
679
super(VerboseTestResult, self).report_tests_starting()
681
def report_test_start(self, test):
682
name = self._shortened_test_description(test)
683
width = osutils.terminal_width()
684
if width is not None:
685
# width needs space for 6 char status, plus 1 for slash, plus an
686
# 11-char time string, plus a trailing blank
687
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
689
self.stream.write(self._ellipsize_to_right(name, width-18))
691
self.stream.write(name)
694
def _error_summary(self, err):
696
return '%s%s' % (indent, err[1])
698
def report_error(self, test, err):
699
self.stream.write('ERROR %s\n%s\n'
700
% (self._testTimeString(test),
701
self._error_summary(err)))
703
def report_failure(self, test, err):
704
self.stream.write(' FAIL %s\n%s\n'
705
% (self._testTimeString(test),
706
self._error_summary(err)))
708
def report_known_failure(self, test, err):
709
self.stream.write('XFAIL %s\n%s\n'
710
% (self._testTimeString(test),
711
self._error_summary(err)))
713
def report_unexpected_success(self, test, reason):
714
self.stream.write(' FAIL %s\n%s: %s\n'
715
% (self._testTimeString(test),
716
"Unexpected success. Should have failed",
719
def report_success(self, test):
720
self.stream.write(' OK %s\n' % self._testTimeString(test))
721
for bench_called, stats in getattr(test, '_benchcalls', []):
722
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
723
stats.pprint(file=self.stream)
724
# flush the stream so that we get smooth output. This verbose mode is
725
# used to show the output in PQM.
728
def report_skip(self, test, reason):
729
self.stream.write(' SKIP %s\n%s\n'
730
% (self._testTimeString(test), reason))
732
def report_not_applicable(self, test, reason):
733
self.stream.write(' N/A %s\n %s\n'
734
% (self._testTimeString(test), reason))
736
def report_unsupported(self, test, feature):
737
"""test cannot be run because feature is missing."""
738
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
739
%(self._testTimeString(test), feature))
742
class TextTestRunner(object):
128
unittest.TestResult.addFailure(self, test, err)
130
self.stream.writeln(" FAIL %s" % self._elapsedTime())
132
self.stream.write('F')
135
def addSuccess(self, test):
137
self.stream.writeln(' OK %s' % self._elapsedTime())
139
self.stream.write('~')
141
unittest.TestResult.addSuccess(self, test)
143
def printErrorList(self, flavour, errors):
144
for test, err in errors:
145
self.stream.writeln(self.separator1)
146
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
147
if hasattr(test, '_get_log'):
148
self.stream.writeln()
149
self.stream.writeln('log from this test:')
150
print >>self.stream, test._get_log()
151
self.stream.writeln(self.separator2)
152
self.stream.writeln("%s" % err)
155
class TextTestRunner(unittest.TextTestRunner):
743
156
stop_on_failure = False
751
result_decorators=None,
753
"""Create a TextTestRunner.
755
:param result_decorators: An optional list of decorators to apply
756
to the result object being used by the runner. Decorators are
757
applied left to right - the first element in the list is the
760
# stream may know claim to know to write unicode strings, but in older
761
# pythons this goes sufficiently wrong that it is a bad idea. (
762
# specifically a built in file with encoding 'UTF-8' will still try
763
# to encode using ascii.
764
new_encoding = osutils.get_terminal_encoding()
765
codec = codecs.lookup(new_encoding)
766
if type(codec) is tuple:
770
encode = codec.encode
771
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
772
# so should swap to the plain codecs.StreamWriter
773
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
775
stream.encoding = new_encoding
777
self.descriptions = descriptions
778
self.verbosity = verbosity
779
self._bench_history = bench_history
780
self._strict = strict
781
self._result_decorators = result_decorators or []
784
"Run the given test case or test suite."
785
if self.verbosity == 1:
786
result_class = TextTestResult
787
elif self.verbosity >= 2:
788
result_class = VerboseTestResult
789
original_result = result_class(self.stream,
792
bench_history=self._bench_history,
795
# Signal to result objects that look at stop early policy to stop,
796
original_result.stop_early = self.stop_on_failure
797
result = original_result
798
for decorator in self._result_decorators:
799
result = decorator(result)
800
result.stop_early = self.stop_on_failure
801
result.startTestRun()
806
# higher level code uses our extended protocol to determine
807
# what exit code to give.
808
return original_result
158
def _makeResult(self):
159
result = _MyResult(self.stream, self.descriptions, self.verbosity)
160
if self.stop_on_failure:
161
result = EarlyStoppingTestResultAdapter(result)
811
165
def iter_suite_tests(suite):
812
166
"""Return all tests in a suite, recursing through nested suites"""
813
if isinstance(suite, unittest.TestCase):
815
elif isinstance(suite, unittest.TestSuite):
167
for item in suite._tests:
168
if isinstance(item, unittest.TestCase):
170
elif isinstance(item, unittest.TestSuite):
817
171
for r in iter_suite_tests(item):
820
raise Exception('unknown type %r for object %r'
821
% (type(suite), suite))
824
TestSkipped = testtools.testcase.TestSkipped
827
class TestNotApplicable(TestSkipped):
828
"""A test is not applicable to the situation where it was run.
830
This is only normally raised by parameterized tests, if they find that
831
the instance they're constructed upon does not support one aspect
836
# traceback._some_str fails to format exceptions that have the default
837
# __str__ which does an implicit ascii conversion. However, repr() on those
838
# objects works, for all that its not quite what the doctor may have ordered.
839
def _clever_some_str(value):
844
return repr(value).replace('\\n', '\n')
846
return '<unprintable %s object>' % type(value).__name__
848
traceback._some_str = _clever_some_str
851
# deprecated - use self.knownFailure(), or self.expectFailure.
852
KnownFailure = testtools.testcase._ExpectedFailure
855
class UnavailableFeature(Exception):
856
"""A feature required for this test was not available.
858
This can be considered a specialised form of SkippedTest.
860
The feature should be used to construct the exception.
864
class StringIOWrapper(object):
865
"""A wrapper around cStringIO which just adds an encoding attribute.
867
Internally we can check sys.stdout to see what the output encoding
868
should be. However, cStringIO has no encoding attribute that we can
869
set. So we wrap it instead.
874
def __init__(self, s=None):
876
self.__dict__['_cstring'] = StringIO(s)
878
self.__dict__['_cstring'] = StringIO()
880
def __getattr__(self, name, getattr=getattr):
881
return getattr(self.__dict__['_cstring'], name)
883
def __setattr__(self, name, val):
884
if name == 'encoding':
885
self.__dict__['encoding'] = val
887
return setattr(self._cstring, name, val)
890
class TestUIFactory(TextUIFactory):
891
"""A UI Factory for testing.
893
Hide the progress bar but emit note()s.
895
Allows get_password to be tested without real tty attached.
897
See also CannedInputUIFactory which lets you provide programmatic input in
900
# TODO: Capture progress events at the model level and allow them to be
901
# observed by tests that care.
903
# XXX: Should probably unify more with CannedInputUIFactory or a
904
# particular configuration of TextUIFactory, or otherwise have a clearer
905
# idea of how they're supposed to be different.
906
# See https://bugs.launchpad.net/bzr/+bug/408213
908
def __init__(self, stdout=None, stderr=None, stdin=None):
909
if stdin is not None:
910
# We use a StringIOWrapper to be able to test various
911
# encodings, but the user is still responsible to
912
# encode the string and to set the encoding attribute
913
# of StringIOWrapper.
914
stdin = StringIOWrapper(stdin)
915
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
917
def get_non_echoed_password(self):
918
"""Get password from stdin without trying to handle the echo mode"""
919
password = self.stdin.readline()
922
if password[-1] == '\n':
923
password = password[:-1]
926
def make_progress_view(self):
927
return NullProgressView()
930
def isolated_doctest_setUp(test):
931
override_os_environ(test)
934
def isolated_doctest_tearDown(test):
935
restore_os_environ(test)
938
def IsolatedDocTestSuite(*args, **kwargs):
939
"""Overrides doctest.DocTestSuite to handle isolation.
941
The method is really a factory and users are expected to use it as such.
944
kwargs['setUp'] = isolated_doctest_setUp
945
kwargs['tearDown'] = isolated_doctest_tearDown
946
return doctest.DocTestSuite(*args, **kwargs)
949
class TestCase(testtools.TestCase):
174
raise Exception('unknown object %r inside test suite %r'
178
class TestSkipped(Exception):
179
"""Indicates that a test was intentionally skipped, rather than failing."""
183
class CommandFailed(Exception):
186
class TestCase(unittest.TestCase):
950
187
"""Base class for bzr unit tests.
952
Tests that need access to disk resources should subclass
189
Tests that need access to disk resources should subclass
953
190
TestCaseInTempDir not TestCase.
955
192
Error and debug log messages are redirected from their usual
1312
230
charjunk=lambda x: False)
1313
231
return ''.join(difflines)
1315
def assertEqual(self, a, b, message=''):
1319
except UnicodeError, e:
1320
# If we can't compare without getting a UnicodeError, then
1321
# obviously they are different
1322
trace.mutter('UnicodeError: %s', e)
1325
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1327
pprint.pformat(a), pprint.pformat(b)))
1329
assertEquals = assertEqual
1331
def assertEqualDiff(self, a, b, message=None):
233
def assertEqualDiff(self, a, b):
1332
234
"""Assert two texts are equal, if not raise an exception.
1334
This is intended for use with multi-line strings where it can
236
This is intended for use with multi-line strings where it can
1335
237
be hard to find the differences by eye.
1337
239
# TODO: perhaps override assertEquals to call this for strings?
1341
message = "texts not equal:\n"
1343
message = 'first string is missing a final newline.\n'
1345
message = 'second string is missing a final newline.\n'
1346
raise AssertionError(message +
1347
self._ndiff_strings(a, b))
1349
def assertEqualMode(self, mode, mode_test):
1350
self.assertEqual(mode, mode_test,
1351
'mode mismatch %o != %o' % (mode, mode_test))
1353
def assertEqualStat(self, expected, actual):
1354
"""assert that expected and actual are the same stat result.
1356
:param expected: A stat result.
1357
:param actual: A stat result.
1358
:raises AssertionError: If the expected and actual stat values differ
1359
other than by atime.
1361
self.assertEqual(expected.st_size, actual.st_size,
1362
'st_size did not match')
1363
self.assertEqual(expected.st_mtime, actual.st_mtime,
1364
'st_mtime did not match')
1365
self.assertEqual(expected.st_ctime, actual.st_ctime,
1366
'st_ctime did not match')
1367
if sys.platform == 'win32':
1368
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1369
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1370
# odd. We just force it to always be 0 to avoid any problems.
1371
self.assertEqual(0, expected.st_dev)
1372
self.assertEqual(0, actual.st_dev)
1373
self.assertEqual(0, expected.st_ino)
1374
self.assertEqual(0, actual.st_ino)
1376
self.assertEqual(expected.st_dev, actual.st_dev,
1377
'st_dev did not match')
1378
self.assertEqual(expected.st_ino, actual.st_ino,
1379
'st_ino did not match')
1380
self.assertEqual(expected.st_mode, actual.st_mode,
1381
'st_mode did not match')
1383
def assertLength(self, length, obj_with_len):
1384
"""Assert that obj_with_len is of length length."""
1385
if len(obj_with_len) != length:
1386
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1387
length, len(obj_with_len), obj_with_len))
1389
def assertLogsError(self, exception_class, func, *args, **kwargs):
1390
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1393
orig_log_exception_quietly = trace.log_exception_quietly
1396
orig_log_exception_quietly()
1397
captured.append(sys.exc_info()[1])
1398
trace.log_exception_quietly = capture
1399
func(*args, **kwargs)
1401
trace.log_exception_quietly = orig_log_exception_quietly
1402
self.assertLength(1, captured)
1404
self.assertIsInstance(err, exception_class)
1407
def assertPositive(self, val):
1408
"""Assert that val is greater than 0."""
1409
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1411
def assertNegative(self, val):
1412
"""Assert that val is less than 0."""
1413
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1415
def assertStartsWith(self, s, prefix):
1416
if not s.startswith(prefix):
1417
raise AssertionError('string %r does not start with %r' % (s, prefix))
1419
def assertEndsWith(self, s, suffix):
1420
"""Asserts that s ends with suffix."""
1421
if not s.endswith(suffix):
1422
raise AssertionError('string %r does not end with %r' % (s, suffix))
1424
def assertContainsRe(self, haystack, needle_re, flags=0):
242
raise AssertionError("texts not equal:\n" +
243
self._ndiff_strings(a, b))
245
def assertContainsRe(self, haystack, needle_re):
1425
246
"""Assert that a contains something matching a regular expression."""
1426
if not re.search(needle_re, haystack, flags):
1427
if '\n' in haystack or len(haystack) > 60:
1428
# a long string, format it in a more readable way
1429
raise AssertionError(
1430
'pattern "%s" not found in\n"""\\\n%s"""\n'
1431
% (needle_re, haystack))
1433
raise AssertionError('pattern "%s" not found in "%s"'
1434
% (needle_re, haystack))
1436
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1437
"""Assert that a does not match a regular expression"""
1438
if re.search(needle_re, haystack, flags):
1439
raise AssertionError('pattern "%s" found in "%s"'
247
if not re.search(needle_re, haystack):
248
raise AssertionError('pattern "%s" not found in "%s"'
1440
249
% (needle_re, haystack))
1442
def assertContainsString(self, haystack, needle):
1443
if haystack.find(needle) == -1:
1444
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1446
def assertNotContainsString(self, haystack, needle):
1447
if haystack.find(needle) != -1:
1448
self.fail("string %r found in '''%s'''" % (needle, haystack))
1450
def assertSubset(self, sublist, superlist):
1451
"""Assert that every entry in sublist is present in superlist."""
1452
missing = set(sublist) - set(superlist)
1453
if len(missing) > 0:
1454
raise AssertionError("value(s) %r not present in container %r" %
1455
(missing, superlist))
1457
def assertListRaises(self, excClass, func, *args, **kwargs):
1458
"""Fail unless excClass is raised when the iterator from func is used.
1460
Many functions can return generators this makes sure
1461
to wrap them in a list() call to make sure the whole generator
1462
is run, and that the proper exception is raised.
1465
list(func(*args, **kwargs))
1469
if getattr(excClass,'__name__', None) is not None:
1470
excName = excClass.__name__
1472
excName = str(excClass)
1473
raise self.failureException, "%s not raised" % excName
1475
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1476
"""Assert that a callable raises a particular exception.
1478
:param excClass: As for the except statement, this may be either an
1479
exception class, or a tuple of classes.
1480
:param callableObj: A callable, will be passed ``*args`` and
1483
Returns the exception so that you can examine it.
1486
callableObj(*args, **kwargs)
1490
if getattr(excClass,'__name__', None) is not None:
1491
excName = excClass.__name__
1494
excName = str(excClass)
1495
raise self.failureException, "%s not raised" % excName
1497
def assertIs(self, left, right, message=None):
1498
if not (left is right):
1499
if message is not None:
1500
raise AssertionError(message)
1502
raise AssertionError("%r is not %r." % (left, right))
1504
def assertIsNot(self, left, right, message=None):
1506
if message is not None:
1507
raise AssertionError(message)
1509
raise AssertionError("%r is %r." % (left, right))
1511
def assertTransportMode(self, transport, path, mode):
1512
"""Fail if a path does not have mode "mode".
1514
If modes are not supported on this transport, the assertion is ignored.
1516
if not transport._can_roundtrip_unix_modebits():
1518
path_stat = transport.stat(path)
1519
actual_mode = stat.S_IMODE(path_stat.st_mode)
1520
self.assertEqual(mode, actual_mode,
1521
'mode of %r incorrect (%s != %s)'
1522
% (path, oct(mode), oct(actual_mode)))
1524
def assertIsSameRealPath(self, path1, path2):
1525
"""Fail if path1 and path2 points to different files"""
1526
self.assertEqual(osutils.realpath(path1),
1527
osutils.realpath(path2),
1528
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1530
def assertIsInstance(self, obj, kls, msg=None):
1531
"""Fail if obj is not an instance of kls
1533
:param msg: Supplementary message to show if the assertion fails.
1535
if not isinstance(obj, kls):
1536
m = "%r is an instance of %s rather than %s" % (
1537
obj, obj.__class__, kls)
1542
def assertFileEqual(self, content, path):
1543
"""Fail if path does not contain 'content'."""
1544
self.assertPathExists(path)
1545
f = file(path, 'rb')
1550
self.assertEqualDiff(content, s)
1552
def assertDocstring(self, expected_docstring, obj):
1553
"""Fail if obj does not have expected_docstring"""
1555
# With -OO the docstring should be None instead
1556
self.assertIs(obj.__doc__, None)
1558
self.assertEqual(expected_docstring, obj.__doc__)
1560
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1561
def failUnlessExists(self, path):
1562
return self.assertPathExists(path)
1564
def assertPathExists(self, path):
1565
"""Fail unless path or paths, which may be abs or relative, exist."""
1566
if not isinstance(path, basestring):
1568
self.assertPathExists(p)
1570
self.assertTrue(osutils.lexists(path),
1571
path + " does not exist")
1573
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1574
def failIfExists(self, path):
1575
return self.assertPathDoesNotExist(path)
1577
def assertPathDoesNotExist(self, path):
1578
"""Fail if path or paths, which may be abs or relative, exist."""
1579
if not isinstance(path, basestring):
1581
self.assertPathDoesNotExist(p)
1583
self.assertFalse(osutils.lexists(path),
1586
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1587
"""A helper for callDeprecated and applyDeprecated.
1589
:param a_callable: A callable to call.
1590
:param args: The positional arguments for the callable
1591
:param kwargs: The keyword arguments for the callable
1592
:return: A tuple (warnings, result). result is the result of calling
1593
a_callable(``*args``, ``**kwargs``).
1596
def capture_warnings(msg, cls=None, stacklevel=None):
1597
# we've hooked into a deprecation specific callpath,
1598
# only deprecations should getting sent via it.
1599
self.assertEqual(cls, DeprecationWarning)
1600
local_warnings.append(msg)
1601
original_warning_method = symbol_versioning.warn
1602
symbol_versioning.set_warning_method(capture_warnings)
1604
result = a_callable(*args, **kwargs)
1606
symbol_versioning.set_warning_method(original_warning_method)
1607
return (local_warnings, result)
1609
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1610
"""Call a deprecated callable without warning the user.
1612
Note that this only captures warnings raised by symbol_versioning.warn,
1613
not other callers that go direct to the warning module.
1615
To test that a deprecated method raises an error, do something like
1616
this (remember that both assertRaises and applyDeprecated delays *args
1617
and **kwargs passing)::
1619
self.assertRaises(errors.ReservedId,
1620
self.applyDeprecated,
1621
deprecated_in((1, 5, 0)),
1625
:param deprecation_format: The deprecation format that the callable
1626
should have been deprecated with. This is the same type as the
1627
parameter to deprecated_method/deprecated_function. If the
1628
callable is not deprecated with this format, an assertion error
1630
:param a_callable: A callable to call. This may be a bound method or
1631
a regular function. It will be called with ``*args`` and
1633
:param args: The positional arguments for the callable
1634
:param kwargs: The keyword arguments for the callable
1635
:return: The result of a_callable(``*args``, ``**kwargs``)
1637
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1639
expected_first_warning = symbol_versioning.deprecation_string(
1640
a_callable, deprecation_format)
1641
if len(call_warnings) == 0:
1642
self.fail("No deprecation warning generated by call to %s" %
1644
self.assertEqual(expected_first_warning, call_warnings[0])
1647
def callCatchWarnings(self, fn, *args, **kw):
1648
"""Call a callable that raises python warnings.
1650
The caller's responsible for examining the returned warnings.
1652
If the callable raises an exception, the exception is not
1653
caught and propagates up to the caller. In that case, the list
1654
of warnings is not available.
1656
:returns: ([warning_object, ...], fn_result)
1658
# XXX: This is not perfect, because it completely overrides the
1659
# warnings filters, and some code may depend on suppressing particular
1660
# warnings. It's the easiest way to insulate ourselves from -Werror,
1661
# though. -- Andrew, 20071062
1663
def _catcher(message, category, filename, lineno, file=None, line=None):
1664
# despite the name, 'message' is normally(?) a Warning subclass
1666
wlist.append(message)
1667
saved_showwarning = warnings.showwarning
1668
saved_filters = warnings.filters
1670
warnings.showwarning = _catcher
1671
warnings.filters = []
1672
result = fn(*args, **kw)
1674
warnings.showwarning = saved_showwarning
1675
warnings.filters = saved_filters
1676
return wlist, result
1678
def callDeprecated(self, expected, callable, *args, **kwargs):
1679
"""Assert that a callable is deprecated in a particular way.
1681
This is a very precise test for unusual requirements. The
1682
applyDeprecated helper function is probably more suited for most tests
1683
as it allows you to simply specify the deprecation format being used
1684
and will ensure that that is issued for the function being called.
1686
Note that this only captures warnings raised by symbol_versioning.warn,
1687
not other callers that go direct to the warning module. To catch
1688
general warnings, use callCatchWarnings.
1690
:param expected: a list of the deprecation warnings expected, in order
1691
:param callable: The callable to call
1692
:param args: The positional arguments for the callable
1693
:param kwargs: The keyword arguments for the callable
1695
call_warnings, result = self._capture_deprecation_warnings(callable,
1697
self.assertEqual(expected, call_warnings)
1700
251
def _startLogFile(self):
1701
252
"""Send bzr and test log messages to a temporary file.
1703
254
The file is removed as the test is torn down.
1705
pseudo_log_file = StringIO()
1706
def _get_log_contents_for_weird_testtools_api():
1707
return [pseudo_log_file.getvalue().decode(
1708
"utf-8", "replace").encode("utf-8")]
1709
self.addDetail("log", content.Content(content.ContentType("text",
1710
"plain", {"charset": "utf8"}),
1711
_get_log_contents_for_weird_testtools_api))
1712
self._log_file = pseudo_log_file
1713
self._log_memento = trace.push_log_file(self._log_file)
256
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
257
self._log_file = os.fdopen(fileno, 'w+')
258
bzrlib.trace.enable_test_log(self._log_file)
259
self._log_file_name = name
1714
260
self.addCleanup(self._finishLogFile)
1716
262
def _finishLogFile(self):
1717
263
"""Finished with the log file.
1719
Close the file and delete it.
1721
if trace._trace_file:
1722
# flush the log file, to get all content
1723
trace._trace_file.flush()
1724
trace.pop_log_file(self._log_memento)
1726
def thisFailsStrictLockCheck(self):
1727
"""It is known that this test would fail with -Dstrict_locks.
1729
By default, all tests are run with strict lock checking unless
1730
-Edisable_lock_checks is supplied. However there are some tests which
1731
we know fail strict locks at this point that have not been fixed.
1732
They should call this function to disable the strict checking.
1734
This should be used sparingly, it is much better to fix the locking
1735
issues rather than papering over the problem by calling this function.
1737
debug.debug_flags.discard('strict_locks')
1739
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1740
"""Overrides an object attribute restoring it after the test.
1742
:note: This should be used with discretion; you should think about
1743
whether it's better to make the code testable without monkey-patching.
1745
:param obj: The object that will be mutated.
1747
:param attr_name: The attribute name we want to preserve/override in
1750
:param new: The optional value we want to set the attribute to.
1752
:returns: The actual attr value.
1754
value = getattr(obj, attr_name)
1755
# The actual value is captured by the call below
1756
self.addCleanup(setattr, obj, attr_name, value)
1757
if new is not _unitialized_attr:
1758
setattr(obj, attr_name, new)
1761
def overrideEnv(self, name, new):
1762
"""Set an environment variable, and reset it after the test.
1764
:param name: The environment variable name.
1766
:param new: The value to set the variable to. If None, the
1767
variable is deleted from the environment.
1769
:returns: The actual variable value.
1771
value = osutils.set_or_unset_env(name, new)
1772
self.addCleanup(osutils.set_or_unset_env, name, value)
1775
def recordCalls(self, obj, attr_name):
1776
"""Monkeypatch in a wrapper that will record calls.
1778
The monkeypatch is automatically removed when the test concludes.
1780
:param obj: The namespace holding the reference to be replaced;
1781
typically a module, class, or object.
1782
:param attr_name: A string for the name of the attribute to
1784
:returns: A list that will be extended with one item every time the
1785
function is called, with a tuple of (args, kwargs).
1789
def decorator(*args, **kwargs):
1790
calls.append((args, kwargs))
1791
return orig(*args, **kwargs)
1792
orig = self.overrideAttr(obj, attr_name, decorator)
265
Read contents into memory, close, and delete.
267
bzrlib.trace.disable_test_log()
268
self._log_file.seek(0)
269
self._log_contents = self._log_file.read()
270
self._log_file.close()
271
os.remove(self._log_file_name)
272
self._log_file = self._log_file_name = None
274
def addCleanup(self, callable):
275
"""Arrange to run a callable when this case is torn down.
277
Callables are run in the reverse of the order they are registered,
278
ie last-in first-out.
280
if callable in self._cleanups:
281
raise ValueError("cleanup function %r already registered on %s"
283
self._cleanups.append(callable)
1795
285
def _cleanEnvironment(self):
1796
for name, value in isolated_environ.iteritems():
1797
self.overrideEnv(name, value)
1799
def _restoreHooks(self):
1800
for klass, (name, hooks) in self._preserved_hooks.items():
1801
setattr(klass, name, hooks)
1802
self._preserved_hooks.clear()
1803
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1804
self._preserved_lazy_hooks.clear()
1806
def knownFailure(self, reason):
1807
"""This test has failed for some known reason."""
1808
raise KnownFailure(reason)
1810
def _suppress_log(self):
1811
"""Remove the log info from details."""
1812
self.discardDetail('log')
1814
def _do_skip(self, result, reason):
1815
self._suppress_log()
1816
addSkip = getattr(result, 'addSkip', None)
1817
if not callable(addSkip):
1818
result.addSuccess(result)
1820
addSkip(self, reason)
1823
def _do_known_failure(self, result, e):
1824
self._suppress_log()
1825
err = sys.exc_info()
1826
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1827
if addExpectedFailure is not None:
1828
addExpectedFailure(self, err)
1830
result.addSuccess(self)
1833
def _do_not_applicable(self, result, e):
1835
reason = 'No reason given'
1838
self._suppress_log ()
1839
addNotApplicable = getattr(result, 'addNotApplicable', None)
1840
if addNotApplicable is not None:
1841
result.addNotApplicable(self, reason)
1843
self._do_skip(result, reason)
1846
def _report_skip(self, result, err):
1847
"""Override the default _report_skip.
1849
We want to strip the 'log' detail. If we waint until _do_skip, it has
1850
already been formatted into the 'reason' string, and we can't pull it
1853
self._suppress_log()
1854
super(TestCase, self)._report_skip(self, result, err)
1857
def _report_expected_failure(self, result, err):
1860
See _report_skip for motivation.
1862
self._suppress_log()
1863
super(TestCase, self)._report_expected_failure(self, result, err)
1866
def _do_unsupported_or_skip(self, result, e):
1868
self._suppress_log()
1869
addNotSupported = getattr(result, 'addNotSupported', None)
1870
if addNotSupported is not None:
1871
result.addNotSupported(self, reason)
1873
self._do_skip(result, reason)
1875
def time(self, callable, *args, **kwargs):
1876
"""Run callable and accrue the time it takes to the benchmark time.
1878
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1879
this will cause lsprofile statistics to be gathered and stored in
1882
if self._benchtime is None:
1883
self.addDetail('benchtime', content.Content(content.ContentType(
1884
"text", "plain"), lambda:[str(self._benchtime)]))
1888
if not self._gather_lsprof_in_benchmarks:
1889
return callable(*args, **kwargs)
1891
# record this benchmark
1892
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1894
self._benchcalls.append(((callable, args, kwargs), stats))
1897
self._benchtime += time.time() - start
288
'APPDATA': os.getcwd(),
293
self.addCleanup(self._restoreEnvironment)
294
for name, value in new_env.iteritems():
295
self._captureVar(name, value)
298
def _captureVar(self, name, newvalue):
299
"""Set an environment variable, preparing it to be reset when finished."""
300
self.__old_env[name] = os.environ.get(name, None)
302
if name in os.environ:
305
os.environ[name] = newvalue
308
def _restoreVar(name, value):
310
if name in os.environ:
313
os.environ[name] = value
315
def _restoreEnvironment(self):
316
for name, value in self.__old_env.iteritems():
317
self._restoreVar(name, value)
321
unittest.TestCase.tearDown(self)
323
def _runCleanups(self):
324
"""Run registered cleanup functions.
326
This should only be called from TestCase.tearDown.
328
for callable in reversed(self._cleanups):
1899
331
def log(self, *args):
1903
"""Get a unicode string containing the log from bzrlib.trace.
1905
Undecodable characters are replaced.
1907
return u"".join(self.getDetails()['log'].iter_text())
1909
def requireFeature(self, feature):
1910
"""This test requires a specific feature is available.
1912
:raises UnavailableFeature: When feature is not available.
1914
if not feature.available():
1915
raise UnavailableFeature(feature)
1917
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1919
"""Run bazaar command line, splitting up a string command line."""
1920
if isinstance(args, basestring):
1921
# shlex don't understand unicode strings,
1922
# so args should be plain string (bialix 20070906)
1923
args = list(shlex.split(str(args)))
1924
return self._run_bzr_core(args, retcode=retcode,
1925
encoding=encoding, stdin=stdin, working_dir=working_dir,
1928
def _run_bzr_core(self, args, retcode, encoding, stdin,
1930
# Clear chk_map page cache, because the contents are likely to mask
1932
chk_map.clear_cache()
1933
if encoding is None:
1934
encoding = osutils.get_user_encoding()
1935
stdout = StringIOWrapper()
1936
stderr = StringIOWrapper()
1937
stdout.encoding = encoding
1938
stderr.encoding = encoding
1940
self.log('run bzr: %r', args)
335
"""Return as a string the log for this test"""
336
if self._log_file_name:
337
return open(self._log_file_name).read()
339
return self._log_contents
340
# TODO: Delete the log after it's been read in
342
def capture(self, cmd, retcode=0):
343
"""Shortcut that splits cmd into words, runs, and returns stdout"""
344
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
346
def run_bzr_captured(self, argv, retcode=0):
347
"""Invoke bzr and return (stdout, stderr).
349
Useful for code that wants to check the contents of the
350
output, the way error messages are presented, etc.
352
This should be the main method for tests that want to exercise the
353
overall behavior of the bzr application (rather than a unit test
354
or a functional test of the library.)
356
Much of the old code runs bzr by forking a new copy of Python, but
357
that is slower, harder to debug, and generally not necessary.
359
This runs bzr through the interface that catches and reports
360
errors, and with logging set to something approximating the
361
default, so that error reporting can be checked.
363
argv -- arguments to invoke bzr
364
retcode -- expected return code, or None for don't-care.
368
self.log('run bzr: %s', ' '.join(argv))
1941
369
# FIXME: don't call into logging here
1942
handler = trace.EncodedStreamHandler(stderr, errors="replace",
370
handler = logging.StreamHandler(stderr)
371
handler.setFormatter(bzrlib.trace.QuietFormatter())
372
handler.setLevel(logging.INFO)
1944
373
logger = logging.getLogger('')
1945
374
logger.addHandler(handler)
1946
old_ui_factory = ui.ui_factory
1947
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1950
if working_dir is not None:
1951
cwd = osutils.getcwd()
1952
os.chdir(working_dir)
1956
result = self.apply_redirected(
1957
ui.ui_factory.stdin,
1959
_mod_commands.run_bzr_catch_user_errors,
1961
except KeyboardInterrupt:
1962
# Reraise KeyboardInterrupt with contents of redirected stdout
1963
# and stderr as arguments, for tests which are interested in
1964
# stdout and stderr and are expecting the exception.
1965
out = stdout.getvalue()
1966
err = stderr.getvalue()
1968
self.log('output:\n%r', out)
1970
self.log('errors:\n%r', err)
1971
raise KeyboardInterrupt(out, err)
376
result = self.apply_redirected(None, stdout, stderr,
377
bzrlib.commands.run_bzr_catch_errors,
1973
380
logger.removeHandler(handler)
1974
ui.ui_factory = old_ui_factory
1978
381
out = stdout.getvalue()
1979
382
err = stderr.getvalue()
1981
self.log('output:\n%r', out)
384
self.log('output:\n%s', out)
1983
self.log('errors:\n%r', err)
386
self.log('errors:\n%s', err)
1984
387
if retcode is not None:
1985
self.assertEquals(retcode, result,
1986
message='Unexpected return code')
1987
return result, out, err
388
self.assertEquals(result, retcode)
1989
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1990
working_dir=None, error_regexes=[], output_encoding=None):
391
def run_bzr(self, *args, **kwargs):
1991
392
"""Invoke bzr, as if it were run from the command line.
1993
The argument list should not include the bzr program name - the
1994
first argument is normally the bzr command. Arguments may be
1995
passed in three ways:
1997
1- A list of strings, eg ["commit", "a"]. This is recommended
1998
when the command contains whitespace or metacharacters, or
1999
is built up at run time.
2001
2- A single string, eg "add a". This is the most convenient
2002
for hardcoded commands.
2004
This runs bzr through the interface that catches and reports
2005
errors, and with logging set to something approximating the
2006
default, so that error reporting can be checked.
2008
394
This should be the main method for tests that want to exercise the
2009
395
overall behavior of the bzr application (rather than a unit test
2010
396
or a functional test of the library.)
2012
398
This sends the stdout/stderr results into the test's log,
2013
399
where it may be useful for debugging. See also run_captured.
2015
:keyword stdin: A string to be used as stdin for the command.
2016
:keyword retcode: The status code the command should return;
2018
:keyword working_dir: The directory to run the command in
2019
:keyword error_regexes: A list of expected error messages. If
2020
specified they must be seen in the error output of the command.
2022
retcode, out, err = self._run_bzr_autosplit(
2027
working_dir=working_dir,
2029
self.assertIsInstance(error_regexes, (list, tuple))
2030
for regex in error_regexes:
2031
self.assertContainsRe(err, regex)
2034
def run_bzr_error(self, error_regexes, *args, **kwargs):
2035
"""Run bzr, and check that stderr contains the supplied regexes
2037
:param error_regexes: Sequence of regular expressions which
2038
must each be found in the error output. The relative ordering
2040
:param args: command-line arguments for bzr
2041
:param kwargs: Keyword arguments which are interpreted by run_bzr
2042
This function changes the default value of retcode to be 3,
2043
since in most cases this is run when you expect bzr to fail.
2045
:return: (out, err) The actual output of running the command (in case
2046
you want to do more inspection)
2050
# Make sure that commit is failing because there is nothing to do
2051
self.run_bzr_error(['no changes to commit'],
2052
['commit', '-m', 'my commit comment'])
2053
# Make sure --strict is handling an unknown file, rather than
2054
# giving us the 'nothing to do' error
2055
self.build_tree(['unknown'])
2056
self.run_bzr_error(['Commit refused because there are unknown files'],
2057
['commit', --strict', '-m', 'my commit comment'])
2059
kwargs.setdefault('retcode', 3)
2060
kwargs['error_regexes'] = error_regexes
2061
out, err = self.run_bzr(*args, **kwargs)
2064
def run_bzr_subprocess(self, *args, **kwargs):
2065
"""Run bzr in a subprocess for testing.
2067
This starts a new Python interpreter and runs bzr in there.
2068
This should only be used for tests that have a justifiable need for
2069
this isolation: e.g. they are testing startup time, or signal
2070
handling, or early startup code, etc. Subprocess code can't be
2071
profiled or debugged so easily.
2073
:keyword retcode: The status code that is expected. Defaults to 0. If
2074
None is supplied, the status code is not checked.
2075
:keyword env_changes: A dictionary which lists changes to environment
2076
variables. A value of None will unset the env variable.
2077
The values must be strings. The change will only occur in the
2078
child, so you don't need to fix the environment after running.
2079
:keyword universal_newlines: Convert CRLF => LF
2080
:keyword allow_plugins: By default the subprocess is run with
2081
--no-plugins to ensure test reproducibility. Also, it is possible
2082
for system-wide plugins to create unexpected output on stderr,
2083
which can cause unnecessary test failures.
2085
env_changes = kwargs.get('env_changes', {})
2086
working_dir = kwargs.get('working_dir', None)
2087
allow_plugins = kwargs.get('allow_plugins', False)
2089
if isinstance(args[0], list):
2091
elif isinstance(args[0], basestring):
2092
args = list(shlex.split(args[0]))
2094
raise ValueError("passing varargs to run_bzr_subprocess")
2095
process = self.start_bzr_subprocess(args, env_changes=env_changes,
2096
working_dir=working_dir,
2097
allow_plugins=allow_plugins)
2098
# We distinguish between retcode=None and retcode not passed.
2099
supplied_retcode = kwargs.get('retcode', 0)
2100
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2101
universal_newlines=kwargs.get('universal_newlines', False),
2104
def start_bzr_subprocess(self, process_args, env_changes=None,
2105
skip_if_plan_to_signal=False,
2107
allow_plugins=False, stderr=subprocess.PIPE):
2108
"""Start bzr in a subprocess for testing.
2110
This starts a new Python interpreter and runs bzr in there.
2111
This should only be used for tests that have a justifiable need for
2112
this isolation: e.g. they are testing startup time, or signal
2113
handling, or early startup code, etc. Subprocess code can't be
2114
profiled or debugged so easily.
2116
:param process_args: a list of arguments to pass to the bzr executable,
2117
for example ``['--version']``.
2118
:param env_changes: A dictionary which lists changes to environment
2119
variables. A value of None will unset the env variable.
2120
The values must be strings. The change will only occur in the
2121
child, so you don't need to fix the environment after running.
2122
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2123
doesn't support signalling subprocesses.
2124
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2125
:param stderr: file to use for the subprocess's stderr. Valid values
2126
are those valid for the stderr argument of `subprocess.Popen`.
2127
Default value is ``subprocess.PIPE``.
2129
:returns: Popen object for the started process.
2131
if skip_if_plan_to_signal:
2132
if os.name != "posix":
2133
raise TestSkipped("Sending signals not supported")
2135
if env_changes is None:
2139
def cleanup_environment():
2140
for env_var, value in env_changes.iteritems():
2141
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2143
def restore_environment():
2144
for env_var, value in old_env.iteritems():
2145
osutils.set_or_unset_env(env_var, value)
2147
bzr_path = self.get_bzr_path()
2150
if working_dir is not None:
2151
cwd = osutils.getcwd()
2152
os.chdir(working_dir)
2155
# win32 subprocess doesn't support preexec_fn
2156
# so we will avoid using it on all platforms, just to
2157
# make sure the code path is used, and we don't break on win32
2158
cleanup_environment()
2159
# Include the subprocess's log file in the test details, in case
2160
# the test fails due to an error in the subprocess.
2161
self._add_subprocess_log(trace._get_bzr_log_filename())
2162
command = [sys.executable]
2163
# frozen executables don't need the path to bzr
2164
if getattr(sys, "frozen", None) is None:
2165
command.append(bzr_path)
2166
if not allow_plugins:
2167
command.append('--no-plugins')
2168
command.extend(process_args)
2169
process = self._popen(command, stdin=subprocess.PIPE,
2170
stdout=subprocess.PIPE,
2173
restore_environment()
2179
def _add_subprocess_log(self, log_file_path):
2180
if len(self._log_files) == 0:
2181
# Register an addCleanup func. We do this on the first call to
2182
# _add_subprocess_log rather than in TestCase.setUp so that this
2183
# addCleanup is registered after any cleanups for tempdirs that
2184
# subclasses might create, which will probably remove the log file
2186
self.addCleanup(self._subprocess_log_cleanup)
2187
# self._log_files is a set, so if a log file is reused we won't grab it
2189
self._log_files.add(log_file_path)
2191
def _subprocess_log_cleanup(self):
2192
for count, log_file_path in enumerate(self._log_files):
2193
# We use buffer_now=True to avoid holding the file open beyond
2194
# the life of this function, which might interfere with e.g.
2195
# cleaning tempdirs on Windows.
2196
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2197
#detail_content = content.content_from_file(
2198
# log_file_path, buffer_now=True)
2199
with open(log_file_path, 'rb') as log_file:
2200
log_file_bytes = log_file.read()
2201
detail_content = content.Content(content.ContentType("text",
2202
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2203
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2206
def _popen(self, *args, **kwargs):
2207
"""Place a call to Popen.
2209
Allows tests to override this method to intercept the calls made to
2210
Popen for introspection.
2212
return subprocess.Popen(*args, **kwargs)
2214
def get_source_path(self):
2215
"""Return the path of the directory containing bzrlib."""
2216
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2218
def get_bzr_path(self):
2219
"""Return the path of the 'bzr' executable for this test suite."""
2220
bzr_path = os.path.join(self.get_source_path(), "bzr")
2221
if not os.path.isfile(bzr_path):
2222
# We are probably installed. Assume sys.argv is the right file
2223
bzr_path = sys.argv[0]
2226
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2227
universal_newlines=False, process_args=None):
2228
"""Finish the execution of process.
2230
:param process: the Popen object returned from start_bzr_subprocess.
2231
:param retcode: The status code that is expected. Defaults to 0. If
2232
None is supplied, the status code is not checked.
2233
:param send_signal: an optional signal to send to the process.
2234
:param universal_newlines: Convert CRLF => LF
2235
:returns: (stdout, stderr)
2237
if send_signal is not None:
2238
os.kill(process.pid, send_signal)
2239
out, err = process.communicate()
2241
if universal_newlines:
2242
out = out.replace('\r\n', '\n')
2243
err = err.replace('\r\n', '\n')
2245
if retcode is not None and retcode != process.returncode:
2246
if process_args is None:
2247
process_args = "(unknown args)"
2248
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2249
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2250
self.fail('Command bzr %s failed with retcode %s != %s'
2251
% (process_args, retcode, process.returncode))
2254
def check_tree_shape(self, tree, shape):
2255
"""Compare a tree to a list of expected names.
401
retcode = kwargs.pop('retcode', 0)
402
return self.run_bzr_captured(args, retcode)
404
def check_inventory_shape(self, inv, shape):
405
"""Compare an inventory to a list of expected names.
2257
407
Fail if they are not precisely equal.
2260
410
shape = list(shape) # copy
2261
for path, ie in tree.iter_entries_by_dir():
411
for path, ie in inv.entries():
2262
412
name = path.replace('\\', '/')
2263
if ie.kind == 'directory':
2264
414
name = name + '/'
2266
pass # ignore root entry
2268
416
shape.remove(name)
2270
418
extras.append(name)
2705
467
All test cases create their own directory within that. If the
2706
468
tests complete successfully, the directory is removed.
2708
:ivar test_base_dir: The path of the top-level directory for this
2709
test, which contains a home directory and a work directory.
2711
:ivar test_home_dir: An initially empty directory under test_base_dir
2712
which is used as $HOME for this test.
2714
:ivar test_dir: A directory under test_base_dir used as the current
2715
directory when the test proper is run.
470
InTempDir is an old alias for FunctionalTestCase.
2718
475
OVERRIDE_PYTHON = 'python'
2721
super(TestCaseInTempDir, self).setUp()
2722
# Remove the protection set in isolated_environ, we have a proper
2723
# access to disk resources now.
2724
self.overrideEnv('BZR_LOG', None)
2726
477
def check_file_contents(self, filename, expect):
2727
478
self.log("check contents of file %s" % filename)
479
contents = file(filename, 'r').read()
2733
480
if contents != expect:
2734
481
self.log("expected: %r" % expect)
2735
482
self.log("actually: %r" % contents)
2736
483
self.fail("contents of %s not as expected" % filename)
2738
def _getTestDirPrefix(self):
2739
# create a directory within the top level test directory
2740
if sys.platform in ('win32', 'cygwin'):
2741
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2742
# windows is likely to have path-length limits so use a short name
2743
name_prefix = name_prefix[-30:]
2745
name_prefix = re.sub('[/]', '_', self.id())
2748
def makeAndChdirToTestDir(self):
2749
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2751
For TestCaseInTempDir we create a temporary directory based on the test
2752
name and then create two subdirs - test and home under it.
2754
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2755
self._getTestDirPrefix())
2757
for i in range(100):
2758
if os.path.exists(name):
2759
name = name_prefix + '_' + str(i)
2761
# now create test and home directories within this dir
2762
self.test_base_dir = name
2763
self.addCleanup(self.deleteTestDir)
2764
os.mkdir(self.test_base_dir)
2766
self.permit_dir(self.test_base_dir)
2767
# 'sprouting' and 'init' of a branch both walk up the tree to find
2768
# stacking policy to honour; create a bzr dir with an unshared
2769
# repository (but not a branch - our code would be trying to escape
2770
# then!) to stop them, and permit it to be read.
2771
# control = bzrdir.BzrDir.create(self.test_base_dir)
2772
# control.create_repository()
2773
self.test_home_dir = self.test_base_dir + '/home'
2774
os.mkdir(self.test_home_dir)
2775
self.test_dir = self.test_base_dir + '/work'
485
def _make_test_root(self):
486
if TestCaseInTempDir.TEST_ROOT is not None:
490
root = u'test%04d.tmp' % i
494
if e.errno == errno.EEXIST:
499
# successfully created
500
TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
502
# make a fake bzr directory there to prevent any tests propagating
503
# up onto the source directory's real branch
504
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
507
super(TestCaseInTempDir, self).setUp()
508
self._make_test_root()
509
_currentdir = os.getcwdu()
510
short_id = self.id().replace('bzrlib.tests.', '') \
511
.replace('__main__.', '')
512
self.test_dir = os.path.join(self.TEST_ROOT, short_id)
2776
513
os.mkdir(self.test_dir)
2777
514
os.chdir(self.test_dir)
2778
# put name of test inside
2779
f = file(self.test_base_dir + '/name', 'w')
2785
def deleteTestDir(self):
2786
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2787
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2789
def build_tree(self, shape, line_endings='binary', transport=None):
515
os.environ['HOME'] = self.test_dir
516
def _leaveDirectory():
517
os.chdir(_currentdir)
518
self.addCleanup(_leaveDirectory)
520
def build_tree(self, shape, line_endings='native'):
2790
521
"""Build a test tree according to a pattern.
2792
523
shape is a sequence of file specifications. If the final
2793
524
character is '/', a directory is created.
2795
This assumes that all the elements in the tree being built are new.
2797
526
This doesn't add anything to a branch.
2799
:type shape: list or tuple.
2800
527
:param line_endings: Either 'binary' or 'native'
2801
in binary mode, exact contents are written in native mode, the
2802
line endings match the default platform endings.
2803
:param transport: A transport to write to, for building trees on VFS's.
2804
If the transport is readonly or None, "." is opened automatically.
528
in binary mode, exact contents are written
529
in native mode, the line endings match the
530
default platform endings.
2807
if type(shape) not in (list, tuple):
2808
raise AssertionError("Parameter 'shape' should be "
2809
"a list or a tuple. Got %r instead" % (shape,))
2810
# It's OK to just create them using forward slashes on windows.
2811
if transport is None or transport.is_readonly():
2812
transport = _mod_transport.get_transport(".")
532
# XXX: It's OK to just create them using forward slashes on windows?
2813
533
for name in shape:
2814
self.assertIsInstance(name, basestring)
534
self.assert_(isinstance(name, basestring))
2815
535
if name[-1] == '/':
2816
transport.mkdir(urlutils.escape(name[:-1]))
2818
538
if line_endings == 'binary':
2820
540
elif line_endings == 'native':
2823
raise errors.BzrError(
2824
'Invalid line ending request %r' % line_endings)
2825
content = "contents of %s%s" % (name.encode('utf-8'), end)
2826
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2828
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2830
def assertInWorkingTree(self, path, root_path='.', tree=None):
2831
"""Assert whether path or paths are in the WorkingTree"""
2833
tree = workingtree.WorkingTree.open(root_path)
2834
if not isinstance(path, basestring):
2836
self.assertInWorkingTree(p, tree=tree)
2838
self.assertIsNot(tree.path2id(path), None,
2839
path+' not in working tree.')
2841
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2842
"""Assert whether path or paths are not in the WorkingTree"""
2844
tree = workingtree.WorkingTree.open(root_path)
2845
if not isinstance(path, basestring):
2847
self.assertNotInWorkingTree(p,tree=tree)
2849
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2852
class TestCaseWithTransport(TestCaseInTempDir):
2853
"""A test case that provides get_url and get_readonly_url facilities.
2855
These back onto two transport servers, one for readonly access and one for
2858
If no explicit class is provided for readonly access, a
2859
ReadonlyTransportDecorator is used instead which allows the use of non disk
2860
based read write transports.
2862
If an explicit class is provided for readonly access, that server and the
2863
readwrite one must both define get_url() as resolving to os.getcwd().
2866
def get_vfs_only_server(self):
2867
"""See TestCaseWithMemoryTransport.
2869
This is useful for some tests with specific servers that need
2872
if self.__vfs_server is None:
2873
self.__vfs_server = self.vfs_transport_factory()
2874
self.start_server(self.__vfs_server)
2875
return self.__vfs_server
2877
def make_branch_and_tree(self, relpath, format=None):
2878
"""Create a branch on the transport and a tree locally.
2880
If the transport is not a LocalTransport, the Tree can't be created on
2881
the transport. In that case if the vfs_transport_factory is
2882
LocalURLServer the working tree is created in the local
2883
directory backing the transport, and the returned tree's branch and
2884
repository will also be accessed locally. Otherwise a lightweight
2885
checkout is created and returned.
2887
We do this because we can't physically create a tree in the local
2888
path, with a branch reference to the transport_factory url, and
2889
a branch + repository in the vfs_transport, unless the vfs_transport
2890
namespace is distinct from the local disk - the two branch objects
2891
would collide. While we could construct a tree with its branch object
2892
pointing at the transport_factory transport in memory, reopening it
2893
would behaving unexpectedly, and has in the past caused testing bugs
2894
when we tried to do it that way.
2896
:param format: The BzrDirFormat.
2897
:returns: the WorkingTree.
2899
# TODO: always use the local disk path for the working tree,
2900
# this obviously requires a format that supports branch references
2901
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2903
b = self.make_branch(relpath, format=format)
2905
return b.bzrdir.create_workingtree()
2906
except errors.NotLocalUrl:
2907
# We can only make working trees locally at the moment. If the
2908
# transport can't support them, then we keep the non-disk-backed
2909
# branch and create a local checkout.
2910
if self.vfs_transport_factory is test_server.LocalURLServer:
2911
# the branch is colocated on disk, we cannot create a checkout.
2912
# hopefully callers will expect this.
2913
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2914
wt = local_controldir.create_workingtree()
2915
if wt.branch._format != b._format:
2917
# Make sure that assigning to wt._branch fixes wt.branch,
2918
# in case the implementation details of workingtree objects
2920
self.assertIs(b, wt.branch)
2923
return b.create_checkout(relpath, lightweight=True)
2925
def assertIsDirectory(self, relpath, transport):
2926
"""Assert that relpath within transport is a directory.
2928
This may not be possible on all transports; in that case it propagates
2929
a TransportNotPossible.
2932
mode = transport.stat(relpath).st_mode
2933
except errors.NoSuchFile:
2934
self.fail("path %s is not a directory; no such file"
2936
if not stat.S_ISDIR(mode):
2937
self.fail("path %s is not a directory; has mode %#o"
2940
def assertTreesEqual(self, left, right):
2941
"""Check that left and right have the same content and properties."""
2942
# we use a tree delta to check for equality of the content, and we
2943
# manually check for equality of other things such as the parents list.
2944
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2945
differences = left.changes_from(right)
2946
self.assertFalse(differences.has_changed(),
2947
"Trees %r and %r are different: %r" % (left, right, differences))
2950
super(TestCaseWithTransport, self).setUp()
2951
self.__vfs_server = None
2953
def disable_missing_extensions_warning(self):
2954
"""Some tests expect a precise stderr content.
2956
There is no point in forcing them to duplicate the extension related
2959
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2962
class ChrootedTestCase(TestCaseWithTransport):
2963
"""A support class that provides readonly urls outside the local namespace.
2965
This is done by checking if self.transport_server is a MemoryServer. if it
2966
is then we are chrooted already, if it is not then an HttpServer is used
2969
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2970
be used without needed to redo it when a different
2971
subclass is in use ?
2975
from bzrlib.tests import http_server
2976
super(ChrootedTestCase, self).setUp()
2977
if not self.vfs_transport_factory == memory.MemoryServer:
2978
self.transport_readonly_server = http_server.HttpServer
2981
def condition_id_re(pattern):
2982
"""Create a condition filter which performs a re check on a test's id.
2984
:param pattern: A regular expression string.
2985
:return: A callable that returns True if the re matches.
2987
filter_re = re.compile(pattern, 0)
2988
def condition(test):
2990
return filter_re.search(test_id)
2994
def condition_isinstance(klass_or_klass_list):
2995
"""Create a condition filter which returns isinstance(param, klass).
2997
:return: A callable which when called with one parameter obj return the
2998
result of isinstance(obj, klass_or_klass_list).
3001
return isinstance(obj, klass_or_klass_list)
3005
def condition_id_in_list(id_list):
3006
"""Create a condition filter which verify that test's id in a list.
3008
:param id_list: A TestIdList object.
3009
:return: A callable that returns True if the test's id appears in the list.
3011
def condition(test):
3012
return id_list.includes(test.id())
3016
def condition_id_startswith(starts):
3017
"""Create a condition filter verifying that test's id starts with a string.
3019
:param starts: A list of string.
3020
:return: A callable that returns True if the test's id starts with one of
3023
def condition(test):
3024
for start in starts:
3025
if test.id().startswith(start):
3031
def exclude_tests_by_condition(suite, condition):
3032
"""Create a test suite which excludes some tests from suite.
3034
:param suite: The suite to get tests from.
3035
:param condition: A callable whose result evaluates True when called with a
3036
test case which should be excluded from the result.
3037
:return: A suite which contains the tests found in suite that fail
3041
for test in iter_suite_tests(suite):
3042
if not condition(test):
3044
return TestUtil.TestSuite(result)
3047
def filter_suite_by_condition(suite, condition):
3048
"""Create a test suite by filtering another one.
3050
:param suite: The source suite.
3051
:param condition: A callable whose result evaluates True when called with a
3052
test case which should be included in the result.
3053
:return: A suite which contains the tests found in suite that pass
3057
for test in iter_suite_tests(suite):
3060
return TestUtil.TestSuite(result)
543
raise BzrError('Invalid line ending request %r' % (line_endings,))
544
print >>f, "contents of", name
547
def build_tree_contents(self, shape):
548
build_tree_contents(shape)
550
def failUnlessExists(self, path):
551
"""Fail unless path, which may be abs or relative, exists."""
552
self.failUnless(osutils.lexists(path))
554
def assertFileEqual(self, content, path):
555
"""Fail if path does not contain 'content'."""
556
self.failUnless(osutils.lexists(path))
557
self.assertEqualDiff(content, open(path, 'r').read())
560
class MetaTestLog(TestCase):
561
def test_logging(self):
562
"""Test logs are captured when a test fails."""
563
self.log('a test message')
564
self._log_file.flush()
565
self.assertContainsRe(self._get_log(), 'a test message\n')
3063
568
def filter_suite_by_re(suite, pattern):
3064
"""Create a test suite by filtering another one.
3066
:param suite: the source suite
3067
:param pattern: pattern that names must match
3068
:returns: the newly created suite
3070
condition = condition_id_re(pattern)
3071
result_suite = filter_suite_by_condition(suite, condition)
3075
def filter_suite_by_id_list(suite, test_id_list):
3076
"""Create a test suite by filtering another one.
3078
:param suite: The source suite.
3079
:param test_id_list: A list of the test ids to keep as strings.
3080
:returns: the newly created suite
3082
condition = condition_id_in_list(test_id_list)
3083
result_suite = filter_suite_by_condition(suite, condition)
3087
def filter_suite_by_id_startswith(suite, start):
3088
"""Create a test suite by filtering another one.
3090
:param suite: The source suite.
3091
:param start: A list of string the test id must start with one of.
3092
:returns: the newly created suite
3094
condition = condition_id_startswith(start)
3095
result_suite = filter_suite_by_condition(suite, condition)
3099
def exclude_tests_by_re(suite, pattern):
3100
"""Create a test suite which excludes some tests from suite.
3102
:param suite: The suite to get tests from.
3103
:param pattern: A regular expression string. Test ids that match this
3104
pattern will be excluded from the result.
3105
:return: A TestSuite that contains all the tests from suite without the
3106
tests that matched pattern. The order of tests is the same as it was in
3109
return exclude_tests_by_condition(suite, condition_id_re(pattern))
3112
def preserve_input(something):
3113
"""A helper for performing test suite transformation chains.
3115
:param something: Anything you want to preserve.
3121
def randomize_suite(suite):
3122
"""Return a new TestSuite with suite's tests in random order.
3124
The tests in the input suite are flattened into a single suite in order to
3125
accomplish this. Any nested TestSuites are removed to provide global
3128
tests = list(iter_suite_tests(suite))
3129
random.shuffle(tests)
3130
return TestUtil.TestSuite(tests)
3133
def split_suite_by_condition(suite, condition):
3134
"""Split a test suite into two by a condition.
3136
:param suite: The suite to split.
3137
:param condition: The condition to match on. Tests that match this
3138
condition are returned in the first test suite, ones that do not match
3139
are in the second suite.
3140
:return: A tuple of two test suites, where the first contains tests from
3141
suite matching the condition, and the second contains the remainder
3142
from suite. The order within each output suite is the same as it was in
569
result = TestUtil.TestSuite()
570
filter_re = re.compile(pattern)
3147
571
for test in iter_suite_tests(suite):
3149
matched.append(test)
3151
did_not_match.append(test)
3152
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3155
def split_suite_by_re(suite, pattern):
3156
"""Split a test suite into two by a regular expression.
3158
:param suite: The suite to split.
3159
:param pattern: A regular expression string. Test ids that match this
3160
pattern will be in the first test suite returned, and the others in the
3161
second test suite returned.
3162
:return: A tuple of two test suites, where the first contains tests from
3163
suite matching pattern, and the second contains the remainder from
3164
suite. The order within each output suite is the same as it was in
3167
return split_suite_by_condition(suite, condition_id_re(pattern))
572
if filter_re.search(test.id()):
3170
577
def run_suite(suite, name='test', verbose=False, pattern=".*",
3171
stop_on_failure=False,
3172
transport=None, lsprof_timed=None, bench_history=None,
3173
matching_tests_first=None,
3176
exclude_pattern=None,
3179
suite_decorators=None,
3181
result_decorators=None,
3183
"""Run a test suite for bzr selftest.
3185
:param runner_class: The class of runner to use. Must support the
3186
constructor arguments passed by run_suite which are more than standard
3188
:return: A boolean indicating success.
3190
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
578
stop_on_failure=False, keep_output=False):
579
TestCaseInTempDir._TEST_NAME = name
3195
if runner_class is None:
3196
runner_class = TextTestRunner
3199
runner = runner_class(stream=stream,
584
runner = TextTestRunner(stream=sys.stdout,
3201
verbosity=verbosity,
3202
bench_history=bench_history,
3204
result_decorators=result_decorators,
3206
587
runner.stop_on_failure=stop_on_failure
3207
# built in decorator factories:
3209
random_order(random_seed, runner),
3210
exclude_tests(exclude_pattern),
3212
if matching_tests_first:
3213
decorators.append(tests_first(pattern))
3215
decorators.append(filter_tests(pattern))
3216
if suite_decorators:
3217
decorators.extend(suite_decorators)
3218
# tell the result object how many tests will be running: (except if
3219
# --parallel=fork is being used. Robert said he will provide a better
3220
# progress design later -- vila 20090817)
3221
if fork_decorator not in decorators:
3222
decorators.append(CountingDecorator)
3223
for decorator in decorators:
3224
suite = decorator(suite)
3226
# Done after test suite decoration to allow randomisation etc
3227
# to take effect, though that is of marginal benefit.
3229
stream.write("Listing tests only ...\n")
3230
for t in iter_suite_tests(suite):
3231
stream.write("%s\n" % (t.id()))
589
suite = filter_suite_by_re(suite, pattern)
3233
590
result = runner.run(suite)
3235
return result.wasStrictlySuccessful()
591
# This is still a little bogus,
592
# but only a little. Folk not using our testrunner will
593
# have to delete their temp directories themselves.
594
if result.wasSuccessful() or not keep_output:
595
if TestCaseInTempDir.TEST_ROOT is not None:
596
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
3237
return result.wasSuccessful()
3240
# A registry where get() returns a suite decorator.
3241
parallel_registry = registry.Registry()
3244
def fork_decorator(suite):
3245
if getattr(os, "fork", None) is None:
3246
raise errors.BzrCommandError("platform does not support fork,"
3247
" try --parallel=subprocess instead.")
3248
concurrency = osutils.local_concurrency()
3249
if concurrency == 1:
3251
from testtools import ConcurrentTestSuite
3252
return ConcurrentTestSuite(suite, fork_for_tests)
3253
parallel_registry.register('fork', fork_decorator)
3256
def subprocess_decorator(suite):
3257
concurrency = osutils.local_concurrency()
3258
if concurrency == 1:
3260
from testtools import ConcurrentTestSuite
3261
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3262
parallel_registry.register('subprocess', subprocess_decorator)
3265
def exclude_tests(exclude_pattern):
3266
"""Return a test suite decorator that excludes tests."""
3267
if exclude_pattern is None:
3268
return identity_decorator
3269
def decorator(suite):
3270
return ExcludeDecorator(suite, exclude_pattern)
3274
def filter_tests(pattern):
3276
return identity_decorator
3277
def decorator(suite):
3278
return FilterTestsDecorator(suite, pattern)
3282
def random_order(random_seed, runner):
3283
"""Return a test suite decorator factory for randomising tests order.
3285
:param random_seed: now, a string which casts to a long, or a long.
3286
:param runner: A test runner with a stream attribute to report on.
3288
if random_seed is None:
3289
return identity_decorator
3290
def decorator(suite):
3291
return RandomDecorator(suite, random_seed, runner.stream)
3295
def tests_first(pattern):
3297
return identity_decorator
3298
def decorator(suite):
3299
return TestFirstDecorator(suite, pattern)
3303
def identity_decorator(suite):
3308
class TestDecorator(TestUtil.TestSuite):
3309
"""A decorator for TestCase/TestSuite objects.
3311
Usually, subclasses should override __iter__(used when flattening test
3312
suites), which we do to filter, reorder, parallelise and so on, run() and
3316
def __init__(self, suite):
3317
TestUtil.TestSuite.__init__(self)
3320
def countTestCases(self):
3323
cases += test.countTestCases()
3330
def run(self, result):
3331
# Use iteration on self, not self._tests, to allow subclasses to hook
3334
if result.shouldStop:
3340
class CountingDecorator(TestDecorator):
3341
"""A decorator which calls result.progress(self.countTestCases)."""
3343
def run(self, result):
3344
progress_method = getattr(result, 'progress', None)
3345
if callable(progress_method):
3346
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3347
return super(CountingDecorator, self).run(result)
3350
class ExcludeDecorator(TestDecorator):
3351
"""A decorator which excludes test matching an exclude pattern."""
3353
def __init__(self, suite, exclude_pattern):
3354
TestDecorator.__init__(self, suite)
3355
self.exclude_pattern = exclude_pattern
3356
self.excluded = False
3360
return iter(self._tests)
3361
self.excluded = True
3362
suite = exclude_tests_by_re(self, self.exclude_pattern)
3364
self.addTests(suite)
3365
return iter(self._tests)
3368
class FilterTestsDecorator(TestDecorator):
3369
"""A decorator which filters tests to those matching a pattern."""
3371
def __init__(self, suite, pattern):
3372
TestDecorator.__init__(self, suite)
3373
self.pattern = pattern
3374
self.filtered = False
3378
return iter(self._tests)
3379
self.filtered = True
3380
suite = filter_suite_by_re(self, self.pattern)
3382
self.addTests(suite)
3383
return iter(self._tests)
3386
class RandomDecorator(TestDecorator):
3387
"""A decorator which randomises the order of its tests."""
3389
def __init__(self, suite, random_seed, stream):
3390
TestDecorator.__init__(self, suite)
3391
self.random_seed = random_seed
3392
self.randomised = False
3393
self.stream = stream
3397
return iter(self._tests)
3398
self.randomised = True
3399
self.stream.write("Randomizing test order using seed %s\n\n" %
3400
(self.actual_seed()))
3401
# Initialise the random number generator.
3402
random.seed(self.actual_seed())
3403
suite = randomize_suite(self)
3405
self.addTests(suite)
3406
return iter(self._tests)
3408
def actual_seed(self):
3409
if self.random_seed == "now":
3410
# We convert the seed to a long to make it reuseable across
3411
# invocations (because the user can reenter it).
3412
self.random_seed = long(time.time())
3414
# Convert the seed to a long if we can
3416
self.random_seed = long(self.random_seed)
3419
return self.random_seed
3422
class TestFirstDecorator(TestDecorator):
3423
"""A decorator which moves named tests to the front."""
3425
def __init__(self, suite, pattern):
3426
TestDecorator.__init__(self, suite)
3427
self.pattern = pattern
3428
self.filtered = False
3432
return iter(self._tests)
3433
self.filtered = True
3434
suites = split_suite_by_re(self, self.pattern)
3436
self.addTests(suites)
3437
return iter(self._tests)
3440
def partition_tests(suite, count):
3441
"""Partition suite into count lists of tests."""
3442
# This just assigns tests in a round-robin fashion. On one hand this
3443
# splits up blocks of related tests that might run faster if they shared
3444
# resources, but on the other it avoids assigning blocks of slow tests to
3445
# just one partition. So the slowest partition shouldn't be much slower
3447
partitions = [list() for i in range(count)]
3448
tests = iter_suite_tests(suite)
3449
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3450
partition.append(test)
3454
def workaround_zealous_crypto_random():
3455
"""Crypto.Random want to help us being secure, but we don't care here.
3457
This workaround some test failure related to the sftp server. Once paramiko
3458
stop using the controversial API in Crypto.Random, we may get rid of it.
3461
from Crypto.Random import atfork
3467
def fork_for_tests(suite):
3468
"""Take suite and start up one runner per CPU by forking()
3470
:return: An iterable of TestCase-like objects which can each have
3471
run(result) called on them to feed tests to result.
3473
concurrency = osutils.local_concurrency()
3475
from subunit import TestProtocolClient, ProtocolTestCase
3476
from subunit.test_results import AutoTimingTestResultDecorator
3477
class TestInOtherProcess(ProtocolTestCase):
3478
# Should be in subunit, I think. RBC.
3479
def __init__(self, stream, pid):
3480
ProtocolTestCase.__init__(self, stream)
3483
def run(self, result):
3485
ProtocolTestCase.run(self, result)
3487
os.waitpid(self.pid, 0)
3489
test_blocks = partition_tests(suite, concurrency)
3490
for process_tests in test_blocks:
3491
process_suite = TestUtil.TestSuite()
3492
process_suite.addTests(process_tests)
3493
c2pread, c2pwrite = os.pipe()
3496
workaround_zealous_crypto_random()
3499
# Leave stderr and stdout open so we can see test noise
3500
# Close stdin so that the child goes away if it decides to
3501
# read from stdin (otherwise its a roulette to see what
3502
# child actually gets keystrokes for pdb etc).
3505
stream = os.fdopen(c2pwrite, 'wb', 1)
3506
subunit_result = AutoTimingTestResultDecorator(
3507
TestProtocolClient(stream))
3508
process_suite.run(subunit_result)
3513
stream = os.fdopen(c2pread, 'rb', 1)
3514
test = TestInOtherProcess(stream, pid)
3519
def reinvoke_for_tests(suite):
3520
"""Take suite and start up one runner per CPU using subprocess().
3522
:return: An iterable of TestCase-like objects which can each have
3523
run(result) called on them to feed tests to result.
3525
concurrency = osutils.local_concurrency()
3527
from subunit import ProtocolTestCase
3528
class TestInSubprocess(ProtocolTestCase):
3529
def __init__(self, process, name):
3530
ProtocolTestCase.__init__(self, process.stdout)
3531
self.process = process
3532
self.process.stdin.close()
3535
def run(self, result):
3537
ProtocolTestCase.run(self, result)
3540
os.unlink(self.name)
3541
# print "pid %d finished" % finished_process
3542
test_blocks = partition_tests(suite, concurrency)
3543
for process_tests in test_blocks:
3544
# ugly; currently reimplement rather than reuses TestCase methods.
3545
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3546
if not os.path.isfile(bzr_path):
3547
# We are probably installed. Assume sys.argv is the right file
3548
bzr_path = sys.argv[0]
3549
bzr_path = [bzr_path]
3550
if sys.platform == "win32":
3551
# if we're on windows, we can't execute the bzr script directly
3552
bzr_path = [sys.executable] + bzr_path
3553
fd, test_list_file_name = tempfile.mkstemp()
3554
test_list_file = os.fdopen(fd, 'wb', 1)
3555
for test in process_tests:
3556
test_list_file.write(test.id() + '\n')
3557
test_list_file.close()
3559
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3561
if '--no-plugins' in sys.argv:
3562
argv.append('--no-plugins')
3563
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3564
# noise on stderr it can interrupt the subunit protocol.
3565
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3566
stdout=subprocess.PIPE,
3567
stderr=subprocess.PIPE,
3569
test = TestInSubprocess(process, test_list_file_name)
3572
os.unlink(test_list_file_name)
3577
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3578
"""Generate profiling data for all activity between start and success.
3580
The profile data is appended to the test's _benchcalls attribute and can
3581
be accessed by the forwarded-to TestResult.
3583
While it might be cleaner do accumulate this in stopTest, addSuccess is
3584
where our existing output support for lsprof is, and this class aims to
3585
fit in with that: while it could be moved it's not necessary to accomplish
3586
test profiling, nor would it be dramatically cleaner.
3589
def startTest(self, test):
3590
self.profiler = bzrlib.lsprof.BzrProfiler()
3591
# Prevent deadlocks in tests that use lsprof: those tests will
3593
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3594
self.profiler.start()
3595
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3597
def addSuccess(self, test):
3598
stats = self.profiler.stop()
3600
calls = test._benchcalls
3601
except AttributeError:
3602
test._benchcalls = []
3603
calls = test._benchcalls
3604
calls.append(((test.id(), "", ""), stats))
3605
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3607
def stopTest(self, test):
3608
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3609
self.profiler = None
3612
# Controlled by "bzr selftest -E=..." option
3613
# Currently supported:
3614
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3615
# preserves any flags supplied at the command line.
3616
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3617
# rather than failing tests. And no longer raise
3618
# LockContention when fctnl locks are not being used
3619
# with proper exclusion rules.
3620
# -Ethreads Will display thread ident at creation/join time to
3621
# help track thread leaks
3623
# -Econfig_stats Will collect statistics using addDetail
3624
selftest_debug_flags = set()
598
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
599
return result.wasSuccessful()
3627
602
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3629
test_suite_factory=None,
3632
matching_tests_first=None,
3635
exclude_pattern=None,
3641
suite_decorators=None,
3645
604
"""Run the whole test suite under the enhanced runner"""
3646
# XXX: Very ugly way to do this...
3647
# Disable warning about old formats because we don't want it to disturb
3648
# any blackbox tests.
3649
from bzrlib import repository
3650
repository._deprecation_warning_done = True
3652
global default_transport
3653
if transport is None:
3654
transport = default_transport
3655
old_transport = default_transport
3656
default_transport = transport
3657
global selftest_debug_flags
3658
old_debug_flags = selftest_debug_flags
3659
if debug_flags is not None:
3660
selftest_debug_flags = set(debug_flags)
3662
if load_list is None:
3665
keep_only = load_test_id_list(load_list)
3667
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3668
for start in starting_with]
3669
if test_suite_factory is None:
3670
# Reduce loading time by loading modules based on the starting_with
3672
suite = test_suite(keep_only, starting_with)
3674
suite = test_suite_factory()
3676
# But always filter as requested.
3677
suite = filter_suite_by_id_startswith(suite, starting_with)
3678
result_decorators = []
3680
result_decorators.append(ProfileResult)
3681
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3682
stop_on_failure=stop_on_failure,
3683
transport=transport,
3684
lsprof_timed=lsprof_timed,
3685
bench_history=bench_history,
3686
matching_tests_first=matching_tests_first,
3687
list_only=list_only,
3688
random_seed=random_seed,
3689
exclude_pattern=exclude_pattern,
3691
runner_class=runner_class,
3692
suite_decorators=suite_decorators,
3694
result_decorators=result_decorators,
3697
default_transport = old_transport
3698
selftest_debug_flags = old_debug_flags
3701
def load_test_id_list(file_name):
3702
"""Load a test id list from a text file.
3704
The format is one test id by line. No special care is taken to impose
3705
strict rules, these test ids are used to filter the test suite so a test id
3706
that do not match an existing test will do no harm. This allows user to add
3707
comments, leave blank lines, etc.
3711
ftest = open(file_name, 'rt')
3713
if e.errno != errno.ENOENT:
3716
raise errors.NoSuchFile(file_name)
3718
for test_name in ftest.readlines():
3719
test_list.append(test_name.strip())
3724
def suite_matches_id_list(test_suite, id_list):
3725
"""Warns about tests not appearing or appearing more than once.
3727
:param test_suite: A TestSuite object.
3728
:param test_id_list: The list of test ids that should be found in
3731
:return: (absents, duplicates) absents is a list containing the test found
3732
in id_list but not in test_suite, duplicates is a list containing the
3733
test found multiple times in test_suite.
3735
When using a prefined test id list, it may occurs that some tests do not
3736
exist anymore or that some tests use the same id. This function warns the
3737
tester about potential problems in his workflow (test lists are volatile)
3738
or in the test suite itself (using the same id for several tests does not
3739
help to localize defects).
3741
# Build a dict counting id occurrences
3743
for test in iter_suite_tests(test_suite):
3745
tests[id] = tests.get(id, 0) + 1
3750
occurs = tests.get(id, 0)
3752
not_found.append(id)
3754
duplicates.append(id)
3756
return not_found, duplicates
3759
class TestIdList(object):
3760
"""Test id list to filter a test suite.
3762
Relying on the assumption that test ids are built as:
3763
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3764
notation, this class offers methods to :
3765
- avoid building a test suite for modules not refered to in the test list,
3766
- keep only the tests listed from the module test suite.
3769
def __init__(self, test_id_list):
3770
# When a test suite needs to be filtered against us we compare test ids
3771
# for equality, so a simple dict offers a quick and simple solution.
3772
self.tests = dict().fromkeys(test_id_list, True)
3774
# While unittest.TestCase have ids like:
3775
# <module>.<class>.<method>[(<param+)],
3776
# doctest.DocTestCase can have ids like:
3779
# <module>.<function>
3780
# <module>.<class>.<method>
3782
# Since we can't predict a test class from its name only, we settle on
3783
# a simple constraint: a test id always begins with its module name.
3786
for test_id in test_id_list:
3787
parts = test_id.split('.')
3788
mod_name = parts.pop(0)
3789
modules[mod_name] = True
3791
mod_name += '.' + part
3792
modules[mod_name] = True
3793
self.modules = modules
3795
def refers_to(self, module_name):
3796
"""Is there tests for the module or one of its sub modules."""
3797
return self.modules.has_key(module_name)
3799
def includes(self, test_id):
3800
return self.tests.has_key(test_id)
3803
class TestPrefixAliasRegistry(registry.Registry):
3804
"""A registry for test prefix aliases.
3806
This helps implement shorcuts for the --starting-with selftest
3807
option. Overriding existing prefixes is not allowed but not fatal (a
3808
warning will be emitted).
3811
def register(self, key, obj, help=None, info=None,
3812
override_existing=False):
3813
"""See Registry.register.
3815
Trying to override an existing alias causes a warning to be emitted,
3816
not a fatal execption.
3819
super(TestPrefixAliasRegistry, self).register(
3820
key, obj, help=help, info=info, override_existing=False)
3822
actual = self.get(key)
3824
'Test prefix alias %s is already used for %s, ignoring %s'
3825
% (key, actual, obj))
3827
def resolve_alias(self, id_start):
3828
"""Replace the alias by the prefix in the given string.
3830
Using an unknown prefix is an error to help catching typos.
3832
parts = id_start.split('.')
3834
parts[0] = self.get(parts[0])
3836
raise errors.BzrCommandError(
3837
'%s is not a known test prefix alias' % parts[0])
3838
return '.'.join(parts)
3841
test_prefix_alias_registry = TestPrefixAliasRegistry()
3842
"""Registry of test prefix aliases."""
3845
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3846
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3847
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3849
# Obvious highest levels prefixes, feel free to add your own via a plugin
3850
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3851
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3852
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3853
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3854
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3857
def _test_suite_testmod_names():
3858
"""Return the standard list of test module names to test."""
3861
'bzrlib.tests.blackbox',
3862
'bzrlib.tests.commands',
3863
'bzrlib.tests.doc_generate',
3864
'bzrlib.tests.per_branch',
3865
'bzrlib.tests.per_bzrdir',
3866
'bzrlib.tests.per_controldir',
3867
'bzrlib.tests.per_controldir_colo',
3868
'bzrlib.tests.per_foreign_vcs',
3869
'bzrlib.tests.per_interrepository',
3870
'bzrlib.tests.per_intertree',
3871
'bzrlib.tests.per_inventory',
3872
'bzrlib.tests.per_interbranch',
3873
'bzrlib.tests.per_lock',
3874
'bzrlib.tests.per_merger',
3875
'bzrlib.tests.per_transport',
3876
'bzrlib.tests.per_tree',
3877
'bzrlib.tests.per_pack_repository',
3878
'bzrlib.tests.per_repository',
3879
'bzrlib.tests.per_repository_chk',
3880
'bzrlib.tests.per_repository_reference',
3881
'bzrlib.tests.per_repository_vf',
3882
'bzrlib.tests.per_uifactory',
3883
'bzrlib.tests.per_versionedfile',
3884
'bzrlib.tests.per_workingtree',
3885
'bzrlib.tests.test__annotator',
3886
'bzrlib.tests.test__bencode',
3887
'bzrlib.tests.test__btree_serializer',
3888
'bzrlib.tests.test__chk_map',
3889
'bzrlib.tests.test__dirstate_helpers',
3890
'bzrlib.tests.test__groupcompress',
3891
'bzrlib.tests.test__known_graph',
3892
'bzrlib.tests.test__rio',
3893
'bzrlib.tests.test__simple_set',
3894
'bzrlib.tests.test__static_tuple',
3895
'bzrlib.tests.test__walkdirs_win32',
3896
'bzrlib.tests.test_ancestry',
3897
'bzrlib.tests.test_annotate',
3898
'bzrlib.tests.test_api',
3899
'bzrlib.tests.test_atomicfile',
3900
'bzrlib.tests.test_bad_files',
3901
'bzrlib.tests.test_bisect_multi',
3902
'bzrlib.tests.test_branch',
3903
'bzrlib.tests.test_branchbuilder',
3904
'bzrlib.tests.test_btree_index',
3905
'bzrlib.tests.test_bugtracker',
3906
'bzrlib.tests.test_bundle',
3907
'bzrlib.tests.test_bzrdir',
3908
'bzrlib.tests.test__chunks_to_lines',
3909
'bzrlib.tests.test_cache_utf8',
3910
'bzrlib.tests.test_chk_map',
3911
'bzrlib.tests.test_chk_serializer',
3912
'bzrlib.tests.test_chunk_writer',
3913
'bzrlib.tests.test_clean_tree',
3914
'bzrlib.tests.test_cleanup',
3915
'bzrlib.tests.test_cmdline',
3916
'bzrlib.tests.test_commands',
3917
'bzrlib.tests.test_commit',
3918
'bzrlib.tests.test_commit_merge',
3919
'bzrlib.tests.test_config',
3920
'bzrlib.tests.test_conflicts',
3921
'bzrlib.tests.test_controldir',
3922
'bzrlib.tests.test_counted_lock',
3923
'bzrlib.tests.test_crash',
3924
'bzrlib.tests.test_decorators',
3925
'bzrlib.tests.test_delta',
3926
'bzrlib.tests.test_debug',
3927
'bzrlib.tests.test_diff',
3928
'bzrlib.tests.test_directory_service',
3929
'bzrlib.tests.test_dirstate',
3930
'bzrlib.tests.test_email_message',
3931
'bzrlib.tests.test_eol_filters',
3932
'bzrlib.tests.test_errors',
3933
'bzrlib.tests.test_export',
3934
'bzrlib.tests.test_export_pot',
3935
'bzrlib.tests.test_extract',
3936
'bzrlib.tests.test_fetch',
3937
'bzrlib.tests.test_fixtures',
3938
'bzrlib.tests.test_fifo_cache',
3939
'bzrlib.tests.test_filters',
3940
'bzrlib.tests.test_ftp_transport',
3941
'bzrlib.tests.test_foreign',
3942
'bzrlib.tests.test_generate_docs',
3943
'bzrlib.tests.test_generate_ids',
3944
'bzrlib.tests.test_globbing',
3945
'bzrlib.tests.test_gpg',
3946
'bzrlib.tests.test_graph',
3947
'bzrlib.tests.test_groupcompress',
3948
'bzrlib.tests.test_hashcache',
3949
'bzrlib.tests.test_help',
3950
'bzrlib.tests.test_hooks',
3951
'bzrlib.tests.test_http',
3952
'bzrlib.tests.test_http_response',
3953
'bzrlib.tests.test_https_ca_bundle',
3954
'bzrlib.tests.test_i18n',
3955
'bzrlib.tests.test_identitymap',
3956
'bzrlib.tests.test_ignores',
3957
'bzrlib.tests.test_index',
3958
'bzrlib.tests.test_import_tariff',
3959
'bzrlib.tests.test_info',
3960
'bzrlib.tests.test_inv',
3961
'bzrlib.tests.test_inventory_delta',
3962
'bzrlib.tests.test_knit',
3963
'bzrlib.tests.test_lazy_import',
3964
'bzrlib.tests.test_lazy_regex',
3965
'bzrlib.tests.test_library_state',
3966
'bzrlib.tests.test_lock',
3967
'bzrlib.tests.test_lockable_files',
3968
'bzrlib.tests.test_lockdir',
3969
'bzrlib.tests.test_log',
3970
'bzrlib.tests.test_lru_cache',
3971
'bzrlib.tests.test_lsprof',
3972
'bzrlib.tests.test_mail_client',
3973
'bzrlib.tests.test_matchers',
3974
'bzrlib.tests.test_memorytree',
3975
'bzrlib.tests.test_merge',
3976
'bzrlib.tests.test_merge3',
3977
'bzrlib.tests.test_merge_core',
3978
'bzrlib.tests.test_merge_directive',
3979
'bzrlib.tests.test_mergetools',
3980
'bzrlib.tests.test_missing',
3981
'bzrlib.tests.test_msgeditor',
3982
'bzrlib.tests.test_multiparent',
3983
'bzrlib.tests.test_mutabletree',
3984
'bzrlib.tests.test_nonascii',
3985
'bzrlib.tests.test_options',
3986
'bzrlib.tests.test_osutils',
3987
'bzrlib.tests.test_osutils_encodings',
3988
'bzrlib.tests.test_pack',
3989
'bzrlib.tests.test_patch',
3990
'bzrlib.tests.test_patches',
3991
'bzrlib.tests.test_permissions',
3992
'bzrlib.tests.test_plugins',
3993
'bzrlib.tests.test_progress',
3994
'bzrlib.tests.test_pyutils',
3995
'bzrlib.tests.test_read_bundle',
3996
'bzrlib.tests.test_reconcile',
3997
'bzrlib.tests.test_reconfigure',
3998
'bzrlib.tests.test_registry',
3999
'bzrlib.tests.test_remote',
4000
'bzrlib.tests.test_rename_map',
4001
'bzrlib.tests.test_repository',
4002
'bzrlib.tests.test_revert',
4003
'bzrlib.tests.test_revision',
4004
'bzrlib.tests.test_revisionspec',
4005
'bzrlib.tests.test_revisiontree',
4006
'bzrlib.tests.test_rio',
4007
'bzrlib.tests.test_rules',
4008
'bzrlib.tests.test_sampler',
4009
'bzrlib.tests.test_scenarios',
4010
'bzrlib.tests.test_script',
4011
'bzrlib.tests.test_selftest',
4012
'bzrlib.tests.test_serializer',
4013
'bzrlib.tests.test_setup',
4014
'bzrlib.tests.test_sftp_transport',
4015
'bzrlib.tests.test_shelf',
4016
'bzrlib.tests.test_shelf_ui',
4017
'bzrlib.tests.test_smart',
4018
'bzrlib.tests.test_smart_add',
4019
'bzrlib.tests.test_smart_request',
4020
'bzrlib.tests.test_smart_transport',
4021
'bzrlib.tests.test_smtp_connection',
4022
'bzrlib.tests.test_source',
4023
'bzrlib.tests.test_ssh_transport',
4024
'bzrlib.tests.test_status',
4025
'bzrlib.tests.test_store',
4026
'bzrlib.tests.test_strace',
4027
'bzrlib.tests.test_subsume',
4028
'bzrlib.tests.test_switch',
4029
'bzrlib.tests.test_symbol_versioning',
4030
'bzrlib.tests.test_tag',
4031
'bzrlib.tests.test_test_server',
4032
'bzrlib.tests.test_testament',
4033
'bzrlib.tests.test_textfile',
4034
'bzrlib.tests.test_textmerge',
4035
'bzrlib.tests.test_cethread',
4036
'bzrlib.tests.test_timestamp',
4037
'bzrlib.tests.test_trace',
4038
'bzrlib.tests.test_transactions',
4039
'bzrlib.tests.test_transform',
4040
'bzrlib.tests.test_transport',
4041
'bzrlib.tests.test_transport_log',
4042
'bzrlib.tests.test_tree',
4043
'bzrlib.tests.test_treebuilder',
4044
'bzrlib.tests.test_treeshape',
4045
'bzrlib.tests.test_tsort',
4046
'bzrlib.tests.test_tuned_gzip',
4047
'bzrlib.tests.test_ui',
4048
'bzrlib.tests.test_uncommit',
4049
'bzrlib.tests.test_upgrade',
4050
'bzrlib.tests.test_upgrade_stacked',
4051
'bzrlib.tests.test_urlutils',
4052
'bzrlib.tests.test_utextwrap',
4053
'bzrlib.tests.test_version',
4054
'bzrlib.tests.test_version_info',
4055
'bzrlib.tests.test_versionedfile',
4056
'bzrlib.tests.test_weave',
4057
'bzrlib.tests.test_whitebox',
4058
'bzrlib.tests.test_win32utils',
4059
'bzrlib.tests.test_workingtree',
4060
'bzrlib.tests.test_workingtree_4',
4061
'bzrlib.tests.test_wsgi',
4062
'bzrlib.tests.test_xml',
4066
def _test_suite_modules_to_doctest():
4067
"""Return the list of modules to doctest."""
4069
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
4073
'bzrlib.branchbuilder',
4074
'bzrlib.decorators',
4076
'bzrlib.iterablefile',
4081
'bzrlib.symbol_versioning',
4083
'bzrlib.tests.fixtures',
4085
'bzrlib.transport.http',
4086
'bzrlib.version_info_formats.format_custom',
4090
def test_suite(keep_only=None, starting_with=None):
4091
"""Build and return TestSuite for the whole of bzrlib.
4093
:param keep_only: A list of test ids limiting the suite returned.
4095
:param starting_with: An id limiting the suite returned to the tests
4098
This function can be replaced if you need to change the default test
4099
suite on a global basis, but it is not encouraged.
4102
loader = TestUtil.TestLoader()
4104
if keep_only is not None:
4105
id_filter = TestIdList(keep_only)
4107
# We take precedence over keep_only because *at loading time* using
4108
# both options means we will load less tests for the same final result.
4109
def interesting_module(name):
4110
for start in starting_with:
4112
# Either the module name starts with the specified string
4113
name.startswith(start)
4114
# or it may contain tests starting with the specified string
4115
or start.startswith(name)
4119
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4121
elif keep_only is not None:
4122
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4123
def interesting_module(name):
4124
return id_filter.refers_to(name)
4127
loader = TestUtil.TestLoader()
4128
def interesting_module(name):
4129
# No filtering, all modules are interesting
4132
suite = loader.suiteClass()
4134
# modules building their suite with loadTestsFromModuleNames
4135
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
4137
for mod in _test_suite_modules_to_doctest():
4138
if not interesting_module(mod):
4139
# No tests to keep here, move along
4142
# note that this really does mean "report only" -- doctest
4143
# still runs the rest of the examples
4144
doc_suite = IsolatedDocTestSuite(
4145
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
4146
except ValueError, e:
4147
print '**failed to get doctest for: %s\n%s' % (mod, e)
4149
if len(doc_suite._tests) == 0:
4150
raise errors.BzrError("no doctests found in %s" % (mod,))
4151
suite.addTest(doc_suite)
4153
default_encoding = sys.getdefaultencoding()
4154
for name, plugin in _mod_plugin.plugins().items():
4155
if not interesting_module(plugin.module.__name__):
4157
plugin_suite = plugin.test_suite()
4158
# We used to catch ImportError here and turn it into just a warning,
4159
# but really if you don't have --no-plugins this should be a failure.
4160
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4161
if plugin_suite is None:
4162
plugin_suite = plugin.load_plugin_tests(loader)
4163
if plugin_suite is not None:
4164
suite.addTest(plugin_suite)
4165
if default_encoding != sys.getdefaultencoding():
4167
'Plugin "%s" tried to reset default encoding to: %s', name,
4168
sys.getdefaultencoding())
4170
sys.setdefaultencoding(default_encoding)
4172
if keep_only is not None:
4173
# Now that the referred modules have loaded their tests, keep only the
4175
suite = filter_suite_by_id_list(suite, id_filter)
4176
# Do some sanity checks on the id_list filtering
4177
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4179
# The tester has used both keep_only and starting_with, so he is
4180
# already aware that some tests are excluded from the list, there
4181
# is no need to tell him which.
4184
# Some tests mentioned in the list are not in the test suite. The
4185
# list may be out of date, report to the tester.
4186
for id in not_found:
4187
trace.warning('"%s" not found in the test suite', id)
4188
for id in duplicates:
4189
trace.warning('"%s" is used as an id by several tests', id)
605
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
606
stop_on_failure=stop_on_failure, keep_output=keep_output)
610
"""Build and return TestSuite for the whole program."""
611
from doctest import DocTestSuite
613
global MODULES_TO_DOCTEST
615
# FIXME: If these fail to load, e.g. because of a syntax error, the
616
# exception is hidden by unittest. Sucks. Should either fix that or
617
# perhaps import them and pass them to unittest as modules.
619
['bzrlib.tests.MetaTestLog',
620
'bzrlib.tests.test_api',
621
'bzrlib.tests.test_gpg',
622
'bzrlib.tests.test_identitymap',
623
'bzrlib.tests.test_inv',
624
'bzrlib.tests.test_ancestry',
625
'bzrlib.tests.test_commit',
626
'bzrlib.tests.test_command',
627
'bzrlib.tests.test_commit_merge',
628
'bzrlib.tests.test_config',
629
'bzrlib.tests.test_merge3',
630
'bzrlib.tests.test_merge',
631
'bzrlib.tests.test_hashcache',
632
'bzrlib.tests.test_status',
633
'bzrlib.tests.test_log',
634
'bzrlib.tests.test_revisionnamespaces',
635
'bzrlib.tests.test_branch',
636
'bzrlib.tests.test_revision',
637
'bzrlib.tests.test_revision_info',
638
'bzrlib.tests.test_merge_core',
639
'bzrlib.tests.test_smart_add',
640
'bzrlib.tests.test_bad_files',
641
'bzrlib.tests.test_diff',
642
'bzrlib.tests.test_parent',
643
'bzrlib.tests.test_xml',
644
'bzrlib.tests.test_weave',
645
'bzrlib.tests.test_fetch',
646
'bzrlib.tests.test_whitebox',
647
'bzrlib.tests.test_store',
648
'bzrlib.tests.test_sampler',
649
'bzrlib.tests.test_transactions',
650
'bzrlib.tests.test_transport',
651
'bzrlib.tests.test_sftp',
652
'bzrlib.tests.test_graph',
653
'bzrlib.tests.test_workingtree',
654
'bzrlib.tests.test_upgrade',
655
'bzrlib.tests.test_uncommit',
656
'bzrlib.tests.test_conflicts',
657
'bzrlib.tests.test_testament',
658
'bzrlib.tests.test_annotate',
659
'bzrlib.tests.test_revprops',
660
'bzrlib.tests.test_options',
661
'bzrlib.tests.test_http',
662
'bzrlib.tests.test_nonascii',
663
'bzrlib.tests.test_reweave',
664
'bzrlib.tests.test_tsort',
665
'bzrlib.tests.test_trace',
666
'bzrlib.tests.test_rio',
669
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
670
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
673
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
674
for package in packages_to_test():
675
suite.addTest(package.test_suite())
676
for m in MODULES_TO_TEST:
677
suite.addTest(TestLoader().loadTestsFromModule(m))
678
for m in (MODULES_TO_DOCTEST):
679
suite.addTest(DocTestSuite(m))
680
for p in bzrlib.plugin.all_plugins:
681
if hasattr(p, 'test_suite'):
682
suite.addTest(p.test_suite())
4194
def multiply_scenarios(*scenarios):
4195
"""Multiply two or more iterables of scenarios.
4197
It is safe to pass scenario generators or iterators.
4199
:returns: A list of compound scenarios: the cross-product of all
4200
scenarios, with the names concatenated and the parameters
4203
return reduce(_multiply_two_scenarios, map(list, scenarios))
4206
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4207
"""Multiply two sets of scenarios.
4209
:returns: the cartesian product of the two sets of scenarios, that is
4210
a scenario for every possible combination of a left scenario and a
4214
('%s,%s' % (left_name, right_name),
4215
dict(left_dict.items() + right_dict.items()))
4216
for left_name, left_dict in scenarios_left
4217
for right_name, right_dict in scenarios_right]
4220
def multiply_tests(tests, scenarios, result):
4221
"""Multiply tests_list by scenarios into result.
4223
This is the core workhorse for test parameterisation.
4225
Typically the load_tests() method for a per-implementation test suite will
4226
call multiply_tests and return the result.
4228
:param tests: The tests to parameterise.
4229
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4230
scenario_param_dict).
4231
:param result: A TestSuite to add created tests to.
4233
This returns the passed in result TestSuite with the cross product of all
4234
the tests repeated once for each scenario. Each test is adapted by adding
4235
the scenario name at the end of its id(), and updating the test object's
4236
__dict__ with the scenario_param_dict.
4238
>>> import bzrlib.tests.test_sampler
4239
>>> r = multiply_tests(
4240
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4241
... [('one', dict(param=1)),
4242
... ('two', dict(param=2))],
4243
... TestUtil.TestSuite())
4244
>>> tests = list(iter_suite_tests(r))
4248
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4254
for test in iter_suite_tests(tests):
4255
apply_scenarios(test, scenarios, result)
4259
def apply_scenarios(test, scenarios, result):
4260
"""Apply the scenarios in scenarios to test and add to result.
4262
:param test: The test to apply scenarios to.
4263
:param scenarios: An iterable of scenarios to apply to test.
4265
:seealso: apply_scenario
4267
for scenario in scenarios:
4268
result.addTest(apply_scenario(test, scenario))
4272
def apply_scenario(test, scenario):
4273
"""Copy test and apply scenario to it.
4275
:param test: A test to adapt.
4276
:param scenario: A tuple describing the scenarion.
4277
The first element of the tuple is the new test id.
4278
The second element is a dict containing attributes to set on the
4280
:return: The adapted test.
4282
new_id = "%s(%s)" % (test.id(), scenario[0])
4283
new_test = clone_test(test, new_id)
4284
for name, value in scenario[1].items():
4285
setattr(new_test, name, value)
4289
def clone_test(test, new_id):
4290
"""Clone a test giving it a new id.
4292
:param test: The test to clone.
4293
:param new_id: The id to assign to it.
4294
:return: The new test.
4296
new_test = copy.copy(test)
4297
new_test.id = lambda: new_id
4298
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4299
# causes cloned tests to share the 'details' dict. This makes it hard to
4300
# read the test output for parameterized tests, because tracebacks will be
4301
# associated with irrelevant tests.
4303
details = new_test._TestCase__details
4304
except AttributeError:
4305
# must be a different version of testtools than expected. Do nothing.
4308
# Reset the '__details' dict.
4309
new_test._TestCase__details = {}
4313
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4315
"""Helper for permutating tests against an extension module.
4317
This is meant to be used inside a modules 'load_tests()' function. It will
4318
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4319
against both implementations. Setting 'test.module' to the appropriate
4320
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4322
:param standard_tests: A test suite to permute
4323
:param loader: A TestLoader
4324
:param py_module_name: The python path to a python module that can always
4325
be loaded, and will be considered the 'python' implementation. (eg
4326
'bzrlib._chk_map_py')
4327
:param ext_module_name: The python path to an extension module. If the
4328
module cannot be loaded, a single test will be added, which notes that
4329
the module is not available. If it can be loaded, all standard_tests
4330
will be run against that module.
4331
:return: (suite, feature) suite is a test-suite that has all the permuted
4332
tests. feature is the Feature object that can be used to determine if
4333
the module is available.
4336
py_module = pyutils.get_named_object(py_module_name)
4338
('python', {'module': py_module}),
4340
suite = loader.suiteClass()
4341
feature = ModuleAvailableFeature(ext_module_name)
4342
if feature.available():
4343
scenarios.append(('C', {'module': feature.module}))
4345
# the compiled module isn't available, so we add a failing test
4346
class FailWithoutFeature(TestCase):
4347
def test_fail(self):
4348
self.requireFeature(feature)
4349
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4350
result = multiply_tests(standard_tests, scenarios, suite)
4351
return result, feature
4354
def _rmtree_temp_dir(dirname, test_id=None):
4355
# If LANG=C we probably have created some bogus paths
4356
# which rmtree(unicode) will fail to delete
4357
# so make sure we are using rmtree(str) to delete everything
4358
# except on win32, where rmtree(str) will fail
4359
# since it doesn't have the property of byte-stream paths
4360
# (they are either ascii or mbcs)
4361
if sys.platform == 'win32':
4362
# make sure we are using the unicode win32 api
4363
dirname = unicode(dirname)
4365
dirname = dirname.encode(sys.getfilesystemencoding())
4367
osutils.rmtree(dirname)
4369
# We don't want to fail here because some useful display will be lost
4370
# otherwise. Polluting the tmp dir is bad, but not giving all the
4371
# possible info to the test runner is even worse.
4373
ui.ui_factory.clear_term()
4374
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4375
# Ugly, but the last thing we want here is fail, so bear with it.
4376
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4377
).encode('ascii', 'replace')
4378
sys.stderr.write('Unable to remove testing dir %s\n%s'
4379
% (os.path.basename(dirname), printable_e))
4382
class Feature(object):
4383
"""An operating system Feature."""
4386
self._available = None
4388
def available(self):
4389
"""Is the feature available?
4391
:return: True if the feature is available.
4393
if self._available is None:
4394
self._available = self._probe()
4395
return self._available
4398
"""Implement this method in concrete features.
4400
:return: True if the feature is available.
4402
raise NotImplementedError
4405
if getattr(self, 'feature_name', None):
4406
return self.feature_name()
4407
return self.__class__.__name__
4410
class _SymlinkFeature(Feature):
4413
return osutils.has_symlinks()
4415
def feature_name(self):
4418
SymlinkFeature = _SymlinkFeature()
4421
class _HardlinkFeature(Feature):
4424
return osutils.has_hardlinks()
4426
def feature_name(self):
4429
HardlinkFeature = _HardlinkFeature()
4432
class _OsFifoFeature(Feature):
4435
return getattr(os, 'mkfifo', None)
4437
def feature_name(self):
4438
return 'filesystem fifos'
4440
OsFifoFeature = _OsFifoFeature()
4443
class _UnicodeFilenameFeature(Feature):
4444
"""Does the filesystem support Unicode filenames?"""
4448
# Check for character combinations unlikely to be covered by any
4449
# single non-unicode encoding. We use the characters
4450
# - greek small letter alpha (U+03B1) and
4451
# - braille pattern dots-123456 (U+283F).
4452
os.stat(u'\u03b1\u283f')
4453
except UnicodeEncodeError:
4455
except (IOError, OSError):
4456
# The filesystem allows the Unicode filename but the file doesn't
4460
# The filesystem allows the Unicode filename and the file exists,
4464
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4467
class _CompatabilityThunkFeature(Feature):
4468
"""This feature is just a thunk to another feature.
4470
It issues a deprecation warning if it is accessed, to let you know that you
4471
should really use a different feature.
4474
def __init__(self, dep_version, module, name,
4475
replacement_name, replacement_module=None):
4476
super(_CompatabilityThunkFeature, self).__init__()
4477
self._module = module
4478
if replacement_module is None:
4479
replacement_module = module
4480
self._replacement_module = replacement_module
4482
self._replacement_name = replacement_name
4483
self._dep_version = dep_version
4484
self._feature = None
4487
if self._feature is None:
4488
depr_msg = self._dep_version % ('%s.%s'
4489
% (self._module, self._name))
4490
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4491
self._replacement_name)
4492
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4493
# Import the new feature and use it as a replacement for the
4495
self._feature = pyutils.get_named_object(
4496
self._replacement_module, self._replacement_name)
4500
return self._feature._probe()
4503
class ModuleAvailableFeature(Feature):
4504
"""This is a feature than describes a module we want to be available.
4506
Declare the name of the module in __init__(), and then after probing, the
4507
module will be available as 'self.module'.
4509
:ivar module: The module if it is available, else None.
4512
def __init__(self, module_name):
4513
super(ModuleAvailableFeature, self).__init__()
4514
self.module_name = module_name
4518
self._module = __import__(self.module_name, {}, {}, [''])
4525
if self.available(): # Make sure the probe has been done
4529
def feature_name(self):
4530
return self.module_name
4533
def probe_unicode_in_user_encoding():
4534
"""Try to encode several unicode strings to use in unicode-aware tests.
4535
Return first successfull match.
4537
:return: (unicode value, encoded plain string value) or (None, None)
4539
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4540
for uni_val in possible_vals:
4542
str_val = uni_val.encode(osutils.get_user_encoding())
4543
except UnicodeEncodeError:
4544
# Try a different character
4547
return uni_val, str_val
4551
def probe_bad_non_ascii(encoding):
4552
"""Try to find [bad] character with code [128..255]
4553
that cannot be decoded to unicode in some encoding.
4554
Return None if all non-ascii characters is valid
4557
for i in xrange(128, 256):
4560
char.decode(encoding)
4561
except UnicodeDecodeError:
4566
class _HTTPSServerFeature(Feature):
4567
"""Some tests want an https Server, check if one is available.
4569
Right now, the only way this is available is under python2.6 which provides
4580
def feature_name(self):
4581
return 'HTTPSServer'
4584
HTTPSServerFeature = _HTTPSServerFeature()
4587
class _UnicodeFilename(Feature):
4588
"""Does the filesystem support Unicode filenames?"""
4593
except UnicodeEncodeError:
4595
except (IOError, OSError):
4596
# The filesystem allows the Unicode filename but the file doesn't
4600
# The filesystem allows the Unicode filename and the file exists,
4604
UnicodeFilename = _UnicodeFilename()
4607
class _ByteStringNamedFilesystem(Feature):
4608
"""Is the filesystem based on bytes?"""
4611
if os.name == "posix":
4615
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4618
class _UTF8Filesystem(Feature):
4619
"""Is the filesystem UTF-8?"""
4622
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4626
UTF8Filesystem = _UTF8Filesystem()
4629
class _BreakinFeature(Feature):
4630
"""Does this platform support the breakin feature?"""
4633
from bzrlib import breakin
4634
if breakin.determine_signal() is None:
4636
if sys.platform == 'win32':
4637
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4638
# We trigger SIGBREAK via a Console api so we need ctypes to
4639
# access the function
4646
def feature_name(self):
4647
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4650
BreakinFeature = _BreakinFeature()
4653
class _CaseInsCasePresFilenameFeature(Feature):
4654
"""Is the file-system case insensitive, but case-preserving?"""
4657
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4659
# first check truly case-preserving for created files, then check
4660
# case insensitive when opening existing files.
4661
name = osutils.normpath(name)
4662
base, rel = osutils.split(name)
4663
found_rel = osutils.canonical_relpath(base, name)
4664
return (found_rel == rel
4665
and os.path.isfile(name.upper())
4666
and os.path.isfile(name.lower()))
4671
def feature_name(self):
4672
return "case-insensitive case-preserving filesystem"
4674
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4677
class _CaseInsensitiveFilesystemFeature(Feature):
4678
"""Check if underlying filesystem is case-insensitive but *not* case
4681
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4682
# more likely to be case preserving, so this case is rare.
4685
if CaseInsCasePresFilenameFeature.available():
4688
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4689
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4690
TestCaseWithMemoryTransport.TEST_ROOT = root
4692
root = TestCaseWithMemoryTransport.TEST_ROOT
4693
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4695
name_a = osutils.pathjoin(tdir, 'a')
4696
name_A = osutils.pathjoin(tdir, 'A')
4698
result = osutils.isdir(name_A)
4699
_rmtree_temp_dir(tdir)
4702
def feature_name(self):
4703
return 'case-insensitive filesystem'
4705
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4708
class _CaseSensitiveFilesystemFeature(Feature):
4711
if CaseInsCasePresFilenameFeature.available():
4713
elif CaseInsensitiveFilesystemFeature.available():
4718
def feature_name(self):
4719
return 'case-sensitive filesystem'
4721
# new coding style is for feature instances to be lowercase
4722
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4725
# Only define SubUnitBzrRunner if subunit is available.
4727
from subunit import TestProtocolClient
4728
from subunit.test_results import AutoTimingTestResultDecorator
4729
class SubUnitBzrProtocolClient(TestProtocolClient):
4731
def addSuccess(self, test, details=None):
4732
# The subunit client always includes the details in the subunit
4733
# stream, but we don't want to include it in ours.
4734
if details is not None and 'log' in details:
4736
return super(SubUnitBzrProtocolClient, self).addSuccess(
4739
class SubUnitBzrRunner(TextTestRunner):
4740
def run(self, test):
4741
result = AutoTimingTestResultDecorator(
4742
SubUnitBzrProtocolClient(self.stream))
4748
class _PosixPermissionsFeature(Feature):
4752
# create temporary file and check if specified perms are maintained.
4755
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4756
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4759
osutils.chmod_if_possible(name, write_perms)
4761
read_perms = os.stat(name).st_mode & 0777
4763
return (write_perms == read_perms)
4765
return (os.name == 'posix') and has_perms()
4767
def feature_name(self):
4768
return 'POSIX permissions support'
4770
posix_permissions_feature = _PosixPermissionsFeature()