1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
# TODO: Perhaps there should be an API to find out if bzr running under the
20
# test suite -- some plugins might want to avoid making intrusive changes if
21
# this is the case. However, we want behaviour under to test to diverge as
22
# little as possible, so this should be used rarely if it's added at all.
23
# (Suggestion from j-a-meinel, 2005-11-24)
25
# NOTE: Some classes in here use camelCaseNaming() rather than
26
# underscore_naming(). That's for consistency with unittest; it's not the
27
# general style of bzrlib. Please continue that consistency when adding e.g.
28
# new assertFoo() methods.
33
from cStringIO import StringIO
56
# nb: check this before importing anything else from within it
57
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 5):
59
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
% (testtools.__file__, _testtools_version))
61
from testtools import content
68
commands as _mod_commands,
77
plugin as _mod_plugin,
84
transport as _mod_transport,
90
# lsprof not available
92
from bzrlib.smart import client, request
93
from bzrlib.transport import (
97
from bzrlib.tests import (
102
from bzrlib.ui import NullProgressView
103
from bzrlib.ui.text import TextUIFactory
105
# Mark this python module as being part of the implementation
106
# of unittest: this gives us better tracebacks where the last
107
# shown frame is the test code, not our assertXYZ.
110
default_transport = test_server.LocalURLServer
113
_unitialized_attr = object()
114
"""A sentinel needed to act as a default value in a method signature."""
117
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
# These are intentionally brought into this namespace. That way plugins, etc
122
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
123
TestSuite = TestUtil.TestSuite
124
TestLoader = TestUtil.TestLoader
126
# Tests should run in a clean and clearly defined environment. The goal is to
127
# keep them isolated from the running environment as mush as possible. The test
128
# framework ensures the variables defined below are set (or deleted if the
129
# value is None) before a test is run and reset to their original value after
130
# the test is run. Generally if some code depends on an environment variable,
131
# the tests should start without this variable in the environment. There are a
132
# few exceptions but you shouldn't violate this rule lightly.
136
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
# tests do check our impls match APPDATA
138
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
142
'BZREMAIL': None, # may still be present in the environment
143
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
'BZR_PROGRESS_BAR': None,
146
'BZR_PLUGIN_PATH': None,
147
'BZR_DISABLE_PLUGINS': None,
148
'BZR_PLUGINS_AT': None,
149
'BZR_CONCURRENCY': None,
150
# Make sure that any text ui tests are consistent regardless of
151
# the environment the test case is run in; you may want tests that
152
# test other combinations. 'dumb' is a reasonable guess for tests
153
# going to a pipe or a StringIO.
159
'SSH_AUTH_SOCK': None,
169
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
170
# least. If you do (care), please update this comment
174
'BZR_REMOTE_PATH': None,
175
# Generally speaking, we don't want apport reporting on crashes in
176
# the test envirnoment unless we're specifically testing apport,
177
# so that it doesn't leak into the real system environment. We
178
# use an env var so it propagates to subprocesses.
179
'APPORT_DISABLE': '1',
183
def override_os_environ(test, env=None):
184
"""Modify os.environ keeping a copy.
186
:param test: A test instance
188
:param env: A dict containing variable definitions to be installed
191
env = isolated_environ
192
test._original_os_environ = dict([(var, value)
193
for var, value in os.environ.iteritems()])
194
for var, value in env.iteritems():
195
osutils.set_or_unset_env(var, value)
196
if var not in test._original_os_environ:
197
# The var is new, add it with a value of None, so
198
# restore_os_environ will delete it
199
test._original_os_environ[var] = None
202
def restore_os_environ(test):
203
"""Restore os.environ to its original state.
205
:param test: A test instance previously passed to override_os_environ.
207
for var, value in test._original_os_environ.iteritems():
208
# Restore the original value (or delete it if the value has been set to
209
# None in override_os_environ).
210
osutils.set_or_unset_env(var, value)
213
class ExtendedTestResult(testtools.TextTestResult):
214
"""Accepts, reports and accumulates the results of running tests.
216
Compared to the unittest version this class adds support for
217
profiling, benchmarking, stopping as soon as a test fails, and
218
skipping tests. There are further-specialized subclasses for
219
different types of display.
221
When a test finishes, in whatever way, it calls one of the addSuccess,
222
addFailure or addError classes. These in turn may redirect to a more
223
specific case for the special test results supported by our extended
226
Note that just one of these objects is fed the results from many tests.
231
def __init__(self, stream, descriptions, verbosity,
235
"""Construct new TestResult.
237
:param bench_history: Optionally, a writable file object to accumulate
240
testtools.TextTestResult.__init__(self, stream)
241
if bench_history is not None:
242
from bzrlib.version import _get_bzr_source_tree
243
src_tree = _get_bzr_source_tree()
246
revision_id = src_tree.get_parent_ids()[0]
248
# XXX: if this is a brand new tree, do the same as if there
252
# XXX: If there's no branch, what should we do?
254
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
255
self._bench_history = bench_history
256
self.ui = ui.ui_factory
259
self.failure_count = 0
260
self.known_failure_count = 0
262
self.not_applicable_count = 0
263
self.unsupported = {}
265
self._overall_start_time = time.time()
266
self._strict = strict
267
self._first_thread_leaker_id = None
268
self._tests_leaking_threads_count = 0
269
self._traceback_from_test = None
271
def stopTestRun(self):
274
stopTime = time.time()
275
timeTaken = stopTime - self.startTime
276
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
277
# the parent class method is similar have to duplicate
278
self._show_list('ERROR', self.errors)
279
self._show_list('FAIL', self.failures)
280
self.stream.write(self.sep2)
281
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
282
run, run != 1 and "s" or "", timeTaken))
283
if not self.wasSuccessful():
284
self.stream.write("FAILED (")
285
failed, errored = map(len, (self.failures, self.errors))
287
self.stream.write("failures=%d" % failed)
289
if failed: self.stream.write(", ")
290
self.stream.write("errors=%d" % errored)
291
if self.known_failure_count:
292
if failed or errored: self.stream.write(", ")
293
self.stream.write("known_failure_count=%d" %
294
self.known_failure_count)
295
self.stream.write(")\n")
297
if self.known_failure_count:
298
self.stream.write("OK (known_failures=%d)\n" %
299
self.known_failure_count)
301
self.stream.write("OK\n")
302
if self.skip_count > 0:
303
skipped = self.skip_count
304
self.stream.write('%d test%s skipped\n' %
305
(skipped, skipped != 1 and "s" or ""))
307
for feature, count in sorted(self.unsupported.items()):
308
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
311
ok = self.wasStrictlySuccessful()
313
ok = self.wasSuccessful()
314
if self._first_thread_leaker_id:
316
'%s is leaking threads among %d leaking tests.\n' % (
317
self._first_thread_leaker_id,
318
self._tests_leaking_threads_count))
319
# We don't report the main thread as an active one.
321
'%d non-main threads were left active in the end.\n'
322
% (len(self._active_threads) - 1))
324
def getDescription(self, test):
327
def _extractBenchmarkTime(self, testCase, details=None):
328
"""Add a benchmark time for the current test case."""
329
if details and 'benchtime' in details:
330
return float(''.join(details['benchtime'].iter_bytes()))
331
return getattr(testCase, "_benchtime", None)
333
def _elapsedTestTimeString(self):
334
"""Return a time string for the overall time the current test has taken."""
335
return self._formatTime(self._delta_to_float(
336
self._now() - self._start_datetime))
338
def _testTimeString(self, testCase):
339
benchmark_time = self._extractBenchmarkTime(testCase)
340
if benchmark_time is not None:
341
return self._formatTime(benchmark_time) + "*"
343
return self._elapsedTestTimeString()
345
def _formatTime(self, seconds):
346
"""Format seconds as milliseconds with leading spaces."""
347
# some benchmarks can take thousands of seconds to run, so we need 8
349
return "%8dms" % (1000 * seconds)
351
def _shortened_test_description(self, test):
353
what = re.sub(r'^bzrlib\.tests\.', '', what)
356
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
357
# multiple times in a row, because the handler is added for
358
# each test but the container list is shared between cases.
359
# See lp:498869 lp:625574 and lp:637725 for background.
360
def _record_traceback_from_test(self, exc_info):
361
"""Store the traceback from passed exc_info tuple till"""
362
self._traceback_from_test = exc_info[2]
364
def startTest(self, test):
365
super(ExtendedTestResult, self).startTest(test)
369
self.report_test_start(test)
370
test.number = self.count
371
self._recordTestStartTime()
372
# Make testtools cases give us the real traceback on failure
373
addOnException = getattr(test, "addOnException", None)
374
if addOnException is not None:
375
addOnException(self._record_traceback_from_test)
376
# Only check for thread leaks on bzrlib derived test cases
377
if isinstance(test, TestCase):
378
test.addCleanup(self._check_leaked_threads, test)
380
def startTests(self):
381
self.report_tests_starting()
382
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
387
def _check_leaked_threads(self, test):
388
"""See if any threads have leaked since last call
390
A sample of live threads is stored in the _active_threads attribute,
391
when this method runs it compares the current live threads and any not
392
in the previous sample are treated as having leaked.
394
now_active_threads = set(threading.enumerate())
395
threads_leaked = now_active_threads.difference(self._active_threads)
397
self._report_thread_leak(test, threads_leaked, now_active_threads)
398
self._tests_leaking_threads_count += 1
399
if self._first_thread_leaker_id is None:
400
self._first_thread_leaker_id = test.id()
401
self._active_threads = now_active_threads
403
def _recordTestStartTime(self):
404
"""Record that a test has started."""
405
self._start_datetime = self._now()
407
def addError(self, test, err):
408
"""Tell result that test finished with an error.
410
Called from the TestCase run() method when the test
411
fails with an unexpected error.
413
self._post_mortem(self._traceback_from_test)
414
super(ExtendedTestResult, self).addError(test, err)
415
self.error_count += 1
416
self.report_error(test, err)
420
def addFailure(self, test, err):
421
"""Tell result that test failed.
423
Called from the TestCase run() method when the test
424
fails because e.g. an assert() method failed.
426
self._post_mortem(self._traceback_from_test)
427
super(ExtendedTestResult, self).addFailure(test, err)
428
self.failure_count += 1
429
self.report_failure(test, err)
433
def addSuccess(self, test, details=None):
434
"""Tell result that test completed successfully.
436
Called from the TestCase run()
438
if self._bench_history is not None:
439
benchmark_time = self._extractBenchmarkTime(test, details)
440
if benchmark_time is not None:
441
self._bench_history.write("%s %s\n" % (
442
self._formatTime(benchmark_time),
444
self.report_success(test)
445
super(ExtendedTestResult, self).addSuccess(test)
446
test._log_contents = ''
448
def addExpectedFailure(self, test, err):
449
self.known_failure_count += 1
450
self.report_known_failure(test, err)
452
def addNotSupported(self, test, feature):
453
"""The test will not be run because of a missing feature.
455
# this can be called in two different ways: it may be that the
456
# test started running, and then raised (through requireFeature)
457
# UnavailableFeature. Alternatively this method can be called
458
# while probing for features before running the test code proper; in
459
# that case we will see startTest and stopTest, but the test will
460
# never actually run.
461
self.unsupported.setdefault(str(feature), 0)
462
self.unsupported[str(feature)] += 1
463
self.report_unsupported(test, feature)
465
def addSkip(self, test, reason):
466
"""A test has not run for 'reason'."""
468
self.report_skip(test, reason)
470
def addNotApplicable(self, test, reason):
471
self.not_applicable_count += 1
472
self.report_not_applicable(test, reason)
474
def _post_mortem(self, tb=None):
475
"""Start a PDB post mortem session."""
476
if os.environ.get('BZR_TEST_PDB', None):
480
def progress(self, offset, whence):
481
"""The test is adjusting the count of tests to run."""
482
if whence == SUBUNIT_SEEK_SET:
483
self.num_tests = offset
484
elif whence == SUBUNIT_SEEK_CUR:
485
self.num_tests += offset
487
raise errors.BzrError("Unknown whence %r" % whence)
489
def report_tests_starting(self):
490
"""Display information before the test run begins"""
491
if getattr(sys, 'frozen', None) is None:
492
bzr_path = osutils.realpath(sys.argv[0])
494
bzr_path = sys.executable
496
'bzr selftest: %s\n' % (bzr_path,))
499
bzrlib.__path__[0],))
501
' bzr-%s python-%s %s\n' % (
502
bzrlib.version_string,
503
bzrlib._format_version_tuple(sys.version_info),
504
platform.platform(aliased=1),
506
self.stream.write('\n')
508
def report_test_start(self, test):
509
"""Display information on the test just about to be run"""
511
def _report_thread_leak(self, test, leaked_threads, active_threads):
512
"""Display information on a test that leaked one or more threads"""
513
# GZ 2010-09-09: A leak summary reported separately from the general
514
# thread debugging would be nice. Tests under subunit
515
# need something not using stream, perhaps adding a
516
# testtools details object would be fitting.
517
if 'threads' in selftest_debug_flags:
518
self.stream.write('%s is leaking, active is now %d\n' %
519
(test.id(), len(active_threads)))
521
def startTestRun(self):
522
self.startTime = time.time()
524
def report_success(self, test):
527
def wasStrictlySuccessful(self):
528
if self.unsupported or self.known_failure_count:
530
return self.wasSuccessful()
533
class TextTestResult(ExtendedTestResult):
534
"""Displays progress and results of tests in text form"""
536
def __init__(self, stream, descriptions, verbosity,
541
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
542
bench_history, strict)
543
# We no longer pass them around, but just rely on the UIFactory stack
546
warnings.warn("Passing pb to TextTestResult is deprecated")
547
self.pb = self.ui.nested_progress_bar()
548
self.pb.show_pct = False
549
self.pb.show_spinner = False
550
self.pb.show_eta = False,
551
self.pb.show_count = False
552
self.pb.show_bar = False
553
self.pb.update_latency = 0
554
self.pb.show_transport_activity = False
556
def stopTestRun(self):
557
# called when the tests that are going to run have run
560
super(TextTestResult, self).stopTestRun()
562
def report_tests_starting(self):
563
super(TextTestResult, self).report_tests_starting()
564
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
566
def _progress_prefix_text(self):
567
# the longer this text, the less space we have to show the test
569
a = '[%d' % self.count # total that have been run
570
# tests skipped as known not to be relevant are not important enough
572
## if self.skip_count:
573
## a += ', %d skip' % self.skip_count
574
## if self.known_failure_count:
575
## a += '+%dX' % self.known_failure_count
577
a +='/%d' % self.num_tests
579
runtime = time.time() - self._overall_start_time
581
a += '%dm%ds' % (runtime / 60, runtime % 60)
584
total_fail_count = self.error_count + self.failure_count
586
a += ', %d failed' % total_fail_count
587
# if self.unsupported:
588
# a += ', %d missing' % len(self.unsupported)
592
def report_test_start(self, test):
594
self._progress_prefix_text()
596
+ self._shortened_test_description(test))
598
def _test_description(self, test):
599
return self._shortened_test_description(test)
601
def report_error(self, test, err):
602
self.stream.write('ERROR: %s\n %s\n' % (
603
self._test_description(test),
607
def report_failure(self, test, err):
608
self.stream.write('FAIL: %s\n %s\n' % (
609
self._test_description(test),
613
def report_known_failure(self, test, err):
616
def report_skip(self, test, reason):
619
def report_not_applicable(self, test, reason):
622
def report_unsupported(self, test, feature):
623
"""test cannot be run because feature is missing."""
626
class VerboseTestResult(ExtendedTestResult):
627
"""Produce long output, with one line per test run plus times"""
629
def _ellipsize_to_right(self, a_string, final_width):
630
"""Truncate and pad a string, keeping the right hand side"""
631
if len(a_string) > final_width:
632
result = '...' + a_string[3-final_width:]
635
return result.ljust(final_width)
637
def report_tests_starting(self):
638
self.stream.write('running %d tests...\n' % self.num_tests)
639
super(VerboseTestResult, self).report_tests_starting()
641
def report_test_start(self, test):
642
name = self._shortened_test_description(test)
643
width = osutils.terminal_width()
644
if width is not None:
645
# width needs space for 6 char status, plus 1 for slash, plus an
646
# 11-char time string, plus a trailing blank
647
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
649
self.stream.write(self._ellipsize_to_right(name, width-18))
651
self.stream.write(name)
654
def _error_summary(self, err):
656
return '%s%s' % (indent, err[1])
658
def report_error(self, test, err):
659
self.stream.write('ERROR %s\n%s\n'
660
% (self._testTimeString(test),
661
self._error_summary(err)))
663
def report_failure(self, test, err):
664
self.stream.write(' FAIL %s\n%s\n'
665
% (self._testTimeString(test),
666
self._error_summary(err)))
668
def report_known_failure(self, test, err):
669
self.stream.write('XFAIL %s\n%s\n'
670
% (self._testTimeString(test),
671
self._error_summary(err)))
673
def report_success(self, test):
674
self.stream.write(' OK %s\n' % self._testTimeString(test))
675
for bench_called, stats in getattr(test, '_benchcalls', []):
676
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
677
stats.pprint(file=self.stream)
678
# flush the stream so that we get smooth output. This verbose mode is
679
# used to show the output in PQM.
682
def report_skip(self, test, reason):
683
self.stream.write(' SKIP %s\n%s\n'
684
% (self._testTimeString(test), reason))
686
def report_not_applicable(self, test, reason):
687
self.stream.write(' N/A %s\n %s\n'
688
% (self._testTimeString(test), reason))
690
def report_unsupported(self, test, feature):
691
"""test cannot be run because feature is missing."""
692
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
693
%(self._testTimeString(test), feature))
696
class TextTestRunner(object):
697
stop_on_failure = False
705
result_decorators=None,
707
"""Create a TextTestRunner.
709
:param result_decorators: An optional list of decorators to apply
710
to the result object being used by the runner. Decorators are
711
applied left to right - the first element in the list is the
714
# stream may know claim to know to write unicode strings, but in older
715
# pythons this goes sufficiently wrong that it is a bad idea. (
716
# specifically a built in file with encoding 'UTF-8' will still try
717
# to encode using ascii.
718
new_encoding = osutils.get_terminal_encoding()
719
codec = codecs.lookup(new_encoding)
720
if type(codec) is tuple:
724
encode = codec.encode
725
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
726
# so should swap to the plain codecs.StreamWriter
727
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
729
stream.encoding = new_encoding
731
self.descriptions = descriptions
732
self.verbosity = verbosity
733
self._bench_history = bench_history
734
self._strict = strict
735
self._result_decorators = result_decorators or []
738
"Run the given test case or test suite."
739
if self.verbosity == 1:
740
result_class = TextTestResult
741
elif self.verbosity >= 2:
742
result_class = VerboseTestResult
743
original_result = result_class(self.stream,
746
bench_history=self._bench_history,
749
# Signal to result objects that look at stop early policy to stop,
750
original_result.stop_early = self.stop_on_failure
751
result = original_result
752
for decorator in self._result_decorators:
753
result = decorator(result)
754
result.stop_early = self.stop_on_failure
755
result.startTestRun()
760
# higher level code uses our extended protocol to determine
761
# what exit code to give.
762
return original_result
765
def iter_suite_tests(suite):
766
"""Return all tests in a suite, recursing through nested suites"""
767
if isinstance(suite, unittest.TestCase):
769
elif isinstance(suite, unittest.TestSuite):
771
for r in iter_suite_tests(item):
774
raise Exception('unknown type %r for object %r'
775
% (type(suite), suite))
778
TestSkipped = testtools.testcase.TestSkipped
781
class TestNotApplicable(TestSkipped):
782
"""A test is not applicable to the situation where it was run.
784
This is only normally raised by parameterized tests, if they find that
785
the instance they're constructed upon does not support one aspect
790
# traceback._some_str fails to format exceptions that have the default
791
# __str__ which does an implicit ascii conversion. However, repr() on those
792
# objects works, for all that its not quite what the doctor may have ordered.
793
def _clever_some_str(value):
798
return repr(value).replace('\\n', '\n')
800
return '<unprintable %s object>' % type(value).__name__
802
traceback._some_str = _clever_some_str
805
# deprecated - use self.knownFailure(), or self.expectFailure.
806
KnownFailure = testtools.testcase._ExpectedFailure
809
class UnavailableFeature(Exception):
810
"""A feature required for this test was not available.
812
This can be considered a specialised form of SkippedTest.
814
The feature should be used to construct the exception.
818
class StringIOWrapper(object):
819
"""A wrapper around cStringIO which just adds an encoding attribute.
821
Internally we can check sys.stdout to see what the output encoding
822
should be. However, cStringIO has no encoding attribute that we can
823
set. So we wrap it instead.
828
def __init__(self, s=None):
830
self.__dict__['_cstring'] = StringIO(s)
832
self.__dict__['_cstring'] = StringIO()
834
def __getattr__(self, name, getattr=getattr):
835
return getattr(self.__dict__['_cstring'], name)
837
def __setattr__(self, name, val):
838
if name == 'encoding':
839
self.__dict__['encoding'] = val
841
return setattr(self._cstring, name, val)
844
class TestUIFactory(TextUIFactory):
845
"""A UI Factory for testing.
847
Hide the progress bar but emit note()s.
849
Allows get_password to be tested without real tty attached.
851
See also CannedInputUIFactory which lets you provide programmatic input in
854
# TODO: Capture progress events at the model level and allow them to be
855
# observed by tests that care.
857
# XXX: Should probably unify more with CannedInputUIFactory or a
858
# particular configuration of TextUIFactory, or otherwise have a clearer
859
# idea of how they're supposed to be different.
860
# See https://bugs.launchpad.net/bzr/+bug/408213
862
def __init__(self, stdout=None, stderr=None, stdin=None):
863
if stdin is not None:
864
# We use a StringIOWrapper to be able to test various
865
# encodings, but the user is still responsible to
866
# encode the string and to set the encoding attribute
867
# of StringIOWrapper.
868
stdin = StringIOWrapper(stdin)
869
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
871
def get_non_echoed_password(self):
872
"""Get password from stdin without trying to handle the echo mode"""
873
password = self.stdin.readline()
876
if password[-1] == '\n':
877
password = password[:-1]
880
def make_progress_view(self):
881
return NullProgressView()
884
def isolated_doctest_setUp(test):
885
override_os_environ(test)
888
def isolated_doctest_tearDown(test):
889
restore_os_environ(test)
892
def IsolatedDocTestSuite(*args, **kwargs):
893
"""Overrides doctest.DocTestSuite to handle isolation.
895
The method is really a factory and users are expected to use it as such.
898
kwargs['setUp'] = isolated_doctest_setUp
899
kwargs['tearDown'] = isolated_doctest_tearDown
900
return doctest.DocTestSuite(*args, **kwargs)
903
class TestCase(testtools.TestCase):
904
"""Base class for bzr unit tests.
906
Tests that need access to disk resources should subclass
907
TestCaseInTempDir not TestCase.
909
Error and debug log messages are redirected from their usual
910
location into a temporary file, the contents of which can be
911
retrieved by _get_log(). We use a real OS file, not an in-memory object,
912
so that it can also capture file IO. When the test completes this file
913
is read into memory and removed from disk.
915
There are also convenience functions to invoke bzr's command-line
916
routine, and to build and check bzr trees.
918
In addition to the usual method of overriding tearDown(), this class also
919
allows subclasses to register cleanup functions via addCleanup, which are
920
run in order as the object is torn down. It's less likely this will be
921
accidentally overlooked.
925
# record lsprof data when performing benchmark calls.
926
_gather_lsprof_in_benchmarks = False
928
def __init__(self, methodName='testMethod'):
929
super(TestCase, self).__init__(methodName)
930
self._directory_isolation = True
931
self.exception_handlers.insert(0,
932
(UnavailableFeature, self._do_unsupported_or_skip))
933
self.exception_handlers.insert(0,
934
(TestNotApplicable, self._do_not_applicable))
937
super(TestCase, self).setUp()
938
for feature in getattr(self, '_test_needs_features', []):
939
self.requireFeature(feature)
940
self._log_contents = None
941
self.addDetail("log", content.Content(content.ContentType("text",
942
"plain", {"charset": "utf8"}),
943
lambda:[self._get_log(keep_log_file=True)]))
944
self._cleanEnvironment()
947
self._benchcalls = []
948
self._benchtime = None
950
self._track_transports()
952
self._clear_debug_flags()
953
# Isolate global verbosity level, to make sure it's reproducible
954
# between tests. We should get rid of this altogether: bug 656694. --
956
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
957
# Isolate config option expansion until its default value for bzrlib is
958
# settled on or a the FIXME associated with _get_expand_default_value
959
# is addressed -- vila 20110219
960
self.overrideAttr(config, '_expand_default_value', None)
965
pdb.Pdb().set_trace(sys._getframe().f_back)
967
def discardDetail(self, name):
968
"""Extend the addDetail, getDetails api so we can remove a detail.
970
eg. bzr always adds the 'log' detail at startup, but we don't want to
971
include it for skipped, xfail, etc tests.
973
It is safe to call this for a detail that doesn't exist, in case this
974
gets called multiple times.
976
# We cheat. details is stored in __details which means we shouldn't
977
# touch it. but getDetails() returns the dict directly, so we can
979
details = self.getDetails()
983
def _clear_debug_flags(self):
984
"""Prevent externally set debug flags affecting tests.
986
Tests that want to use debug flags can just set them in the
987
debug_flags set during setup/teardown.
989
# Start with a copy of the current debug flags we can safely modify.
990
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
991
if 'allow_debug' not in selftest_debug_flags:
992
debug.debug_flags.clear()
993
if 'disable_lock_checks' not in selftest_debug_flags:
994
debug.debug_flags.add('strict_locks')
996
def _clear_hooks(self):
997
# prevent hooks affecting tests
998
known_hooks = hooks.known_hooks
999
self._preserved_hooks = {}
1000
for key, (parent, name) in known_hooks.iter_parent_objects():
1001
current_hooks = getattr(parent, name)
1002
self._preserved_hooks[parent] = (name, current_hooks)
1003
self._preserved_lazy_hooks = hooks._lazy_hooks
1004
hooks._lazy_hooks = {}
1005
self.addCleanup(self._restoreHooks)
1006
for key, (parent, name) in known_hooks.iter_parent_objects():
1007
factory = known_hooks.get(key)
1008
setattr(parent, name, factory())
1009
# this hook should always be installed
1010
request._install_hook()
1012
def disable_directory_isolation(self):
1013
"""Turn off directory isolation checks."""
1014
self._directory_isolation = False
1016
def enable_directory_isolation(self):
1017
"""Enable directory isolation checks."""
1018
self._directory_isolation = True
1020
def _silenceUI(self):
1021
"""Turn off UI for duration of test"""
1022
# by default the UI is off; tests can turn it on if they want it.
1023
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1025
def _check_locks(self):
1026
"""Check that all lock take/release actions have been paired."""
1027
# We always check for mismatched locks. If a mismatch is found, we
1028
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1029
# case we just print a warning.
1031
acquired_locks = [lock for action, lock in self._lock_actions
1032
if action == 'acquired']
1033
released_locks = [lock for action, lock in self._lock_actions
1034
if action == 'released']
1035
broken_locks = [lock for action, lock in self._lock_actions
1036
if action == 'broken']
1037
# trivially, given the tests for lock acquistion and release, if we
1038
# have as many in each list, it should be ok. Some lock tests also
1039
# break some locks on purpose and should be taken into account by
1040
# considering that breaking a lock is just a dirty way of releasing it.
1041
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1042
message = ('Different number of acquired and '
1043
'released or broken locks. (%s, %s + %s)' %
1044
(acquired_locks, released_locks, broken_locks))
1045
if not self._lock_check_thorough:
1046
# Rather than fail, just warn
1047
print "Broken test %s: %s" % (self, message)
1051
def _track_locks(self):
1052
"""Track lock activity during tests."""
1053
self._lock_actions = []
1054
if 'disable_lock_checks' in selftest_debug_flags:
1055
self._lock_check_thorough = False
1057
self._lock_check_thorough = True
1059
self.addCleanup(self._check_locks)
1060
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1061
self._lock_acquired, None)
1062
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1063
self._lock_released, None)
1064
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1065
self._lock_broken, None)
1067
def _lock_acquired(self, result):
1068
self._lock_actions.append(('acquired', result))
1070
def _lock_released(self, result):
1071
self._lock_actions.append(('released', result))
1073
def _lock_broken(self, result):
1074
self._lock_actions.append(('broken', result))
1076
def permit_dir(self, name):
1077
"""Permit a directory to be used by this test. See permit_url."""
1078
name_transport = _mod_transport.get_transport(name)
1079
self.permit_url(name)
1080
self.permit_url(name_transport.base)
1082
def permit_url(self, url):
1083
"""Declare that url is an ok url to use in this test.
1085
Do this for memory transports, temporary test directory etc.
1087
Do not do this for the current working directory, /tmp, or any other
1088
preexisting non isolated url.
1090
if not url.endswith('/'):
1092
self._bzr_selftest_roots.append(url)
1094
def permit_source_tree_branch_repo(self):
1095
"""Permit the source tree bzr is running from to be opened.
1097
Some code such as bzrlib.version attempts to read from the bzr branch
1098
that bzr is executing from (if any). This method permits that directory
1099
to be used in the test suite.
1101
path = self.get_source_path()
1102
self.record_directory_isolation()
1105
workingtree.WorkingTree.open(path)
1106
except (errors.NotBranchError, errors.NoWorkingTree):
1107
raise TestSkipped('Needs a working tree of bzr sources')
1109
self.enable_directory_isolation()
1111
def _preopen_isolate_transport(self, transport):
1112
"""Check that all transport openings are done in the test work area."""
1113
while isinstance(transport, pathfilter.PathFilteringTransport):
1114
# Unwrap pathfiltered transports
1115
transport = transport.server.backing_transport.clone(
1116
transport._filter('.'))
1117
url = transport.base
1118
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1119
# urls it is given by prepending readonly+. This is appropriate as the
1120
# client shouldn't know that the server is readonly (or not readonly).
1121
# We could register all servers twice, with readonly+ prepending, but
1122
# that makes for a long list; this is about the same but easier to
1124
if url.startswith('readonly+'):
1125
url = url[len('readonly+'):]
1126
self._preopen_isolate_url(url)
1128
def _preopen_isolate_url(self, url):
1129
if not self._directory_isolation:
1131
if self._directory_isolation == 'record':
1132
self._bzr_selftest_roots.append(url)
1134
# This prevents all transports, including e.g. sftp ones backed on disk
1135
# from working unless they are explicitly granted permission. We then
1136
# depend on the code that sets up test transports to check that they are
1137
# appropriately isolated and enable their use by calling
1138
# self.permit_transport()
1139
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1140
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1141
% (url, self._bzr_selftest_roots))
1143
def record_directory_isolation(self):
1144
"""Gather accessed directories to permit later access.
1146
This is used for tests that access the branch bzr is running from.
1148
self._directory_isolation = "record"
1150
def start_server(self, transport_server, backing_server=None):
1151
"""Start transport_server for this test.
1153
This starts the server, registers a cleanup for it and permits the
1154
server's urls to be used.
1156
if backing_server is None:
1157
transport_server.start_server()
1159
transport_server.start_server(backing_server)
1160
self.addCleanup(transport_server.stop_server)
1161
# Obtain a real transport because if the server supplies a password, it
1162
# will be hidden from the base on the client side.
1163
t = _mod_transport.get_transport(transport_server.get_url())
1164
# Some transport servers effectively chroot the backing transport;
1165
# others like SFTPServer don't - users of the transport can walk up the
1166
# transport to read the entire backing transport. This wouldn't matter
1167
# except that the workdir tests are given - and that they expect the
1168
# server's url to point at - is one directory under the safety net. So
1169
# Branch operations into the transport will attempt to walk up one
1170
# directory. Chrooting all servers would avoid this but also mean that
1171
# we wouldn't be testing directly against non-root urls. Alternatively
1172
# getting the test framework to start the server with a backing server
1173
# at the actual safety net directory would work too, but this then
1174
# means that the self.get_url/self.get_transport methods would need
1175
# to transform all their results. On balance its cleaner to handle it
1176
# here, and permit a higher url when we have one of these transports.
1177
if t.base.endswith('/work/'):
1178
# we have safety net/test root/work
1179
t = t.clone('../..')
1180
elif isinstance(transport_server,
1181
test_server.SmartTCPServer_for_testing):
1182
# The smart server adds a path similar to work, which is traversed
1183
# up from by the client. But the server is chrooted - the actual
1184
# backing transport is not escaped from, and VFS requests to the
1185
# root will error (because they try to escape the chroot).
1187
while t2.base != t.base:
1190
self.permit_url(t.base)
1192
def _track_transports(self):
1193
"""Install checks for transport usage."""
1194
# TestCase has no safe place it can write to.
1195
self._bzr_selftest_roots = []
1196
# Currently the easiest way to be sure that nothing is going on is to
1197
# hook into bzr dir opening. This leaves a small window of error for
1198
# transport tests, but they are well known, and we can improve on this
1200
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1201
self._preopen_isolate_transport, "Check bzr directories are safe.")
1203
def _ndiff_strings(self, a, b):
1204
"""Return ndiff between two strings containing lines.
1206
A trailing newline is added if missing to make the strings
1208
if b and b[-1] != '\n':
1210
if a and a[-1] != '\n':
1212
difflines = difflib.ndiff(a.splitlines(True),
1214
linejunk=lambda x: False,
1215
charjunk=lambda x: False)
1216
return ''.join(difflines)
1218
def assertEqual(self, a, b, message=''):
1222
except UnicodeError, e:
1223
# If we can't compare without getting a UnicodeError, then
1224
# obviously they are different
1225
trace.mutter('UnicodeError: %s', e)
1228
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1230
pprint.pformat(a), pprint.pformat(b)))
1232
assertEquals = assertEqual
1234
def assertEqualDiff(self, a, b, message=None):
1235
"""Assert two texts are equal, if not raise an exception.
1237
This is intended for use with multi-line strings where it can
1238
be hard to find the differences by eye.
1240
# TODO: perhaps override assertEquals to call this for strings?
1244
message = "texts not equal:\n"
1246
message = 'first string is missing a final newline.\n'
1248
message = 'second string is missing a final newline.\n'
1249
raise AssertionError(message +
1250
self._ndiff_strings(a, b))
1252
def assertEqualMode(self, mode, mode_test):
1253
self.assertEqual(mode, mode_test,
1254
'mode mismatch %o != %o' % (mode, mode_test))
1256
def assertEqualStat(self, expected, actual):
1257
"""assert that expected and actual are the same stat result.
1259
:param expected: A stat result.
1260
:param actual: A stat result.
1261
:raises AssertionError: If the expected and actual stat values differ
1262
other than by atime.
1264
self.assertEqual(expected.st_size, actual.st_size,
1265
'st_size did not match')
1266
self.assertEqual(expected.st_mtime, actual.st_mtime,
1267
'st_mtime did not match')
1268
self.assertEqual(expected.st_ctime, actual.st_ctime,
1269
'st_ctime did not match')
1270
if sys.platform == 'win32':
1271
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1272
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1273
# odd. We just force it to always be 0 to avoid any problems.
1274
self.assertEqual(0, expected.st_dev)
1275
self.assertEqual(0, actual.st_dev)
1276
self.assertEqual(0, expected.st_ino)
1277
self.assertEqual(0, actual.st_ino)
1279
self.assertEqual(expected.st_dev, actual.st_dev,
1280
'st_dev did not match')
1281
self.assertEqual(expected.st_ino, actual.st_ino,
1282
'st_ino did not match')
1283
self.assertEqual(expected.st_mode, actual.st_mode,
1284
'st_mode did not match')
1286
def assertLength(self, length, obj_with_len):
1287
"""Assert that obj_with_len is of length length."""
1288
if len(obj_with_len) != length:
1289
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1290
length, len(obj_with_len), obj_with_len))
1292
def assertLogsError(self, exception_class, func, *args, **kwargs):
1293
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1296
orig_log_exception_quietly = trace.log_exception_quietly
1299
orig_log_exception_quietly()
1300
captured.append(sys.exc_info())
1301
trace.log_exception_quietly = capture
1302
func(*args, **kwargs)
1304
trace.log_exception_quietly = orig_log_exception_quietly
1305
self.assertLength(1, captured)
1306
err = captured[0][1]
1307
self.assertIsInstance(err, exception_class)
1310
def assertPositive(self, val):
1311
"""Assert that val is greater than 0."""
1312
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1314
def assertNegative(self, val):
1315
"""Assert that val is less than 0."""
1316
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1318
def assertStartsWith(self, s, prefix):
1319
if not s.startswith(prefix):
1320
raise AssertionError('string %r does not start with %r' % (s, prefix))
1322
def assertEndsWith(self, s, suffix):
1323
"""Asserts that s ends with suffix."""
1324
if not s.endswith(suffix):
1325
raise AssertionError('string %r does not end with %r' % (s, suffix))
1327
def assertContainsRe(self, haystack, needle_re, flags=0):
1328
"""Assert that a contains something matching a regular expression."""
1329
if not re.search(needle_re, haystack, flags):
1330
if '\n' in haystack or len(haystack) > 60:
1331
# a long string, format it in a more readable way
1332
raise AssertionError(
1333
'pattern "%s" not found in\n"""\\\n%s"""\n'
1334
% (needle_re, haystack))
1336
raise AssertionError('pattern "%s" not found in "%s"'
1337
% (needle_re, haystack))
1339
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1340
"""Assert that a does not match a regular expression"""
1341
if re.search(needle_re, haystack, flags):
1342
raise AssertionError('pattern "%s" found in "%s"'
1343
% (needle_re, haystack))
1345
def assertContainsString(self, haystack, needle):
1346
if haystack.find(needle) == -1:
1347
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1349
def assertNotContainsString(self, haystack, needle):
1350
if haystack.find(needle) != -1:
1351
self.fail("string %r found in '''%s'''" % (needle, haystack))
1353
def assertSubset(self, sublist, superlist):
1354
"""Assert that every entry in sublist is present in superlist."""
1355
missing = set(sublist) - set(superlist)
1356
if len(missing) > 0:
1357
raise AssertionError("value(s) %r not present in container %r" %
1358
(missing, superlist))
1360
def assertListRaises(self, excClass, func, *args, **kwargs):
1361
"""Fail unless excClass is raised when the iterator from func is used.
1363
Many functions can return generators this makes sure
1364
to wrap them in a list() call to make sure the whole generator
1365
is run, and that the proper exception is raised.
1368
list(func(*args, **kwargs))
1372
if getattr(excClass,'__name__', None) is not None:
1373
excName = excClass.__name__
1375
excName = str(excClass)
1376
raise self.failureException, "%s not raised" % excName
1378
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1379
"""Assert that a callable raises a particular exception.
1381
:param excClass: As for the except statement, this may be either an
1382
exception class, or a tuple of classes.
1383
:param callableObj: A callable, will be passed ``*args`` and
1386
Returns the exception so that you can examine it.
1389
callableObj(*args, **kwargs)
1393
if getattr(excClass,'__name__', None) is not None:
1394
excName = excClass.__name__
1397
excName = str(excClass)
1398
raise self.failureException, "%s not raised" % excName
1400
def assertIs(self, left, right, message=None):
1401
if not (left is right):
1402
if message is not None:
1403
raise AssertionError(message)
1405
raise AssertionError("%r is not %r." % (left, right))
1407
def assertIsNot(self, left, right, message=None):
1409
if message is not None:
1410
raise AssertionError(message)
1412
raise AssertionError("%r is %r." % (left, right))
1414
def assertTransportMode(self, transport, path, mode):
1415
"""Fail if a path does not have mode "mode".
1417
If modes are not supported on this transport, the assertion is ignored.
1419
if not transport._can_roundtrip_unix_modebits():
1421
path_stat = transport.stat(path)
1422
actual_mode = stat.S_IMODE(path_stat.st_mode)
1423
self.assertEqual(mode, actual_mode,
1424
'mode of %r incorrect (%s != %s)'
1425
% (path, oct(mode), oct(actual_mode)))
1427
def assertIsSameRealPath(self, path1, path2):
1428
"""Fail if path1 and path2 points to different files"""
1429
self.assertEqual(osutils.realpath(path1),
1430
osutils.realpath(path2),
1431
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1433
def assertIsInstance(self, obj, kls, msg=None):
1434
"""Fail if obj is not an instance of kls
1436
:param msg: Supplementary message to show if the assertion fails.
1438
if not isinstance(obj, kls):
1439
m = "%r is an instance of %s rather than %s" % (
1440
obj, obj.__class__, kls)
1445
def assertFileEqual(self, content, path):
1446
"""Fail if path does not contain 'content'."""
1447
self.assertPathExists(path)
1448
f = file(path, 'rb')
1453
self.assertEqualDiff(content, s)
1455
def assertDocstring(self, expected_docstring, obj):
1456
"""Fail if obj does not have expected_docstring"""
1458
# With -OO the docstring should be None instead
1459
self.assertIs(obj.__doc__, None)
1461
self.assertEqual(expected_docstring, obj.__doc__)
1463
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1464
def failUnlessExists(self, path):
1465
return self.assertPathExists(path)
1467
def assertPathExists(self, path):
1468
"""Fail unless path or paths, which may be abs or relative, exist."""
1469
if not isinstance(path, basestring):
1471
self.assertPathExists(p)
1473
self.assertTrue(osutils.lexists(path),
1474
path + " does not exist")
1476
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1477
def failIfExists(self, path):
1478
return self.assertPathDoesNotExist(path)
1480
def assertPathDoesNotExist(self, path):
1481
"""Fail if path or paths, which may be abs or relative, exist."""
1482
if not isinstance(path, basestring):
1484
self.assertPathDoesNotExist(p)
1486
self.assertFalse(osutils.lexists(path),
1489
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1490
"""A helper for callDeprecated and applyDeprecated.
1492
:param a_callable: A callable to call.
1493
:param args: The positional arguments for the callable
1494
:param kwargs: The keyword arguments for the callable
1495
:return: A tuple (warnings, result). result is the result of calling
1496
a_callable(``*args``, ``**kwargs``).
1499
def capture_warnings(msg, cls=None, stacklevel=None):
1500
# we've hooked into a deprecation specific callpath,
1501
# only deprecations should getting sent via it.
1502
self.assertEqual(cls, DeprecationWarning)
1503
local_warnings.append(msg)
1504
original_warning_method = symbol_versioning.warn
1505
symbol_versioning.set_warning_method(capture_warnings)
1507
result = a_callable(*args, **kwargs)
1509
symbol_versioning.set_warning_method(original_warning_method)
1510
return (local_warnings, result)
1512
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1513
"""Call a deprecated callable without warning the user.
1515
Note that this only captures warnings raised by symbol_versioning.warn,
1516
not other callers that go direct to the warning module.
1518
To test that a deprecated method raises an error, do something like
1521
self.assertRaises(errors.ReservedId,
1522
self.applyDeprecated,
1523
deprecated_in((1, 5, 0)),
1527
:param deprecation_format: The deprecation format that the callable
1528
should have been deprecated with. This is the same type as the
1529
parameter to deprecated_method/deprecated_function. If the
1530
callable is not deprecated with this format, an assertion error
1532
:param a_callable: A callable to call. This may be a bound method or
1533
a regular function. It will be called with ``*args`` and
1535
:param args: The positional arguments for the callable
1536
:param kwargs: The keyword arguments for the callable
1537
:return: The result of a_callable(``*args``, ``**kwargs``)
1539
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1541
expected_first_warning = symbol_versioning.deprecation_string(
1542
a_callable, deprecation_format)
1543
if len(call_warnings) == 0:
1544
self.fail("No deprecation warning generated by call to %s" %
1546
self.assertEqual(expected_first_warning, call_warnings[0])
1549
def callCatchWarnings(self, fn, *args, **kw):
1550
"""Call a callable that raises python warnings.
1552
The caller's responsible for examining the returned warnings.
1554
If the callable raises an exception, the exception is not
1555
caught and propagates up to the caller. In that case, the list
1556
of warnings is not available.
1558
:returns: ([warning_object, ...], fn_result)
1560
# XXX: This is not perfect, because it completely overrides the
1561
# warnings filters, and some code may depend on suppressing particular
1562
# warnings. It's the easiest way to insulate ourselves from -Werror,
1563
# though. -- Andrew, 20071062
1565
def _catcher(message, category, filename, lineno, file=None, line=None):
1566
# despite the name, 'message' is normally(?) a Warning subclass
1568
wlist.append(message)
1569
saved_showwarning = warnings.showwarning
1570
saved_filters = warnings.filters
1572
warnings.showwarning = _catcher
1573
warnings.filters = []
1574
result = fn(*args, **kw)
1576
warnings.showwarning = saved_showwarning
1577
warnings.filters = saved_filters
1578
return wlist, result
1580
def callDeprecated(self, expected, callable, *args, **kwargs):
1581
"""Assert that a callable is deprecated in a particular way.
1583
This is a very precise test for unusual requirements. The
1584
applyDeprecated helper function is probably more suited for most tests
1585
as it allows you to simply specify the deprecation format being used
1586
and will ensure that that is issued for the function being called.
1588
Note that this only captures warnings raised by symbol_versioning.warn,
1589
not other callers that go direct to the warning module. To catch
1590
general warnings, use callCatchWarnings.
1592
:param expected: a list of the deprecation warnings expected, in order
1593
:param callable: The callable to call
1594
:param args: The positional arguments for the callable
1595
:param kwargs: The keyword arguments for the callable
1597
call_warnings, result = self._capture_deprecation_warnings(callable,
1599
self.assertEqual(expected, call_warnings)
1602
def _startLogFile(self):
1603
"""Send bzr and test log messages to a temporary file.
1605
The file is removed as the test is torn down.
1607
self._log_file = StringIO()
1608
self._log_memento = trace.push_log_file(self._log_file)
1609
self.addCleanup(self._finishLogFile)
1611
def _finishLogFile(self):
1612
"""Finished with the log file.
1614
Close the file and delete it, unless setKeepLogfile was called.
1616
if trace._trace_file:
1617
# flush the log file, to get all content
1618
trace._trace_file.flush()
1619
trace.pop_log_file(self._log_memento)
1620
# Cache the log result and delete the file on disk
1621
self._get_log(False)
1623
def thisFailsStrictLockCheck(self):
1624
"""It is known that this test would fail with -Dstrict_locks.
1626
By default, all tests are run with strict lock checking unless
1627
-Edisable_lock_checks is supplied. However there are some tests which
1628
we know fail strict locks at this point that have not been fixed.
1629
They should call this function to disable the strict checking.
1631
This should be used sparingly, it is much better to fix the locking
1632
issues rather than papering over the problem by calling this function.
1634
debug.debug_flags.discard('strict_locks')
1636
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1637
"""Overrides an object attribute restoring it after the test.
1639
:param obj: The object that will be mutated.
1641
:param attr_name: The attribute name we want to preserve/override in
1644
:param new: The optional value we want to set the attribute to.
1646
:returns: The actual attr value.
1648
value = getattr(obj, attr_name)
1649
# The actual value is captured by the call below
1650
self.addCleanup(setattr, obj, attr_name, value)
1651
if new is not _unitialized_attr:
1652
setattr(obj, attr_name, new)
1655
def overrideEnv(self, name, new):
1656
"""Set an environment variable, and reset it after the test.
1658
:param name: The environment variable name.
1660
:param new: The value to set the variable to. If None, the
1661
variable is deleted from the environment.
1663
:returns: The actual variable value.
1665
value = osutils.set_or_unset_env(name, new)
1666
self.addCleanup(osutils.set_or_unset_env, name, value)
1669
def _cleanEnvironment(self):
1670
for name, value in isolated_environ.iteritems():
1671
self.overrideEnv(name, value)
1673
def _restoreHooks(self):
1674
for klass, (name, hooks) in self._preserved_hooks.items():
1675
setattr(klass, name, hooks)
1676
hooks._lazy_hooks = self._preserved_lazy_hooks
1678
def knownFailure(self, reason):
1679
"""This test has failed for some known reason."""
1680
raise KnownFailure(reason)
1682
def _suppress_log(self):
1683
"""Remove the log info from details."""
1684
self.discardDetail('log')
1686
def _do_skip(self, result, reason):
1687
self._suppress_log()
1688
addSkip = getattr(result, 'addSkip', None)
1689
if not callable(addSkip):
1690
result.addSuccess(result)
1692
addSkip(self, reason)
1695
def _do_known_failure(self, result, e):
1696
self._suppress_log()
1697
err = sys.exc_info()
1698
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1699
if addExpectedFailure is not None:
1700
addExpectedFailure(self, err)
1702
result.addSuccess(self)
1705
def _do_not_applicable(self, result, e):
1707
reason = 'No reason given'
1710
self._suppress_log ()
1711
addNotApplicable = getattr(result, 'addNotApplicable', None)
1712
if addNotApplicable is not None:
1713
result.addNotApplicable(self, reason)
1715
self._do_skip(result, reason)
1718
def _report_skip(self, result, err):
1719
"""Override the default _report_skip.
1721
We want to strip the 'log' detail. If we waint until _do_skip, it has
1722
already been formatted into the 'reason' string, and we can't pull it
1725
self._suppress_log()
1726
super(TestCase, self)._report_skip(self, result, err)
1729
def _report_expected_failure(self, result, err):
1732
See _report_skip for motivation.
1734
self._suppress_log()
1735
super(TestCase, self)._report_expected_failure(self, result, err)
1738
def _do_unsupported_or_skip(self, result, e):
1740
self._suppress_log()
1741
addNotSupported = getattr(result, 'addNotSupported', None)
1742
if addNotSupported is not None:
1743
result.addNotSupported(self, reason)
1745
self._do_skip(result, reason)
1747
def time(self, callable, *args, **kwargs):
1748
"""Run callable and accrue the time it takes to the benchmark time.
1750
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1751
this will cause lsprofile statistics to be gathered and stored in
1754
if self._benchtime is None:
1755
self.addDetail('benchtime', content.Content(content.ContentType(
1756
"text", "plain"), lambda:[str(self._benchtime)]))
1760
if not self._gather_lsprof_in_benchmarks:
1761
return callable(*args, **kwargs)
1763
# record this benchmark
1764
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1766
self._benchcalls.append(((callable, args, kwargs), stats))
1769
self._benchtime += time.time() - start
1771
def log(self, *args):
1774
def _get_log(self, keep_log_file=False):
1775
"""Internal helper to get the log from bzrlib.trace for this test.
1777
Please use self.getDetails, or self.get_log to access this in test case
1780
:param keep_log_file: When True, if the log is still a file on disk
1781
leave it as a file on disk. When False, if the log is still a file
1782
on disk, the log file is deleted and the log preserved as
1784
:return: A string containing the log.
1786
if self._log_contents is not None:
1788
self._log_contents.decode('utf8')
1789
except UnicodeDecodeError:
1790
unicodestr = self._log_contents.decode('utf8', 'replace')
1791
self._log_contents = unicodestr.encode('utf8')
1792
return self._log_contents
1793
if self._log_file is not None:
1794
log_contents = self._log_file.getvalue()
1796
log_contents.decode('utf8')
1797
except UnicodeDecodeError:
1798
unicodestr = log_contents.decode('utf8', 'replace')
1799
log_contents = unicodestr.encode('utf8')
1800
if not keep_log_file:
1801
self._log_file = None
1802
# Permit multiple calls to get_log until we clean it up in
1804
self._log_contents = log_contents
1807
return "No log file content."
1810
"""Get a unicode string containing the log from bzrlib.trace.
1812
Undecodable characters are replaced.
1814
return u"".join(self.getDetails()['log'].iter_text())
1816
def requireFeature(self, feature):
1817
"""This test requires a specific feature is available.
1819
:raises UnavailableFeature: When feature is not available.
1821
if not feature.available():
1822
raise UnavailableFeature(feature)
1824
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1826
"""Run bazaar command line, splitting up a string command line."""
1827
if isinstance(args, basestring):
1828
# shlex don't understand unicode strings,
1829
# so args should be plain string (bialix 20070906)
1830
args = list(shlex.split(str(args)))
1831
return self._run_bzr_core(args, retcode=retcode,
1832
encoding=encoding, stdin=stdin, working_dir=working_dir,
1835
def _run_bzr_core(self, args, retcode, encoding, stdin,
1837
# Clear chk_map page cache, because the contents are likely to mask
1839
chk_map.clear_cache()
1840
if encoding is None:
1841
encoding = osutils.get_user_encoding()
1842
stdout = StringIOWrapper()
1843
stderr = StringIOWrapper()
1844
stdout.encoding = encoding
1845
stderr.encoding = encoding
1847
self.log('run bzr: %r', args)
1848
# FIXME: don't call into logging here
1849
handler = logging.StreamHandler(stderr)
1850
handler.setLevel(logging.INFO)
1851
logger = logging.getLogger('')
1852
logger.addHandler(handler)
1853
old_ui_factory = ui.ui_factory
1854
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1857
if working_dir is not None:
1858
cwd = osutils.getcwd()
1859
os.chdir(working_dir)
1863
result = self.apply_redirected(
1864
ui.ui_factory.stdin,
1866
_mod_commands.run_bzr_catch_user_errors,
1868
except KeyboardInterrupt:
1869
# Reraise KeyboardInterrupt with contents of redirected stdout
1870
# and stderr as arguments, for tests which are interested in
1871
# stdout and stderr and are expecting the exception.
1872
out = stdout.getvalue()
1873
err = stderr.getvalue()
1875
self.log('output:\n%r', out)
1877
self.log('errors:\n%r', err)
1878
raise KeyboardInterrupt(out, err)
1880
logger.removeHandler(handler)
1881
ui.ui_factory = old_ui_factory
1885
out = stdout.getvalue()
1886
err = stderr.getvalue()
1888
self.log('output:\n%r', out)
1890
self.log('errors:\n%r', err)
1891
if retcode is not None:
1892
self.assertEquals(retcode, result,
1893
message='Unexpected return code')
1894
return result, out, err
1896
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1897
working_dir=None, error_regexes=[], output_encoding=None):
1898
"""Invoke bzr, as if it were run from the command line.
1900
The argument list should not include the bzr program name - the
1901
first argument is normally the bzr command. Arguments may be
1902
passed in three ways:
1904
1- A list of strings, eg ["commit", "a"]. This is recommended
1905
when the command contains whitespace or metacharacters, or
1906
is built up at run time.
1908
2- A single string, eg "add a". This is the most convenient
1909
for hardcoded commands.
1911
This runs bzr through the interface that catches and reports
1912
errors, and with logging set to something approximating the
1913
default, so that error reporting can be checked.
1915
This should be the main method for tests that want to exercise the
1916
overall behavior of the bzr application (rather than a unit test
1917
or a functional test of the library.)
1919
This sends the stdout/stderr results into the test's log,
1920
where it may be useful for debugging. See also run_captured.
1922
:keyword stdin: A string to be used as stdin for the command.
1923
:keyword retcode: The status code the command should return;
1925
:keyword working_dir: The directory to run the command in
1926
:keyword error_regexes: A list of expected error messages. If
1927
specified they must be seen in the error output of the command.
1929
retcode, out, err = self._run_bzr_autosplit(
1934
working_dir=working_dir,
1936
self.assertIsInstance(error_regexes, (list, tuple))
1937
for regex in error_regexes:
1938
self.assertContainsRe(err, regex)
1941
def run_bzr_error(self, error_regexes, *args, **kwargs):
1942
"""Run bzr, and check that stderr contains the supplied regexes
1944
:param error_regexes: Sequence of regular expressions which
1945
must each be found in the error output. The relative ordering
1947
:param args: command-line arguments for bzr
1948
:param kwargs: Keyword arguments which are interpreted by run_bzr
1949
This function changes the default value of retcode to be 3,
1950
since in most cases this is run when you expect bzr to fail.
1952
:return: (out, err) The actual output of running the command (in case
1953
you want to do more inspection)
1957
# Make sure that commit is failing because there is nothing to do
1958
self.run_bzr_error(['no changes to commit'],
1959
['commit', '-m', 'my commit comment'])
1960
# Make sure --strict is handling an unknown file, rather than
1961
# giving us the 'nothing to do' error
1962
self.build_tree(['unknown'])
1963
self.run_bzr_error(['Commit refused because there are unknown files'],
1964
['commit', --strict', '-m', 'my commit comment'])
1966
kwargs.setdefault('retcode', 3)
1967
kwargs['error_regexes'] = error_regexes
1968
out, err = self.run_bzr(*args, **kwargs)
1971
def run_bzr_subprocess(self, *args, **kwargs):
1972
"""Run bzr in a subprocess for testing.
1974
This starts a new Python interpreter and runs bzr in there.
1975
This should only be used for tests that have a justifiable need for
1976
this isolation: e.g. they are testing startup time, or signal
1977
handling, or early startup code, etc. Subprocess code can't be
1978
profiled or debugged so easily.
1980
:keyword retcode: The status code that is expected. Defaults to 0. If
1981
None is supplied, the status code is not checked.
1982
:keyword env_changes: A dictionary which lists changes to environment
1983
variables. A value of None will unset the env variable.
1984
The values must be strings. The change will only occur in the
1985
child, so you don't need to fix the environment after running.
1986
:keyword universal_newlines: Convert CRLF => LF
1987
:keyword allow_plugins: By default the subprocess is run with
1988
--no-plugins to ensure test reproducibility. Also, it is possible
1989
for system-wide plugins to create unexpected output on stderr,
1990
which can cause unnecessary test failures.
1992
env_changes = kwargs.get('env_changes', {})
1993
working_dir = kwargs.get('working_dir', None)
1994
allow_plugins = kwargs.get('allow_plugins', False)
1996
if isinstance(args[0], list):
1998
elif isinstance(args[0], basestring):
1999
args = list(shlex.split(args[0]))
2001
raise ValueError("passing varargs to run_bzr_subprocess")
2002
process = self.start_bzr_subprocess(args, env_changes=env_changes,
2003
working_dir=working_dir,
2004
allow_plugins=allow_plugins)
2005
# We distinguish between retcode=None and retcode not passed.
2006
supplied_retcode = kwargs.get('retcode', 0)
2007
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2008
universal_newlines=kwargs.get('universal_newlines', False),
2011
def start_bzr_subprocess(self, process_args, env_changes=None,
2012
skip_if_plan_to_signal=False,
2014
allow_plugins=False):
2015
"""Start bzr in a subprocess for testing.
2017
This starts a new Python interpreter and runs bzr in there.
2018
This should only be used for tests that have a justifiable need for
2019
this isolation: e.g. they are testing startup time, or signal
2020
handling, or early startup code, etc. Subprocess code can't be
2021
profiled or debugged so easily.
2023
:param process_args: a list of arguments to pass to the bzr executable,
2024
for example ``['--version']``.
2025
:param env_changes: A dictionary which lists changes to environment
2026
variables. A value of None will unset the env variable.
2027
The values must be strings. The change will only occur in the
2028
child, so you don't need to fix the environment after running.
2029
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2030
doesn't support signalling subprocesses.
2031
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2033
:returns: Popen object for the started process.
2035
if skip_if_plan_to_signal:
2036
if os.name != "posix":
2037
raise TestSkipped("Sending signals not supported")
2039
if env_changes is None:
2043
def cleanup_environment():
2044
for env_var, value in env_changes.iteritems():
2045
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2047
def restore_environment():
2048
for env_var, value in old_env.iteritems():
2049
osutils.set_or_unset_env(env_var, value)
2051
bzr_path = self.get_bzr_path()
2054
if working_dir is not None:
2055
cwd = osutils.getcwd()
2056
os.chdir(working_dir)
2059
# win32 subprocess doesn't support preexec_fn
2060
# so we will avoid using it on all platforms, just to
2061
# make sure the code path is used, and we don't break on win32
2062
cleanup_environment()
2063
command = [sys.executable]
2064
# frozen executables don't need the path to bzr
2065
if getattr(sys, "frozen", None) is None:
2066
command.append(bzr_path)
2067
if not allow_plugins:
2068
command.append('--no-plugins')
2069
command.extend(process_args)
2070
process = self._popen(command, stdin=subprocess.PIPE,
2071
stdout=subprocess.PIPE,
2072
stderr=subprocess.PIPE)
2074
restore_environment()
2080
def _popen(self, *args, **kwargs):
2081
"""Place a call to Popen.
2083
Allows tests to override this method to intercept the calls made to
2084
Popen for introspection.
2086
return subprocess.Popen(*args, **kwargs)
2088
def get_source_path(self):
2089
"""Return the path of the directory containing bzrlib."""
2090
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2092
def get_bzr_path(self):
2093
"""Return the path of the 'bzr' executable for this test suite."""
2094
bzr_path = os.path.join(self.get_source_path(), "bzr")
2095
if not os.path.isfile(bzr_path):
2096
# We are probably installed. Assume sys.argv is the right file
2097
bzr_path = sys.argv[0]
2100
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2101
universal_newlines=False, process_args=None):
2102
"""Finish the execution of process.
2104
:param process: the Popen object returned from start_bzr_subprocess.
2105
:param retcode: The status code that is expected. Defaults to 0. If
2106
None is supplied, the status code is not checked.
2107
:param send_signal: an optional signal to send to the process.
2108
:param universal_newlines: Convert CRLF => LF
2109
:returns: (stdout, stderr)
2111
if send_signal is not None:
2112
os.kill(process.pid, send_signal)
2113
out, err = process.communicate()
2115
if universal_newlines:
2116
out = out.replace('\r\n', '\n')
2117
err = err.replace('\r\n', '\n')
2119
if retcode is not None and retcode != process.returncode:
2120
if process_args is None:
2121
process_args = "(unknown args)"
2122
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2123
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2124
self.fail('Command bzr %s failed with retcode %s != %s'
2125
% (process_args, retcode, process.returncode))
2128
def check_inventory_shape(self, inv, shape):
2129
"""Compare an inventory to a list of expected names.
2131
Fail if they are not precisely equal.
2134
shape = list(shape) # copy
2135
for path, ie in inv.entries():
2136
name = path.replace('\\', '/')
2137
if ie.kind == 'directory':
2144
self.fail("expected paths not found in inventory: %r" % shape)
2146
self.fail("unexpected paths found in inventory: %r" % extras)
2148
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2149
a_callable=None, *args, **kwargs):
2150
"""Call callable with redirected std io pipes.
2152
Returns the return code."""
2153
if not callable(a_callable):
2154
raise ValueError("a_callable must be callable.")
2156
stdin = StringIO("")
2158
if getattr(self, "_log_file", None) is not None:
2159
stdout = self._log_file
2163
if getattr(self, "_log_file", None is not None):
2164
stderr = self._log_file
2167
real_stdin = sys.stdin
2168
real_stdout = sys.stdout
2169
real_stderr = sys.stderr
2174
return a_callable(*args, **kwargs)
2176
sys.stdout = real_stdout
2177
sys.stderr = real_stderr
2178
sys.stdin = real_stdin
2180
def reduceLockdirTimeout(self):
2181
"""Reduce the default lock timeout for the duration of the test, so that
2182
if LockContention occurs during a test, it does so quickly.
2184
Tests that expect to provoke LockContention errors should call this.
2186
self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2188
def make_utf8_encoded_stringio(self, encoding_type=None):
2189
"""Return a StringIOWrapper instance, that will encode Unicode
2192
if encoding_type is None:
2193
encoding_type = 'strict'
2195
output_encoding = 'utf-8'
2196
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2197
sio.encoding = output_encoding
2200
def disable_verb(self, verb):
2201
"""Disable a smart server verb for one test."""
2202
from bzrlib.smart import request
2203
request_handlers = request.request_handlers
2204
orig_method = request_handlers.get(verb)
2205
request_handlers.remove(verb)
2206
self.addCleanup(request_handlers.register, verb, orig_method)
2209
class CapturedCall(object):
2210
"""A helper for capturing smart server calls for easy debug analysis."""
2212
def __init__(self, params, prefix_length):
2213
"""Capture the call with params and skip prefix_length stack frames."""
2216
# The last 5 frames are the __init__, the hook frame, and 3 smart
2217
# client frames. Beyond this we could get more clever, but this is good
2219
stack = traceback.extract_stack()[prefix_length:-5]
2220
self.stack = ''.join(traceback.format_list(stack))
2223
return self.call.method
2226
return self.call.method
2232
class TestCaseWithMemoryTransport(TestCase):
2233
"""Common test class for tests that do not need disk resources.
2235
Tests that need disk resources should derive from TestCaseWithTransport.
2237
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2239
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2240
a directory which does not exist. This serves to help ensure test isolation
2241
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2242
must exist. However, TestCaseWithMemoryTransport does not offer local
2243
file defaults for the transport in tests, nor does it obey the command line
2244
override, so tests that accidentally write to the common directory should
2247
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2248
a .bzr directory that stops us ascending higher into the filesystem.
2254
def __init__(self, methodName='runTest'):
2255
# allow test parameterization after test construction and before test
2256
# execution. Variables that the parameterizer sets need to be
2257
# ones that are not set by setUp, or setUp will trash them.
2258
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2259
self.vfs_transport_factory = default_transport
2260
self.transport_server = None
2261
self.transport_readonly_server = None
2262
self.__vfs_server = None
2264
def get_transport(self, relpath=None):
2265
"""Return a writeable transport.
2267
This transport is for the test scratch space relative to
2270
:param relpath: a path relative to the base url.
2272
t = _mod_transport.get_transport(self.get_url(relpath))
2273
self.assertFalse(t.is_readonly())
2276
def get_readonly_transport(self, relpath=None):
2277
"""Return a readonly transport for the test scratch space
2279
This can be used to test that operations which should only need
2280
readonly access in fact do not try to write.
2282
:param relpath: a path relative to the base url.
2284
t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2285
self.assertTrue(t.is_readonly())
2288
def create_transport_readonly_server(self):
2289
"""Create a transport server from class defined at init.
2291
This is mostly a hook for daughter classes.
2293
return self.transport_readonly_server()
2295
def get_readonly_server(self):
2296
"""Get the server instance for the readonly transport
2298
This is useful for some tests with specific servers to do diagnostics.
2300
if self.__readonly_server is None:
2301
if self.transport_readonly_server is None:
2302
# readonly decorator requested
2303
self.__readonly_server = test_server.ReadonlyServer()
2305
# explicit readonly transport.
2306
self.__readonly_server = self.create_transport_readonly_server()
2307
self.start_server(self.__readonly_server,
2308
self.get_vfs_only_server())
2309
return self.__readonly_server
2311
def get_readonly_url(self, relpath=None):
2312
"""Get a URL for the readonly transport.
2314
This will either be backed by '.' or a decorator to the transport
2315
used by self.get_url()
2316
relpath provides for clients to get a path relative to the base url.
2317
These should only be downwards relative, not upwards.
2319
base = self.get_readonly_server().get_url()
2320
return self._adjust_url(base, relpath)
2322
def get_vfs_only_server(self):
2323
"""Get the vfs only read/write server instance.
2325
This is useful for some tests with specific servers that need
2328
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2329
is no means to override it.
2331
if self.__vfs_server is None:
2332
self.__vfs_server = memory.MemoryServer()
2333
self.start_server(self.__vfs_server)
2334
return self.__vfs_server
2336
def get_server(self):
2337
"""Get the read/write server instance.
2339
This is useful for some tests with specific servers that need
2342
This is built from the self.transport_server factory. If that is None,
2343
then the self.get_vfs_server is returned.
2345
if self.__server is None:
2346
if (self.transport_server is None or self.transport_server is
2347
self.vfs_transport_factory):
2348
self.__server = self.get_vfs_only_server()
2350
# bring up a decorated means of access to the vfs only server.
2351
self.__server = self.transport_server()
2352
self.start_server(self.__server, self.get_vfs_only_server())
2353
return self.__server
2355
def _adjust_url(self, base, relpath):
2356
"""Get a URL (or maybe a path) for the readwrite transport.
2358
This will either be backed by '.' or to an equivalent non-file based
2360
relpath provides for clients to get a path relative to the base url.
2361
These should only be downwards relative, not upwards.
2363
if relpath is not None and relpath != '.':
2364
if not base.endswith('/'):
2366
# XXX: Really base should be a url; we did after all call
2367
# get_url()! But sometimes it's just a path (from
2368
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2369
# to a non-escaped local path.
2370
if base.startswith('./') or base.startswith('/'):
2373
base += urlutils.escape(relpath)
2376
def get_url(self, relpath=None):
2377
"""Get a URL (or maybe a path) for the readwrite transport.
2379
This will either be backed by '.' or to an equivalent non-file based
2381
relpath provides for clients to get a path relative to the base url.
2382
These should only be downwards relative, not upwards.
2384
base = self.get_server().get_url()
2385
return self._adjust_url(base, relpath)
2387
def get_vfs_only_url(self, relpath=None):
2388
"""Get a URL (or maybe a path for the plain old vfs transport.
2390
This will never be a smart protocol. It always has all the
2391
capabilities of the local filesystem, but it might actually be a
2392
MemoryTransport or some other similar virtual filesystem.
2394
This is the backing transport (if any) of the server returned by
2395
get_url and get_readonly_url.
2397
:param relpath: provides for clients to get a path relative to the base
2398
url. These should only be downwards relative, not upwards.
2401
base = self.get_vfs_only_server().get_url()
2402
return self._adjust_url(base, relpath)
2404
def _create_safety_net(self):
2405
"""Make a fake bzr directory.
2407
This prevents any tests propagating up onto the TEST_ROOT directory's
2410
root = TestCaseWithMemoryTransport.TEST_ROOT
2411
bzrdir.BzrDir.create_standalone_workingtree(root)
2413
def _check_safety_net(self):
2414
"""Check that the safety .bzr directory have not been touched.
2416
_make_test_root have created a .bzr directory to prevent tests from
2417
propagating. This method ensures than a test did not leaked.
2419
root = TestCaseWithMemoryTransport.TEST_ROOT
2420
self.permit_url(_mod_transport.get_transport(root).base)
2421
wt = workingtree.WorkingTree.open(root)
2422
last_rev = wt.last_revision()
2423
if last_rev != 'null:':
2424
# The current test have modified the /bzr directory, we need to
2425
# recreate a new one or all the followng tests will fail.
2426
# If you need to inspect its content uncomment the following line
2427
# import pdb; pdb.set_trace()
2428
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2429
self._create_safety_net()
2430
raise AssertionError('%s/.bzr should not be modified' % root)
2432
def _make_test_root(self):
2433
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2434
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2435
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2437
TestCaseWithMemoryTransport.TEST_ROOT = root
2439
self._create_safety_net()
2441
# The same directory is used by all tests, and we're not
2442
# specifically told when all tests are finished. This will do.
2443
atexit.register(_rmtree_temp_dir, root)
2445
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2446
self.addCleanup(self._check_safety_net)
2448
def makeAndChdirToTestDir(self):
2449
"""Create a temporary directories for this one test.
2451
This must set self.test_home_dir and self.test_dir and chdir to
2454
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2456
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2457
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2458
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2459
self.permit_dir(self.test_dir)
2461
def make_branch(self, relpath, format=None):
2462
"""Create a branch on the transport at relpath."""
2463
repo = self.make_repository(relpath, format=format)
2464
return repo.bzrdir.create_branch()
2466
def make_bzrdir(self, relpath, format=None):
2468
# might be a relative or absolute path
2469
maybe_a_url = self.get_url(relpath)
2470
segments = maybe_a_url.rsplit('/', 1)
2471
t = _mod_transport.get_transport(maybe_a_url)
2472
if len(segments) > 1 and segments[-1] not in ('', '.'):
2476
if isinstance(format, basestring):
2477
format = bzrdir.format_registry.make_bzrdir(format)
2478
return format.initialize_on_transport(t)
2479
except errors.UninitializableFormat:
2480
raise TestSkipped("Format %s is not initializable." % format)
2482
def make_repository(self, relpath, shared=False, format=None):
2483
"""Create a repository on our default transport at relpath.
2485
Note that relpath must be a relative path, not a full url.
2487
# FIXME: If you create a remoterepository this returns the underlying
2488
# real format, which is incorrect. Actually we should make sure that
2489
# RemoteBzrDir returns a RemoteRepository.
2490
# maybe mbp 20070410
2491
made_control = self.make_bzrdir(relpath, format=format)
2492
return made_control.create_repository(shared=shared)
2494
def make_smart_server(self, path, backing_server=None):
2495
if backing_server is None:
2496
backing_server = self.get_server()
2497
smart_server = test_server.SmartTCPServer_for_testing()
2498
self.start_server(smart_server, backing_server)
2499
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2501
return remote_transport
2503
def make_branch_and_memory_tree(self, relpath, format=None):
2504
"""Create a branch on the default transport and a MemoryTree for it."""
2505
b = self.make_branch(relpath, format=format)
2506
return memorytree.MemoryTree.create_on_branch(b)
2508
def make_branch_builder(self, relpath, format=None):
2509
branch = self.make_branch(relpath, format=format)
2510
return branchbuilder.BranchBuilder(branch=branch)
2512
def overrideEnvironmentForTesting(self):
2513
test_home_dir = self.test_home_dir
2514
if isinstance(test_home_dir, unicode):
2515
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2516
self.overrideEnv('HOME', test_home_dir)
2517
self.overrideEnv('BZR_HOME', test_home_dir)
2520
super(TestCaseWithMemoryTransport, self).setUp()
2521
# Ensure that ConnectedTransport doesn't leak sockets
2522
def get_transport_with_cleanup(*args, **kwargs):
2523
t = orig_get_transport(*args, **kwargs)
2524
if isinstance(t, _mod_transport.ConnectedTransport):
2525
self.addCleanup(t.disconnect)
2528
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2529
get_transport_with_cleanup)
2530
self._make_test_root()
2531
self.addCleanup(os.chdir, os.getcwdu())
2532
self.makeAndChdirToTestDir()
2533
self.overrideEnvironmentForTesting()
2534
self.__readonly_server = None
2535
self.__server = None
2536
self.reduceLockdirTimeout()
2538
def setup_smart_server_with_call_log(self):
2539
"""Sets up a smart server as the transport server with a call log."""
2540
self.transport_server = test_server.SmartTCPServer_for_testing
2541
self.hpss_calls = []
2543
# Skip the current stack down to the caller of
2544
# setup_smart_server_with_call_log
2545
prefix_length = len(traceback.extract_stack()) - 2
2546
def capture_hpss_call(params):
2547
self.hpss_calls.append(
2548
CapturedCall(params, prefix_length))
2549
client._SmartClient.hooks.install_named_hook(
2550
'call', capture_hpss_call, None)
2552
def reset_smart_call_log(self):
2553
self.hpss_calls = []
2556
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2557
"""Derived class that runs a test within a temporary directory.
2559
This is useful for tests that need to create a branch, etc.
2561
The directory is created in a slightly complex way: for each
2562
Python invocation, a new temporary top-level directory is created.
2563
All test cases create their own directory within that. If the
2564
tests complete successfully, the directory is removed.
2566
:ivar test_base_dir: The path of the top-level directory for this
2567
test, which contains a home directory and a work directory.
2569
:ivar test_home_dir: An initially empty directory under test_base_dir
2570
which is used as $HOME for this test.
2572
:ivar test_dir: A directory under test_base_dir used as the current
2573
directory when the test proper is run.
2576
OVERRIDE_PYTHON = 'python'
2578
def check_file_contents(self, filename, expect):
2579
self.log("check contents of file %s" % filename)
2585
if contents != expect:
2586
self.log("expected: %r" % expect)
2587
self.log("actually: %r" % contents)
2588
self.fail("contents of %s not as expected" % filename)
2590
def _getTestDirPrefix(self):
2591
# create a directory within the top level test directory
2592
if sys.platform in ('win32', 'cygwin'):
2593
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2594
# windows is likely to have path-length limits so use a short name
2595
name_prefix = name_prefix[-30:]
2597
name_prefix = re.sub('[/]', '_', self.id())
2600
def makeAndChdirToTestDir(self):
2601
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2603
For TestCaseInTempDir we create a temporary directory based on the test
2604
name and then create two subdirs - test and home under it.
2606
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2607
self._getTestDirPrefix())
2609
for i in range(100):
2610
if os.path.exists(name):
2611
name = name_prefix + '_' + str(i)
2613
# now create test and home directories within this dir
2614
self.test_base_dir = name
2615
self.addCleanup(self.deleteTestDir)
2616
os.mkdir(self.test_base_dir)
2618
self.permit_dir(self.test_base_dir)
2619
# 'sprouting' and 'init' of a branch both walk up the tree to find
2620
# stacking policy to honour; create a bzr dir with an unshared
2621
# repository (but not a branch - our code would be trying to escape
2622
# then!) to stop them, and permit it to be read.
2623
# control = bzrdir.BzrDir.create(self.test_base_dir)
2624
# control.create_repository()
2625
self.test_home_dir = self.test_base_dir + '/home'
2626
os.mkdir(self.test_home_dir)
2627
self.test_dir = self.test_base_dir + '/work'
2628
os.mkdir(self.test_dir)
2629
os.chdir(self.test_dir)
2630
# put name of test inside
2631
f = file(self.test_base_dir + '/name', 'w')
2637
def deleteTestDir(self):
2638
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2639
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2641
def build_tree(self, shape, line_endings='binary', transport=None):
2642
"""Build a test tree according to a pattern.
2644
shape is a sequence of file specifications. If the final
2645
character is '/', a directory is created.
2647
This assumes that all the elements in the tree being built are new.
2649
This doesn't add anything to a branch.
2651
:type shape: list or tuple.
2652
:param line_endings: Either 'binary' or 'native'
2653
in binary mode, exact contents are written in native mode, the
2654
line endings match the default platform endings.
2655
:param transport: A transport to write to, for building trees on VFS's.
2656
If the transport is readonly or None, "." is opened automatically.
2659
if type(shape) not in (list, tuple):
2660
raise AssertionError("Parameter 'shape' should be "
2661
"a list or a tuple. Got %r instead" % (shape,))
2662
# It's OK to just create them using forward slashes on windows.
2663
if transport is None or transport.is_readonly():
2664
transport = _mod_transport.get_transport(".")
2666
self.assertIsInstance(name, basestring)
2668
transport.mkdir(urlutils.escape(name[:-1]))
2670
if line_endings == 'binary':
2672
elif line_endings == 'native':
2675
raise errors.BzrError(
2676
'Invalid line ending request %r' % line_endings)
2677
content = "contents of %s%s" % (name.encode('utf-8'), end)
2678
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2680
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2682
def assertInWorkingTree(self, path, root_path='.', tree=None):
2683
"""Assert whether path or paths are in the WorkingTree"""
2685
tree = workingtree.WorkingTree.open(root_path)
2686
if not isinstance(path, basestring):
2688
self.assertInWorkingTree(p, tree=tree)
2690
self.assertIsNot(tree.path2id(path), None,
2691
path+' not in working tree.')
2693
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2694
"""Assert whether path or paths are not in the WorkingTree"""
2696
tree = workingtree.WorkingTree.open(root_path)
2697
if not isinstance(path, basestring):
2699
self.assertNotInWorkingTree(p,tree=tree)
2701
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2704
class TestCaseWithTransport(TestCaseInTempDir):
2705
"""A test case that provides get_url and get_readonly_url facilities.
2707
These back onto two transport servers, one for readonly access and one for
2710
If no explicit class is provided for readonly access, a
2711
ReadonlyTransportDecorator is used instead which allows the use of non disk
2712
based read write transports.
2714
If an explicit class is provided for readonly access, that server and the
2715
readwrite one must both define get_url() as resolving to os.getcwd().
2718
def get_vfs_only_server(self):
2719
"""See TestCaseWithMemoryTransport.
2721
This is useful for some tests with specific servers that need
2724
if self.__vfs_server is None:
2725
self.__vfs_server = self.vfs_transport_factory()
2726
self.start_server(self.__vfs_server)
2727
return self.__vfs_server
2729
def make_branch_and_tree(self, relpath, format=None):
2730
"""Create a branch on the transport and a tree locally.
2732
If the transport is not a LocalTransport, the Tree can't be created on
2733
the transport. In that case if the vfs_transport_factory is
2734
LocalURLServer the working tree is created in the local
2735
directory backing the transport, and the returned tree's branch and
2736
repository will also be accessed locally. Otherwise a lightweight
2737
checkout is created and returned.
2739
We do this because we can't physically create a tree in the local
2740
path, with a branch reference to the transport_factory url, and
2741
a branch + repository in the vfs_transport, unless the vfs_transport
2742
namespace is distinct from the local disk - the two branch objects
2743
would collide. While we could construct a tree with its branch object
2744
pointing at the transport_factory transport in memory, reopening it
2745
would behaving unexpectedly, and has in the past caused testing bugs
2746
when we tried to do it that way.
2748
:param format: The BzrDirFormat.
2749
:returns: the WorkingTree.
2751
# TODO: always use the local disk path for the working tree,
2752
# this obviously requires a format that supports branch references
2753
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2755
b = self.make_branch(relpath, format=format)
2757
return b.bzrdir.create_workingtree()
2758
except errors.NotLocalUrl:
2759
# We can only make working trees locally at the moment. If the
2760
# transport can't support them, then we keep the non-disk-backed
2761
# branch and create a local checkout.
2762
if self.vfs_transport_factory is test_server.LocalURLServer:
2763
# the branch is colocated on disk, we cannot create a checkout.
2764
# hopefully callers will expect this.
2765
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2766
wt = local_controldir.create_workingtree()
2767
if wt.branch._format != b._format:
2769
# Make sure that assigning to wt._branch fixes wt.branch,
2770
# in case the implementation details of workingtree objects
2772
self.assertIs(b, wt.branch)
2775
return b.create_checkout(relpath, lightweight=True)
2777
def assertIsDirectory(self, relpath, transport):
2778
"""Assert that relpath within transport is a directory.
2780
This may not be possible on all transports; in that case it propagates
2781
a TransportNotPossible.
2784
mode = transport.stat(relpath).st_mode
2785
except errors.NoSuchFile:
2786
self.fail("path %s is not a directory; no such file"
2788
if not stat.S_ISDIR(mode):
2789
self.fail("path %s is not a directory; has mode %#o"
2792
def assertTreesEqual(self, left, right):
2793
"""Check that left and right have the same content and properties."""
2794
# we use a tree delta to check for equality of the content, and we
2795
# manually check for equality of other things such as the parents list.
2796
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2797
differences = left.changes_from(right)
2798
self.assertFalse(differences.has_changed(),
2799
"Trees %r and %r are different: %r" % (left, right, differences))
2802
super(TestCaseWithTransport, self).setUp()
2803
self.__vfs_server = None
2805
def disable_missing_extensions_warning(self):
2806
"""Some tests expect a precise stderr content.
2808
There is no point in forcing them to duplicate the extension related
2811
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2814
class ChrootedTestCase(TestCaseWithTransport):
2815
"""A support class that provides readonly urls outside the local namespace.
2817
This is done by checking if self.transport_server is a MemoryServer. if it
2818
is then we are chrooted already, if it is not then an HttpServer is used
2821
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2822
be used without needed to redo it when a different
2823
subclass is in use ?
2827
from bzrlib.tests import http_server
2828
super(ChrootedTestCase, self).setUp()
2829
if not self.vfs_transport_factory == memory.MemoryServer:
2830
self.transport_readonly_server = http_server.HttpServer
2833
def condition_id_re(pattern):
2834
"""Create a condition filter which performs a re check on a test's id.
2836
:param pattern: A regular expression string.
2837
:return: A callable that returns True if the re matches.
2839
filter_re = re.compile(pattern, 0)
2840
def condition(test):
2842
return filter_re.search(test_id)
2846
def condition_isinstance(klass_or_klass_list):
2847
"""Create a condition filter which returns isinstance(param, klass).
2849
:return: A callable which when called with one parameter obj return the
2850
result of isinstance(obj, klass_or_klass_list).
2853
return isinstance(obj, klass_or_klass_list)
2857
def condition_id_in_list(id_list):
2858
"""Create a condition filter which verify that test's id in a list.
2860
:param id_list: A TestIdList object.
2861
:return: A callable that returns True if the test's id appears in the list.
2863
def condition(test):
2864
return id_list.includes(test.id())
2868
def condition_id_startswith(starts):
2869
"""Create a condition filter verifying that test's id starts with a string.
2871
:param starts: A list of string.
2872
:return: A callable that returns True if the test's id starts with one of
2875
def condition(test):
2876
for start in starts:
2877
if test.id().startswith(start):
2883
def exclude_tests_by_condition(suite, condition):
2884
"""Create a test suite which excludes some tests from suite.
2886
:param suite: The suite to get tests from.
2887
:param condition: A callable whose result evaluates True when called with a
2888
test case which should be excluded from the result.
2889
:return: A suite which contains the tests found in suite that fail
2893
for test in iter_suite_tests(suite):
2894
if not condition(test):
2896
return TestUtil.TestSuite(result)
2899
def filter_suite_by_condition(suite, condition):
2900
"""Create a test suite by filtering another one.
2902
:param suite: The source suite.
2903
:param condition: A callable whose result evaluates True when called with a
2904
test case which should be included in the result.
2905
:return: A suite which contains the tests found in suite that pass
2909
for test in iter_suite_tests(suite):
2912
return TestUtil.TestSuite(result)
2915
def filter_suite_by_re(suite, pattern):
2916
"""Create a test suite by filtering another one.
2918
:param suite: the source suite
2919
:param pattern: pattern that names must match
2920
:returns: the newly created suite
2922
condition = condition_id_re(pattern)
2923
result_suite = filter_suite_by_condition(suite, condition)
2927
def filter_suite_by_id_list(suite, test_id_list):
2928
"""Create a test suite by filtering another one.
2930
:param suite: The source suite.
2931
:param test_id_list: A list of the test ids to keep as strings.
2932
:returns: the newly created suite
2934
condition = condition_id_in_list(test_id_list)
2935
result_suite = filter_suite_by_condition(suite, condition)
2939
def filter_suite_by_id_startswith(suite, start):
2940
"""Create a test suite by filtering another one.
2942
:param suite: The source suite.
2943
:param start: A list of string the test id must start with one of.
2944
:returns: the newly created suite
2946
condition = condition_id_startswith(start)
2947
result_suite = filter_suite_by_condition(suite, condition)
2951
def exclude_tests_by_re(suite, pattern):
2952
"""Create a test suite which excludes some tests from suite.
2954
:param suite: The suite to get tests from.
2955
:param pattern: A regular expression string. Test ids that match this
2956
pattern will be excluded from the result.
2957
:return: A TestSuite that contains all the tests from suite without the
2958
tests that matched pattern. The order of tests is the same as it was in
2961
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2964
def preserve_input(something):
2965
"""A helper for performing test suite transformation chains.
2967
:param something: Anything you want to preserve.
2973
def randomize_suite(suite):
2974
"""Return a new TestSuite with suite's tests in random order.
2976
The tests in the input suite are flattened into a single suite in order to
2977
accomplish this. Any nested TestSuites are removed to provide global
2980
tests = list(iter_suite_tests(suite))
2981
random.shuffle(tests)
2982
return TestUtil.TestSuite(tests)
2985
def split_suite_by_condition(suite, condition):
2986
"""Split a test suite into two by a condition.
2988
:param suite: The suite to split.
2989
:param condition: The condition to match on. Tests that match this
2990
condition are returned in the first test suite, ones that do not match
2991
are in the second suite.
2992
:return: A tuple of two test suites, where the first contains tests from
2993
suite matching the condition, and the second contains the remainder
2994
from suite. The order within each output suite is the same as it was in
2999
for test in iter_suite_tests(suite):
3001
matched.append(test)
3003
did_not_match.append(test)
3004
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3007
def split_suite_by_re(suite, pattern):
3008
"""Split a test suite into two by a regular expression.
3010
:param suite: The suite to split.
3011
:param pattern: A regular expression string. Test ids that match this
3012
pattern will be in the first test suite returned, and the others in the
3013
second test suite returned.
3014
:return: A tuple of two test suites, where the first contains tests from
3015
suite matching pattern, and the second contains the remainder from
3016
suite. The order within each output suite is the same as it was in
3019
return split_suite_by_condition(suite, condition_id_re(pattern))
3022
def run_suite(suite, name='test', verbose=False, pattern=".*",
3023
stop_on_failure=False,
3024
transport=None, lsprof_timed=None, bench_history=None,
3025
matching_tests_first=None,
3028
exclude_pattern=None,
3031
suite_decorators=None,
3033
result_decorators=None,
3035
"""Run a test suite for bzr selftest.
3037
:param runner_class: The class of runner to use. Must support the
3038
constructor arguments passed by run_suite which are more than standard
3040
:return: A boolean indicating success.
3042
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
3047
if runner_class is None:
3048
runner_class = TextTestRunner
3051
runner = runner_class(stream=stream,
3053
verbosity=verbosity,
3054
bench_history=bench_history,
3056
result_decorators=result_decorators,
3058
runner.stop_on_failure=stop_on_failure
3059
# built in decorator factories:
3061
random_order(random_seed, runner),
3062
exclude_tests(exclude_pattern),
3064
if matching_tests_first:
3065
decorators.append(tests_first(pattern))
3067
decorators.append(filter_tests(pattern))
3068
if suite_decorators:
3069
decorators.extend(suite_decorators)
3070
# tell the result object how many tests will be running: (except if
3071
# --parallel=fork is being used. Robert said he will provide a better
3072
# progress design later -- vila 20090817)
3073
if fork_decorator not in decorators:
3074
decorators.append(CountingDecorator)
3075
for decorator in decorators:
3076
suite = decorator(suite)
3078
# Done after test suite decoration to allow randomisation etc
3079
# to take effect, though that is of marginal benefit.
3081
stream.write("Listing tests only ...\n")
3082
for t in iter_suite_tests(suite):
3083
stream.write("%s\n" % (t.id()))
3085
result = runner.run(suite)
3087
return result.wasStrictlySuccessful()
3089
return result.wasSuccessful()
3092
# A registry where get() returns a suite decorator.
3093
parallel_registry = registry.Registry()
3096
def fork_decorator(suite):
3097
if getattr(os, "fork", None) is None:
3098
raise errors.BzrCommandError("platform does not support fork,"
3099
" try --parallel=subprocess instead.")
3100
concurrency = osutils.local_concurrency()
3101
if concurrency == 1:
3103
from testtools import ConcurrentTestSuite
3104
return ConcurrentTestSuite(suite, fork_for_tests)
3105
parallel_registry.register('fork', fork_decorator)
3108
def subprocess_decorator(suite):
3109
concurrency = osutils.local_concurrency()
3110
if concurrency == 1:
3112
from testtools import ConcurrentTestSuite
3113
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3114
parallel_registry.register('subprocess', subprocess_decorator)
3117
def exclude_tests(exclude_pattern):
3118
"""Return a test suite decorator that excludes tests."""
3119
if exclude_pattern is None:
3120
return identity_decorator
3121
def decorator(suite):
3122
return ExcludeDecorator(suite, exclude_pattern)
3126
def filter_tests(pattern):
3128
return identity_decorator
3129
def decorator(suite):
3130
return FilterTestsDecorator(suite, pattern)
3134
def random_order(random_seed, runner):
3135
"""Return a test suite decorator factory for randomising tests order.
3137
:param random_seed: now, a string which casts to a long, or a long.
3138
:param runner: A test runner with a stream attribute to report on.
3140
if random_seed is None:
3141
return identity_decorator
3142
def decorator(suite):
3143
return RandomDecorator(suite, random_seed, runner.stream)
3147
def tests_first(pattern):
3149
return identity_decorator
3150
def decorator(suite):
3151
return TestFirstDecorator(suite, pattern)
3155
def identity_decorator(suite):
3160
class TestDecorator(TestUtil.TestSuite):
3161
"""A decorator for TestCase/TestSuite objects.
3163
Usually, subclasses should override __iter__(used when flattening test
3164
suites), which we do to filter, reorder, parallelise and so on, run() and
3168
def __init__(self, suite):
3169
TestUtil.TestSuite.__init__(self)
3172
def countTestCases(self):
3175
cases += test.countTestCases()
3182
def run(self, result):
3183
# Use iteration on self, not self._tests, to allow subclasses to hook
3186
if result.shouldStop:
3192
class CountingDecorator(TestDecorator):
3193
"""A decorator which calls result.progress(self.countTestCases)."""
3195
def run(self, result):
3196
progress_method = getattr(result, 'progress', None)
3197
if callable(progress_method):
3198
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3199
return super(CountingDecorator, self).run(result)
3202
class ExcludeDecorator(TestDecorator):
3203
"""A decorator which excludes test matching an exclude pattern."""
3205
def __init__(self, suite, exclude_pattern):
3206
TestDecorator.__init__(self, suite)
3207
self.exclude_pattern = exclude_pattern
3208
self.excluded = False
3212
return iter(self._tests)
3213
self.excluded = True
3214
suite = exclude_tests_by_re(self, self.exclude_pattern)
3216
self.addTests(suite)
3217
return iter(self._tests)
3220
class FilterTestsDecorator(TestDecorator):
3221
"""A decorator which filters tests to those matching a pattern."""
3223
def __init__(self, suite, pattern):
3224
TestDecorator.__init__(self, suite)
3225
self.pattern = pattern
3226
self.filtered = False
3230
return iter(self._tests)
3231
self.filtered = True
3232
suite = filter_suite_by_re(self, self.pattern)
3234
self.addTests(suite)
3235
return iter(self._tests)
3238
class RandomDecorator(TestDecorator):
3239
"""A decorator which randomises the order of its tests."""
3241
def __init__(self, suite, random_seed, stream):
3242
TestDecorator.__init__(self, suite)
3243
self.random_seed = random_seed
3244
self.randomised = False
3245
self.stream = stream
3249
return iter(self._tests)
3250
self.randomised = True
3251
self.stream.write("Randomizing test order using seed %s\n\n" %
3252
(self.actual_seed()))
3253
# Initialise the random number generator.
3254
random.seed(self.actual_seed())
3255
suite = randomize_suite(self)
3257
self.addTests(suite)
3258
return iter(self._tests)
3260
def actual_seed(self):
3261
if self.random_seed == "now":
3262
# We convert the seed to a long to make it reuseable across
3263
# invocations (because the user can reenter it).
3264
self.random_seed = long(time.time())
3266
# Convert the seed to a long if we can
3268
self.random_seed = long(self.random_seed)
3271
return self.random_seed
3274
class TestFirstDecorator(TestDecorator):
3275
"""A decorator which moves named tests to the front."""
3277
def __init__(self, suite, pattern):
3278
TestDecorator.__init__(self, suite)
3279
self.pattern = pattern
3280
self.filtered = False
3284
return iter(self._tests)
3285
self.filtered = True
3286
suites = split_suite_by_re(self, self.pattern)
3288
self.addTests(suites)
3289
return iter(self._tests)
3292
def partition_tests(suite, count):
3293
"""Partition suite into count lists of tests."""
3294
# This just assigns tests in a round-robin fashion. On one hand this
3295
# splits up blocks of related tests that might run faster if they shared
3296
# resources, but on the other it avoids assigning blocks of slow tests to
3297
# just one partition. So the slowest partition shouldn't be much slower
3299
partitions = [list() for i in range(count)]
3300
tests = iter_suite_tests(suite)
3301
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3302
partition.append(test)
3306
def workaround_zealous_crypto_random():
3307
"""Crypto.Random want to help us being secure, but we don't care here.
3309
This workaround some test failure related to the sftp server. Once paramiko
3310
stop using the controversial API in Crypto.Random, we may get rid of it.
3313
from Crypto.Random import atfork
3319
def fork_for_tests(suite):
3320
"""Take suite and start up one runner per CPU by forking()
3322
:return: An iterable of TestCase-like objects which can each have
3323
run(result) called on them to feed tests to result.
3325
concurrency = osutils.local_concurrency()
3327
from subunit import TestProtocolClient, ProtocolTestCase
3328
from subunit.test_results import AutoTimingTestResultDecorator
3329
class TestInOtherProcess(ProtocolTestCase):
3330
# Should be in subunit, I think. RBC.
3331
def __init__(self, stream, pid):
3332
ProtocolTestCase.__init__(self, stream)
3335
def run(self, result):
3337
ProtocolTestCase.run(self, result)
3339
os.waitpid(self.pid, 0)
3341
test_blocks = partition_tests(suite, concurrency)
3342
for process_tests in test_blocks:
3343
process_suite = TestUtil.TestSuite()
3344
process_suite.addTests(process_tests)
3345
c2pread, c2pwrite = os.pipe()
3348
workaround_zealous_crypto_random()
3351
# Leave stderr and stdout open so we can see test noise
3352
# Close stdin so that the child goes away if it decides to
3353
# read from stdin (otherwise its a roulette to see what
3354
# child actually gets keystrokes for pdb etc).
3357
stream = os.fdopen(c2pwrite, 'wb', 1)
3358
subunit_result = AutoTimingTestResultDecorator(
3359
TestProtocolClient(stream))
3360
process_suite.run(subunit_result)
3365
stream = os.fdopen(c2pread, 'rb', 1)
3366
test = TestInOtherProcess(stream, pid)
3371
def reinvoke_for_tests(suite):
3372
"""Take suite and start up one runner per CPU using subprocess().
3374
:return: An iterable of TestCase-like objects which can each have
3375
run(result) called on them to feed tests to result.
3377
concurrency = osutils.local_concurrency()
3379
from subunit import ProtocolTestCase
3380
class TestInSubprocess(ProtocolTestCase):
3381
def __init__(self, process, name):
3382
ProtocolTestCase.__init__(self, process.stdout)
3383
self.process = process
3384
self.process.stdin.close()
3387
def run(self, result):
3389
ProtocolTestCase.run(self, result)
3392
os.unlink(self.name)
3393
# print "pid %d finished" % finished_process
3394
test_blocks = partition_tests(suite, concurrency)
3395
for process_tests in test_blocks:
3396
# ugly; currently reimplement rather than reuses TestCase methods.
3397
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3398
if not os.path.isfile(bzr_path):
3399
# We are probably installed. Assume sys.argv is the right file
3400
bzr_path = sys.argv[0]
3401
bzr_path = [bzr_path]
3402
if sys.platform == "win32":
3403
# if we're on windows, we can't execute the bzr script directly
3404
bzr_path = [sys.executable] + bzr_path
3405
fd, test_list_file_name = tempfile.mkstemp()
3406
test_list_file = os.fdopen(fd, 'wb', 1)
3407
for test in process_tests:
3408
test_list_file.write(test.id() + '\n')
3409
test_list_file.close()
3411
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3413
if '--no-plugins' in sys.argv:
3414
argv.append('--no-plugins')
3415
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3416
# noise on stderr it can interrupt the subunit protocol.
3417
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3418
stdout=subprocess.PIPE,
3419
stderr=subprocess.PIPE,
3421
test = TestInSubprocess(process, test_list_file_name)
3424
os.unlink(test_list_file_name)
3429
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3430
"""Generate profiling data for all activity between start and success.
3432
The profile data is appended to the test's _benchcalls attribute and can
3433
be accessed by the forwarded-to TestResult.
3435
While it might be cleaner do accumulate this in stopTest, addSuccess is
3436
where our existing output support for lsprof is, and this class aims to
3437
fit in with that: while it could be moved it's not necessary to accomplish
3438
test profiling, nor would it be dramatically cleaner.
3441
def startTest(self, test):
3442
self.profiler = bzrlib.lsprof.BzrProfiler()
3443
# Prevent deadlocks in tests that use lsprof: those tests will
3445
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3446
self.profiler.start()
3447
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3449
def addSuccess(self, test):
3450
stats = self.profiler.stop()
3452
calls = test._benchcalls
3453
except AttributeError:
3454
test._benchcalls = []
3455
calls = test._benchcalls
3456
calls.append(((test.id(), "", ""), stats))
3457
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3459
def stopTest(self, test):
3460
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3461
self.profiler = None
3464
# Controlled by "bzr selftest -E=..." option
3465
# Currently supported:
3466
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3467
# preserves any flags supplied at the command line.
3468
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3469
# rather than failing tests. And no longer raise
3470
# LockContention when fctnl locks are not being used
3471
# with proper exclusion rules.
3472
# -Ethreads Will display thread ident at creation/join time to
3473
# help track thread leaks
3474
selftest_debug_flags = set()
3477
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3479
test_suite_factory=None,
3482
matching_tests_first=None,
3485
exclude_pattern=None,
3491
suite_decorators=None,
3495
"""Run the whole test suite under the enhanced runner"""
3496
# XXX: Very ugly way to do this...
3497
# Disable warning about old formats because we don't want it to disturb
3498
# any blackbox tests.
3499
from bzrlib import repository
3500
repository._deprecation_warning_done = True
3502
global default_transport
3503
if transport is None:
3504
transport = default_transport
3505
old_transport = default_transport
3506
default_transport = transport
3507
global selftest_debug_flags
3508
old_debug_flags = selftest_debug_flags
3509
if debug_flags is not None:
3510
selftest_debug_flags = set(debug_flags)
3512
if load_list is None:
3515
keep_only = load_test_id_list(load_list)
3517
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3518
for start in starting_with]
3519
if test_suite_factory is None:
3520
# Reduce loading time by loading modules based on the starting_with
3522
suite = test_suite(keep_only, starting_with)
3524
suite = test_suite_factory()
3526
# But always filter as requested.
3527
suite = filter_suite_by_id_startswith(suite, starting_with)
3528
result_decorators = []
3530
result_decorators.append(ProfileResult)
3531
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3532
stop_on_failure=stop_on_failure,
3533
transport=transport,
3534
lsprof_timed=lsprof_timed,
3535
bench_history=bench_history,
3536
matching_tests_first=matching_tests_first,
3537
list_only=list_only,
3538
random_seed=random_seed,
3539
exclude_pattern=exclude_pattern,
3541
runner_class=runner_class,
3542
suite_decorators=suite_decorators,
3544
result_decorators=result_decorators,
3547
default_transport = old_transport
3548
selftest_debug_flags = old_debug_flags
3551
def load_test_id_list(file_name):
3552
"""Load a test id list from a text file.
3554
The format is one test id by line. No special care is taken to impose
3555
strict rules, these test ids are used to filter the test suite so a test id
3556
that do not match an existing test will do no harm. This allows user to add
3557
comments, leave blank lines, etc.
3561
ftest = open(file_name, 'rt')
3563
if e.errno != errno.ENOENT:
3566
raise errors.NoSuchFile(file_name)
3568
for test_name in ftest.readlines():
3569
test_list.append(test_name.strip())
3574
def suite_matches_id_list(test_suite, id_list):
3575
"""Warns about tests not appearing or appearing more than once.
3577
:param test_suite: A TestSuite object.
3578
:param test_id_list: The list of test ids that should be found in
3581
:return: (absents, duplicates) absents is a list containing the test found
3582
in id_list but not in test_suite, duplicates is a list containing the
3583
test found multiple times in test_suite.
3585
When using a prefined test id list, it may occurs that some tests do not
3586
exist anymore or that some tests use the same id. This function warns the
3587
tester about potential problems in his workflow (test lists are volatile)
3588
or in the test suite itself (using the same id for several tests does not
3589
help to localize defects).
3591
# Build a dict counting id occurrences
3593
for test in iter_suite_tests(test_suite):
3595
tests[id] = tests.get(id, 0) + 1
3600
occurs = tests.get(id, 0)
3602
not_found.append(id)
3604
duplicates.append(id)
3606
return not_found, duplicates
3609
class TestIdList(object):
3610
"""Test id list to filter a test suite.
3612
Relying on the assumption that test ids are built as:
3613
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3614
notation, this class offers methods to :
3615
- avoid building a test suite for modules not refered to in the test list,
3616
- keep only the tests listed from the module test suite.
3619
def __init__(self, test_id_list):
3620
# When a test suite needs to be filtered against us we compare test ids
3621
# for equality, so a simple dict offers a quick and simple solution.
3622
self.tests = dict().fromkeys(test_id_list, True)
3624
# While unittest.TestCase have ids like:
3625
# <module>.<class>.<method>[(<param+)],
3626
# doctest.DocTestCase can have ids like:
3629
# <module>.<function>
3630
# <module>.<class>.<method>
3632
# Since we can't predict a test class from its name only, we settle on
3633
# a simple constraint: a test id always begins with its module name.
3636
for test_id in test_id_list:
3637
parts = test_id.split('.')
3638
mod_name = parts.pop(0)
3639
modules[mod_name] = True
3641
mod_name += '.' + part
3642
modules[mod_name] = True
3643
self.modules = modules
3645
def refers_to(self, module_name):
3646
"""Is there tests for the module or one of its sub modules."""
3647
return self.modules.has_key(module_name)
3649
def includes(self, test_id):
3650
return self.tests.has_key(test_id)
3653
class TestPrefixAliasRegistry(registry.Registry):
3654
"""A registry for test prefix aliases.
3656
This helps implement shorcuts for the --starting-with selftest
3657
option. Overriding existing prefixes is not allowed but not fatal (a
3658
warning will be emitted).
3661
def register(self, key, obj, help=None, info=None,
3662
override_existing=False):
3663
"""See Registry.register.
3665
Trying to override an existing alias causes a warning to be emitted,
3666
not a fatal execption.
3669
super(TestPrefixAliasRegistry, self).register(
3670
key, obj, help=help, info=info, override_existing=False)
3672
actual = self.get(key)
3674
'Test prefix alias %s is already used for %s, ignoring %s'
3675
% (key, actual, obj))
3677
def resolve_alias(self, id_start):
3678
"""Replace the alias by the prefix in the given string.
3680
Using an unknown prefix is an error to help catching typos.
3682
parts = id_start.split('.')
3684
parts[0] = self.get(parts[0])
3686
raise errors.BzrCommandError(
3687
'%s is not a known test prefix alias' % parts[0])
3688
return '.'.join(parts)
3691
test_prefix_alias_registry = TestPrefixAliasRegistry()
3692
"""Registry of test prefix aliases."""
3695
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3696
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3697
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3699
# Obvious highest levels prefixes, feel free to add your own via a plugin
3700
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3701
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3702
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3703
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3704
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3707
def _test_suite_testmod_names():
3708
"""Return the standard list of test module names to test."""
3711
'bzrlib.tests.blackbox',
3712
'bzrlib.tests.commands',
3713
'bzrlib.tests.doc_generate',
3714
'bzrlib.tests.per_branch',
3715
'bzrlib.tests.per_bzrdir',
3716
'bzrlib.tests.per_controldir',
3717
'bzrlib.tests.per_controldir_colo',
3718
'bzrlib.tests.per_foreign_vcs',
3719
'bzrlib.tests.per_interrepository',
3720
'bzrlib.tests.per_intertree',
3721
'bzrlib.tests.per_inventory',
3722
'bzrlib.tests.per_interbranch',
3723
'bzrlib.tests.per_lock',
3724
'bzrlib.tests.per_merger',
3725
'bzrlib.tests.per_transport',
3726
'bzrlib.tests.per_tree',
3727
'bzrlib.tests.per_pack_repository',
3728
'bzrlib.tests.per_repository',
3729
'bzrlib.tests.per_repository_chk',
3730
'bzrlib.tests.per_repository_reference',
3731
'bzrlib.tests.per_repository_vf',
3732
'bzrlib.tests.per_uifactory',
3733
'bzrlib.tests.per_versionedfile',
3734
'bzrlib.tests.per_workingtree',
3735
'bzrlib.tests.test__annotator',
3736
'bzrlib.tests.test__bencode',
3737
'bzrlib.tests.test__btree_serializer',
3738
'bzrlib.tests.test__chk_map',
3739
'bzrlib.tests.test__dirstate_helpers',
3740
'bzrlib.tests.test__groupcompress',
3741
'bzrlib.tests.test__known_graph',
3742
'bzrlib.tests.test__rio',
3743
'bzrlib.tests.test__simple_set',
3744
'bzrlib.tests.test__static_tuple',
3745
'bzrlib.tests.test__walkdirs_win32',
3746
'bzrlib.tests.test_ancestry',
3747
'bzrlib.tests.test_annotate',
3748
'bzrlib.tests.test_api',
3749
'bzrlib.tests.test_atomicfile',
3750
'bzrlib.tests.test_bad_files',
3751
'bzrlib.tests.test_bisect_multi',
3752
'bzrlib.tests.test_branch',
3753
'bzrlib.tests.test_branchbuilder',
3754
'bzrlib.tests.test_btree_index',
3755
'bzrlib.tests.test_bugtracker',
3756
'bzrlib.tests.test_bundle',
3757
'bzrlib.tests.test_bzrdir',
3758
'bzrlib.tests.test__chunks_to_lines',
3759
'bzrlib.tests.test_cache_utf8',
3760
'bzrlib.tests.test_chk_map',
3761
'bzrlib.tests.test_chk_serializer',
3762
'bzrlib.tests.test_chunk_writer',
3763
'bzrlib.tests.test_clean_tree',
3764
'bzrlib.tests.test_cleanup',
3765
'bzrlib.tests.test_cmdline',
3766
'bzrlib.tests.test_commands',
3767
'bzrlib.tests.test_commit',
3768
'bzrlib.tests.test_commit_merge',
3769
'bzrlib.tests.test_config',
3770
'bzrlib.tests.test_conflicts',
3771
'bzrlib.tests.test_controldir',
3772
'bzrlib.tests.test_counted_lock',
3773
'bzrlib.tests.test_crash',
3774
'bzrlib.tests.test_decorators',
3775
'bzrlib.tests.test_delta',
3776
'bzrlib.tests.test_debug',
3777
'bzrlib.tests.test_deprecated_graph',
3778
'bzrlib.tests.test_diff',
3779
'bzrlib.tests.test_directory_service',
3780
'bzrlib.tests.test_dirstate',
3781
'bzrlib.tests.test_email_message',
3782
'bzrlib.tests.test_eol_filters',
3783
'bzrlib.tests.test_errors',
3784
'bzrlib.tests.test_export',
3785
'bzrlib.tests.test_extract',
3786
'bzrlib.tests.test_fetch',
3787
'bzrlib.tests.test_fixtures',
3788
'bzrlib.tests.test_fifo_cache',
3789
'bzrlib.tests.test_filters',
3790
'bzrlib.tests.test_ftp_transport',
3791
'bzrlib.tests.test_foreign',
3792
'bzrlib.tests.test_generate_docs',
3793
'bzrlib.tests.test_generate_ids',
3794
'bzrlib.tests.test_globbing',
3795
'bzrlib.tests.test_gpg',
3796
'bzrlib.tests.test_graph',
3797
'bzrlib.tests.test_groupcompress',
3798
'bzrlib.tests.test_hashcache',
3799
'bzrlib.tests.test_help',
3800
'bzrlib.tests.test_hooks',
3801
'bzrlib.tests.test_http',
3802
'bzrlib.tests.test_http_response',
3803
'bzrlib.tests.test_https_ca_bundle',
3804
'bzrlib.tests.test_identitymap',
3805
'bzrlib.tests.test_ignores',
3806
'bzrlib.tests.test_index',
3807
'bzrlib.tests.test_import_tariff',
3808
'bzrlib.tests.test_info',
3809
'bzrlib.tests.test_inv',
3810
'bzrlib.tests.test_inventory_delta',
3811
'bzrlib.tests.test_knit',
3812
'bzrlib.tests.test_lazy_import',
3813
'bzrlib.tests.test_lazy_regex',
3814
'bzrlib.tests.test_library_state',
3815
'bzrlib.tests.test_lock',
3816
'bzrlib.tests.test_lockable_files',
3817
'bzrlib.tests.test_lockdir',
3818
'bzrlib.tests.test_log',
3819
'bzrlib.tests.test_lru_cache',
3820
'bzrlib.tests.test_lsprof',
3821
'bzrlib.tests.test_mail_client',
3822
'bzrlib.tests.test_matchers',
3823
'bzrlib.tests.test_memorytree',
3824
'bzrlib.tests.test_merge',
3825
'bzrlib.tests.test_merge3',
3826
'bzrlib.tests.test_merge_core',
3827
'bzrlib.tests.test_merge_directive',
3828
'bzrlib.tests.test_mergetools',
3829
'bzrlib.tests.test_missing',
3830
'bzrlib.tests.test_msgeditor',
3831
'bzrlib.tests.test_multiparent',
3832
'bzrlib.tests.test_mutabletree',
3833
'bzrlib.tests.test_nonascii',
3834
'bzrlib.tests.test_options',
3835
'bzrlib.tests.test_osutils',
3836
'bzrlib.tests.test_osutils_encodings',
3837
'bzrlib.tests.test_pack',
3838
'bzrlib.tests.test_patch',
3839
'bzrlib.tests.test_patches',
3840
'bzrlib.tests.test_permissions',
3841
'bzrlib.tests.test_plugins',
3842
'bzrlib.tests.test_progress',
3843
'bzrlib.tests.test_pyutils',
3844
'bzrlib.tests.test_read_bundle',
3845
'bzrlib.tests.test_reconcile',
3846
'bzrlib.tests.test_reconfigure',
3847
'bzrlib.tests.test_registry',
3848
'bzrlib.tests.test_remote',
3849
'bzrlib.tests.test_rename_map',
3850
'bzrlib.tests.test_repository',
3851
'bzrlib.tests.test_revert',
3852
'bzrlib.tests.test_revision',
3853
'bzrlib.tests.test_revisionspec',
3854
'bzrlib.tests.test_revisiontree',
3855
'bzrlib.tests.test_rio',
3856
'bzrlib.tests.test_rules',
3857
'bzrlib.tests.test_sampler',
3858
'bzrlib.tests.test_scenarios',
3859
'bzrlib.tests.test_script',
3860
'bzrlib.tests.test_selftest',
3861
'bzrlib.tests.test_serializer',
3862
'bzrlib.tests.test_setup',
3863
'bzrlib.tests.test_sftp_transport',
3864
'bzrlib.tests.test_shelf',
3865
'bzrlib.tests.test_shelf_ui',
3866
'bzrlib.tests.test_smart',
3867
'bzrlib.tests.test_smart_add',
3868
'bzrlib.tests.test_smart_request',
3869
'bzrlib.tests.test_smart_transport',
3870
'bzrlib.tests.test_smtp_connection',
3871
'bzrlib.tests.test_source',
3872
'bzrlib.tests.test_ssh_transport',
3873
'bzrlib.tests.test_status',
3874
'bzrlib.tests.test_store',
3875
'bzrlib.tests.test_strace',
3876
'bzrlib.tests.test_subsume',
3877
'bzrlib.tests.test_switch',
3878
'bzrlib.tests.test_symbol_versioning',
3879
'bzrlib.tests.test_tag',
3880
'bzrlib.tests.test_test_server',
3881
'bzrlib.tests.test_testament',
3882
'bzrlib.tests.test_textfile',
3883
'bzrlib.tests.test_textmerge',
3884
'bzrlib.tests.test_cethread',
3885
'bzrlib.tests.test_timestamp',
3886
'bzrlib.tests.test_trace',
3887
'bzrlib.tests.test_transactions',
3888
'bzrlib.tests.test_transform',
3889
'bzrlib.tests.test_transport',
3890
'bzrlib.tests.test_transport_log',
3891
'bzrlib.tests.test_tree',
3892
'bzrlib.tests.test_treebuilder',
3893
'bzrlib.tests.test_treeshape',
3894
'bzrlib.tests.test_tsort',
3895
'bzrlib.tests.test_tuned_gzip',
3896
'bzrlib.tests.test_ui',
3897
'bzrlib.tests.test_uncommit',
3898
'bzrlib.tests.test_upgrade',
3899
'bzrlib.tests.test_upgrade_stacked',
3900
'bzrlib.tests.test_urlutils',
3901
'bzrlib.tests.test_version',
3902
'bzrlib.tests.test_version_info',
3903
'bzrlib.tests.test_versionedfile',
3904
'bzrlib.tests.test_weave',
3905
'bzrlib.tests.test_whitebox',
3906
'bzrlib.tests.test_win32utils',
3907
'bzrlib.tests.test_workingtree',
3908
'bzrlib.tests.test_workingtree_4',
3909
'bzrlib.tests.test_wsgi',
3910
'bzrlib.tests.test_xml',
3914
def _test_suite_modules_to_doctest():
3915
"""Return the list of modules to doctest."""
3917
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3921
'bzrlib.branchbuilder',
3922
'bzrlib.decorators',
3924
'bzrlib.iterablefile',
3929
'bzrlib.symbol_versioning',
3931
'bzrlib.tests.fixtures',
3933
'bzrlib.transport.http',
3934
'bzrlib.version_info_formats.format_custom',
3938
def test_suite(keep_only=None, starting_with=None):
3939
"""Build and return TestSuite for the whole of bzrlib.
3941
:param keep_only: A list of test ids limiting the suite returned.
3943
:param starting_with: An id limiting the suite returned to the tests
3946
This function can be replaced if you need to change the default test
3947
suite on a global basis, but it is not encouraged.
3950
loader = TestUtil.TestLoader()
3952
if keep_only is not None:
3953
id_filter = TestIdList(keep_only)
3955
# We take precedence over keep_only because *at loading time* using
3956
# both options means we will load less tests for the same final result.
3957
def interesting_module(name):
3958
for start in starting_with:
3960
# Either the module name starts with the specified string
3961
name.startswith(start)
3962
# or it may contain tests starting with the specified string
3963
or start.startswith(name)
3967
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3969
elif keep_only is not None:
3970
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3971
def interesting_module(name):
3972
return id_filter.refers_to(name)
3975
loader = TestUtil.TestLoader()
3976
def interesting_module(name):
3977
# No filtering, all modules are interesting
3980
suite = loader.suiteClass()
3982
# modules building their suite with loadTestsFromModuleNames
3983
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3985
for mod in _test_suite_modules_to_doctest():
3986
if not interesting_module(mod):
3987
# No tests to keep here, move along
3990
# note that this really does mean "report only" -- doctest
3991
# still runs the rest of the examples
3992
doc_suite = IsolatedDocTestSuite(
3993
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3994
except ValueError, e:
3995
print '**failed to get doctest for: %s\n%s' % (mod, e)
3997
if len(doc_suite._tests) == 0:
3998
raise errors.BzrError("no doctests found in %s" % (mod,))
3999
suite.addTest(doc_suite)
4001
default_encoding = sys.getdefaultencoding()
4002
for name, plugin in _mod_plugin.plugins().items():
4003
if not interesting_module(plugin.module.__name__):
4005
plugin_suite = plugin.test_suite()
4006
# We used to catch ImportError here and turn it into just a warning,
4007
# but really if you don't have --no-plugins this should be a failure.
4008
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4009
if plugin_suite is None:
4010
plugin_suite = plugin.load_plugin_tests(loader)
4011
if plugin_suite is not None:
4012
suite.addTest(plugin_suite)
4013
if default_encoding != sys.getdefaultencoding():
4015
'Plugin "%s" tried to reset default encoding to: %s', name,
4016
sys.getdefaultencoding())
4018
sys.setdefaultencoding(default_encoding)
4020
if keep_only is not None:
4021
# Now that the referred modules have loaded their tests, keep only the
4023
suite = filter_suite_by_id_list(suite, id_filter)
4024
# Do some sanity checks on the id_list filtering
4025
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4027
# The tester has used both keep_only and starting_with, so he is
4028
# already aware that some tests are excluded from the list, there
4029
# is no need to tell him which.
4032
# Some tests mentioned in the list are not in the test suite. The
4033
# list may be out of date, report to the tester.
4034
for id in not_found:
4035
trace.warning('"%s" not found in the test suite', id)
4036
for id in duplicates:
4037
trace.warning('"%s" is used as an id by several tests', id)
4042
def multiply_scenarios(*scenarios):
4043
"""Multiply two or more iterables of scenarios.
4045
It is safe to pass scenario generators or iterators.
4047
:returns: A list of compound scenarios: the cross-product of all
4048
scenarios, with the names concatenated and the parameters
4051
return reduce(_multiply_two_scenarios, map(list, scenarios))
4054
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4055
"""Multiply two sets of scenarios.
4057
:returns: the cartesian product of the two sets of scenarios, that is
4058
a scenario for every possible combination of a left scenario and a
4062
('%s,%s' % (left_name, right_name),
4063
dict(left_dict.items() + right_dict.items()))
4064
for left_name, left_dict in scenarios_left
4065
for right_name, right_dict in scenarios_right]
4068
def multiply_tests(tests, scenarios, result):
4069
"""Multiply tests_list by scenarios into result.
4071
This is the core workhorse for test parameterisation.
4073
Typically the load_tests() method for a per-implementation test suite will
4074
call multiply_tests and return the result.
4076
:param tests: The tests to parameterise.
4077
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4078
scenario_param_dict).
4079
:param result: A TestSuite to add created tests to.
4081
This returns the passed in result TestSuite with the cross product of all
4082
the tests repeated once for each scenario. Each test is adapted by adding
4083
the scenario name at the end of its id(), and updating the test object's
4084
__dict__ with the scenario_param_dict.
4086
>>> import bzrlib.tests.test_sampler
4087
>>> r = multiply_tests(
4088
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4089
... [('one', dict(param=1)),
4090
... ('two', dict(param=2))],
4091
... TestUtil.TestSuite())
4092
>>> tests = list(iter_suite_tests(r))
4096
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4102
for test in iter_suite_tests(tests):
4103
apply_scenarios(test, scenarios, result)
4107
def apply_scenarios(test, scenarios, result):
4108
"""Apply the scenarios in scenarios to test and add to result.
4110
:param test: The test to apply scenarios to.
4111
:param scenarios: An iterable of scenarios to apply to test.
4113
:seealso: apply_scenario
4115
for scenario in scenarios:
4116
result.addTest(apply_scenario(test, scenario))
4120
def apply_scenario(test, scenario):
4121
"""Copy test and apply scenario to it.
4123
:param test: A test to adapt.
4124
:param scenario: A tuple describing the scenarion.
4125
The first element of the tuple is the new test id.
4126
The second element is a dict containing attributes to set on the
4128
:return: The adapted test.
4130
new_id = "%s(%s)" % (test.id(), scenario[0])
4131
new_test = clone_test(test, new_id)
4132
for name, value in scenario[1].items():
4133
setattr(new_test, name, value)
4137
def clone_test(test, new_id):
4138
"""Clone a test giving it a new id.
4140
:param test: The test to clone.
4141
:param new_id: The id to assign to it.
4142
:return: The new test.
4144
new_test = copy.copy(test)
4145
new_test.id = lambda: new_id
4146
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4147
# causes cloned tests to share the 'details' dict. This makes it hard to
4148
# read the test output for parameterized tests, because tracebacks will be
4149
# associated with irrelevant tests.
4151
details = new_test._TestCase__details
4152
except AttributeError:
4153
# must be a different version of testtools than expected. Do nothing.
4156
# Reset the '__details' dict.
4157
new_test._TestCase__details = {}
4161
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4163
"""Helper for permutating tests against an extension module.
4165
This is meant to be used inside a modules 'load_tests()' function. It will
4166
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4167
against both implementations. Setting 'test.module' to the appropriate
4168
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4170
:param standard_tests: A test suite to permute
4171
:param loader: A TestLoader
4172
:param py_module_name: The python path to a python module that can always
4173
be loaded, and will be considered the 'python' implementation. (eg
4174
'bzrlib._chk_map_py')
4175
:param ext_module_name: The python path to an extension module. If the
4176
module cannot be loaded, a single test will be added, which notes that
4177
the module is not available. If it can be loaded, all standard_tests
4178
will be run against that module.
4179
:return: (suite, feature) suite is a test-suite that has all the permuted
4180
tests. feature is the Feature object that can be used to determine if
4181
the module is available.
4184
py_module = pyutils.get_named_object(py_module_name)
4186
('python', {'module': py_module}),
4188
suite = loader.suiteClass()
4189
feature = ModuleAvailableFeature(ext_module_name)
4190
if feature.available():
4191
scenarios.append(('C', {'module': feature.module}))
4193
# the compiled module isn't available, so we add a failing test
4194
class FailWithoutFeature(TestCase):
4195
def test_fail(self):
4196
self.requireFeature(feature)
4197
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4198
result = multiply_tests(standard_tests, scenarios, suite)
4199
return result, feature
4202
def _rmtree_temp_dir(dirname, test_id=None):
4203
# If LANG=C we probably have created some bogus paths
4204
# which rmtree(unicode) will fail to delete
4205
# so make sure we are using rmtree(str) to delete everything
4206
# except on win32, where rmtree(str) will fail
4207
# since it doesn't have the property of byte-stream paths
4208
# (they are either ascii or mbcs)
4209
if sys.platform == 'win32':
4210
# make sure we are using the unicode win32 api
4211
dirname = unicode(dirname)
4213
dirname = dirname.encode(sys.getfilesystemencoding())
4215
osutils.rmtree(dirname)
4217
# We don't want to fail here because some useful display will be lost
4218
# otherwise. Polluting the tmp dir is bad, but not giving all the
4219
# possible info to the test runner is even worse.
4221
ui.ui_factory.clear_term()
4222
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4223
# Ugly, but the last thing we want here is fail, so bear with it.
4224
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4225
).encode('ascii', 'replace')
4226
sys.stderr.write('Unable to remove testing dir %s\n%s'
4227
% (os.path.basename(dirname), printable_e))
4230
class Feature(object):
4231
"""An operating system Feature."""
4234
self._available = None
4236
def available(self):
4237
"""Is the feature available?
4239
:return: True if the feature is available.
4241
if self._available is None:
4242
self._available = self._probe()
4243
return self._available
4246
"""Implement this method in concrete features.
4248
:return: True if the feature is available.
4250
raise NotImplementedError
4253
if getattr(self, 'feature_name', None):
4254
return self.feature_name()
4255
return self.__class__.__name__
4258
class _SymlinkFeature(Feature):
4261
return osutils.has_symlinks()
4263
def feature_name(self):
4266
SymlinkFeature = _SymlinkFeature()
4269
class _HardlinkFeature(Feature):
4272
return osutils.has_hardlinks()
4274
def feature_name(self):
4277
HardlinkFeature = _HardlinkFeature()
4280
class _OsFifoFeature(Feature):
4283
return getattr(os, 'mkfifo', None)
4285
def feature_name(self):
4286
return 'filesystem fifos'
4288
OsFifoFeature = _OsFifoFeature()
4291
class _UnicodeFilenameFeature(Feature):
4292
"""Does the filesystem support Unicode filenames?"""
4296
# Check for character combinations unlikely to be covered by any
4297
# single non-unicode encoding. We use the characters
4298
# - greek small letter alpha (U+03B1) and
4299
# - braille pattern dots-123456 (U+283F).
4300
os.stat(u'\u03b1\u283f')
4301
except UnicodeEncodeError:
4303
except (IOError, OSError):
4304
# The filesystem allows the Unicode filename but the file doesn't
4308
# The filesystem allows the Unicode filename and the file exists,
4312
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4315
class _CompatabilityThunkFeature(Feature):
4316
"""This feature is just a thunk to another feature.
4318
It issues a deprecation warning if it is accessed, to let you know that you
4319
should really use a different feature.
4322
def __init__(self, dep_version, module, name,
4323
replacement_name, replacement_module=None):
4324
super(_CompatabilityThunkFeature, self).__init__()
4325
self._module = module
4326
if replacement_module is None:
4327
replacement_module = module
4328
self._replacement_module = replacement_module
4330
self._replacement_name = replacement_name
4331
self._dep_version = dep_version
4332
self._feature = None
4335
if self._feature is None:
4336
depr_msg = self._dep_version % ('%s.%s'
4337
% (self._module, self._name))
4338
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4339
self._replacement_name)
4340
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4341
# Import the new feature and use it as a replacement for the
4343
self._feature = pyutils.get_named_object(
4344
self._replacement_module, self._replacement_name)
4348
return self._feature._probe()
4351
class ModuleAvailableFeature(Feature):
4352
"""This is a feature than describes a module we want to be available.
4354
Declare the name of the module in __init__(), and then after probing, the
4355
module will be available as 'self.module'.
4357
:ivar module: The module if it is available, else None.
4360
def __init__(self, module_name):
4361
super(ModuleAvailableFeature, self).__init__()
4362
self.module_name = module_name
4366
self._module = __import__(self.module_name, {}, {}, [''])
4373
if self.available(): # Make sure the probe has been done
4377
def feature_name(self):
4378
return self.module_name
4381
def probe_unicode_in_user_encoding():
4382
"""Try to encode several unicode strings to use in unicode-aware tests.
4383
Return first successfull match.
4385
:return: (unicode value, encoded plain string value) or (None, None)
4387
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4388
for uni_val in possible_vals:
4390
str_val = uni_val.encode(osutils.get_user_encoding())
4391
except UnicodeEncodeError:
4392
# Try a different character
4395
return uni_val, str_val
4399
def probe_bad_non_ascii(encoding):
4400
"""Try to find [bad] character with code [128..255]
4401
that cannot be decoded to unicode in some encoding.
4402
Return None if all non-ascii characters is valid
4405
for i in xrange(128, 256):
4408
char.decode(encoding)
4409
except UnicodeDecodeError:
4414
class _HTTPSServerFeature(Feature):
4415
"""Some tests want an https Server, check if one is available.
4417
Right now, the only way this is available is under python2.6 which provides
4428
def feature_name(self):
4429
return 'HTTPSServer'
4432
HTTPSServerFeature = _HTTPSServerFeature()
4435
class _UnicodeFilename(Feature):
4436
"""Does the filesystem support Unicode filenames?"""
4441
except UnicodeEncodeError:
4443
except (IOError, OSError):
4444
# The filesystem allows the Unicode filename but the file doesn't
4448
# The filesystem allows the Unicode filename and the file exists,
4452
UnicodeFilename = _UnicodeFilename()
4455
class _ByteStringNamedFilesystem(Feature):
4456
"""Is the filesystem based on bytes?"""
4459
if os.name == "posix":
4463
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4466
class _UTF8Filesystem(Feature):
4467
"""Is the filesystem UTF-8?"""
4470
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4474
UTF8Filesystem = _UTF8Filesystem()
4477
class _BreakinFeature(Feature):
4478
"""Does this platform support the breakin feature?"""
4481
from bzrlib import breakin
4482
if breakin.determine_signal() is None:
4484
if sys.platform == 'win32':
4485
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4486
# We trigger SIGBREAK via a Console api so we need ctypes to
4487
# access the function
4494
def feature_name(self):
4495
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4498
BreakinFeature = _BreakinFeature()
4501
class _CaseInsCasePresFilenameFeature(Feature):
4502
"""Is the file-system case insensitive, but case-preserving?"""
4505
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4507
# first check truly case-preserving for created files, then check
4508
# case insensitive when opening existing files.
4509
name = osutils.normpath(name)
4510
base, rel = osutils.split(name)
4511
found_rel = osutils.canonical_relpath(base, name)
4512
return (found_rel == rel
4513
and os.path.isfile(name.upper())
4514
and os.path.isfile(name.lower()))
4519
def feature_name(self):
4520
return "case-insensitive case-preserving filesystem"
4522
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4525
class _CaseInsensitiveFilesystemFeature(Feature):
4526
"""Check if underlying filesystem is case-insensitive but *not* case
4529
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4530
# more likely to be case preserving, so this case is rare.
4533
if CaseInsCasePresFilenameFeature.available():
4536
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4537
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4538
TestCaseWithMemoryTransport.TEST_ROOT = root
4540
root = TestCaseWithMemoryTransport.TEST_ROOT
4541
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4543
name_a = osutils.pathjoin(tdir, 'a')
4544
name_A = osutils.pathjoin(tdir, 'A')
4546
result = osutils.isdir(name_A)
4547
_rmtree_temp_dir(tdir)
4550
def feature_name(self):
4551
return 'case-insensitive filesystem'
4553
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4556
class _CaseSensitiveFilesystemFeature(Feature):
4559
if CaseInsCasePresFilenameFeature.available():
4561
elif CaseInsensitiveFilesystemFeature.available():
4566
def feature_name(self):
4567
return 'case-sensitive filesystem'
4569
# new coding style is for feature instances to be lowercase
4570
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4573
# Only define SubUnitBzrRunner if subunit is available.
4575
from subunit import TestProtocolClient
4576
from subunit.test_results import AutoTimingTestResultDecorator
4577
class SubUnitBzrProtocolClient(TestProtocolClient):
4579
def addSuccess(self, test, details=None):
4580
# The subunit client always includes the details in the subunit
4581
# stream, but we don't want to include it in ours.
4582
if details is not None and 'log' in details:
4584
return super(SubUnitBzrProtocolClient, self).addSuccess(
4587
class SubUnitBzrRunner(TextTestRunner):
4588
def run(self, test):
4589
result = AutoTimingTestResultDecorator(
4590
SubUnitBzrProtocolClient(self.stream))
4596
class _PosixPermissionsFeature(Feature):
4600
# create temporary file and check if specified perms are maintained.
4603
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4604
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4607
os.chmod(name, write_perms)
4609
read_perms = os.stat(name).st_mode & 0777
4611
return (write_perms == read_perms)
4613
return (os.name == 'posix') and has_perms()
4615
def feature_name(self):
4616
return 'POSIX permissions support'
4618
posix_permissions_feature = _PosixPermissionsFeature()