1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
# TODO: Perhaps there should be an API to find out if bzr running under the
20
# test suite -- some plugins might want to avoid making intrusive changes if
21
# this is the case. However, we want behaviour under to test to diverge as
22
# little as possible, so this should be used rarely if it's added at all.
23
# (Suggestion from j-a-meinel, 2005-11-24)
25
# NOTE: Some classes in here use camelCaseNaming() rather than
26
# underscore_naming(). That's for consistency with unittest; it's not the
27
# general style of bzrlib. Please continue that consistency when adding e.g.
28
# new assertFoo() methods.
33
from cStringIO import StringIO
56
# nb: check this before importing anything else from within it
57
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 5):
59
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
% (testtools.__file__, _testtools_version))
61
from testtools import content
68
commands as _mod_commands,
77
plugin as _mod_plugin,
84
transport as _mod_transport,
90
# lsprof not available
92
from bzrlib.smart import client, request
93
from bzrlib.transport import (
97
from bzrlib.tests import (
102
from bzrlib.ui import NullProgressView
103
from bzrlib.ui.text import TextUIFactory
105
# Mark this python module as being part of the implementation
106
# of unittest: this gives us better tracebacks where the last
107
# shown frame is the test code, not our assertXYZ.
110
default_transport = test_server.LocalURLServer
113
_unitialized_attr = object()
114
"""A sentinel needed to act as a default value in a method signature."""
117
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
# These are intentionally brought into this namespace. That way plugins, etc
122
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
123
TestSuite = TestUtil.TestSuite
124
TestLoader = TestUtil.TestLoader
126
# Tests should run in a clean and clearly defined environment. The goal is to
127
# keep them isolated from the running environment as mush as possible. The test
128
# framework ensures the variables defined below are set (or deleted if the
129
# value is None) before a test is run and reset to their original value after
130
# the test is run. Generally if some code depends on an environment variable,
131
# the tests should start without this variable in the environment. There are a
132
# few exceptions but you shouldn't violate this rule lightly.
136
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
# tests do check our impls match APPDATA
138
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
142
'BZREMAIL': None, # may still be present in the environment
143
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
'BZR_PROGRESS_BAR': None,
146
'BZR_PLUGIN_PATH': None,
147
'BZR_DISABLE_PLUGINS': None,
148
'BZR_PLUGINS_AT': None,
149
'BZR_CONCURRENCY': None,
150
# Make sure that any text ui tests are consistent regardless of
151
# the environment the test case is run in; you may want tests that
152
# test other combinations. 'dumb' is a reasonable guess for tests
153
# going to a pipe or a StringIO.
159
'SSH_AUTH_SOCK': None,
169
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
170
# least. If you do (care), please update this comment
174
'BZR_REMOTE_PATH': None,
175
# Generally speaking, we don't want apport reporting on crashes in
176
# the test envirnoment unless we're specifically testing apport,
177
# so that it doesn't leak into the real system environment. We
178
# use an env var so it propagates to subprocesses.
179
'APPORT_DISABLE': '1',
183
def override_os_environ(test, env=None):
184
"""Modify os.environ keeping a copy.
186
:param test: A test instance
188
:param env: A dict containing variable definitions to be installed
191
env = isolated_environ
192
test._original_os_environ = dict([(var, value)
193
for var, value in os.environ.iteritems()])
194
for var, value in env.iteritems():
195
osutils.set_or_unset_env(var, value)
196
if var not in test._original_os_environ:
197
# The var is new, add it with a value of None, so
198
# restore_os_environ will delete it
199
test._original_os_environ[var] = None
202
def restore_os_environ(test):
203
"""Restore os.environ to its original state.
205
:param test: A test instance previously passed to override_os_environ.
207
for var, value in test._original_os_environ.iteritems():
208
# Restore the original value (or delete it if the value has been set to
209
# None in override_os_environ).
210
osutils.set_or_unset_env(var, value)
213
class ExtendedTestResult(testtools.TextTestResult):
214
"""Accepts, reports and accumulates the results of running tests.
216
Compared to the unittest version this class adds support for
217
profiling, benchmarking, stopping as soon as a test fails, and
218
skipping tests. There are further-specialized subclasses for
219
different types of display.
221
When a test finishes, in whatever way, it calls one of the addSuccess,
222
addFailure or addError classes. These in turn may redirect to a more
223
specific case for the special test results supported by our extended
226
Note that just one of these objects is fed the results from many tests.
231
def __init__(self, stream, descriptions, verbosity,
235
"""Construct new TestResult.
237
:param bench_history: Optionally, a writable file object to accumulate
240
testtools.TextTestResult.__init__(self, stream)
241
if bench_history is not None:
242
from bzrlib.version import _get_bzr_source_tree
243
src_tree = _get_bzr_source_tree()
246
revision_id = src_tree.get_parent_ids()[0]
248
# XXX: if this is a brand new tree, do the same as if there
252
# XXX: If there's no branch, what should we do?
254
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
255
self._bench_history = bench_history
256
self.ui = ui.ui_factory
259
self.failure_count = 0
260
self.known_failure_count = 0
262
self.not_applicable_count = 0
263
self.unsupported = {}
265
self._overall_start_time = time.time()
266
self._strict = strict
267
self._first_thread_leaker_id = None
268
self._tests_leaking_threads_count = 0
269
self._traceback_from_test = None
271
def stopTestRun(self):
274
stopTime = time.time()
275
timeTaken = stopTime - self.startTime
276
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
277
# the parent class method is similar have to duplicate
278
self._show_list('ERROR', self.errors)
279
self._show_list('FAIL', self.failures)
280
self.stream.write(self.sep2)
281
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
282
run, run != 1 and "s" or "", timeTaken))
283
if not self.wasSuccessful():
284
self.stream.write("FAILED (")
285
failed, errored = map(len, (self.failures, self.errors))
287
self.stream.write("failures=%d" % failed)
289
if failed: self.stream.write(", ")
290
self.stream.write("errors=%d" % errored)
291
if self.known_failure_count:
292
if failed or errored: self.stream.write(", ")
293
self.stream.write("known_failure_count=%d" %
294
self.known_failure_count)
295
self.stream.write(")\n")
297
if self.known_failure_count:
298
self.stream.write("OK (known_failures=%d)\n" %
299
self.known_failure_count)
301
self.stream.write("OK\n")
302
if self.skip_count > 0:
303
skipped = self.skip_count
304
self.stream.write('%d test%s skipped\n' %
305
(skipped, skipped != 1 and "s" or ""))
307
for feature, count in sorted(self.unsupported.items()):
308
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
311
ok = self.wasStrictlySuccessful()
313
ok = self.wasSuccessful()
314
if self._first_thread_leaker_id:
316
'%s is leaking threads among %d leaking tests.\n' % (
317
self._first_thread_leaker_id,
318
self._tests_leaking_threads_count))
319
# We don't report the main thread as an active one.
321
'%d non-main threads were left active in the end.\n'
322
% (len(self._active_threads) - 1))
324
def getDescription(self, test):
327
def _extractBenchmarkTime(self, testCase, details=None):
328
"""Add a benchmark time for the current test case."""
329
if details and 'benchtime' in details:
330
return float(''.join(details['benchtime'].iter_bytes()))
331
return getattr(testCase, "_benchtime", None)
333
def _elapsedTestTimeString(self):
334
"""Return a time string for the overall time the current test has taken."""
335
return self._formatTime(self._delta_to_float(
336
self._now() - self._start_datetime))
338
def _testTimeString(self, testCase):
339
benchmark_time = self._extractBenchmarkTime(testCase)
340
if benchmark_time is not None:
341
return self._formatTime(benchmark_time) + "*"
343
return self._elapsedTestTimeString()
345
def _formatTime(self, seconds):
346
"""Format seconds as milliseconds with leading spaces."""
347
# some benchmarks can take thousands of seconds to run, so we need 8
349
return "%8dms" % (1000 * seconds)
351
def _shortened_test_description(self, test):
353
what = re.sub(r'^bzrlib\.tests\.', '', what)
356
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
357
# multiple times in a row, because the handler is added for
358
# each test but the container list is shared between cases.
359
# See lp:498869 lp:625574 and lp:637725 for background.
360
def _record_traceback_from_test(self, exc_info):
361
"""Store the traceback from passed exc_info tuple till"""
362
self._traceback_from_test = exc_info[2]
364
def startTest(self, test):
365
super(ExtendedTestResult, self).startTest(test)
369
self.report_test_start(test)
370
test.number = self.count
371
self._recordTestStartTime()
372
# Make testtools cases give us the real traceback on failure
373
addOnException = getattr(test, "addOnException", None)
374
if addOnException is not None:
375
addOnException(self._record_traceback_from_test)
376
# Only check for thread leaks on bzrlib derived test cases
377
if isinstance(test, TestCase):
378
test.addCleanup(self._check_leaked_threads, test)
380
def startTests(self):
381
self.report_tests_starting()
382
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
387
def _check_leaked_threads(self, test):
388
"""See if any threads have leaked since last call
390
A sample of live threads is stored in the _active_threads attribute,
391
when this method runs it compares the current live threads and any not
392
in the previous sample are treated as having leaked.
394
now_active_threads = set(threading.enumerate())
395
threads_leaked = now_active_threads.difference(self._active_threads)
397
self._report_thread_leak(test, threads_leaked, now_active_threads)
398
self._tests_leaking_threads_count += 1
399
if self._first_thread_leaker_id is None:
400
self._first_thread_leaker_id = test.id()
401
self._active_threads = now_active_threads
403
def _recordTestStartTime(self):
404
"""Record that a test has started."""
405
self._start_datetime = self._now()
407
def addError(self, test, err):
408
"""Tell result that test finished with an error.
410
Called from the TestCase run() method when the test
411
fails with an unexpected error.
413
self._post_mortem(self._traceback_from_test)
414
super(ExtendedTestResult, self).addError(test, err)
415
self.error_count += 1
416
self.report_error(test, err)
420
def addFailure(self, test, err):
421
"""Tell result that test failed.
423
Called from the TestCase run() method when the test
424
fails because e.g. an assert() method failed.
426
self._post_mortem(self._traceback_from_test)
427
super(ExtendedTestResult, self).addFailure(test, err)
428
self.failure_count += 1
429
self.report_failure(test, err)
433
def addSuccess(self, test, details=None):
434
"""Tell result that test completed successfully.
436
Called from the TestCase run()
438
if self._bench_history is not None:
439
benchmark_time = self._extractBenchmarkTime(test, details)
440
if benchmark_time is not None:
441
self._bench_history.write("%s %s\n" % (
442
self._formatTime(benchmark_time),
444
self.report_success(test)
445
super(ExtendedTestResult, self).addSuccess(test)
446
test._log_contents = ''
448
def addExpectedFailure(self, test, err):
449
self.known_failure_count += 1
450
self.report_known_failure(test, err)
452
def addNotSupported(self, test, feature):
453
"""The test will not be run because of a missing feature.
455
# this can be called in two different ways: it may be that the
456
# test started running, and then raised (through requireFeature)
457
# UnavailableFeature. Alternatively this method can be called
458
# while probing for features before running the test code proper; in
459
# that case we will see startTest and stopTest, but the test will
460
# never actually run.
461
self.unsupported.setdefault(str(feature), 0)
462
self.unsupported[str(feature)] += 1
463
self.report_unsupported(test, feature)
465
def addSkip(self, test, reason):
466
"""A test has not run for 'reason'."""
468
self.report_skip(test, reason)
470
def addNotApplicable(self, test, reason):
471
self.not_applicable_count += 1
472
self.report_not_applicable(test, reason)
474
def _post_mortem(self, tb=None):
475
"""Start a PDB post mortem session."""
476
if os.environ.get('BZR_TEST_PDB', None):
480
def progress(self, offset, whence):
481
"""The test is adjusting the count of tests to run."""
482
if whence == SUBUNIT_SEEK_SET:
483
self.num_tests = offset
484
elif whence == SUBUNIT_SEEK_CUR:
485
self.num_tests += offset
487
raise errors.BzrError("Unknown whence %r" % whence)
489
def report_tests_starting(self):
490
"""Display information before the test run begins"""
491
if getattr(sys, 'frozen', None) is None:
492
bzr_path = osutils.realpath(sys.argv[0])
494
bzr_path = sys.executable
496
'bzr selftest: %s\n' % (bzr_path,))
499
bzrlib.__path__[0],))
501
' bzr-%s python-%s %s\n' % (
502
bzrlib.version_string,
503
bzrlib._format_version_tuple(sys.version_info),
504
platform.platform(aliased=1),
506
self.stream.write('\n')
508
def report_test_start(self, test):
509
"""Display information on the test just about to be run"""
511
def _report_thread_leak(self, test, leaked_threads, active_threads):
512
"""Display information on a test that leaked one or more threads"""
513
# GZ 2010-09-09: A leak summary reported separately from the general
514
# thread debugging would be nice. Tests under subunit
515
# need something not using stream, perhaps adding a
516
# testtools details object would be fitting.
517
if 'threads' in selftest_debug_flags:
518
self.stream.write('%s is leaking, active is now %d\n' %
519
(test.id(), len(active_threads)))
521
def startTestRun(self):
522
self.startTime = time.time()
524
def report_success(self, test):
527
def wasStrictlySuccessful(self):
528
if self.unsupported or self.known_failure_count:
530
return self.wasSuccessful()
533
class TextTestResult(ExtendedTestResult):
534
"""Displays progress and results of tests in text form"""
536
def __init__(self, stream, descriptions, verbosity,
541
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
542
bench_history, strict)
543
# We no longer pass them around, but just rely on the UIFactory stack
546
warnings.warn("Passing pb to TextTestResult is deprecated")
547
self.pb = self.ui.nested_progress_bar()
548
self.pb.show_pct = False
549
self.pb.show_spinner = False
550
self.pb.show_eta = False,
551
self.pb.show_count = False
552
self.pb.show_bar = False
553
self.pb.update_latency = 0
554
self.pb.show_transport_activity = False
556
def stopTestRun(self):
557
# called when the tests that are going to run have run
560
super(TextTestResult, self).stopTestRun()
562
def report_tests_starting(self):
563
super(TextTestResult, self).report_tests_starting()
564
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
566
def _progress_prefix_text(self):
567
# the longer this text, the less space we have to show the test
569
a = '[%d' % self.count # total that have been run
570
# tests skipped as known not to be relevant are not important enough
572
## if self.skip_count:
573
## a += ', %d skip' % self.skip_count
574
## if self.known_failure_count:
575
## a += '+%dX' % self.known_failure_count
577
a +='/%d' % self.num_tests
579
runtime = time.time() - self._overall_start_time
581
a += '%dm%ds' % (runtime / 60, runtime % 60)
584
total_fail_count = self.error_count + self.failure_count
586
a += ', %d failed' % total_fail_count
587
# if self.unsupported:
588
# a += ', %d missing' % len(self.unsupported)
592
def report_test_start(self, test):
594
self._progress_prefix_text()
596
+ self._shortened_test_description(test))
598
def _test_description(self, test):
599
return self._shortened_test_description(test)
601
def report_error(self, test, err):
602
self.stream.write('ERROR: %s\n %s\n' % (
603
self._test_description(test),
607
def report_failure(self, test, err):
608
self.stream.write('FAIL: %s\n %s\n' % (
609
self._test_description(test),
613
def report_known_failure(self, test, err):
616
def report_skip(self, test, reason):
619
def report_not_applicable(self, test, reason):
622
def report_unsupported(self, test, feature):
623
"""test cannot be run because feature is missing."""
626
class VerboseTestResult(ExtendedTestResult):
627
"""Produce long output, with one line per test run plus times"""
629
def _ellipsize_to_right(self, a_string, final_width):
630
"""Truncate and pad a string, keeping the right hand side"""
631
if len(a_string) > final_width:
632
result = '...' + a_string[3-final_width:]
635
return result.ljust(final_width)
637
def report_tests_starting(self):
638
self.stream.write('running %d tests...\n' % self.num_tests)
639
super(VerboseTestResult, self).report_tests_starting()
641
def report_test_start(self, test):
642
name = self._shortened_test_description(test)
643
width = osutils.terminal_width()
644
if width is not None:
645
# width needs space for 6 char status, plus 1 for slash, plus an
646
# 11-char time string, plus a trailing blank
647
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
649
self.stream.write(self._ellipsize_to_right(name, width-18))
651
self.stream.write(name)
654
def _error_summary(self, err):
656
return '%s%s' % (indent, err[1])
658
def report_error(self, test, err):
659
self.stream.write('ERROR %s\n%s\n'
660
% (self._testTimeString(test),
661
self._error_summary(err)))
663
def report_failure(self, test, err):
664
self.stream.write(' FAIL %s\n%s\n'
665
% (self._testTimeString(test),
666
self._error_summary(err)))
668
def report_known_failure(self, test, err):
669
self.stream.write('XFAIL %s\n%s\n'
670
% (self._testTimeString(test),
671
self._error_summary(err)))
673
def report_success(self, test):
674
self.stream.write(' OK %s\n' % self._testTimeString(test))
675
for bench_called, stats in getattr(test, '_benchcalls', []):
676
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
677
stats.pprint(file=self.stream)
678
# flush the stream so that we get smooth output. This verbose mode is
679
# used to show the output in PQM.
682
def report_skip(self, test, reason):
683
self.stream.write(' SKIP %s\n%s\n'
684
% (self._testTimeString(test), reason))
686
def report_not_applicable(self, test, reason):
687
self.stream.write(' N/A %s\n %s\n'
688
% (self._testTimeString(test), reason))
690
def report_unsupported(self, test, feature):
691
"""test cannot be run because feature is missing."""
692
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
693
%(self._testTimeString(test), feature))
696
class TextTestRunner(object):
697
stop_on_failure = False
705
result_decorators=None,
707
"""Create a TextTestRunner.
709
:param result_decorators: An optional list of decorators to apply
710
to the result object being used by the runner. Decorators are
711
applied left to right - the first element in the list is the
714
# stream may know claim to know to write unicode strings, but in older
715
# pythons this goes sufficiently wrong that it is a bad idea. (
716
# specifically a built in file with encoding 'UTF-8' will still try
717
# to encode using ascii.
718
new_encoding = osutils.get_terminal_encoding()
719
codec = codecs.lookup(new_encoding)
720
if type(codec) is tuple:
724
encode = codec.encode
725
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
726
# so should swap to the plain codecs.StreamWriter
727
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
729
stream.encoding = new_encoding
731
self.descriptions = descriptions
732
self.verbosity = verbosity
733
self._bench_history = bench_history
734
self._strict = strict
735
self._result_decorators = result_decorators or []
738
"Run the given test case or test suite."
739
if self.verbosity == 1:
740
result_class = TextTestResult
741
elif self.verbosity >= 2:
742
result_class = VerboseTestResult
743
original_result = result_class(self.stream,
746
bench_history=self._bench_history,
749
# Signal to result objects that look at stop early policy to stop,
750
original_result.stop_early = self.stop_on_failure
751
result = original_result
752
for decorator in self._result_decorators:
753
result = decorator(result)
754
result.stop_early = self.stop_on_failure
755
result.startTestRun()
760
# higher level code uses our extended protocol to determine
761
# what exit code to give.
762
return original_result
765
def iter_suite_tests(suite):
766
"""Return all tests in a suite, recursing through nested suites"""
767
if isinstance(suite, unittest.TestCase):
769
elif isinstance(suite, unittest.TestSuite):
771
for r in iter_suite_tests(item):
774
raise Exception('unknown type %r for object %r'
775
% (type(suite), suite))
778
TestSkipped = testtools.testcase.TestSkipped
781
class TestNotApplicable(TestSkipped):
782
"""A test is not applicable to the situation where it was run.
784
This is only normally raised by parameterized tests, if they find that
785
the instance they're constructed upon does not support one aspect
790
# traceback._some_str fails to format exceptions that have the default
791
# __str__ which does an implicit ascii conversion. However, repr() on those
792
# objects works, for all that its not quite what the doctor may have ordered.
793
def _clever_some_str(value):
798
return repr(value).replace('\\n', '\n')
800
return '<unprintable %s object>' % type(value).__name__
802
traceback._some_str = _clever_some_str
805
# deprecated - use self.knownFailure(), or self.expectFailure.
806
KnownFailure = testtools.testcase._ExpectedFailure
809
class UnavailableFeature(Exception):
810
"""A feature required for this test was not available.
812
This can be considered a specialised form of SkippedTest.
814
The feature should be used to construct the exception.
818
class StringIOWrapper(object):
819
"""A wrapper around cStringIO which just adds an encoding attribute.
821
Internally we can check sys.stdout to see what the output encoding
822
should be. However, cStringIO has no encoding attribute that we can
823
set. So we wrap it instead.
828
def __init__(self, s=None):
830
self.__dict__['_cstring'] = StringIO(s)
832
self.__dict__['_cstring'] = StringIO()
834
def __getattr__(self, name, getattr=getattr):
835
return getattr(self.__dict__['_cstring'], name)
837
def __setattr__(self, name, val):
838
if name == 'encoding':
839
self.__dict__['encoding'] = val
841
return setattr(self._cstring, name, val)
844
class TestUIFactory(TextUIFactory):
845
"""A UI Factory for testing.
847
Hide the progress bar but emit note()s.
849
Allows get_password to be tested without real tty attached.
851
See also CannedInputUIFactory which lets you provide programmatic input in
854
# TODO: Capture progress events at the model level and allow them to be
855
# observed by tests that care.
857
# XXX: Should probably unify more with CannedInputUIFactory or a
858
# particular configuration of TextUIFactory, or otherwise have a clearer
859
# idea of how they're supposed to be different.
860
# See https://bugs.launchpad.net/bzr/+bug/408213
862
def __init__(self, stdout=None, stderr=None, stdin=None):
863
if stdin is not None:
864
# We use a StringIOWrapper to be able to test various
865
# encodings, but the user is still responsible to
866
# encode the string and to set the encoding attribute
867
# of StringIOWrapper.
868
stdin = StringIOWrapper(stdin)
869
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
871
def get_non_echoed_password(self):
872
"""Get password from stdin without trying to handle the echo mode"""
873
password = self.stdin.readline()
876
if password[-1] == '\n':
877
password = password[:-1]
880
def make_progress_view(self):
881
return NullProgressView()
884
def isolated_doctest_setUp(test):
885
override_os_environ(test)
888
def isolated_doctest_tearDown(test):
889
restore_os_environ(test)
892
def IsolatedDocTestSuite(*args, **kwargs):
893
"""Overrides doctest.DocTestSuite to handle isolation.
895
The method is really a factory and users are expected to use it as such.
898
kwargs['setUp'] = isolated_doctest_setUp
899
kwargs['tearDown'] = isolated_doctest_tearDown
900
return doctest.DocTestSuite(*args, **kwargs)
903
class TestCase(testtools.TestCase):
904
"""Base class for bzr unit tests.
906
Tests that need access to disk resources should subclass
907
TestCaseInTempDir not TestCase.
909
Error and debug log messages are redirected from their usual
910
location into a temporary file, the contents of which can be
911
retrieved by _get_log(). We use a real OS file, not an in-memory object,
912
so that it can also capture file IO. When the test completes this file
913
is read into memory and removed from disk.
915
There are also convenience functions to invoke bzr's command-line
916
routine, and to build and check bzr trees.
918
In addition to the usual method of overriding tearDown(), this class also
919
allows subclasses to register cleanup functions via addCleanup, which are
920
run in order as the object is torn down. It's less likely this will be
921
accidentally overlooked.
925
# record lsprof data when performing benchmark calls.
926
_gather_lsprof_in_benchmarks = False
928
def __init__(self, methodName='testMethod'):
929
super(TestCase, self).__init__(methodName)
930
self._directory_isolation = True
931
self.exception_handlers.insert(0,
932
(UnavailableFeature, self._do_unsupported_or_skip))
933
self.exception_handlers.insert(0,
934
(TestNotApplicable, self._do_not_applicable))
937
super(TestCase, self).setUp()
938
for feature in getattr(self, '_test_needs_features', []):
939
self.requireFeature(feature)
940
self._log_contents = None
941
self.addDetail("log", content.Content(content.ContentType("text",
942
"plain", {"charset": "utf8"}),
943
lambda:[self._get_log(keep_log_file=True)]))
944
self._cleanEnvironment()
947
self._benchcalls = []
948
self._benchtime = None
950
self._track_transports()
952
self._clear_debug_flags()
953
# Isolate global verbosity level, to make sure it's reproducible
954
# between tests. We should get rid of this altogether: bug 656694. --
956
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
957
# Isolate config option expansion until its default value for bzrlib is
958
# settled on or a the FIXME associated with _get_expand_default_value
959
# is addressed -- vila 20110219
960
self.overrideAttr(config, '_expand_default_value', None)
965
pdb.Pdb().set_trace(sys._getframe().f_back)
967
def discardDetail(self, name):
968
"""Extend the addDetail, getDetails api so we can remove a detail.
970
eg. bzr always adds the 'log' detail at startup, but we don't want to
971
include it for skipped, xfail, etc tests.
973
It is safe to call this for a detail that doesn't exist, in case this
974
gets called multiple times.
976
# We cheat. details is stored in __details which means we shouldn't
977
# touch it. but getDetails() returns the dict directly, so we can
979
details = self.getDetails()
983
def _clear_debug_flags(self):
984
"""Prevent externally set debug flags affecting tests.
986
Tests that want to use debug flags can just set them in the
987
debug_flags set during setup/teardown.
989
# Start with a copy of the current debug flags we can safely modify.
990
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
991
if 'allow_debug' not in selftest_debug_flags:
992
debug.debug_flags.clear()
993
if 'disable_lock_checks' not in selftest_debug_flags:
994
debug.debug_flags.add('strict_locks')
996
def _clear_hooks(self):
997
# prevent hooks affecting tests
998
known_hooks = hooks.known_hooks
999
self._preserved_hooks = {}
1000
for key, (parent, name) in known_hooks.iter_parent_objects():
1001
current_hooks = getattr(parent, name)
1002
self._preserved_hooks[parent] = (name, current_hooks)
1003
self._preserved_lazy_hooks = hooks._lazy_hooks
1004
hooks._lazy_hooks = {}
1005
self.addCleanup(self._restoreHooks)
1006
for key, (parent, name) in known_hooks.iter_parent_objects():
1007
factory = known_hooks.get(key)
1008
setattr(parent, name, factory())
1009
# this hook should always be installed
1010
request._install_hook()
1012
def disable_directory_isolation(self):
1013
"""Turn off directory isolation checks."""
1014
self._directory_isolation = False
1016
def enable_directory_isolation(self):
1017
"""Enable directory isolation checks."""
1018
self._directory_isolation = True
1020
def _silenceUI(self):
1021
"""Turn off UI for duration of test"""
1022
# by default the UI is off; tests can turn it on if they want it.
1023
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1025
def _check_locks(self):
1026
"""Check that all lock take/release actions have been paired."""
1027
# We always check for mismatched locks. If a mismatch is found, we
1028
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1029
# case we just print a warning.
1031
acquired_locks = [lock for action, lock in self._lock_actions
1032
if action == 'acquired']
1033
released_locks = [lock for action, lock in self._lock_actions
1034
if action == 'released']
1035
broken_locks = [lock for action, lock in self._lock_actions
1036
if action == 'broken']
1037
# trivially, given the tests for lock acquistion and release, if we
1038
# have as many in each list, it should be ok. Some lock tests also
1039
# break some locks on purpose and should be taken into account by
1040
# considering that breaking a lock is just a dirty way of releasing it.
1041
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1042
message = ('Different number of acquired and '
1043
'released or broken locks. (%s, %s + %s)' %
1044
(acquired_locks, released_locks, broken_locks))
1045
if not self._lock_check_thorough:
1046
# Rather than fail, just warn
1047
print "Broken test %s: %s" % (self, message)
1051
def _track_locks(self):
1052
"""Track lock activity during tests."""
1053
self._lock_actions = []
1054
if 'disable_lock_checks' in selftest_debug_flags:
1055
self._lock_check_thorough = False
1057
self._lock_check_thorough = True
1059
self.addCleanup(self._check_locks)
1060
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1061
self._lock_acquired, None)
1062
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1063
self._lock_released, None)
1064
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1065
self._lock_broken, None)
1067
def _lock_acquired(self, result):
1068
self._lock_actions.append(('acquired', result))
1070
def _lock_released(self, result):
1071
self._lock_actions.append(('released', result))
1073
def _lock_broken(self, result):
1074
self._lock_actions.append(('broken', result))
1076
def permit_dir(self, name):
1077
"""Permit a directory to be used by this test. See permit_url."""
1078
name_transport = _mod_transport.get_transport(name)
1079
self.permit_url(name)
1080
self.permit_url(name_transport.base)
1082
def permit_url(self, url):
1083
"""Declare that url is an ok url to use in this test.
1085
Do this for memory transports, temporary test directory etc.
1087
Do not do this for the current working directory, /tmp, or any other
1088
preexisting non isolated url.
1090
if not url.endswith('/'):
1092
self._bzr_selftest_roots.append(url)
1094
def permit_source_tree_branch_repo(self):
1095
"""Permit the source tree bzr is running from to be opened.
1097
Some code such as bzrlib.version attempts to read from the bzr branch
1098
that bzr is executing from (if any). This method permits that directory
1099
to be used in the test suite.
1101
path = self.get_source_path()
1102
self.record_directory_isolation()
1105
workingtree.WorkingTree.open(path)
1106
except (errors.NotBranchError, errors.NoWorkingTree):
1107
raise TestSkipped('Needs a working tree of bzr sources')
1109
self.enable_directory_isolation()
1111
def _preopen_isolate_transport(self, transport):
1112
"""Check that all transport openings are done in the test work area."""
1113
while isinstance(transport, pathfilter.PathFilteringTransport):
1114
# Unwrap pathfiltered transports
1115
transport = transport.server.backing_transport.clone(
1116
transport._filter('.'))
1117
url = transport.base
1118
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1119
# urls it is given by prepending readonly+. This is appropriate as the
1120
# client shouldn't know that the server is readonly (or not readonly).
1121
# We could register all servers twice, with readonly+ prepending, but
1122
# that makes for a long list; this is about the same but easier to
1124
if url.startswith('readonly+'):
1125
url = url[len('readonly+'):]
1126
self._preopen_isolate_url(url)
1128
def _preopen_isolate_url(self, url):
1129
if not self._directory_isolation:
1131
if self._directory_isolation == 'record':
1132
self._bzr_selftest_roots.append(url)
1134
# This prevents all transports, including e.g. sftp ones backed on disk
1135
# from working unless they are explicitly granted permission. We then
1136
# depend on the code that sets up test transports to check that they are
1137
# appropriately isolated and enable their use by calling
1138
# self.permit_transport()
1139
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1140
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1141
% (url, self._bzr_selftest_roots))
1143
def record_directory_isolation(self):
1144
"""Gather accessed directories to permit later access.
1146
This is used for tests that access the branch bzr is running from.
1148
self._directory_isolation = "record"
1150
def start_server(self, transport_server, backing_server=None):
1151
"""Start transport_server for this test.
1153
This starts the server, registers a cleanup for it and permits the
1154
server's urls to be used.
1156
if backing_server is None:
1157
transport_server.start_server()
1159
transport_server.start_server(backing_server)
1160
self.addCleanup(transport_server.stop_server)
1161
# Obtain a real transport because if the server supplies a password, it
1162
# will be hidden from the base on the client side.
1163
t = _mod_transport.get_transport(transport_server.get_url())
1164
# Some transport servers effectively chroot the backing transport;
1165
# others like SFTPServer don't - users of the transport can walk up the
1166
# transport to read the entire backing transport. This wouldn't matter
1167
# except that the workdir tests are given - and that they expect the
1168
# server's url to point at - is one directory under the safety net. So
1169
# Branch operations into the transport will attempt to walk up one
1170
# directory. Chrooting all servers would avoid this but also mean that
1171
# we wouldn't be testing directly against non-root urls. Alternatively
1172
# getting the test framework to start the server with a backing server
1173
# at the actual safety net directory would work too, but this then
1174
# means that the self.get_url/self.get_transport methods would need
1175
# to transform all their results. On balance its cleaner to handle it
1176
# here, and permit a higher url when we have one of these transports.
1177
if t.base.endswith('/work/'):
1178
# we have safety net/test root/work
1179
t = t.clone('../..')
1180
elif isinstance(transport_server,
1181
test_server.SmartTCPServer_for_testing):
1182
# The smart server adds a path similar to work, which is traversed
1183
# up from by the client. But the server is chrooted - the actual
1184
# backing transport is not escaped from, and VFS requests to the
1185
# root will error (because they try to escape the chroot).
1187
while t2.base != t.base:
1190
self.permit_url(t.base)
1192
def _track_transports(self):
1193
"""Install checks for transport usage."""
1194
# TestCase has no safe place it can write to.
1195
self._bzr_selftest_roots = []
1196
# Currently the easiest way to be sure that nothing is going on is to
1197
# hook into bzr dir opening. This leaves a small window of error for
1198
# transport tests, but they are well known, and we can improve on this
1200
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1201
self._preopen_isolate_transport, "Check bzr directories are safe.")
1203
def _ndiff_strings(self, a, b):
1204
"""Return ndiff between two strings containing lines.
1206
A trailing newline is added if missing to make the strings
1208
if b and b[-1] != '\n':
1210
if a and a[-1] != '\n':
1212
difflines = difflib.ndiff(a.splitlines(True),
1214
linejunk=lambda x: False,
1215
charjunk=lambda x: False)
1216
return ''.join(difflines)
1218
def assertEqual(self, a, b, message=''):
1222
except UnicodeError, e:
1223
# If we can't compare without getting a UnicodeError, then
1224
# obviously they are different
1225
trace.mutter('UnicodeError: %s', e)
1228
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1230
pprint.pformat(a), pprint.pformat(b)))
1232
assertEquals = assertEqual
1234
def assertEqualDiff(self, a, b, message=None):
1235
"""Assert two texts are equal, if not raise an exception.
1237
This is intended for use with multi-line strings where it can
1238
be hard to find the differences by eye.
1240
# TODO: perhaps override assertEquals to call this for strings?
1244
message = "texts not equal:\n"
1246
message = 'first string is missing a final newline.\n'
1248
message = 'second string is missing a final newline.\n'
1249
raise AssertionError(message +
1250
self._ndiff_strings(a, b))
1252
def assertEqualMode(self, mode, mode_test):
1253
self.assertEqual(mode, mode_test,
1254
'mode mismatch %o != %o' % (mode, mode_test))
1256
def assertEqualStat(self, expected, actual):
1257
"""assert that expected and actual are the same stat result.
1259
:param expected: A stat result.
1260
:param actual: A stat result.
1261
:raises AssertionError: If the expected and actual stat values differ
1262
other than by atime.
1264
self.assertEqual(expected.st_size, actual.st_size,
1265
'st_size did not match')
1266
self.assertEqual(expected.st_mtime, actual.st_mtime,
1267
'st_mtime did not match')
1268
self.assertEqual(expected.st_ctime, actual.st_ctime,
1269
'st_ctime did not match')
1270
if sys.platform == 'win32':
1271
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1272
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1273
# odd. We just force it to always be 0 to avoid any problems.
1274
self.assertEqual(0, expected.st_dev)
1275
self.assertEqual(0, actual.st_dev)
1276
self.assertEqual(0, expected.st_ino)
1277
self.assertEqual(0, actual.st_ino)
1279
self.assertEqual(expected.st_dev, actual.st_dev,
1280
'st_dev did not match')
1281
self.assertEqual(expected.st_ino, actual.st_ino,
1282
'st_ino did not match')
1283
self.assertEqual(expected.st_mode, actual.st_mode,
1284
'st_mode did not match')
1286
def assertLength(self, length, obj_with_len):
1287
"""Assert that obj_with_len is of length length."""
1288
if len(obj_with_len) != length:
1289
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1290
length, len(obj_with_len), obj_with_len))
1292
def assertLogsError(self, exception_class, func, *args, **kwargs):
1293
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1296
orig_log_exception_quietly = trace.log_exception_quietly
1299
orig_log_exception_quietly()
1300
captured.append(sys.exc_info())
1301
trace.log_exception_quietly = capture
1302
func(*args, **kwargs)
1304
trace.log_exception_quietly = orig_log_exception_quietly
1305
self.assertLength(1, captured)
1306
err = captured[0][1]
1307
self.assertIsInstance(err, exception_class)
1310
def assertPositive(self, val):
1311
"""Assert that val is greater than 0."""
1312
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1314
def assertNegative(self, val):
1315
"""Assert that val is less than 0."""
1316
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1318
def assertStartsWith(self, s, prefix):
1319
if not s.startswith(prefix):
1320
raise AssertionError('string %r does not start with %r' % (s, prefix))
1322
def assertEndsWith(self, s, suffix):
1323
"""Asserts that s ends with suffix."""
1324
if not s.endswith(suffix):
1325
raise AssertionError('string %r does not end with %r' % (s, suffix))
1327
def assertContainsRe(self, haystack, needle_re, flags=0):
1328
"""Assert that a contains something matching a regular expression."""
1329
if not re.search(needle_re, haystack, flags):
1330
if '\n' in haystack or len(haystack) > 60:
1331
# a long string, format it in a more readable way
1332
raise AssertionError(
1333
'pattern "%s" not found in\n"""\\\n%s"""\n'
1334
% (needle_re, haystack))
1336
raise AssertionError('pattern "%s" not found in "%s"'
1337
% (needle_re, haystack))
1339
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1340
"""Assert that a does not match a regular expression"""
1341
if re.search(needle_re, haystack, flags):
1342
raise AssertionError('pattern "%s" found in "%s"'
1343
% (needle_re, haystack))
1345
def assertContainsString(self, haystack, needle):
1346
if haystack.find(needle) == -1:
1347
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1349
def assertNotContainsString(self, haystack, needle):
1350
if haystack.find(needle) != -1:
1351
self.fail("string %r found in '''%s'''" % (needle, haystack))
1353
def assertSubset(self, sublist, superlist):
1354
"""Assert that every entry in sublist is present in superlist."""
1355
missing = set(sublist) - set(superlist)
1356
if len(missing) > 0:
1357
raise AssertionError("value(s) %r not present in container %r" %
1358
(missing, superlist))
1360
def assertListRaises(self, excClass, func, *args, **kwargs):
1361
"""Fail unless excClass is raised when the iterator from func is used.
1363
Many functions can return generators this makes sure
1364
to wrap them in a list() call to make sure the whole generator
1365
is run, and that the proper exception is raised.
1368
list(func(*args, **kwargs))
1372
if getattr(excClass,'__name__', None) is not None:
1373
excName = excClass.__name__
1375
excName = str(excClass)
1376
raise self.failureException, "%s not raised" % excName
1378
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1379
"""Assert that a callable raises a particular exception.
1381
:param excClass: As for the except statement, this may be either an
1382
exception class, or a tuple of classes.
1383
:param callableObj: A callable, will be passed ``*args`` and
1386
Returns the exception so that you can examine it.
1389
callableObj(*args, **kwargs)
1393
if getattr(excClass,'__name__', None) is not None:
1394
excName = excClass.__name__
1397
excName = str(excClass)
1398
raise self.failureException, "%s not raised" % excName
1400
def assertIs(self, left, right, message=None):
1401
if not (left is right):
1402
if message is not None:
1403
raise AssertionError(message)
1405
raise AssertionError("%r is not %r." % (left, right))
1407
def assertIsNot(self, left, right, message=None):
1409
if message is not None:
1410
raise AssertionError(message)
1412
raise AssertionError("%r is %r." % (left, right))
1414
def assertTransportMode(self, transport, path, mode):
1415
"""Fail if a path does not have mode "mode".
1417
If modes are not supported on this transport, the assertion is ignored.
1419
if not transport._can_roundtrip_unix_modebits():
1421
path_stat = transport.stat(path)
1422
actual_mode = stat.S_IMODE(path_stat.st_mode)
1423
self.assertEqual(mode, actual_mode,
1424
'mode of %r incorrect (%s != %s)'
1425
% (path, oct(mode), oct(actual_mode)))
1427
def assertIsSameRealPath(self, path1, path2):
1428
"""Fail if path1 and path2 points to different files"""
1429
self.assertEqual(osutils.realpath(path1),
1430
osutils.realpath(path2),
1431
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1433
def assertIsInstance(self, obj, kls, msg=None):
1434
"""Fail if obj is not an instance of kls
1436
:param msg: Supplementary message to show if the assertion fails.
1438
if not isinstance(obj, kls):
1439
m = "%r is an instance of %s rather than %s" % (
1440
obj, obj.__class__, kls)
1445
def assertFileEqual(self, content, path):
1446
"""Fail if path does not contain 'content'."""
1447
self.assertPathExists(path)
1448
f = file(path, 'rb')
1453
self.assertEqualDiff(content, s)
1455
def assertDocstring(self, expected_docstring, obj):
1456
"""Fail if obj does not have expected_docstring"""
1458
# With -OO the docstring should be None instead
1459
self.assertIs(obj.__doc__, None)
1461
self.assertEqual(expected_docstring, obj.__doc__)
1463
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1464
def failUnlessExists(self, path):
1465
return self.assertPathExists(path)
1467
def assertPathExists(self, path):
1468
"""Fail unless path or paths, which may be abs or relative, exist."""
1469
if not isinstance(path, basestring):
1471
self.assertPathExists(p)
1473
self.assertTrue(osutils.lexists(path),
1474
path + " does not exist")
1476
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1477
def failIfExists(self, path):
1478
return self.assertPathDoesNotExist(path)
1480
def assertPathDoesNotExist(self, path):
1481
"""Fail if path or paths, which may be abs or relative, exist."""
1482
if not isinstance(path, basestring):
1484
self.assertPathDoesNotExist(p)
1486
self.assertFalse(osutils.lexists(path),
1489
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1490
"""A helper for callDeprecated and applyDeprecated.
1492
:param a_callable: A callable to call.
1493
:param args: The positional arguments for the callable
1494
:param kwargs: The keyword arguments for the callable
1495
:return: A tuple (warnings, result). result is the result of calling
1496
a_callable(``*args``, ``**kwargs``).
1499
def capture_warnings(msg, cls=None, stacklevel=None):
1500
# we've hooked into a deprecation specific callpath,
1501
# only deprecations should getting sent via it.
1502
self.assertEqual(cls, DeprecationWarning)
1503
local_warnings.append(msg)
1504
original_warning_method = symbol_versioning.warn
1505
symbol_versioning.set_warning_method(capture_warnings)
1507
result = a_callable(*args, **kwargs)
1509
symbol_versioning.set_warning_method(original_warning_method)
1510
return (local_warnings, result)
1512
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1513
"""Call a deprecated callable without warning the user.
1515
Note that this only captures warnings raised by symbol_versioning.warn,
1516
not other callers that go direct to the warning module.
1518
To test that a deprecated method raises an error, do something like
1521
self.assertRaises(errors.ReservedId,
1522
self.applyDeprecated,
1523
deprecated_in((1, 5, 0)),
1527
:param deprecation_format: The deprecation format that the callable
1528
should have been deprecated with. This is the same type as the
1529
parameter to deprecated_method/deprecated_function. If the
1530
callable is not deprecated with this format, an assertion error
1532
:param a_callable: A callable to call. This may be a bound method or
1533
a regular function. It will be called with ``*args`` and
1535
:param args: The positional arguments for the callable
1536
:param kwargs: The keyword arguments for the callable
1537
:return: The result of a_callable(``*args``, ``**kwargs``)
1539
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1541
expected_first_warning = symbol_versioning.deprecation_string(
1542
a_callable, deprecation_format)
1543
if len(call_warnings) == 0:
1544
self.fail("No deprecation warning generated by call to %s" %
1546
self.assertEqual(expected_first_warning, call_warnings[0])
1549
def callCatchWarnings(self, fn, *args, **kw):
1550
"""Call a callable that raises python warnings.
1552
The caller's responsible for examining the returned warnings.
1554
If the callable raises an exception, the exception is not
1555
caught and propagates up to the caller. In that case, the list
1556
of warnings is not available.
1558
:returns: ([warning_object, ...], fn_result)
1560
# XXX: This is not perfect, because it completely overrides the
1561
# warnings filters, and some code may depend on suppressing particular
1562
# warnings. It's the easiest way to insulate ourselves from -Werror,
1563
# though. -- Andrew, 20071062
1565
def _catcher(message, category, filename, lineno, file=None, line=None):
1566
# despite the name, 'message' is normally(?) a Warning subclass
1568
wlist.append(message)
1569
saved_showwarning = warnings.showwarning
1570
saved_filters = warnings.filters
1572
warnings.showwarning = _catcher
1573
warnings.filters = []
1574
result = fn(*args, **kw)
1576
warnings.showwarning = saved_showwarning
1577
warnings.filters = saved_filters
1578
return wlist, result
1580
def callDeprecated(self, expected, callable, *args, **kwargs):
1581
"""Assert that a callable is deprecated in a particular way.
1583
This is a very precise test for unusual requirements. The
1584
applyDeprecated helper function is probably more suited for most tests
1585
as it allows you to simply specify the deprecation format being used
1586
and will ensure that that is issued for the function being called.
1588
Note that this only captures warnings raised by symbol_versioning.warn,
1589
not other callers that go direct to the warning module. To catch
1590
general warnings, use callCatchWarnings.
1592
:param expected: a list of the deprecation warnings expected, in order
1593
:param callable: The callable to call
1594
:param args: The positional arguments for the callable
1595
:param kwargs: The keyword arguments for the callable
1597
call_warnings, result = self._capture_deprecation_warnings(callable,
1599
self.assertEqual(expected, call_warnings)
1602
def _startLogFile(self):
1603
"""Send bzr and test log messages to a temporary file.
1605
The file is removed as the test is torn down.
1607
self._log_file = StringIO()
1608
self._log_memento = trace.push_log_file(self._log_file)
1609
self.addCleanup(self._finishLogFile)
1611
def _finishLogFile(self):
1612
"""Finished with the log file.
1614
Close the file and delete it, unless setKeepLogfile was called.
1616
if trace._trace_file:
1617
# flush the log file, to get all content
1618
trace._trace_file.flush()
1619
trace.pop_log_file(self._log_memento)
1620
# Cache the log result and delete the file on disk
1621
self._get_log(False)
1623
def thisFailsStrictLockCheck(self):
1624
"""It is known that this test would fail with -Dstrict_locks.
1626
By default, all tests are run with strict lock checking unless
1627
-Edisable_lock_checks is supplied. However there are some tests which
1628
we know fail strict locks at this point that have not been fixed.
1629
They should call this function to disable the strict checking.
1631
This should be used sparingly, it is much better to fix the locking
1632
issues rather than papering over the problem by calling this function.
1634
debug.debug_flags.discard('strict_locks')
1636
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1637
"""Overrides an object attribute restoring it after the test.
1639
:param obj: The object that will be mutated.
1641
:param attr_name: The attribute name we want to preserve/override in
1644
:param new: The optional value we want to set the attribute to.
1646
:returns: The actual attr value.
1648
value = getattr(obj, attr_name)
1649
# The actual value is captured by the call below
1650
self.addCleanup(setattr, obj, attr_name, value)
1651
if new is not _unitialized_attr:
1652
setattr(obj, attr_name, new)
1655
def overrideEnv(self, name, new):
1656
"""Set an environment variable, and reset it after the test.
1658
:param name: The environment variable name.
1660
:param new: The value to set the variable to. If None, the
1661
variable is deleted from the environment.
1663
:returns: The actual variable value.
1665
value = osutils.set_or_unset_env(name, new)
1666
self.addCleanup(osutils.set_or_unset_env, name, value)
1669
def _cleanEnvironment(self):
1670
for name, value in isolated_environ.iteritems():
1671
self.overrideEnv(name, value)
1673
def _restoreHooks(self):
1674
for klass, (name, hooks) in self._preserved_hooks.items():
1675
setattr(klass, name, hooks)
1676
hooks._lazy_hooks = self._preserved_lazy_hooks
1678
def knownFailure(self, reason):
1679
"""This test has failed for some known reason."""
1680
raise KnownFailure(reason)
1682
def _suppress_log(self):
1683
"""Remove the log info from details."""
1684
self.discardDetail('log')
1686
def _do_skip(self, result, reason):
1687
self._suppress_log()
1688
addSkip = getattr(result, 'addSkip', None)
1689
if not callable(addSkip):
1690
result.addSuccess(result)
1692
addSkip(self, reason)
1695
def _do_known_failure(self, result, e):
1696
self._suppress_log()
1697
err = sys.exc_info()
1698
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1699
if addExpectedFailure is not None:
1700
addExpectedFailure(self, err)
1702
result.addSuccess(self)
1705
def _do_not_applicable(self, result, e):
1707
reason = 'No reason given'
1710
self._suppress_log ()
1711
addNotApplicable = getattr(result, 'addNotApplicable', None)
1712
if addNotApplicable is not None:
1713
result.addNotApplicable(self, reason)
1715
self._do_skip(result, reason)
1718
def _report_skip(self, result, err):
1719
"""Override the default _report_skip.
1721
We want to strip the 'log' detail. If we waint until _do_skip, it has
1722
already been formatted into the 'reason' string, and we can't pull it
1725
self._suppress_log()
1726
super(TestCase, self)._report_skip(self, result, err)
1729
def _report_expected_failure(self, result, err):
1732
See _report_skip for motivation.
1734
self._suppress_log()
1735
super(TestCase, self)._report_expected_failure(self, result, err)
1738
def _do_unsupported_or_skip(self, result, e):
1740
self._suppress_log()
1741
addNotSupported = getattr(result, 'addNotSupported', None)
1742
if addNotSupported is not None:
1743
result.addNotSupported(self, reason)
1745
self._do_skip(result, reason)
1747
def time(self, callable, *args, **kwargs):
1748
"""Run callable and accrue the time it takes to the benchmark time.
1750
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1751
this will cause lsprofile statistics to be gathered and stored in
1754
if self._benchtime is None:
1755
self.addDetail('benchtime', content.Content(content.ContentType(
1756
"text", "plain"), lambda:[str(self._benchtime)]))
1760
if not self._gather_lsprof_in_benchmarks:
1761
return callable(*args, **kwargs)
1763
# record this benchmark
1764
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1766
self._benchcalls.append(((callable, args, kwargs), stats))
1769
self._benchtime += time.time() - start
1771
def log(self, *args):
1774
def _get_log(self, keep_log_file=False):
1775
"""Internal helper to get the log from bzrlib.trace for this test.
1777
Please use self.getDetails, or self.get_log to access this in test case
1780
:param keep_log_file: When True, if the log is still a file on disk
1781
leave it as a file on disk. When False, if the log is still a file
1782
on disk, the log file is deleted and the log preserved as
1784
:return: A string containing the log.
1786
if self._log_contents is not None:
1788
self._log_contents.decode('utf8')
1789
except UnicodeDecodeError:
1790
unicodestr = self._log_contents.decode('utf8', 'replace')
1791
self._log_contents = unicodestr.encode('utf8')
1792
return self._log_contents
1793
if self._log_file is not None:
1794
log_contents = self._log_file.getvalue()
1796
log_contents.decode('utf8')
1797
except UnicodeDecodeError:
1798
unicodestr = log_contents.decode('utf8', 'replace')
1799
log_contents = unicodestr.encode('utf8')
1800
if not keep_log_file:
1801
self._log_file = None
1802
# Permit multiple calls to get_log until we clean it up in
1804
self._log_contents = log_contents
1807
return "No log file content."
1810
"""Get a unicode string containing the log from bzrlib.trace.
1812
Undecodable characters are replaced.
1814
return u"".join(self.getDetails()['log'].iter_text())
1816
def requireFeature(self, feature):
1817
"""This test requires a specific feature is available.
1819
:raises UnavailableFeature: When feature is not available.
1821
if not feature.available():
1822
raise UnavailableFeature(feature)
1824
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1826
"""Run bazaar command line, splitting up a string command line."""
1827
if isinstance(args, basestring):
1828
# shlex don't understand unicode strings,
1829
# so args should be plain string (bialix 20070906)
1830
args = list(shlex.split(str(args)))
1831
return self._run_bzr_core(args, retcode=retcode,
1832
encoding=encoding, stdin=stdin, working_dir=working_dir,
1835
def _run_bzr_core(self, args, retcode, encoding, stdin,
1837
# Clear chk_map page cache, because the contents are likely to mask
1839
chk_map.clear_cache()
1840
if encoding is None:
1841
encoding = osutils.get_user_encoding()
1842
stdout = StringIOWrapper()
1843
stderr = StringIOWrapper()
1844
stdout.encoding = encoding
1845
stderr.encoding = encoding
1847
self.log('run bzr: %r', args)
1848
# FIXME: don't call into logging here
1849
handler = logging.StreamHandler(stderr)
1850
handler.setLevel(logging.INFO)
1851
logger = logging.getLogger('')
1852
logger.addHandler(handler)
1853
old_ui_factory = ui.ui_factory
1854
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1857
if working_dir is not None:
1858
cwd = osutils.getcwd()
1859
os.chdir(working_dir)
1863
result = self.apply_redirected(
1864
ui.ui_factory.stdin,
1866
_mod_commands.run_bzr_catch_user_errors,
1868
except KeyboardInterrupt:
1869
# Reraise KeyboardInterrupt with contents of redirected stdout
1870
# and stderr as arguments, for tests which are interested in
1871
# stdout and stderr and are expecting the exception.
1872
out = stdout.getvalue()
1873
err = stderr.getvalue()
1875
self.log('output:\n%r', out)
1877
self.log('errors:\n%r', err)
1878
raise KeyboardInterrupt(out, err)
1880
logger.removeHandler(handler)
1881
ui.ui_factory = old_ui_factory
1885
out = stdout.getvalue()
1886
err = stderr.getvalue()
1888
self.log('output:\n%r', out)
1890
self.log('errors:\n%r', err)
1891
if retcode is not None:
1892
self.assertEquals(retcode, result,
1893
message='Unexpected return code')
1894
return result, out, err
1896
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1897
working_dir=None, error_regexes=[], output_encoding=None):
1898
"""Invoke bzr, as if it were run from the command line.
1900
The argument list should not include the bzr program name - the
1901
first argument is normally the bzr command. Arguments may be
1902
passed in three ways:
1904
1- A list of strings, eg ["commit", "a"]. This is recommended
1905
when the command contains whitespace or metacharacters, or
1906
is built up at run time.
1908
2- A single string, eg "add a". This is the most convenient
1909
for hardcoded commands.
1911
This runs bzr through the interface that catches and reports
1912
errors, and with logging set to something approximating the
1913
default, so that error reporting can be checked.
1915
This should be the main method for tests that want to exercise the
1916
overall behavior of the bzr application (rather than a unit test
1917
or a functional test of the library.)
1919
This sends the stdout/stderr results into the test's log,
1920
where it may be useful for debugging. See also run_captured.
1922
:keyword stdin: A string to be used as stdin for the command.
1923
:keyword retcode: The status code the command should return;
1925
:keyword working_dir: The directory to run the command in
1926
:keyword error_regexes: A list of expected error messages. If
1927
specified they must be seen in the error output of the command.
1929
retcode, out, err = self._run_bzr_autosplit(
1934
working_dir=working_dir,
1936
self.assertIsInstance(error_regexes, (list, tuple))
1937
for regex in error_regexes:
1938
self.assertContainsRe(err, regex)
1941
def run_bzr_error(self, error_regexes, *args, **kwargs):
1942
"""Run bzr, and check that stderr contains the supplied regexes
1944
:param error_regexes: Sequence of regular expressions which
1945
must each be found in the error output. The relative ordering
1947
:param args: command-line arguments for bzr
1948
:param kwargs: Keyword arguments which are interpreted by run_bzr
1949
This function changes the default value of retcode to be 3,
1950
since in most cases this is run when you expect bzr to fail.
1952
:return: (out, err) The actual output of running the command (in case
1953
you want to do more inspection)
1957
# Make sure that commit is failing because there is nothing to do
1958
self.run_bzr_error(['no changes to commit'],
1959
['commit', '-m', 'my commit comment'])
1960
# Make sure --strict is handling an unknown file, rather than
1961
# giving us the 'nothing to do' error
1962
self.build_tree(['unknown'])
1963
self.run_bzr_error(['Commit refused because there are unknown files'],
1964
['commit', --strict', '-m', 'my commit comment'])
1966
kwargs.setdefault('retcode', 3)
1967
kwargs['error_regexes'] = error_regexes
1968
out, err = self.run_bzr(*args, **kwargs)
1971
def run_bzr_subprocess(self, *args, **kwargs):
1972
"""Run bzr in a subprocess for testing.
1974
This starts a new Python interpreter and runs bzr in there.
1975
This should only be used for tests that have a justifiable need for
1976
this isolation: e.g. they are testing startup time, or signal
1977
handling, or early startup code, etc. Subprocess code can't be
1978
profiled or debugged so easily.
1980
:keyword retcode: The status code that is expected. Defaults to 0. If
1981
None is supplied, the status code is not checked.
1982
:keyword env_changes: A dictionary which lists changes to environment
1983
variables. A value of None will unset the env variable.
1984
The values must be strings. The change will only occur in the
1985
child, so you don't need to fix the environment after running.
1986
:keyword universal_newlines: Convert CRLF => LF
1987
:keyword allow_plugins: By default the subprocess is run with
1988
--no-plugins to ensure test reproducibility. Also, it is possible
1989
for system-wide plugins to create unexpected output on stderr,
1990
which can cause unnecessary test failures.
1992
env_changes = kwargs.get('env_changes', {})
1993
working_dir = kwargs.get('working_dir', None)
1994
allow_plugins = kwargs.get('allow_plugins', False)
1996
if isinstance(args[0], list):
1998
elif isinstance(args[0], basestring):
1999
args = list(shlex.split(args[0]))
2001
raise ValueError("passing varargs to run_bzr_subprocess")
2002
process = self.start_bzr_subprocess(args, env_changes=env_changes,
2003
working_dir=working_dir,
2004
allow_plugins=allow_plugins)
2005
# We distinguish between retcode=None and retcode not passed.
2006
supplied_retcode = kwargs.get('retcode', 0)
2007
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2008
universal_newlines=kwargs.get('universal_newlines', False),
2011
def start_bzr_subprocess(self, process_args, env_changes=None,
2012
skip_if_plan_to_signal=False,
2014
allow_plugins=False):
2015
"""Start bzr in a subprocess for testing.
2017
This starts a new Python interpreter and runs bzr in there.
2018
This should only be used for tests that have a justifiable need for
2019
this isolation: e.g. they are testing startup time, or signal
2020
handling, or early startup code, etc. Subprocess code can't be
2021
profiled or debugged so easily.
2023
:param process_args: a list of arguments to pass to the bzr executable,
2024
for example ``['--version']``.
2025
:param env_changes: A dictionary which lists changes to environment
2026
variables. A value of None will unset the env variable.
2027
The values must be strings. The change will only occur in the
2028
child, so you don't need to fix the environment after running.
2029
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2030
doesn't support signalling subprocesses.
2031
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2033
:returns: Popen object for the started process.
2035
if skip_if_plan_to_signal:
2036
if os.name != "posix":
2037
raise TestSkipped("Sending signals not supported")
2039
if env_changes is None:
2043
def cleanup_environment():
2044
for env_var, value in env_changes.iteritems():
2045
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2047
def restore_environment():
2048
for env_var, value in old_env.iteritems():
2049
osutils.set_or_unset_env(env_var, value)
2051
bzr_path = self.get_bzr_path()
2054
if working_dir is not None:
2055
cwd = osutils.getcwd()
2056
os.chdir(working_dir)
2059
# win32 subprocess doesn't support preexec_fn
2060
# so we will avoid using it on all platforms, just to
2061
# make sure the code path is used, and we don't break on win32
2062
cleanup_environment()
2063
command = [sys.executable]
2064
# frozen executables don't need the path to bzr
2065
if getattr(sys, "frozen", None) is None:
2066
command.append(bzr_path)
2067
if not allow_plugins:
2068
command.append('--no-plugins')
2069
command.extend(process_args)
2070
process = self._popen(command, stdin=subprocess.PIPE,
2071
stdout=subprocess.PIPE,
2072
stderr=subprocess.PIPE)
2074
restore_environment()
2080
def _popen(self, *args, **kwargs):
2081
"""Place a call to Popen.
2083
Allows tests to override this method to intercept the calls made to
2084
Popen for introspection.
2086
return subprocess.Popen(*args, **kwargs)
2088
def get_source_path(self):
2089
"""Return the path of the directory containing bzrlib."""
2090
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2092
def get_bzr_path(self):
2093
"""Return the path of the 'bzr' executable for this test suite."""
2094
bzr_path = os.path.join(self.get_source_path(), "bzr")
2095
if not os.path.isfile(bzr_path):
2096
# We are probably installed. Assume sys.argv is the right file
2097
bzr_path = sys.argv[0]
2100
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2101
universal_newlines=False, process_args=None):
2102
"""Finish the execution of process.
2104
:param process: the Popen object returned from start_bzr_subprocess.
2105
:param retcode: The status code that is expected. Defaults to 0. If
2106
None is supplied, the status code is not checked.
2107
:param send_signal: an optional signal to send to the process.
2108
:param universal_newlines: Convert CRLF => LF
2109
:returns: (stdout, stderr)
2111
if send_signal is not None:
2112
os.kill(process.pid, send_signal)
2113
out, err = process.communicate()
2115
if universal_newlines:
2116
out = out.replace('\r\n', '\n')
2117
err = err.replace('\r\n', '\n')
2119
if retcode is not None and retcode != process.returncode:
2120
if process_args is None:
2121
process_args = "(unknown args)"
2122
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2123
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2124
self.fail('Command bzr %s failed with retcode %s != %s'
2125
% (process_args, retcode, process.returncode))
2128
def check_tree_shape(self, tree, shape):
2129
"""Compare a tree to a list of expected names.
2131
Fail if they are not precisely equal.
2134
shape = list(shape) # copy
2135
for path, ie in tree.iter_entries_by_dir():
2136
name = path.replace('\\', '/')
2137
if ie.kind == 'directory':
2140
pass # ignore root entry
2146
self.fail("expected paths not found in inventory: %r" % shape)
2148
self.fail("unexpected paths found in inventory: %r" % extras)
2150
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2151
a_callable=None, *args, **kwargs):
2152
"""Call callable with redirected std io pipes.
2154
Returns the return code."""
2155
if not callable(a_callable):
2156
raise ValueError("a_callable must be callable.")
2158
stdin = StringIO("")
2160
if getattr(self, "_log_file", None) is not None:
2161
stdout = self._log_file
2165
if getattr(self, "_log_file", None is not None):
2166
stderr = self._log_file
2169
real_stdin = sys.stdin
2170
real_stdout = sys.stdout
2171
real_stderr = sys.stderr
2176
return a_callable(*args, **kwargs)
2178
sys.stdout = real_stdout
2179
sys.stderr = real_stderr
2180
sys.stdin = real_stdin
2182
def reduceLockdirTimeout(self):
2183
"""Reduce the default lock timeout for the duration of the test, so that
2184
if LockContention occurs during a test, it does so quickly.
2186
Tests that expect to provoke LockContention errors should call this.
2188
self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2190
def make_utf8_encoded_stringio(self, encoding_type=None):
2191
"""Return a StringIOWrapper instance, that will encode Unicode
2194
if encoding_type is None:
2195
encoding_type = 'strict'
2197
output_encoding = 'utf-8'
2198
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2199
sio.encoding = output_encoding
2202
def disable_verb(self, verb):
2203
"""Disable a smart server verb for one test."""
2204
from bzrlib.smart import request
2205
request_handlers = request.request_handlers
2206
orig_method = request_handlers.get(verb)
2207
request_handlers.remove(verb)
2208
self.addCleanup(request_handlers.register, verb, orig_method)
2211
class CapturedCall(object):
2212
"""A helper for capturing smart server calls for easy debug analysis."""
2214
def __init__(self, params, prefix_length):
2215
"""Capture the call with params and skip prefix_length stack frames."""
2218
# The last 5 frames are the __init__, the hook frame, and 3 smart
2219
# client frames. Beyond this we could get more clever, but this is good
2221
stack = traceback.extract_stack()[prefix_length:-5]
2222
self.stack = ''.join(traceback.format_list(stack))
2225
return self.call.method
2228
return self.call.method
2234
class TestCaseWithMemoryTransport(TestCase):
2235
"""Common test class for tests that do not need disk resources.
2237
Tests that need disk resources should derive from TestCaseWithTransport.
2239
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2241
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2242
a directory which does not exist. This serves to help ensure test isolation
2243
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2244
must exist. However, TestCaseWithMemoryTransport does not offer local
2245
file defaults for the transport in tests, nor does it obey the command line
2246
override, so tests that accidentally write to the common directory should
2249
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2250
a .bzr directory that stops us ascending higher into the filesystem.
2256
def __init__(self, methodName='runTest'):
2257
# allow test parameterization after test construction and before test
2258
# execution. Variables that the parameterizer sets need to be
2259
# ones that are not set by setUp, or setUp will trash them.
2260
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2261
self.vfs_transport_factory = default_transport
2262
self.transport_server = None
2263
self.transport_readonly_server = None
2264
self.__vfs_server = None
2266
def get_transport(self, relpath=None):
2267
"""Return a writeable transport.
2269
This transport is for the test scratch space relative to
2272
:param relpath: a path relative to the base url.
2274
t = _mod_transport.get_transport(self.get_url(relpath))
2275
self.assertFalse(t.is_readonly())
2278
def get_readonly_transport(self, relpath=None):
2279
"""Return a readonly transport for the test scratch space
2281
This can be used to test that operations which should only need
2282
readonly access in fact do not try to write.
2284
:param relpath: a path relative to the base url.
2286
t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2287
self.assertTrue(t.is_readonly())
2290
def create_transport_readonly_server(self):
2291
"""Create a transport server from class defined at init.
2293
This is mostly a hook for daughter classes.
2295
return self.transport_readonly_server()
2297
def get_readonly_server(self):
2298
"""Get the server instance for the readonly transport
2300
This is useful for some tests with specific servers to do diagnostics.
2302
if self.__readonly_server is None:
2303
if self.transport_readonly_server is None:
2304
# readonly decorator requested
2305
self.__readonly_server = test_server.ReadonlyServer()
2307
# explicit readonly transport.
2308
self.__readonly_server = self.create_transport_readonly_server()
2309
self.start_server(self.__readonly_server,
2310
self.get_vfs_only_server())
2311
return self.__readonly_server
2313
def get_readonly_url(self, relpath=None):
2314
"""Get a URL for the readonly transport.
2316
This will either be backed by '.' or a decorator to the transport
2317
used by self.get_url()
2318
relpath provides for clients to get a path relative to the base url.
2319
These should only be downwards relative, not upwards.
2321
base = self.get_readonly_server().get_url()
2322
return self._adjust_url(base, relpath)
2324
def get_vfs_only_server(self):
2325
"""Get the vfs only read/write server instance.
2327
This is useful for some tests with specific servers that need
2330
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2331
is no means to override it.
2333
if self.__vfs_server is None:
2334
self.__vfs_server = memory.MemoryServer()
2335
self.start_server(self.__vfs_server)
2336
return self.__vfs_server
2338
def get_server(self):
2339
"""Get the read/write server instance.
2341
This is useful for some tests with specific servers that need
2344
This is built from the self.transport_server factory. If that is None,
2345
then the self.get_vfs_server is returned.
2347
if self.__server is None:
2348
if (self.transport_server is None or self.transport_server is
2349
self.vfs_transport_factory):
2350
self.__server = self.get_vfs_only_server()
2352
# bring up a decorated means of access to the vfs only server.
2353
self.__server = self.transport_server()
2354
self.start_server(self.__server, self.get_vfs_only_server())
2355
return self.__server
2357
def _adjust_url(self, base, relpath):
2358
"""Get a URL (or maybe a path) for the readwrite transport.
2360
This will either be backed by '.' or to an equivalent non-file based
2362
relpath provides for clients to get a path relative to the base url.
2363
These should only be downwards relative, not upwards.
2365
if relpath is not None and relpath != '.':
2366
if not base.endswith('/'):
2368
# XXX: Really base should be a url; we did after all call
2369
# get_url()! But sometimes it's just a path (from
2370
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2371
# to a non-escaped local path.
2372
if base.startswith('./') or base.startswith('/'):
2375
base += urlutils.escape(relpath)
2378
def get_url(self, relpath=None):
2379
"""Get a URL (or maybe a path) for the readwrite transport.
2381
This will either be backed by '.' or to an equivalent non-file based
2383
relpath provides for clients to get a path relative to the base url.
2384
These should only be downwards relative, not upwards.
2386
base = self.get_server().get_url()
2387
return self._adjust_url(base, relpath)
2389
def get_vfs_only_url(self, relpath=None):
2390
"""Get a URL (or maybe a path for the plain old vfs transport.
2392
This will never be a smart protocol. It always has all the
2393
capabilities of the local filesystem, but it might actually be a
2394
MemoryTransport or some other similar virtual filesystem.
2396
This is the backing transport (if any) of the server returned by
2397
get_url and get_readonly_url.
2399
:param relpath: provides for clients to get a path relative to the base
2400
url. These should only be downwards relative, not upwards.
2403
base = self.get_vfs_only_server().get_url()
2404
return self._adjust_url(base, relpath)
2406
def _create_safety_net(self):
2407
"""Make a fake bzr directory.
2409
This prevents any tests propagating up onto the TEST_ROOT directory's
2412
root = TestCaseWithMemoryTransport.TEST_ROOT
2413
bzrdir.BzrDir.create_standalone_workingtree(root)
2415
def _check_safety_net(self):
2416
"""Check that the safety .bzr directory have not been touched.
2418
_make_test_root have created a .bzr directory to prevent tests from
2419
propagating. This method ensures than a test did not leaked.
2421
root = TestCaseWithMemoryTransport.TEST_ROOT
2422
self.permit_url(_mod_transport.get_transport(root).base)
2423
wt = workingtree.WorkingTree.open(root)
2424
last_rev = wt.last_revision()
2425
if last_rev != 'null:':
2426
# The current test have modified the /bzr directory, we need to
2427
# recreate a new one or all the followng tests will fail.
2428
# If you need to inspect its content uncomment the following line
2429
# import pdb; pdb.set_trace()
2430
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2431
self._create_safety_net()
2432
raise AssertionError('%s/.bzr should not be modified' % root)
2434
def _make_test_root(self):
2435
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2436
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2437
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2439
TestCaseWithMemoryTransport.TEST_ROOT = root
2441
self._create_safety_net()
2443
# The same directory is used by all tests, and we're not
2444
# specifically told when all tests are finished. This will do.
2445
atexit.register(_rmtree_temp_dir, root)
2447
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2448
self.addCleanup(self._check_safety_net)
2450
def makeAndChdirToTestDir(self):
2451
"""Create a temporary directories for this one test.
2453
This must set self.test_home_dir and self.test_dir and chdir to
2456
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2458
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2459
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2460
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2461
self.permit_dir(self.test_dir)
2463
def make_branch(self, relpath, format=None):
2464
"""Create a branch on the transport at relpath."""
2465
repo = self.make_repository(relpath, format=format)
2466
return repo.bzrdir.create_branch()
2468
def make_bzrdir(self, relpath, format=None):
2470
# might be a relative or absolute path
2471
maybe_a_url = self.get_url(relpath)
2472
segments = maybe_a_url.rsplit('/', 1)
2473
t = _mod_transport.get_transport(maybe_a_url)
2474
if len(segments) > 1 and segments[-1] not in ('', '.'):
2478
if isinstance(format, basestring):
2479
format = bzrdir.format_registry.make_bzrdir(format)
2480
return format.initialize_on_transport(t)
2481
except errors.UninitializableFormat:
2482
raise TestSkipped("Format %s is not initializable." % format)
2484
def make_repository(self, relpath, shared=False, format=None):
2485
"""Create a repository on our default transport at relpath.
2487
Note that relpath must be a relative path, not a full url.
2489
# FIXME: If you create a remoterepository this returns the underlying
2490
# real format, which is incorrect. Actually we should make sure that
2491
# RemoteBzrDir returns a RemoteRepository.
2492
# maybe mbp 20070410
2493
made_control = self.make_bzrdir(relpath, format=format)
2494
return made_control.create_repository(shared=shared)
2496
def make_smart_server(self, path, backing_server=None):
2497
if backing_server is None:
2498
backing_server = self.get_server()
2499
smart_server = test_server.SmartTCPServer_for_testing()
2500
self.start_server(smart_server, backing_server)
2501
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2503
return remote_transport
2505
def make_branch_and_memory_tree(self, relpath, format=None):
2506
"""Create a branch on the default transport and a MemoryTree for it."""
2507
b = self.make_branch(relpath, format=format)
2508
return memorytree.MemoryTree.create_on_branch(b)
2510
def make_branch_builder(self, relpath, format=None):
2511
branch = self.make_branch(relpath, format=format)
2512
return branchbuilder.BranchBuilder(branch=branch)
2514
def overrideEnvironmentForTesting(self):
2515
test_home_dir = self.test_home_dir
2516
if isinstance(test_home_dir, unicode):
2517
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2518
self.overrideEnv('HOME', test_home_dir)
2519
self.overrideEnv('BZR_HOME', test_home_dir)
2522
super(TestCaseWithMemoryTransport, self).setUp()
2523
# Ensure that ConnectedTransport doesn't leak sockets
2524
def get_transport_with_cleanup(*args, **kwargs):
2525
t = orig_get_transport(*args, **kwargs)
2526
if isinstance(t, _mod_transport.ConnectedTransport):
2527
self.addCleanup(t.disconnect)
2530
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2531
get_transport_with_cleanup)
2532
self._make_test_root()
2533
self.addCleanup(os.chdir, os.getcwdu())
2534
self.makeAndChdirToTestDir()
2535
self.overrideEnvironmentForTesting()
2536
self.__readonly_server = None
2537
self.__server = None
2538
self.reduceLockdirTimeout()
2540
def setup_smart_server_with_call_log(self):
2541
"""Sets up a smart server as the transport server with a call log."""
2542
self.transport_server = test_server.SmartTCPServer_for_testing
2543
self.hpss_calls = []
2545
# Skip the current stack down to the caller of
2546
# setup_smart_server_with_call_log
2547
prefix_length = len(traceback.extract_stack()) - 2
2548
def capture_hpss_call(params):
2549
self.hpss_calls.append(
2550
CapturedCall(params, prefix_length))
2551
client._SmartClient.hooks.install_named_hook(
2552
'call', capture_hpss_call, None)
2554
def reset_smart_call_log(self):
2555
self.hpss_calls = []
2558
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2559
"""Derived class that runs a test within a temporary directory.
2561
This is useful for tests that need to create a branch, etc.
2563
The directory is created in a slightly complex way: for each
2564
Python invocation, a new temporary top-level directory is created.
2565
All test cases create their own directory within that. If the
2566
tests complete successfully, the directory is removed.
2568
:ivar test_base_dir: The path of the top-level directory for this
2569
test, which contains a home directory and a work directory.
2571
:ivar test_home_dir: An initially empty directory under test_base_dir
2572
which is used as $HOME for this test.
2574
:ivar test_dir: A directory under test_base_dir used as the current
2575
directory when the test proper is run.
2578
OVERRIDE_PYTHON = 'python'
2580
def check_file_contents(self, filename, expect):
2581
self.log("check contents of file %s" % filename)
2587
if contents != expect:
2588
self.log("expected: %r" % expect)
2589
self.log("actually: %r" % contents)
2590
self.fail("contents of %s not as expected" % filename)
2592
def _getTestDirPrefix(self):
2593
# create a directory within the top level test directory
2594
if sys.platform in ('win32', 'cygwin'):
2595
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2596
# windows is likely to have path-length limits so use a short name
2597
name_prefix = name_prefix[-30:]
2599
name_prefix = re.sub('[/]', '_', self.id())
2602
def makeAndChdirToTestDir(self):
2603
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2605
For TestCaseInTempDir we create a temporary directory based on the test
2606
name and then create two subdirs - test and home under it.
2608
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2609
self._getTestDirPrefix())
2611
for i in range(100):
2612
if os.path.exists(name):
2613
name = name_prefix + '_' + str(i)
2615
# now create test and home directories within this dir
2616
self.test_base_dir = name
2617
self.addCleanup(self.deleteTestDir)
2618
os.mkdir(self.test_base_dir)
2620
self.permit_dir(self.test_base_dir)
2621
# 'sprouting' and 'init' of a branch both walk up the tree to find
2622
# stacking policy to honour; create a bzr dir with an unshared
2623
# repository (but not a branch - our code would be trying to escape
2624
# then!) to stop them, and permit it to be read.
2625
# control = bzrdir.BzrDir.create(self.test_base_dir)
2626
# control.create_repository()
2627
self.test_home_dir = self.test_base_dir + '/home'
2628
os.mkdir(self.test_home_dir)
2629
self.test_dir = self.test_base_dir + '/work'
2630
os.mkdir(self.test_dir)
2631
os.chdir(self.test_dir)
2632
# put name of test inside
2633
f = file(self.test_base_dir + '/name', 'w')
2639
def deleteTestDir(self):
2640
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2641
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2643
def build_tree(self, shape, line_endings='binary', transport=None):
2644
"""Build a test tree according to a pattern.
2646
shape is a sequence of file specifications. If the final
2647
character is '/', a directory is created.
2649
This assumes that all the elements in the tree being built are new.
2651
This doesn't add anything to a branch.
2653
:type shape: list or tuple.
2654
:param line_endings: Either 'binary' or 'native'
2655
in binary mode, exact contents are written in native mode, the
2656
line endings match the default platform endings.
2657
:param transport: A transport to write to, for building trees on VFS's.
2658
If the transport is readonly or None, "." is opened automatically.
2661
if type(shape) not in (list, tuple):
2662
raise AssertionError("Parameter 'shape' should be "
2663
"a list or a tuple. Got %r instead" % (shape,))
2664
# It's OK to just create them using forward slashes on windows.
2665
if transport is None or transport.is_readonly():
2666
transport = _mod_transport.get_transport(".")
2668
self.assertIsInstance(name, basestring)
2670
transport.mkdir(urlutils.escape(name[:-1]))
2672
if line_endings == 'binary':
2674
elif line_endings == 'native':
2677
raise errors.BzrError(
2678
'Invalid line ending request %r' % line_endings)
2679
content = "contents of %s%s" % (name.encode('utf-8'), end)
2680
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2682
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2684
def assertInWorkingTree(self, path, root_path='.', tree=None):
2685
"""Assert whether path or paths are in the WorkingTree"""
2687
tree = workingtree.WorkingTree.open(root_path)
2688
if not isinstance(path, basestring):
2690
self.assertInWorkingTree(p, tree=tree)
2692
self.assertIsNot(tree.path2id(path), None,
2693
path+' not in working tree.')
2695
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2696
"""Assert whether path or paths are not in the WorkingTree"""
2698
tree = workingtree.WorkingTree.open(root_path)
2699
if not isinstance(path, basestring):
2701
self.assertNotInWorkingTree(p,tree=tree)
2703
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2706
class TestCaseWithTransport(TestCaseInTempDir):
2707
"""A test case that provides get_url and get_readonly_url facilities.
2709
These back onto two transport servers, one for readonly access and one for
2712
If no explicit class is provided for readonly access, a
2713
ReadonlyTransportDecorator is used instead which allows the use of non disk
2714
based read write transports.
2716
If an explicit class is provided for readonly access, that server and the
2717
readwrite one must both define get_url() as resolving to os.getcwd().
2720
def get_vfs_only_server(self):
2721
"""See TestCaseWithMemoryTransport.
2723
This is useful for some tests with specific servers that need
2726
if self.__vfs_server is None:
2727
self.__vfs_server = self.vfs_transport_factory()
2728
self.start_server(self.__vfs_server)
2729
return self.__vfs_server
2731
def make_branch_and_tree(self, relpath, format=None):
2732
"""Create a branch on the transport and a tree locally.
2734
If the transport is not a LocalTransport, the Tree can't be created on
2735
the transport. In that case if the vfs_transport_factory is
2736
LocalURLServer the working tree is created in the local
2737
directory backing the transport, and the returned tree's branch and
2738
repository will also be accessed locally. Otherwise a lightweight
2739
checkout is created and returned.
2741
We do this because we can't physically create a tree in the local
2742
path, with a branch reference to the transport_factory url, and
2743
a branch + repository in the vfs_transport, unless the vfs_transport
2744
namespace is distinct from the local disk - the two branch objects
2745
would collide. While we could construct a tree with its branch object
2746
pointing at the transport_factory transport in memory, reopening it
2747
would behaving unexpectedly, and has in the past caused testing bugs
2748
when we tried to do it that way.
2750
:param format: The BzrDirFormat.
2751
:returns: the WorkingTree.
2753
# TODO: always use the local disk path for the working tree,
2754
# this obviously requires a format that supports branch references
2755
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2757
b = self.make_branch(relpath, format=format)
2759
return b.bzrdir.create_workingtree()
2760
except errors.NotLocalUrl:
2761
# We can only make working trees locally at the moment. If the
2762
# transport can't support them, then we keep the non-disk-backed
2763
# branch and create a local checkout.
2764
if self.vfs_transport_factory is test_server.LocalURLServer:
2765
# the branch is colocated on disk, we cannot create a checkout.
2766
# hopefully callers will expect this.
2767
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2768
wt = local_controldir.create_workingtree()
2769
if wt.branch._format != b._format:
2771
# Make sure that assigning to wt._branch fixes wt.branch,
2772
# in case the implementation details of workingtree objects
2774
self.assertIs(b, wt.branch)
2777
return b.create_checkout(relpath, lightweight=True)
2779
def assertIsDirectory(self, relpath, transport):
2780
"""Assert that relpath within transport is a directory.
2782
This may not be possible on all transports; in that case it propagates
2783
a TransportNotPossible.
2786
mode = transport.stat(relpath).st_mode
2787
except errors.NoSuchFile:
2788
self.fail("path %s is not a directory; no such file"
2790
if not stat.S_ISDIR(mode):
2791
self.fail("path %s is not a directory; has mode %#o"
2794
def assertTreesEqual(self, left, right):
2795
"""Check that left and right have the same content and properties."""
2796
# we use a tree delta to check for equality of the content, and we
2797
# manually check for equality of other things such as the parents list.
2798
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2799
differences = left.changes_from(right)
2800
self.assertFalse(differences.has_changed(),
2801
"Trees %r and %r are different: %r" % (left, right, differences))
2804
super(TestCaseWithTransport, self).setUp()
2805
self.__vfs_server = None
2807
def disable_missing_extensions_warning(self):
2808
"""Some tests expect a precise stderr content.
2810
There is no point in forcing them to duplicate the extension related
2813
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2816
class ChrootedTestCase(TestCaseWithTransport):
2817
"""A support class that provides readonly urls outside the local namespace.
2819
This is done by checking if self.transport_server is a MemoryServer. if it
2820
is then we are chrooted already, if it is not then an HttpServer is used
2823
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2824
be used without needed to redo it when a different
2825
subclass is in use ?
2829
from bzrlib.tests import http_server
2830
super(ChrootedTestCase, self).setUp()
2831
if not self.vfs_transport_factory == memory.MemoryServer:
2832
self.transport_readonly_server = http_server.HttpServer
2835
def condition_id_re(pattern):
2836
"""Create a condition filter which performs a re check on a test's id.
2838
:param pattern: A regular expression string.
2839
:return: A callable that returns True if the re matches.
2841
filter_re = re.compile(pattern, 0)
2842
def condition(test):
2844
return filter_re.search(test_id)
2848
def condition_isinstance(klass_or_klass_list):
2849
"""Create a condition filter which returns isinstance(param, klass).
2851
:return: A callable which when called with one parameter obj return the
2852
result of isinstance(obj, klass_or_klass_list).
2855
return isinstance(obj, klass_or_klass_list)
2859
def condition_id_in_list(id_list):
2860
"""Create a condition filter which verify that test's id in a list.
2862
:param id_list: A TestIdList object.
2863
:return: A callable that returns True if the test's id appears in the list.
2865
def condition(test):
2866
return id_list.includes(test.id())
2870
def condition_id_startswith(starts):
2871
"""Create a condition filter verifying that test's id starts with a string.
2873
:param starts: A list of string.
2874
:return: A callable that returns True if the test's id starts with one of
2877
def condition(test):
2878
for start in starts:
2879
if test.id().startswith(start):
2885
def exclude_tests_by_condition(suite, condition):
2886
"""Create a test suite which excludes some tests from suite.
2888
:param suite: The suite to get tests from.
2889
:param condition: A callable whose result evaluates True when called with a
2890
test case which should be excluded from the result.
2891
:return: A suite which contains the tests found in suite that fail
2895
for test in iter_suite_tests(suite):
2896
if not condition(test):
2898
return TestUtil.TestSuite(result)
2901
def filter_suite_by_condition(suite, condition):
2902
"""Create a test suite by filtering another one.
2904
:param suite: The source suite.
2905
:param condition: A callable whose result evaluates True when called with a
2906
test case which should be included in the result.
2907
:return: A suite which contains the tests found in suite that pass
2911
for test in iter_suite_tests(suite):
2914
return TestUtil.TestSuite(result)
2917
def filter_suite_by_re(suite, pattern):
2918
"""Create a test suite by filtering another one.
2920
:param suite: the source suite
2921
:param pattern: pattern that names must match
2922
:returns: the newly created suite
2924
condition = condition_id_re(pattern)
2925
result_suite = filter_suite_by_condition(suite, condition)
2929
def filter_suite_by_id_list(suite, test_id_list):
2930
"""Create a test suite by filtering another one.
2932
:param suite: The source suite.
2933
:param test_id_list: A list of the test ids to keep as strings.
2934
:returns: the newly created suite
2936
condition = condition_id_in_list(test_id_list)
2937
result_suite = filter_suite_by_condition(suite, condition)
2941
def filter_suite_by_id_startswith(suite, start):
2942
"""Create a test suite by filtering another one.
2944
:param suite: The source suite.
2945
:param start: A list of string the test id must start with one of.
2946
:returns: the newly created suite
2948
condition = condition_id_startswith(start)
2949
result_suite = filter_suite_by_condition(suite, condition)
2953
def exclude_tests_by_re(suite, pattern):
2954
"""Create a test suite which excludes some tests from suite.
2956
:param suite: The suite to get tests from.
2957
:param pattern: A regular expression string. Test ids that match this
2958
pattern will be excluded from the result.
2959
:return: A TestSuite that contains all the tests from suite without the
2960
tests that matched pattern. The order of tests is the same as it was in
2963
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2966
def preserve_input(something):
2967
"""A helper for performing test suite transformation chains.
2969
:param something: Anything you want to preserve.
2975
def randomize_suite(suite):
2976
"""Return a new TestSuite with suite's tests in random order.
2978
The tests in the input suite are flattened into a single suite in order to
2979
accomplish this. Any nested TestSuites are removed to provide global
2982
tests = list(iter_suite_tests(suite))
2983
random.shuffle(tests)
2984
return TestUtil.TestSuite(tests)
2987
def split_suite_by_condition(suite, condition):
2988
"""Split a test suite into two by a condition.
2990
:param suite: The suite to split.
2991
:param condition: The condition to match on. Tests that match this
2992
condition are returned in the first test suite, ones that do not match
2993
are in the second suite.
2994
:return: A tuple of two test suites, where the first contains tests from
2995
suite matching the condition, and the second contains the remainder
2996
from suite. The order within each output suite is the same as it was in
3001
for test in iter_suite_tests(suite):
3003
matched.append(test)
3005
did_not_match.append(test)
3006
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3009
def split_suite_by_re(suite, pattern):
3010
"""Split a test suite into two by a regular expression.
3012
:param suite: The suite to split.
3013
:param pattern: A regular expression string. Test ids that match this
3014
pattern will be in the first test suite returned, and the others in the
3015
second test suite returned.
3016
:return: A tuple of two test suites, where the first contains tests from
3017
suite matching pattern, and the second contains the remainder from
3018
suite. The order within each output suite is the same as it was in
3021
return split_suite_by_condition(suite, condition_id_re(pattern))
3024
def run_suite(suite, name='test', verbose=False, pattern=".*",
3025
stop_on_failure=False,
3026
transport=None, lsprof_timed=None, bench_history=None,
3027
matching_tests_first=None,
3030
exclude_pattern=None,
3033
suite_decorators=None,
3035
result_decorators=None,
3037
"""Run a test suite for bzr selftest.
3039
:param runner_class: The class of runner to use. Must support the
3040
constructor arguments passed by run_suite which are more than standard
3042
:return: A boolean indicating success.
3044
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
3049
if runner_class is None:
3050
runner_class = TextTestRunner
3053
runner = runner_class(stream=stream,
3055
verbosity=verbosity,
3056
bench_history=bench_history,
3058
result_decorators=result_decorators,
3060
runner.stop_on_failure=stop_on_failure
3061
# built in decorator factories:
3063
random_order(random_seed, runner),
3064
exclude_tests(exclude_pattern),
3066
if matching_tests_first:
3067
decorators.append(tests_first(pattern))
3069
decorators.append(filter_tests(pattern))
3070
if suite_decorators:
3071
decorators.extend(suite_decorators)
3072
# tell the result object how many tests will be running: (except if
3073
# --parallel=fork is being used. Robert said he will provide a better
3074
# progress design later -- vila 20090817)
3075
if fork_decorator not in decorators:
3076
decorators.append(CountingDecorator)
3077
for decorator in decorators:
3078
suite = decorator(suite)
3080
# Done after test suite decoration to allow randomisation etc
3081
# to take effect, though that is of marginal benefit.
3083
stream.write("Listing tests only ...\n")
3084
for t in iter_suite_tests(suite):
3085
stream.write("%s\n" % (t.id()))
3087
result = runner.run(suite)
3089
return result.wasStrictlySuccessful()
3091
return result.wasSuccessful()
3094
# A registry where get() returns a suite decorator.
3095
parallel_registry = registry.Registry()
3098
def fork_decorator(suite):
3099
if getattr(os, "fork", None) is None:
3100
raise errors.BzrCommandError("platform does not support fork,"
3101
" try --parallel=subprocess instead.")
3102
concurrency = osutils.local_concurrency()
3103
if concurrency == 1:
3105
from testtools import ConcurrentTestSuite
3106
return ConcurrentTestSuite(suite, fork_for_tests)
3107
parallel_registry.register('fork', fork_decorator)
3110
def subprocess_decorator(suite):
3111
concurrency = osutils.local_concurrency()
3112
if concurrency == 1:
3114
from testtools import ConcurrentTestSuite
3115
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3116
parallel_registry.register('subprocess', subprocess_decorator)
3119
def exclude_tests(exclude_pattern):
3120
"""Return a test suite decorator that excludes tests."""
3121
if exclude_pattern is None:
3122
return identity_decorator
3123
def decorator(suite):
3124
return ExcludeDecorator(suite, exclude_pattern)
3128
def filter_tests(pattern):
3130
return identity_decorator
3131
def decorator(suite):
3132
return FilterTestsDecorator(suite, pattern)
3136
def random_order(random_seed, runner):
3137
"""Return a test suite decorator factory for randomising tests order.
3139
:param random_seed: now, a string which casts to a long, or a long.
3140
:param runner: A test runner with a stream attribute to report on.
3142
if random_seed is None:
3143
return identity_decorator
3144
def decorator(suite):
3145
return RandomDecorator(suite, random_seed, runner.stream)
3149
def tests_first(pattern):
3151
return identity_decorator
3152
def decorator(suite):
3153
return TestFirstDecorator(suite, pattern)
3157
def identity_decorator(suite):
3162
class TestDecorator(TestUtil.TestSuite):
3163
"""A decorator for TestCase/TestSuite objects.
3165
Usually, subclasses should override __iter__(used when flattening test
3166
suites), which we do to filter, reorder, parallelise and so on, run() and
3170
def __init__(self, suite):
3171
TestUtil.TestSuite.__init__(self)
3174
def countTestCases(self):
3177
cases += test.countTestCases()
3184
def run(self, result):
3185
# Use iteration on self, not self._tests, to allow subclasses to hook
3188
if result.shouldStop:
3194
class CountingDecorator(TestDecorator):
3195
"""A decorator which calls result.progress(self.countTestCases)."""
3197
def run(self, result):
3198
progress_method = getattr(result, 'progress', None)
3199
if callable(progress_method):
3200
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3201
return super(CountingDecorator, self).run(result)
3204
class ExcludeDecorator(TestDecorator):
3205
"""A decorator which excludes test matching an exclude pattern."""
3207
def __init__(self, suite, exclude_pattern):
3208
TestDecorator.__init__(self, suite)
3209
self.exclude_pattern = exclude_pattern
3210
self.excluded = False
3214
return iter(self._tests)
3215
self.excluded = True
3216
suite = exclude_tests_by_re(self, self.exclude_pattern)
3218
self.addTests(suite)
3219
return iter(self._tests)
3222
class FilterTestsDecorator(TestDecorator):
3223
"""A decorator which filters tests to those matching a pattern."""
3225
def __init__(self, suite, pattern):
3226
TestDecorator.__init__(self, suite)
3227
self.pattern = pattern
3228
self.filtered = False
3232
return iter(self._tests)
3233
self.filtered = True
3234
suite = filter_suite_by_re(self, self.pattern)
3236
self.addTests(suite)
3237
return iter(self._tests)
3240
class RandomDecorator(TestDecorator):
3241
"""A decorator which randomises the order of its tests."""
3243
def __init__(self, suite, random_seed, stream):
3244
TestDecorator.__init__(self, suite)
3245
self.random_seed = random_seed
3246
self.randomised = False
3247
self.stream = stream
3251
return iter(self._tests)
3252
self.randomised = True
3253
self.stream.write("Randomizing test order using seed %s\n\n" %
3254
(self.actual_seed()))
3255
# Initialise the random number generator.
3256
random.seed(self.actual_seed())
3257
suite = randomize_suite(self)
3259
self.addTests(suite)
3260
return iter(self._tests)
3262
def actual_seed(self):
3263
if self.random_seed == "now":
3264
# We convert the seed to a long to make it reuseable across
3265
# invocations (because the user can reenter it).
3266
self.random_seed = long(time.time())
3268
# Convert the seed to a long if we can
3270
self.random_seed = long(self.random_seed)
3273
return self.random_seed
3276
class TestFirstDecorator(TestDecorator):
3277
"""A decorator which moves named tests to the front."""
3279
def __init__(self, suite, pattern):
3280
TestDecorator.__init__(self, suite)
3281
self.pattern = pattern
3282
self.filtered = False
3286
return iter(self._tests)
3287
self.filtered = True
3288
suites = split_suite_by_re(self, self.pattern)
3290
self.addTests(suites)
3291
return iter(self._tests)
3294
def partition_tests(suite, count):
3295
"""Partition suite into count lists of tests."""
3296
# This just assigns tests in a round-robin fashion. On one hand this
3297
# splits up blocks of related tests that might run faster if they shared
3298
# resources, but on the other it avoids assigning blocks of slow tests to
3299
# just one partition. So the slowest partition shouldn't be much slower
3301
partitions = [list() for i in range(count)]
3302
tests = iter_suite_tests(suite)
3303
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3304
partition.append(test)
3308
def workaround_zealous_crypto_random():
3309
"""Crypto.Random want to help us being secure, but we don't care here.
3311
This workaround some test failure related to the sftp server. Once paramiko
3312
stop using the controversial API in Crypto.Random, we may get rid of it.
3315
from Crypto.Random import atfork
3321
def fork_for_tests(suite):
3322
"""Take suite and start up one runner per CPU by forking()
3324
:return: An iterable of TestCase-like objects which can each have
3325
run(result) called on them to feed tests to result.
3327
concurrency = osutils.local_concurrency()
3329
from subunit import TestProtocolClient, ProtocolTestCase
3330
from subunit.test_results import AutoTimingTestResultDecorator
3331
class TestInOtherProcess(ProtocolTestCase):
3332
# Should be in subunit, I think. RBC.
3333
def __init__(self, stream, pid):
3334
ProtocolTestCase.__init__(self, stream)
3337
def run(self, result):
3339
ProtocolTestCase.run(self, result)
3341
os.waitpid(self.pid, 0)
3343
test_blocks = partition_tests(suite, concurrency)
3344
for process_tests in test_blocks:
3345
process_suite = TestUtil.TestSuite()
3346
process_suite.addTests(process_tests)
3347
c2pread, c2pwrite = os.pipe()
3350
workaround_zealous_crypto_random()
3353
# Leave stderr and stdout open so we can see test noise
3354
# Close stdin so that the child goes away if it decides to
3355
# read from stdin (otherwise its a roulette to see what
3356
# child actually gets keystrokes for pdb etc).
3359
stream = os.fdopen(c2pwrite, 'wb', 1)
3360
subunit_result = AutoTimingTestResultDecorator(
3361
TestProtocolClient(stream))
3362
process_suite.run(subunit_result)
3367
stream = os.fdopen(c2pread, 'rb', 1)
3368
test = TestInOtherProcess(stream, pid)
3373
def reinvoke_for_tests(suite):
3374
"""Take suite and start up one runner per CPU using subprocess().
3376
:return: An iterable of TestCase-like objects which can each have
3377
run(result) called on them to feed tests to result.
3379
concurrency = osutils.local_concurrency()
3381
from subunit import ProtocolTestCase
3382
class TestInSubprocess(ProtocolTestCase):
3383
def __init__(self, process, name):
3384
ProtocolTestCase.__init__(self, process.stdout)
3385
self.process = process
3386
self.process.stdin.close()
3389
def run(self, result):
3391
ProtocolTestCase.run(self, result)
3394
os.unlink(self.name)
3395
# print "pid %d finished" % finished_process
3396
test_blocks = partition_tests(suite, concurrency)
3397
for process_tests in test_blocks:
3398
# ugly; currently reimplement rather than reuses TestCase methods.
3399
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3400
if not os.path.isfile(bzr_path):
3401
# We are probably installed. Assume sys.argv is the right file
3402
bzr_path = sys.argv[0]
3403
bzr_path = [bzr_path]
3404
if sys.platform == "win32":
3405
# if we're on windows, we can't execute the bzr script directly
3406
bzr_path = [sys.executable] + bzr_path
3407
fd, test_list_file_name = tempfile.mkstemp()
3408
test_list_file = os.fdopen(fd, 'wb', 1)
3409
for test in process_tests:
3410
test_list_file.write(test.id() + '\n')
3411
test_list_file.close()
3413
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3415
if '--no-plugins' in sys.argv:
3416
argv.append('--no-plugins')
3417
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3418
# noise on stderr it can interrupt the subunit protocol.
3419
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3420
stdout=subprocess.PIPE,
3421
stderr=subprocess.PIPE,
3423
test = TestInSubprocess(process, test_list_file_name)
3426
os.unlink(test_list_file_name)
3431
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3432
"""Generate profiling data for all activity between start and success.
3434
The profile data is appended to the test's _benchcalls attribute and can
3435
be accessed by the forwarded-to TestResult.
3437
While it might be cleaner do accumulate this in stopTest, addSuccess is
3438
where our existing output support for lsprof is, and this class aims to
3439
fit in with that: while it could be moved it's not necessary to accomplish
3440
test profiling, nor would it be dramatically cleaner.
3443
def startTest(self, test):
3444
self.profiler = bzrlib.lsprof.BzrProfiler()
3445
# Prevent deadlocks in tests that use lsprof: those tests will
3447
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3448
self.profiler.start()
3449
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3451
def addSuccess(self, test):
3452
stats = self.profiler.stop()
3454
calls = test._benchcalls
3455
except AttributeError:
3456
test._benchcalls = []
3457
calls = test._benchcalls
3458
calls.append(((test.id(), "", ""), stats))
3459
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3461
def stopTest(self, test):
3462
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3463
self.profiler = None
3466
# Controlled by "bzr selftest -E=..." option
3467
# Currently supported:
3468
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3469
# preserves any flags supplied at the command line.
3470
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3471
# rather than failing tests. And no longer raise
3472
# LockContention when fctnl locks are not being used
3473
# with proper exclusion rules.
3474
# -Ethreads Will display thread ident at creation/join time to
3475
# help track thread leaks
3476
selftest_debug_flags = set()
3479
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3481
test_suite_factory=None,
3484
matching_tests_first=None,
3487
exclude_pattern=None,
3493
suite_decorators=None,
3497
"""Run the whole test suite under the enhanced runner"""
3498
# XXX: Very ugly way to do this...
3499
# Disable warning about old formats because we don't want it to disturb
3500
# any blackbox tests.
3501
from bzrlib import repository
3502
repository._deprecation_warning_done = True
3504
global default_transport
3505
if transport is None:
3506
transport = default_transport
3507
old_transport = default_transport
3508
default_transport = transport
3509
global selftest_debug_flags
3510
old_debug_flags = selftest_debug_flags
3511
if debug_flags is not None:
3512
selftest_debug_flags = set(debug_flags)
3514
if load_list is None:
3517
keep_only = load_test_id_list(load_list)
3519
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3520
for start in starting_with]
3521
if test_suite_factory is None:
3522
# Reduce loading time by loading modules based on the starting_with
3524
suite = test_suite(keep_only, starting_with)
3526
suite = test_suite_factory()
3528
# But always filter as requested.
3529
suite = filter_suite_by_id_startswith(suite, starting_with)
3530
result_decorators = []
3532
result_decorators.append(ProfileResult)
3533
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3534
stop_on_failure=stop_on_failure,
3535
transport=transport,
3536
lsprof_timed=lsprof_timed,
3537
bench_history=bench_history,
3538
matching_tests_first=matching_tests_first,
3539
list_only=list_only,
3540
random_seed=random_seed,
3541
exclude_pattern=exclude_pattern,
3543
runner_class=runner_class,
3544
suite_decorators=suite_decorators,
3546
result_decorators=result_decorators,
3549
default_transport = old_transport
3550
selftest_debug_flags = old_debug_flags
3553
def load_test_id_list(file_name):
3554
"""Load a test id list from a text file.
3556
The format is one test id by line. No special care is taken to impose
3557
strict rules, these test ids are used to filter the test suite so a test id
3558
that do not match an existing test will do no harm. This allows user to add
3559
comments, leave blank lines, etc.
3563
ftest = open(file_name, 'rt')
3565
if e.errno != errno.ENOENT:
3568
raise errors.NoSuchFile(file_name)
3570
for test_name in ftest.readlines():
3571
test_list.append(test_name.strip())
3576
def suite_matches_id_list(test_suite, id_list):
3577
"""Warns about tests not appearing or appearing more than once.
3579
:param test_suite: A TestSuite object.
3580
:param test_id_list: The list of test ids that should be found in
3583
:return: (absents, duplicates) absents is a list containing the test found
3584
in id_list but not in test_suite, duplicates is a list containing the
3585
test found multiple times in test_suite.
3587
When using a prefined test id list, it may occurs that some tests do not
3588
exist anymore or that some tests use the same id. This function warns the
3589
tester about potential problems in his workflow (test lists are volatile)
3590
or in the test suite itself (using the same id for several tests does not
3591
help to localize defects).
3593
# Build a dict counting id occurrences
3595
for test in iter_suite_tests(test_suite):
3597
tests[id] = tests.get(id, 0) + 1
3602
occurs = tests.get(id, 0)
3604
not_found.append(id)
3606
duplicates.append(id)
3608
return not_found, duplicates
3611
class TestIdList(object):
3612
"""Test id list to filter a test suite.
3614
Relying on the assumption that test ids are built as:
3615
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3616
notation, this class offers methods to :
3617
- avoid building a test suite for modules not refered to in the test list,
3618
- keep only the tests listed from the module test suite.
3621
def __init__(self, test_id_list):
3622
# When a test suite needs to be filtered against us we compare test ids
3623
# for equality, so a simple dict offers a quick and simple solution.
3624
self.tests = dict().fromkeys(test_id_list, True)
3626
# While unittest.TestCase have ids like:
3627
# <module>.<class>.<method>[(<param+)],
3628
# doctest.DocTestCase can have ids like:
3631
# <module>.<function>
3632
# <module>.<class>.<method>
3634
# Since we can't predict a test class from its name only, we settle on
3635
# a simple constraint: a test id always begins with its module name.
3638
for test_id in test_id_list:
3639
parts = test_id.split('.')
3640
mod_name = parts.pop(0)
3641
modules[mod_name] = True
3643
mod_name += '.' + part
3644
modules[mod_name] = True
3645
self.modules = modules
3647
def refers_to(self, module_name):
3648
"""Is there tests for the module or one of its sub modules."""
3649
return self.modules.has_key(module_name)
3651
def includes(self, test_id):
3652
return self.tests.has_key(test_id)
3655
class TestPrefixAliasRegistry(registry.Registry):
3656
"""A registry for test prefix aliases.
3658
This helps implement shorcuts for the --starting-with selftest
3659
option. Overriding existing prefixes is not allowed but not fatal (a
3660
warning will be emitted).
3663
def register(self, key, obj, help=None, info=None,
3664
override_existing=False):
3665
"""See Registry.register.
3667
Trying to override an existing alias causes a warning to be emitted,
3668
not a fatal execption.
3671
super(TestPrefixAliasRegistry, self).register(
3672
key, obj, help=help, info=info, override_existing=False)
3674
actual = self.get(key)
3676
'Test prefix alias %s is already used for %s, ignoring %s'
3677
% (key, actual, obj))
3679
def resolve_alias(self, id_start):
3680
"""Replace the alias by the prefix in the given string.
3682
Using an unknown prefix is an error to help catching typos.
3684
parts = id_start.split('.')
3686
parts[0] = self.get(parts[0])
3688
raise errors.BzrCommandError(
3689
'%s is not a known test prefix alias' % parts[0])
3690
return '.'.join(parts)
3693
test_prefix_alias_registry = TestPrefixAliasRegistry()
3694
"""Registry of test prefix aliases."""
3697
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3698
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3699
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3701
# Obvious highest levels prefixes, feel free to add your own via a plugin
3702
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3703
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3704
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3705
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3706
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3709
def _test_suite_testmod_names():
3710
"""Return the standard list of test module names to test."""
3713
'bzrlib.tests.blackbox',
3714
'bzrlib.tests.commands',
3715
'bzrlib.tests.doc_generate',
3716
'bzrlib.tests.per_branch',
3717
'bzrlib.tests.per_bzrdir',
3718
'bzrlib.tests.per_controldir',
3719
'bzrlib.tests.per_controldir_colo',
3720
'bzrlib.tests.per_foreign_vcs',
3721
'bzrlib.tests.per_interrepository',
3722
'bzrlib.tests.per_intertree',
3723
'bzrlib.tests.per_inventory',
3724
'bzrlib.tests.per_interbranch',
3725
'bzrlib.tests.per_lock',
3726
'bzrlib.tests.per_merger',
3727
'bzrlib.tests.per_transport',
3728
'bzrlib.tests.per_tree',
3729
'bzrlib.tests.per_pack_repository',
3730
'bzrlib.tests.per_repository',
3731
'bzrlib.tests.per_repository_chk',
3732
'bzrlib.tests.per_repository_reference',
3733
'bzrlib.tests.per_repository_vf',
3734
'bzrlib.tests.per_uifactory',
3735
'bzrlib.tests.per_versionedfile',
3736
'bzrlib.tests.per_workingtree',
3737
'bzrlib.tests.test__annotator',
3738
'bzrlib.tests.test__bencode',
3739
'bzrlib.tests.test__btree_serializer',
3740
'bzrlib.tests.test__chk_map',
3741
'bzrlib.tests.test__dirstate_helpers',
3742
'bzrlib.tests.test__groupcompress',
3743
'bzrlib.tests.test__known_graph',
3744
'bzrlib.tests.test__rio',
3745
'bzrlib.tests.test__simple_set',
3746
'bzrlib.tests.test__static_tuple',
3747
'bzrlib.tests.test__walkdirs_win32',
3748
'bzrlib.tests.test_ancestry',
3749
'bzrlib.tests.test_annotate',
3750
'bzrlib.tests.test_api',
3751
'bzrlib.tests.test_atomicfile',
3752
'bzrlib.tests.test_bad_files',
3753
'bzrlib.tests.test_bisect_multi',
3754
'bzrlib.tests.test_branch',
3755
'bzrlib.tests.test_branchbuilder',
3756
'bzrlib.tests.test_btree_index',
3757
'bzrlib.tests.test_bugtracker',
3758
'bzrlib.tests.test_bundle',
3759
'bzrlib.tests.test_bzrdir',
3760
'bzrlib.tests.test__chunks_to_lines',
3761
'bzrlib.tests.test_cache_utf8',
3762
'bzrlib.tests.test_chk_map',
3763
'bzrlib.tests.test_chk_serializer',
3764
'bzrlib.tests.test_chunk_writer',
3765
'bzrlib.tests.test_clean_tree',
3766
'bzrlib.tests.test_cleanup',
3767
'bzrlib.tests.test_cmdline',
3768
'bzrlib.tests.test_commands',
3769
'bzrlib.tests.test_commit',
3770
'bzrlib.tests.test_commit_merge',
3771
'bzrlib.tests.test_config',
3772
'bzrlib.tests.test_conflicts',
3773
'bzrlib.tests.test_controldir',
3774
'bzrlib.tests.test_counted_lock',
3775
'bzrlib.tests.test_crash',
3776
'bzrlib.tests.test_decorators',
3777
'bzrlib.tests.test_delta',
3778
'bzrlib.tests.test_debug',
3779
'bzrlib.tests.test_deprecated_graph',
3780
'bzrlib.tests.test_diff',
3781
'bzrlib.tests.test_directory_service',
3782
'bzrlib.tests.test_dirstate',
3783
'bzrlib.tests.test_email_message',
3784
'bzrlib.tests.test_eol_filters',
3785
'bzrlib.tests.test_errors',
3786
'bzrlib.tests.test_export',
3787
'bzrlib.tests.test_extract',
3788
'bzrlib.tests.test_fetch',
3789
'bzrlib.tests.test_fixtures',
3790
'bzrlib.tests.test_fifo_cache',
3791
'bzrlib.tests.test_filters',
3792
'bzrlib.tests.test_ftp_transport',
3793
'bzrlib.tests.test_foreign',
3794
'bzrlib.tests.test_generate_docs',
3795
'bzrlib.tests.test_generate_ids',
3796
'bzrlib.tests.test_globbing',
3797
'bzrlib.tests.test_gpg',
3798
'bzrlib.tests.test_graph',
3799
'bzrlib.tests.test_groupcompress',
3800
'bzrlib.tests.test_hashcache',
3801
'bzrlib.tests.test_help',
3802
'bzrlib.tests.test_hooks',
3803
'bzrlib.tests.test_http',
3804
'bzrlib.tests.test_http_response',
3805
'bzrlib.tests.test_https_ca_bundle',
3806
'bzrlib.tests.test_identitymap',
3807
'bzrlib.tests.test_ignores',
3808
'bzrlib.tests.test_index',
3809
'bzrlib.tests.test_import_tariff',
3810
'bzrlib.tests.test_info',
3811
'bzrlib.tests.test_inv',
3812
'bzrlib.tests.test_inventory_delta',
3813
'bzrlib.tests.test_knit',
3814
'bzrlib.tests.test_lazy_import',
3815
'bzrlib.tests.test_lazy_regex',
3816
'bzrlib.tests.test_library_state',
3817
'bzrlib.tests.test_lock',
3818
'bzrlib.tests.test_lockable_files',
3819
'bzrlib.tests.test_lockdir',
3820
'bzrlib.tests.test_log',
3821
'bzrlib.tests.test_lru_cache',
3822
'bzrlib.tests.test_lsprof',
3823
'bzrlib.tests.test_mail_client',
3824
'bzrlib.tests.test_matchers',
3825
'bzrlib.tests.test_memorytree',
3826
'bzrlib.tests.test_merge',
3827
'bzrlib.tests.test_merge3',
3828
'bzrlib.tests.test_merge_core',
3829
'bzrlib.tests.test_merge_directive',
3830
'bzrlib.tests.test_mergetools',
3831
'bzrlib.tests.test_missing',
3832
'bzrlib.tests.test_msgeditor',
3833
'bzrlib.tests.test_multiparent',
3834
'bzrlib.tests.test_mutabletree',
3835
'bzrlib.tests.test_nonascii',
3836
'bzrlib.tests.test_options',
3837
'bzrlib.tests.test_osutils',
3838
'bzrlib.tests.test_osutils_encodings',
3839
'bzrlib.tests.test_pack',
3840
'bzrlib.tests.test_patch',
3841
'bzrlib.tests.test_patches',
3842
'bzrlib.tests.test_permissions',
3843
'bzrlib.tests.test_plugins',
3844
'bzrlib.tests.test_progress',
3845
'bzrlib.tests.test_pyutils',
3846
'bzrlib.tests.test_read_bundle',
3847
'bzrlib.tests.test_reconcile',
3848
'bzrlib.tests.test_reconfigure',
3849
'bzrlib.tests.test_registry',
3850
'bzrlib.tests.test_remote',
3851
'bzrlib.tests.test_rename_map',
3852
'bzrlib.tests.test_repository',
3853
'bzrlib.tests.test_revert',
3854
'bzrlib.tests.test_revision',
3855
'bzrlib.tests.test_revisionspec',
3856
'bzrlib.tests.test_revisiontree',
3857
'bzrlib.tests.test_rio',
3858
'bzrlib.tests.test_rules',
3859
'bzrlib.tests.test_sampler',
3860
'bzrlib.tests.test_scenarios',
3861
'bzrlib.tests.test_script',
3862
'bzrlib.tests.test_selftest',
3863
'bzrlib.tests.test_serializer',
3864
'bzrlib.tests.test_setup',
3865
'bzrlib.tests.test_sftp_transport',
3866
'bzrlib.tests.test_shelf',
3867
'bzrlib.tests.test_shelf_ui',
3868
'bzrlib.tests.test_smart',
3869
'bzrlib.tests.test_smart_add',
3870
'bzrlib.tests.test_smart_request',
3871
'bzrlib.tests.test_smart_transport',
3872
'bzrlib.tests.test_smtp_connection',
3873
'bzrlib.tests.test_source',
3874
'bzrlib.tests.test_ssh_transport',
3875
'bzrlib.tests.test_status',
3876
'bzrlib.tests.test_store',
3877
'bzrlib.tests.test_strace',
3878
'bzrlib.tests.test_subsume',
3879
'bzrlib.tests.test_switch',
3880
'bzrlib.tests.test_symbol_versioning',
3881
'bzrlib.tests.test_tag',
3882
'bzrlib.tests.test_test_server',
3883
'bzrlib.tests.test_testament',
3884
'bzrlib.tests.test_textfile',
3885
'bzrlib.tests.test_textmerge',
3886
'bzrlib.tests.test_cethread',
3887
'bzrlib.tests.test_timestamp',
3888
'bzrlib.tests.test_trace',
3889
'bzrlib.tests.test_transactions',
3890
'bzrlib.tests.test_transform',
3891
'bzrlib.tests.test_transport',
3892
'bzrlib.tests.test_transport_log',
3893
'bzrlib.tests.test_tree',
3894
'bzrlib.tests.test_treebuilder',
3895
'bzrlib.tests.test_treeshape',
3896
'bzrlib.tests.test_tsort',
3897
'bzrlib.tests.test_tuned_gzip',
3898
'bzrlib.tests.test_ui',
3899
'bzrlib.tests.test_uncommit',
3900
'bzrlib.tests.test_upgrade',
3901
'bzrlib.tests.test_upgrade_stacked',
3902
'bzrlib.tests.test_urlutils',
3903
'bzrlib.tests.test_version',
3904
'bzrlib.tests.test_version_info',
3905
'bzrlib.tests.test_versionedfile',
3906
'bzrlib.tests.test_weave',
3907
'bzrlib.tests.test_whitebox',
3908
'bzrlib.tests.test_win32utils',
3909
'bzrlib.tests.test_workingtree',
3910
'bzrlib.tests.test_workingtree_4',
3911
'bzrlib.tests.test_wsgi',
3912
'bzrlib.tests.test_xml',
3916
def _test_suite_modules_to_doctest():
3917
"""Return the list of modules to doctest."""
3919
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3923
'bzrlib.branchbuilder',
3924
'bzrlib.decorators',
3926
'bzrlib.iterablefile',
3931
'bzrlib.symbol_versioning',
3933
'bzrlib.tests.fixtures',
3935
'bzrlib.transport.http',
3936
'bzrlib.version_info_formats.format_custom',
3940
def test_suite(keep_only=None, starting_with=None):
3941
"""Build and return TestSuite for the whole of bzrlib.
3943
:param keep_only: A list of test ids limiting the suite returned.
3945
:param starting_with: An id limiting the suite returned to the tests
3948
This function can be replaced if you need to change the default test
3949
suite on a global basis, but it is not encouraged.
3952
loader = TestUtil.TestLoader()
3954
if keep_only is not None:
3955
id_filter = TestIdList(keep_only)
3957
# We take precedence over keep_only because *at loading time* using
3958
# both options means we will load less tests for the same final result.
3959
def interesting_module(name):
3960
for start in starting_with:
3962
# Either the module name starts with the specified string
3963
name.startswith(start)
3964
# or it may contain tests starting with the specified string
3965
or start.startswith(name)
3969
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3971
elif keep_only is not None:
3972
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3973
def interesting_module(name):
3974
return id_filter.refers_to(name)
3977
loader = TestUtil.TestLoader()
3978
def interesting_module(name):
3979
# No filtering, all modules are interesting
3982
suite = loader.suiteClass()
3984
# modules building their suite with loadTestsFromModuleNames
3985
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3987
for mod in _test_suite_modules_to_doctest():
3988
if not interesting_module(mod):
3989
# No tests to keep here, move along
3992
# note that this really does mean "report only" -- doctest
3993
# still runs the rest of the examples
3994
doc_suite = IsolatedDocTestSuite(
3995
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3996
except ValueError, e:
3997
print '**failed to get doctest for: %s\n%s' % (mod, e)
3999
if len(doc_suite._tests) == 0:
4000
raise errors.BzrError("no doctests found in %s" % (mod,))
4001
suite.addTest(doc_suite)
4003
default_encoding = sys.getdefaultencoding()
4004
for name, plugin in _mod_plugin.plugins().items():
4005
if not interesting_module(plugin.module.__name__):
4007
plugin_suite = plugin.test_suite()
4008
# We used to catch ImportError here and turn it into just a warning,
4009
# but really if you don't have --no-plugins this should be a failure.
4010
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4011
if plugin_suite is None:
4012
plugin_suite = plugin.load_plugin_tests(loader)
4013
if plugin_suite is not None:
4014
suite.addTest(plugin_suite)
4015
if default_encoding != sys.getdefaultencoding():
4017
'Plugin "%s" tried to reset default encoding to: %s', name,
4018
sys.getdefaultencoding())
4020
sys.setdefaultencoding(default_encoding)
4022
if keep_only is not None:
4023
# Now that the referred modules have loaded their tests, keep only the
4025
suite = filter_suite_by_id_list(suite, id_filter)
4026
# Do some sanity checks on the id_list filtering
4027
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4029
# The tester has used both keep_only and starting_with, so he is
4030
# already aware that some tests are excluded from the list, there
4031
# is no need to tell him which.
4034
# Some tests mentioned in the list are not in the test suite. The
4035
# list may be out of date, report to the tester.
4036
for id in not_found:
4037
trace.warning('"%s" not found in the test suite', id)
4038
for id in duplicates:
4039
trace.warning('"%s" is used as an id by several tests', id)
4044
def multiply_scenarios(*scenarios):
4045
"""Multiply two or more iterables of scenarios.
4047
It is safe to pass scenario generators or iterators.
4049
:returns: A list of compound scenarios: the cross-product of all
4050
scenarios, with the names concatenated and the parameters
4053
return reduce(_multiply_two_scenarios, map(list, scenarios))
4056
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4057
"""Multiply two sets of scenarios.
4059
:returns: the cartesian product of the two sets of scenarios, that is
4060
a scenario for every possible combination of a left scenario and a
4064
('%s,%s' % (left_name, right_name),
4065
dict(left_dict.items() + right_dict.items()))
4066
for left_name, left_dict in scenarios_left
4067
for right_name, right_dict in scenarios_right]
4070
def multiply_tests(tests, scenarios, result):
4071
"""Multiply tests_list by scenarios into result.
4073
This is the core workhorse for test parameterisation.
4075
Typically the load_tests() method for a per-implementation test suite will
4076
call multiply_tests and return the result.
4078
:param tests: The tests to parameterise.
4079
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4080
scenario_param_dict).
4081
:param result: A TestSuite to add created tests to.
4083
This returns the passed in result TestSuite with the cross product of all
4084
the tests repeated once for each scenario. Each test is adapted by adding
4085
the scenario name at the end of its id(), and updating the test object's
4086
__dict__ with the scenario_param_dict.
4088
>>> import bzrlib.tests.test_sampler
4089
>>> r = multiply_tests(
4090
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4091
... [('one', dict(param=1)),
4092
... ('two', dict(param=2))],
4093
... TestUtil.TestSuite())
4094
>>> tests = list(iter_suite_tests(r))
4098
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4104
for test in iter_suite_tests(tests):
4105
apply_scenarios(test, scenarios, result)
4109
def apply_scenarios(test, scenarios, result):
4110
"""Apply the scenarios in scenarios to test and add to result.
4112
:param test: The test to apply scenarios to.
4113
:param scenarios: An iterable of scenarios to apply to test.
4115
:seealso: apply_scenario
4117
for scenario in scenarios:
4118
result.addTest(apply_scenario(test, scenario))
4122
def apply_scenario(test, scenario):
4123
"""Copy test and apply scenario to it.
4125
:param test: A test to adapt.
4126
:param scenario: A tuple describing the scenarion.
4127
The first element of the tuple is the new test id.
4128
The second element is a dict containing attributes to set on the
4130
:return: The adapted test.
4132
new_id = "%s(%s)" % (test.id(), scenario[0])
4133
new_test = clone_test(test, new_id)
4134
for name, value in scenario[1].items():
4135
setattr(new_test, name, value)
4139
def clone_test(test, new_id):
4140
"""Clone a test giving it a new id.
4142
:param test: The test to clone.
4143
:param new_id: The id to assign to it.
4144
:return: The new test.
4146
new_test = copy.copy(test)
4147
new_test.id = lambda: new_id
4148
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4149
# causes cloned tests to share the 'details' dict. This makes it hard to
4150
# read the test output for parameterized tests, because tracebacks will be
4151
# associated with irrelevant tests.
4153
details = new_test._TestCase__details
4154
except AttributeError:
4155
# must be a different version of testtools than expected. Do nothing.
4158
# Reset the '__details' dict.
4159
new_test._TestCase__details = {}
4163
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4165
"""Helper for permutating tests against an extension module.
4167
This is meant to be used inside a modules 'load_tests()' function. It will
4168
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4169
against both implementations. Setting 'test.module' to the appropriate
4170
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4172
:param standard_tests: A test suite to permute
4173
:param loader: A TestLoader
4174
:param py_module_name: The python path to a python module that can always
4175
be loaded, and will be considered the 'python' implementation. (eg
4176
'bzrlib._chk_map_py')
4177
:param ext_module_name: The python path to an extension module. If the
4178
module cannot be loaded, a single test will be added, which notes that
4179
the module is not available. If it can be loaded, all standard_tests
4180
will be run against that module.
4181
:return: (suite, feature) suite is a test-suite that has all the permuted
4182
tests. feature is the Feature object that can be used to determine if
4183
the module is available.
4186
py_module = pyutils.get_named_object(py_module_name)
4188
('python', {'module': py_module}),
4190
suite = loader.suiteClass()
4191
feature = ModuleAvailableFeature(ext_module_name)
4192
if feature.available():
4193
scenarios.append(('C', {'module': feature.module}))
4195
# the compiled module isn't available, so we add a failing test
4196
class FailWithoutFeature(TestCase):
4197
def test_fail(self):
4198
self.requireFeature(feature)
4199
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4200
result = multiply_tests(standard_tests, scenarios, suite)
4201
return result, feature
4204
def _rmtree_temp_dir(dirname, test_id=None):
4205
# If LANG=C we probably have created some bogus paths
4206
# which rmtree(unicode) will fail to delete
4207
# so make sure we are using rmtree(str) to delete everything
4208
# except on win32, where rmtree(str) will fail
4209
# since it doesn't have the property of byte-stream paths
4210
# (they are either ascii or mbcs)
4211
if sys.platform == 'win32':
4212
# make sure we are using the unicode win32 api
4213
dirname = unicode(dirname)
4215
dirname = dirname.encode(sys.getfilesystemencoding())
4217
osutils.rmtree(dirname)
4219
# We don't want to fail here because some useful display will be lost
4220
# otherwise. Polluting the tmp dir is bad, but not giving all the
4221
# possible info to the test runner is even worse.
4223
ui.ui_factory.clear_term()
4224
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4225
# Ugly, but the last thing we want here is fail, so bear with it.
4226
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4227
).encode('ascii', 'replace')
4228
sys.stderr.write('Unable to remove testing dir %s\n%s'
4229
% (os.path.basename(dirname), printable_e))
4232
class Feature(object):
4233
"""An operating system Feature."""
4236
self._available = None
4238
def available(self):
4239
"""Is the feature available?
4241
:return: True if the feature is available.
4243
if self._available is None:
4244
self._available = self._probe()
4245
return self._available
4248
"""Implement this method in concrete features.
4250
:return: True if the feature is available.
4252
raise NotImplementedError
4255
if getattr(self, 'feature_name', None):
4256
return self.feature_name()
4257
return self.__class__.__name__
4260
class _SymlinkFeature(Feature):
4263
return osutils.has_symlinks()
4265
def feature_name(self):
4268
SymlinkFeature = _SymlinkFeature()
4271
class _HardlinkFeature(Feature):
4274
return osutils.has_hardlinks()
4276
def feature_name(self):
4279
HardlinkFeature = _HardlinkFeature()
4282
class _OsFifoFeature(Feature):
4285
return getattr(os, 'mkfifo', None)
4287
def feature_name(self):
4288
return 'filesystem fifos'
4290
OsFifoFeature = _OsFifoFeature()
4293
class _UnicodeFilenameFeature(Feature):
4294
"""Does the filesystem support Unicode filenames?"""
4298
# Check for character combinations unlikely to be covered by any
4299
# single non-unicode encoding. We use the characters
4300
# - greek small letter alpha (U+03B1) and
4301
# - braille pattern dots-123456 (U+283F).
4302
os.stat(u'\u03b1\u283f')
4303
except UnicodeEncodeError:
4305
except (IOError, OSError):
4306
# The filesystem allows the Unicode filename but the file doesn't
4310
# The filesystem allows the Unicode filename and the file exists,
4314
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4317
class _CompatabilityThunkFeature(Feature):
4318
"""This feature is just a thunk to another feature.
4320
It issues a deprecation warning if it is accessed, to let you know that you
4321
should really use a different feature.
4324
def __init__(self, dep_version, module, name,
4325
replacement_name, replacement_module=None):
4326
super(_CompatabilityThunkFeature, self).__init__()
4327
self._module = module
4328
if replacement_module is None:
4329
replacement_module = module
4330
self._replacement_module = replacement_module
4332
self._replacement_name = replacement_name
4333
self._dep_version = dep_version
4334
self._feature = None
4337
if self._feature is None:
4338
depr_msg = self._dep_version % ('%s.%s'
4339
% (self._module, self._name))
4340
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4341
self._replacement_name)
4342
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4343
# Import the new feature and use it as a replacement for the
4345
self._feature = pyutils.get_named_object(
4346
self._replacement_module, self._replacement_name)
4350
return self._feature._probe()
4353
class ModuleAvailableFeature(Feature):
4354
"""This is a feature than describes a module we want to be available.
4356
Declare the name of the module in __init__(), and then after probing, the
4357
module will be available as 'self.module'.
4359
:ivar module: The module if it is available, else None.
4362
def __init__(self, module_name):
4363
super(ModuleAvailableFeature, self).__init__()
4364
self.module_name = module_name
4368
self._module = __import__(self.module_name, {}, {}, [''])
4375
if self.available(): # Make sure the probe has been done
4379
def feature_name(self):
4380
return self.module_name
4383
def probe_unicode_in_user_encoding():
4384
"""Try to encode several unicode strings to use in unicode-aware tests.
4385
Return first successfull match.
4387
:return: (unicode value, encoded plain string value) or (None, None)
4389
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4390
for uni_val in possible_vals:
4392
str_val = uni_val.encode(osutils.get_user_encoding())
4393
except UnicodeEncodeError:
4394
# Try a different character
4397
return uni_val, str_val
4401
def probe_bad_non_ascii(encoding):
4402
"""Try to find [bad] character with code [128..255]
4403
that cannot be decoded to unicode in some encoding.
4404
Return None if all non-ascii characters is valid
4407
for i in xrange(128, 256):
4410
char.decode(encoding)
4411
except UnicodeDecodeError:
4416
class _HTTPSServerFeature(Feature):
4417
"""Some tests want an https Server, check if one is available.
4419
Right now, the only way this is available is under python2.6 which provides
4430
def feature_name(self):
4431
return 'HTTPSServer'
4434
HTTPSServerFeature = _HTTPSServerFeature()
4437
class _UnicodeFilename(Feature):
4438
"""Does the filesystem support Unicode filenames?"""
4443
except UnicodeEncodeError:
4445
except (IOError, OSError):
4446
# The filesystem allows the Unicode filename but the file doesn't
4450
# The filesystem allows the Unicode filename and the file exists,
4454
UnicodeFilename = _UnicodeFilename()
4457
class _ByteStringNamedFilesystem(Feature):
4458
"""Is the filesystem based on bytes?"""
4461
if os.name == "posix":
4465
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4468
class _UTF8Filesystem(Feature):
4469
"""Is the filesystem UTF-8?"""
4472
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4476
UTF8Filesystem = _UTF8Filesystem()
4479
class _BreakinFeature(Feature):
4480
"""Does this platform support the breakin feature?"""
4483
from bzrlib import breakin
4484
if breakin.determine_signal() is None:
4486
if sys.platform == 'win32':
4487
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4488
# We trigger SIGBREAK via a Console api so we need ctypes to
4489
# access the function
4496
def feature_name(self):
4497
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4500
BreakinFeature = _BreakinFeature()
4503
class _CaseInsCasePresFilenameFeature(Feature):
4504
"""Is the file-system case insensitive, but case-preserving?"""
4507
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4509
# first check truly case-preserving for created files, then check
4510
# case insensitive when opening existing files.
4511
name = osutils.normpath(name)
4512
base, rel = osutils.split(name)
4513
found_rel = osutils.canonical_relpath(base, name)
4514
return (found_rel == rel
4515
and os.path.isfile(name.upper())
4516
and os.path.isfile(name.lower()))
4521
def feature_name(self):
4522
return "case-insensitive case-preserving filesystem"
4524
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4527
class _CaseInsensitiveFilesystemFeature(Feature):
4528
"""Check if underlying filesystem is case-insensitive but *not* case
4531
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4532
# more likely to be case preserving, so this case is rare.
4535
if CaseInsCasePresFilenameFeature.available():
4538
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4539
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4540
TestCaseWithMemoryTransport.TEST_ROOT = root
4542
root = TestCaseWithMemoryTransport.TEST_ROOT
4543
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4545
name_a = osutils.pathjoin(tdir, 'a')
4546
name_A = osutils.pathjoin(tdir, 'A')
4548
result = osutils.isdir(name_A)
4549
_rmtree_temp_dir(tdir)
4552
def feature_name(self):
4553
return 'case-insensitive filesystem'
4555
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4558
class _CaseSensitiveFilesystemFeature(Feature):
4561
if CaseInsCasePresFilenameFeature.available():
4563
elif CaseInsensitiveFilesystemFeature.available():
4568
def feature_name(self):
4569
return 'case-sensitive filesystem'
4571
# new coding style is for feature instances to be lowercase
4572
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4575
# Only define SubUnitBzrRunner if subunit is available.
4577
from subunit import TestProtocolClient
4578
from subunit.test_results import AutoTimingTestResultDecorator
4579
class SubUnitBzrProtocolClient(TestProtocolClient):
4581
def addSuccess(self, test, details=None):
4582
# The subunit client always includes the details in the subunit
4583
# stream, but we don't want to include it in ours.
4584
if details is not None and 'log' in details:
4586
return super(SubUnitBzrProtocolClient, self).addSuccess(
4589
class SubUnitBzrRunner(TextTestRunner):
4590
def run(self, test):
4591
result = AutoTimingTestResultDecorator(
4592
SubUnitBzrProtocolClient(self.stream))
4598
class _PosixPermissionsFeature(Feature):
4602
# create temporary file and check if specified perms are maintained.
4605
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4606
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4609
os.chmod(name, write_perms)
4611
read_perms = os.stat(name).st_mode & 0777
4613
return (write_perms == read_perms)
4615
return (os.name == 'posix') and has_perms()
4617
def feature_name(self):
4618
return 'POSIX permissions support'
4620
posix_permissions_feature = _PosixPermissionsFeature()