1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
# TODO: Perhaps there should be an API to find out if bzr running under the
20
# test suite -- some plugins might want to avoid making intrusive changes if
21
# this is the case. However, we want behaviour under to test to diverge as
22
# little as possible, so this should be used rarely if it's added at all.
23
# (Suggestion from j-a-meinel, 2005-11-24)
25
# NOTE: Some classes in here use camelCaseNaming() rather than
26
# underscore_naming(). That's for consistency with unittest; it's not the
27
# general style of bzrlib. Please continue that consistency when adding e.g.
28
# new assertFoo() methods.
33
from cStringIO import StringIO
56
# nb: check this before importing anything else from within it
57
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
60
% (testtools.__file__, _testtools_version))
61
from testtools import content
77
transport as _mod_transport,
81
import bzrlib.commands
82
import bzrlib.timestamp
84
import bzrlib.inventory
85
import bzrlib.iterablefile
90
# lsprof not available
92
from bzrlib.merge import merge_inner
95
from bzrlib.smart import client, request, server
97
from bzrlib import symbol_versioning
98
from bzrlib.symbol_versioning import (
106
from bzrlib.transport import (
110
from bzrlib.trace import mutter, note
111
from bzrlib.tests import (
116
from bzrlib.ui import NullProgressView
117
from bzrlib.ui.text import TextUIFactory
118
import bzrlib.version_info_formats.format_custom
119
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
121
# Mark this python module as being part of the implementation
122
# of unittest: this gives us better tracebacks where the last
123
# shown frame is the test code, not our assertXYZ.
126
default_transport = test_server.LocalURLServer
129
_unitialized_attr = object()
130
"""A sentinel needed to act as a default value in a method signature."""
133
# Subunit result codes, defined here to prevent a hard dependency on subunit.
137
# These are intentionally brought into this namespace. That way plugins, etc
138
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
139
TestSuite = TestUtil.TestSuite
140
TestLoader = TestUtil.TestLoader
142
class ExtendedTestResult(testtools.TextTestResult):
143
"""Accepts, reports and accumulates the results of running tests.
145
Compared to the unittest version this class adds support for
146
profiling, benchmarking, stopping as soon as a test fails, and
147
skipping tests. There are further-specialized subclasses for
148
different types of display.
150
When a test finishes, in whatever way, it calls one of the addSuccess,
151
addFailure or addError classes. These in turn may redirect to a more
152
specific case for the special test results supported by our extended
155
Note that just one of these objects is fed the results from many tests.
160
def __init__(self, stream, descriptions, verbosity,
164
"""Construct new TestResult.
166
:param bench_history: Optionally, a writable file object to accumulate
169
testtools.TextTestResult.__init__(self, stream)
170
if bench_history is not None:
171
from bzrlib.version import _get_bzr_source_tree
172
src_tree = _get_bzr_source_tree()
175
revision_id = src_tree.get_parent_ids()[0]
177
# XXX: if this is a brand new tree, do the same as if there
181
# XXX: If there's no branch, what should we do?
183
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
184
self._bench_history = bench_history
185
self.ui = ui.ui_factory
188
self.failure_count = 0
189
self.known_failure_count = 0
191
self.not_applicable_count = 0
192
self.unsupported = {}
194
self._overall_start_time = time.time()
195
self._strict = strict
196
self._first_thread_leaker_id = None
197
self._tests_leaking_threads_count = 0
199
def stopTestRun(self):
202
stopTime = time.time()
203
timeTaken = stopTime - self.startTime
204
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
205
# the parent class method is similar have to duplicate
206
self._show_list('ERROR', self.errors)
207
self._show_list('FAIL', self.failures)
208
self.stream.write(self.sep2)
209
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
210
run, run != 1 and "s" or "", timeTaken))
211
if not self.wasSuccessful():
212
self.stream.write("FAILED (")
213
failed, errored = map(len, (self.failures, self.errors))
215
self.stream.write("failures=%d" % failed)
217
if failed: self.stream.write(", ")
218
self.stream.write("errors=%d" % errored)
219
if self.known_failure_count:
220
if failed or errored: self.stream.write(", ")
221
self.stream.write("known_failure_count=%d" %
222
self.known_failure_count)
223
self.stream.write(")\n")
225
if self.known_failure_count:
226
self.stream.write("OK (known_failures=%d)\n" %
227
self.known_failure_count)
229
self.stream.write("OK\n")
230
if self.skip_count > 0:
231
skipped = self.skip_count
232
self.stream.write('%d test%s skipped\n' %
233
(skipped, skipped != 1 and "s" or ""))
235
for feature, count in sorted(self.unsupported.items()):
236
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
239
ok = self.wasStrictlySuccessful()
241
ok = self.wasSuccessful()
242
if self._first_thread_leaker_id:
244
'%s is leaking threads among %d leaking tests.\n' % (
245
self._first_thread_leaker_id,
246
self._tests_leaking_threads_count))
247
# We don't report the main thread as an active one.
249
'%d non-main threads were left active in the end.\n'
250
% (len(self._active_threads) - 1))
252
def getDescription(self, test):
255
def _extractBenchmarkTime(self, testCase, details=None):
256
"""Add a benchmark time for the current test case."""
257
if details and 'benchtime' in details:
258
return float(''.join(details['benchtime'].iter_bytes()))
259
return getattr(testCase, "_benchtime", None)
261
def _elapsedTestTimeString(self):
262
"""Return a time string for the overall time the current test has taken."""
263
return self._formatTime(time.time() - self._start_time)
265
def _testTimeString(self, testCase):
266
benchmark_time = self._extractBenchmarkTime(testCase)
267
if benchmark_time is not None:
268
return self._formatTime(benchmark_time) + "*"
270
return self._elapsedTestTimeString()
272
def _formatTime(self, seconds):
273
"""Format seconds as milliseconds with leading spaces."""
274
# some benchmarks can take thousands of seconds to run, so we need 8
276
return "%8dms" % (1000 * seconds)
278
def _shortened_test_description(self, test):
280
what = re.sub(r'^bzrlib\.tests\.', '', what)
283
def startTest(self, test):
284
super(ExtendedTestResult, self).startTest(test)
288
self.report_test_start(test)
289
test.number = self.count
290
self._recordTestStartTime()
291
# Only check for thread leaks if the test case supports cleanups
292
addCleanup = getattr(test, "addCleanup", None)
293
if addCleanup is not None:
294
addCleanup(self._check_leaked_threads, test)
296
def startTests(self):
297
self.report_tests_starting()
298
self._active_threads = threading.enumerate()
300
def _check_leaked_threads(self, test):
301
"""See if any threads have leaked since last call
303
A sample of live threads is stored in the _active_threads attribute,
304
when this method runs it compares the current live threads and any not
305
in the previous sample are treated as having leaked.
307
now_active_threads = set(threading.enumerate())
308
threads_leaked = now_active_threads.difference(self._active_threads)
310
self._report_thread_leak(test, threads_leaked, now_active_threads)
311
self._tests_leaking_threads_count += 1
312
if self._first_thread_leaker_id is None:
313
self._first_thread_leaker_id = test.id()
314
self._active_threads = now_active_threads
316
def _recordTestStartTime(self):
317
"""Record that a test has started."""
318
self._start_time = time.time()
320
def addError(self, test, err):
321
"""Tell result that test finished with an error.
323
Called from the TestCase run() method when the test
324
fails with an unexpected error.
327
super(ExtendedTestResult, self).addError(test, err)
328
self.error_count += 1
329
self.report_error(test, err)
333
def addFailure(self, test, err):
334
"""Tell result that test failed.
336
Called from the TestCase run() method when the test
337
fails because e.g. an assert() method failed.
340
super(ExtendedTestResult, self).addFailure(test, err)
341
self.failure_count += 1
342
self.report_failure(test, err)
346
def addSuccess(self, test, details=None):
347
"""Tell result that test completed successfully.
349
Called from the TestCase run()
351
if self._bench_history is not None:
352
benchmark_time = self._extractBenchmarkTime(test, details)
353
if benchmark_time is not None:
354
self._bench_history.write("%s %s\n" % (
355
self._formatTime(benchmark_time),
357
self.report_success(test)
358
super(ExtendedTestResult, self).addSuccess(test)
359
test._log_contents = ''
361
def addExpectedFailure(self, test, err):
362
self.known_failure_count += 1
363
self.report_known_failure(test, err)
365
def addNotSupported(self, test, feature):
366
"""The test will not be run because of a missing feature.
368
# this can be called in two different ways: it may be that the
369
# test started running, and then raised (through requireFeature)
370
# UnavailableFeature. Alternatively this method can be called
371
# while probing for features before running the test code proper; in
372
# that case we will see startTest and stopTest, but the test will
373
# never actually run.
374
self.unsupported.setdefault(str(feature), 0)
375
self.unsupported[str(feature)] += 1
376
self.report_unsupported(test, feature)
378
def addSkip(self, test, reason):
379
"""A test has not run for 'reason'."""
381
self.report_skip(test, reason)
383
def addNotApplicable(self, test, reason):
384
self.not_applicable_count += 1
385
self.report_not_applicable(test, reason)
387
def _post_mortem(self):
388
"""Start a PDB post mortem session."""
389
if os.environ.get('BZR_TEST_PDB', None):
390
import pdb;pdb.post_mortem()
392
def progress(self, offset, whence):
393
"""The test is adjusting the count of tests to run."""
394
if whence == SUBUNIT_SEEK_SET:
395
self.num_tests = offset
396
elif whence == SUBUNIT_SEEK_CUR:
397
self.num_tests += offset
399
raise errors.BzrError("Unknown whence %r" % whence)
401
def report_tests_starting(self):
402
"""Display information before the test run begins"""
403
if getattr(sys, 'frozen', None) is None:
404
bzr_path = osutils.realpath(sys.argv[0])
406
bzr_path = sys.executable
408
'bzr selftest: %s\n' % (bzr_path,))
411
bzrlib.__path__[0],))
413
' bzr-%s python-%s %s\n' % (
414
bzrlib.version_string,
415
bzrlib._format_version_tuple(sys.version_info),
416
platform.platform(aliased=1),
418
self.stream.write('\n')
420
def report_test_start(self, test):
421
"""Display information on the test just about to be run"""
423
def _report_thread_leak(self, test, leaked_threads, active_threads):
424
"""Display information on a test that leaked one or more threads"""
425
# GZ 2010-09-09: A leak summary reported separately from the general
426
# thread debugging would be nice. Tests under subunit
427
# need something not using stream, perhaps adding a
428
# testtools details object would be fitting.
429
if 'threads' in selftest_debug_flags:
430
self.stream.write('%s is leaking, active is now %d\n' %
431
(test.id(), len(active_threads)))
433
def startTestRun(self):
434
self.startTime = time.time()
436
def report_success(self, test):
439
def wasStrictlySuccessful(self):
440
if self.unsupported or self.known_failure_count:
442
return self.wasSuccessful()
445
class TextTestResult(ExtendedTestResult):
446
"""Displays progress and results of tests in text form"""
448
def __init__(self, stream, descriptions, verbosity,
453
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
454
bench_history, strict)
455
# We no longer pass them around, but just rely on the UIFactory stack
458
warnings.warn("Passing pb to TextTestResult is deprecated")
459
self.pb = self.ui.nested_progress_bar()
460
self.pb.show_pct = False
461
self.pb.show_spinner = False
462
self.pb.show_eta = False,
463
self.pb.show_count = False
464
self.pb.show_bar = False
465
self.pb.update_latency = 0
466
self.pb.show_transport_activity = False
468
def stopTestRun(self):
469
# called when the tests that are going to run have run
472
super(TextTestResult, self).stopTestRun()
474
def report_tests_starting(self):
475
super(TextTestResult, self).report_tests_starting()
476
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
478
def _progress_prefix_text(self):
479
# the longer this text, the less space we have to show the test
481
a = '[%d' % self.count # total that have been run
482
# tests skipped as known not to be relevant are not important enough
484
## if self.skip_count:
485
## a += ', %d skip' % self.skip_count
486
## if self.known_failure_count:
487
## a += '+%dX' % self.known_failure_count
489
a +='/%d' % self.num_tests
491
runtime = time.time() - self._overall_start_time
493
a += '%dm%ds' % (runtime / 60, runtime % 60)
496
total_fail_count = self.error_count + self.failure_count
498
a += ', %d failed' % total_fail_count
499
# if self.unsupported:
500
# a += ', %d missing' % len(self.unsupported)
504
def report_test_start(self, test):
506
self._progress_prefix_text()
508
+ self._shortened_test_description(test))
510
def _test_description(self, test):
511
return self._shortened_test_description(test)
513
def report_error(self, test, err):
514
self.stream.write('ERROR: %s\n %s\n' % (
515
self._test_description(test),
519
def report_failure(self, test, err):
520
self.stream.write('FAIL: %s\n %s\n' % (
521
self._test_description(test),
525
def report_known_failure(self, test, err):
528
def report_skip(self, test, reason):
531
def report_not_applicable(self, test, reason):
534
def report_unsupported(self, test, feature):
535
"""test cannot be run because feature is missing."""
538
class VerboseTestResult(ExtendedTestResult):
539
"""Produce long output, with one line per test run plus times"""
541
def _ellipsize_to_right(self, a_string, final_width):
542
"""Truncate and pad a string, keeping the right hand side"""
543
if len(a_string) > final_width:
544
result = '...' + a_string[3-final_width:]
547
return result.ljust(final_width)
549
def report_tests_starting(self):
550
self.stream.write('running %d tests...\n' % self.num_tests)
551
super(VerboseTestResult, self).report_tests_starting()
553
def report_test_start(self, test):
554
name = self._shortened_test_description(test)
555
width = osutils.terminal_width()
556
if width is not None:
557
# width needs space for 6 char status, plus 1 for slash, plus an
558
# 11-char time string, plus a trailing blank
559
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
561
self.stream.write(self._ellipsize_to_right(name, width-18))
563
self.stream.write(name)
566
def _error_summary(self, err):
568
return '%s%s' % (indent, err[1])
570
def report_error(self, test, err):
571
self.stream.write('ERROR %s\n%s\n'
572
% (self._testTimeString(test),
573
self._error_summary(err)))
575
def report_failure(self, test, err):
576
self.stream.write(' FAIL %s\n%s\n'
577
% (self._testTimeString(test),
578
self._error_summary(err)))
580
def report_known_failure(self, test, err):
581
self.stream.write('XFAIL %s\n%s\n'
582
% (self._testTimeString(test),
583
self._error_summary(err)))
585
def report_success(self, test):
586
self.stream.write(' OK %s\n' % self._testTimeString(test))
587
for bench_called, stats in getattr(test, '_benchcalls', []):
588
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
589
stats.pprint(file=self.stream)
590
# flush the stream so that we get smooth output. This verbose mode is
591
# used to show the output in PQM.
594
def report_skip(self, test, reason):
595
self.stream.write(' SKIP %s\n%s\n'
596
% (self._testTimeString(test), reason))
598
def report_not_applicable(self, test, reason):
599
self.stream.write(' N/A %s\n %s\n'
600
% (self._testTimeString(test), reason))
602
def report_unsupported(self, test, feature):
603
"""test cannot be run because feature is missing."""
604
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
605
%(self._testTimeString(test), feature))
608
class TextTestRunner(object):
609
stop_on_failure = False
617
result_decorators=None,
619
"""Create a TextTestRunner.
621
:param result_decorators: An optional list of decorators to apply
622
to the result object being used by the runner. Decorators are
623
applied left to right - the first element in the list is the
626
# stream may know claim to know to write unicode strings, but in older
627
# pythons this goes sufficiently wrong that it is a bad idea. (
628
# specifically a built in file with encoding 'UTF-8' will still try
629
# to encode using ascii.
630
new_encoding = osutils.get_terminal_encoding()
631
codec = codecs.lookup(new_encoding)
632
if type(codec) is tuple:
636
encode = codec.encode
637
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
638
stream.encoding = new_encoding
640
self.descriptions = descriptions
641
self.verbosity = verbosity
642
self._bench_history = bench_history
643
self._strict = strict
644
self._result_decorators = result_decorators or []
647
"Run the given test case or test suite."
648
if self.verbosity == 1:
649
result_class = TextTestResult
650
elif self.verbosity >= 2:
651
result_class = VerboseTestResult
652
original_result = result_class(self.stream,
655
bench_history=self._bench_history,
658
# Signal to result objects that look at stop early policy to stop,
659
original_result.stop_early = self.stop_on_failure
660
result = original_result
661
for decorator in self._result_decorators:
662
result = decorator(result)
663
result.stop_early = self.stop_on_failure
664
result.startTestRun()
669
# higher level code uses our extended protocol to determine
670
# what exit code to give.
671
return original_result
674
def iter_suite_tests(suite):
675
"""Return all tests in a suite, recursing through nested suites"""
676
if isinstance(suite, unittest.TestCase):
678
elif isinstance(suite, unittest.TestSuite):
680
for r in iter_suite_tests(item):
683
raise Exception('unknown type %r for object %r'
684
% (type(suite), suite))
687
TestSkipped = testtools.testcase.TestSkipped
690
class TestNotApplicable(TestSkipped):
691
"""A test is not applicable to the situation where it was run.
693
This is only normally raised by parameterized tests, if they find that
694
the instance they're constructed upon does not support one aspect
699
# traceback._some_str fails to format exceptions that have the default
700
# __str__ which does an implicit ascii conversion. However, repr() on those
701
# objects works, for all that its not quite what the doctor may have ordered.
702
def _clever_some_str(value):
707
return repr(value).replace('\\n', '\n')
709
return '<unprintable %s object>' % type(value).__name__
711
traceback._some_str = _clever_some_str
714
# deprecated - use self.knownFailure(), or self.expectFailure.
715
KnownFailure = testtools.testcase._ExpectedFailure
718
class UnavailableFeature(Exception):
719
"""A feature required for this test was not available.
721
This can be considered a specialised form of SkippedTest.
723
The feature should be used to construct the exception.
727
class StringIOWrapper(object):
728
"""A wrapper around cStringIO which just adds an encoding attribute.
730
Internally we can check sys.stdout to see what the output encoding
731
should be. However, cStringIO has no encoding attribute that we can
732
set. So we wrap it instead.
737
def __init__(self, s=None):
739
self.__dict__['_cstring'] = StringIO(s)
741
self.__dict__['_cstring'] = StringIO()
743
def __getattr__(self, name, getattr=getattr):
744
return getattr(self.__dict__['_cstring'], name)
746
def __setattr__(self, name, val):
747
if name == 'encoding':
748
self.__dict__['encoding'] = val
750
return setattr(self._cstring, name, val)
753
class TestUIFactory(TextUIFactory):
754
"""A UI Factory for testing.
756
Hide the progress bar but emit note()s.
758
Allows get_password to be tested without real tty attached.
760
See also CannedInputUIFactory which lets you provide programmatic input in
763
# TODO: Capture progress events at the model level and allow them to be
764
# observed by tests that care.
766
# XXX: Should probably unify more with CannedInputUIFactory or a
767
# particular configuration of TextUIFactory, or otherwise have a clearer
768
# idea of how they're supposed to be different.
769
# See https://bugs.launchpad.net/bzr/+bug/408213
771
def __init__(self, stdout=None, stderr=None, stdin=None):
772
if stdin is not None:
773
# We use a StringIOWrapper to be able to test various
774
# encodings, but the user is still responsible to
775
# encode the string and to set the encoding attribute
776
# of StringIOWrapper.
777
stdin = StringIOWrapper(stdin)
778
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
780
def get_non_echoed_password(self):
781
"""Get password from stdin without trying to handle the echo mode"""
782
password = self.stdin.readline()
785
if password[-1] == '\n':
786
password = password[:-1]
789
def make_progress_view(self):
790
return NullProgressView()
793
class TestCase(testtools.TestCase):
794
"""Base class for bzr unit tests.
796
Tests that need access to disk resources should subclass
797
TestCaseInTempDir not TestCase.
799
Error and debug log messages are redirected from their usual
800
location into a temporary file, the contents of which can be
801
retrieved by _get_log(). We use a real OS file, not an in-memory object,
802
so that it can also capture file IO. When the test completes this file
803
is read into memory and removed from disk.
805
There are also convenience functions to invoke bzr's command-line
806
routine, and to build and check bzr trees.
808
In addition to the usual method of overriding tearDown(), this class also
809
allows subclasses to register cleanup functions via addCleanup, which are
810
run in order as the object is torn down. It's less likely this will be
811
accidentally overlooked.
815
# record lsprof data when performing benchmark calls.
816
_gather_lsprof_in_benchmarks = False
818
def __init__(self, methodName='testMethod'):
819
super(TestCase, self).__init__(methodName)
820
self._directory_isolation = True
821
self.exception_handlers.insert(0,
822
(UnavailableFeature, self._do_unsupported_or_skip))
823
self.exception_handlers.insert(0,
824
(TestNotApplicable, self._do_not_applicable))
827
super(TestCase, self).setUp()
828
for feature in getattr(self, '_test_needs_features', []):
829
self.requireFeature(feature)
830
self._log_contents = None
831
self.addDetail("log", content.Content(content.ContentType("text",
832
"plain", {"charset": "utf8"}),
833
lambda:[self._get_log(keep_log_file=True)]))
834
self._cleanEnvironment()
837
self._benchcalls = []
838
self._benchtime = None
840
self._track_transports()
842
self._clear_debug_flags()
847
pdb.Pdb().set_trace(sys._getframe().f_back)
849
def discardDetail(self, name):
850
"""Extend the addDetail, getDetails api so we can remove a detail.
852
eg. bzr always adds the 'log' detail at startup, but we don't want to
853
include it for skipped, xfail, etc tests.
855
It is safe to call this for a detail that doesn't exist, in case this
856
gets called multiple times.
858
# We cheat. details is stored in __details which means we shouldn't
859
# touch it. but getDetails() returns the dict directly, so we can
861
details = self.getDetails()
865
def _clear_debug_flags(self):
866
"""Prevent externally set debug flags affecting tests.
868
Tests that want to use debug flags can just set them in the
869
debug_flags set during setup/teardown.
871
# Start with a copy of the current debug flags we can safely modify.
872
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
873
if 'allow_debug' not in selftest_debug_flags:
874
debug.debug_flags.clear()
875
if 'disable_lock_checks' not in selftest_debug_flags:
876
debug.debug_flags.add('strict_locks')
878
def _clear_hooks(self):
879
# prevent hooks affecting tests
880
self._preserved_hooks = {}
881
for key, factory in hooks.known_hooks.items():
882
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
883
current_hooks = hooks.known_hooks_key_to_object(key)
884
self._preserved_hooks[parent] = (name, current_hooks)
885
self.addCleanup(self._restoreHooks)
886
for key, factory in hooks.known_hooks.items():
887
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
888
setattr(parent, name, factory())
889
# this hook should always be installed
890
request._install_hook()
892
def disable_directory_isolation(self):
893
"""Turn off directory isolation checks."""
894
self._directory_isolation = False
896
def enable_directory_isolation(self):
897
"""Enable directory isolation checks."""
898
self._directory_isolation = True
900
def _silenceUI(self):
901
"""Turn off UI for duration of test"""
902
# by default the UI is off; tests can turn it on if they want it.
903
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
905
def _check_locks(self):
906
"""Check that all lock take/release actions have been paired."""
907
# We always check for mismatched locks. If a mismatch is found, we
908
# fail unless -Edisable_lock_checks is supplied to selftest, in which
909
# case we just print a warning.
911
acquired_locks = [lock for action, lock in self._lock_actions
912
if action == 'acquired']
913
released_locks = [lock for action, lock in self._lock_actions
914
if action == 'released']
915
broken_locks = [lock for action, lock in self._lock_actions
916
if action == 'broken']
917
# trivially, given the tests for lock acquistion and release, if we
918
# have as many in each list, it should be ok. Some lock tests also
919
# break some locks on purpose and should be taken into account by
920
# considering that breaking a lock is just a dirty way of releasing it.
921
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
922
message = ('Different number of acquired and '
923
'released or broken locks. (%s, %s + %s)' %
924
(acquired_locks, released_locks, broken_locks))
925
if not self._lock_check_thorough:
926
# Rather than fail, just warn
927
print "Broken test %s: %s" % (self, message)
931
def _track_locks(self):
932
"""Track lock activity during tests."""
933
self._lock_actions = []
934
if 'disable_lock_checks' in selftest_debug_flags:
935
self._lock_check_thorough = False
937
self._lock_check_thorough = True
939
self.addCleanup(self._check_locks)
940
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
941
self._lock_acquired, None)
942
_mod_lock.Lock.hooks.install_named_hook('lock_released',
943
self._lock_released, None)
944
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
945
self._lock_broken, None)
947
def _lock_acquired(self, result):
948
self._lock_actions.append(('acquired', result))
950
def _lock_released(self, result):
951
self._lock_actions.append(('released', result))
953
def _lock_broken(self, result):
954
self._lock_actions.append(('broken', result))
956
def permit_dir(self, name):
957
"""Permit a directory to be used by this test. See permit_url."""
958
name_transport = _mod_transport.get_transport(name)
959
self.permit_url(name)
960
self.permit_url(name_transport.base)
962
def permit_url(self, url):
963
"""Declare that url is an ok url to use in this test.
965
Do this for memory transports, temporary test directory etc.
967
Do not do this for the current working directory, /tmp, or any other
968
preexisting non isolated url.
970
if not url.endswith('/'):
972
self._bzr_selftest_roots.append(url)
974
def permit_source_tree_branch_repo(self):
975
"""Permit the source tree bzr is running from to be opened.
977
Some code such as bzrlib.version attempts to read from the bzr branch
978
that bzr is executing from (if any). This method permits that directory
979
to be used in the test suite.
981
path = self.get_source_path()
982
self.record_directory_isolation()
985
workingtree.WorkingTree.open(path)
986
except (errors.NotBranchError, errors.NoWorkingTree):
987
raise TestSkipped('Needs a working tree of bzr sources')
989
self.enable_directory_isolation()
991
def _preopen_isolate_transport(self, transport):
992
"""Check that all transport openings are done in the test work area."""
993
while isinstance(transport, pathfilter.PathFilteringTransport):
994
# Unwrap pathfiltered transports
995
transport = transport.server.backing_transport.clone(
996
transport._filter('.'))
998
# ReadonlySmartTCPServer_for_testing decorates the backing transport
999
# urls it is given by prepending readonly+. This is appropriate as the
1000
# client shouldn't know that the server is readonly (or not readonly).
1001
# We could register all servers twice, with readonly+ prepending, but
1002
# that makes for a long list; this is about the same but easier to
1004
if url.startswith('readonly+'):
1005
url = url[len('readonly+'):]
1006
self._preopen_isolate_url(url)
1008
def _preopen_isolate_url(self, url):
1009
if not self._directory_isolation:
1011
if self._directory_isolation == 'record':
1012
self._bzr_selftest_roots.append(url)
1014
# This prevents all transports, including e.g. sftp ones backed on disk
1015
# from working unless they are explicitly granted permission. We then
1016
# depend on the code that sets up test transports to check that they are
1017
# appropriately isolated and enable their use by calling
1018
# self.permit_transport()
1019
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1020
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1021
% (url, self._bzr_selftest_roots))
1023
def record_directory_isolation(self):
1024
"""Gather accessed directories to permit later access.
1026
This is used for tests that access the branch bzr is running from.
1028
self._directory_isolation = "record"
1030
def start_server(self, transport_server, backing_server=None):
1031
"""Start transport_server for this test.
1033
This starts the server, registers a cleanup for it and permits the
1034
server's urls to be used.
1036
if backing_server is None:
1037
transport_server.start_server()
1039
transport_server.start_server(backing_server)
1040
self.addCleanup(transport_server.stop_server)
1041
# Obtain a real transport because if the server supplies a password, it
1042
# will be hidden from the base on the client side.
1043
t = _mod_transport.get_transport(transport_server.get_url())
1044
# Some transport servers effectively chroot the backing transport;
1045
# others like SFTPServer don't - users of the transport can walk up the
1046
# transport to read the entire backing transport. This wouldn't matter
1047
# except that the workdir tests are given - and that they expect the
1048
# server's url to point at - is one directory under the safety net. So
1049
# Branch operations into the transport will attempt to walk up one
1050
# directory. Chrooting all servers would avoid this but also mean that
1051
# we wouldn't be testing directly against non-root urls. Alternatively
1052
# getting the test framework to start the server with a backing server
1053
# at the actual safety net directory would work too, but this then
1054
# means that the self.get_url/self.get_transport methods would need
1055
# to transform all their results. On balance its cleaner to handle it
1056
# here, and permit a higher url when we have one of these transports.
1057
if t.base.endswith('/work/'):
1058
# we have safety net/test root/work
1059
t = t.clone('../..')
1060
elif isinstance(transport_server,
1061
test_server.SmartTCPServer_for_testing):
1062
# The smart server adds a path similar to work, which is traversed
1063
# up from by the client. But the server is chrooted - the actual
1064
# backing transport is not escaped from, and VFS requests to the
1065
# root will error (because they try to escape the chroot).
1067
while t2.base != t.base:
1070
self.permit_url(t.base)
1072
def _track_transports(self):
1073
"""Install checks for transport usage."""
1074
# TestCase has no safe place it can write to.
1075
self._bzr_selftest_roots = []
1076
# Currently the easiest way to be sure that nothing is going on is to
1077
# hook into bzr dir opening. This leaves a small window of error for
1078
# transport tests, but they are well known, and we can improve on this
1080
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1081
self._preopen_isolate_transport, "Check bzr directories are safe.")
1083
def _ndiff_strings(self, a, b):
1084
"""Return ndiff between two strings containing lines.
1086
A trailing newline is added if missing to make the strings
1088
if b and b[-1] != '\n':
1090
if a and a[-1] != '\n':
1092
difflines = difflib.ndiff(a.splitlines(True),
1094
linejunk=lambda x: False,
1095
charjunk=lambda x: False)
1096
return ''.join(difflines)
1098
def assertEqual(self, a, b, message=''):
1102
except UnicodeError, e:
1103
# If we can't compare without getting a UnicodeError, then
1104
# obviously they are different
1105
mutter('UnicodeError: %s', e)
1108
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1110
pprint.pformat(a), pprint.pformat(b)))
1112
assertEquals = assertEqual
1114
def assertEqualDiff(self, a, b, message=None):
1115
"""Assert two texts are equal, if not raise an exception.
1117
This is intended for use with multi-line strings where it can
1118
be hard to find the differences by eye.
1120
# TODO: perhaps override assertEquals to call this for strings?
1124
message = "texts not equal:\n"
1126
message = 'first string is missing a final newline.\n'
1128
message = 'second string is missing a final newline.\n'
1129
raise AssertionError(message +
1130
self._ndiff_strings(a, b))
1132
def assertEqualMode(self, mode, mode_test):
1133
self.assertEqual(mode, mode_test,
1134
'mode mismatch %o != %o' % (mode, mode_test))
1136
def assertEqualStat(self, expected, actual):
1137
"""assert that expected and actual are the same stat result.
1139
:param expected: A stat result.
1140
:param actual: A stat result.
1141
:raises AssertionError: If the expected and actual stat values differ
1142
other than by atime.
1144
self.assertEqual(expected.st_size, actual.st_size,
1145
'st_size did not match')
1146
self.assertEqual(expected.st_mtime, actual.st_mtime,
1147
'st_mtime did not match')
1148
self.assertEqual(expected.st_ctime, actual.st_ctime,
1149
'st_ctime did not match')
1150
if sys.platform != 'win32':
1151
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1152
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1153
# odd. Regardless we shouldn't actually try to assert anything
1154
# about their values
1155
self.assertEqual(expected.st_dev, actual.st_dev,
1156
'st_dev did not match')
1157
self.assertEqual(expected.st_ino, actual.st_ino,
1158
'st_ino did not match')
1159
self.assertEqual(expected.st_mode, actual.st_mode,
1160
'st_mode did not match')
1162
def assertLength(self, length, obj_with_len):
1163
"""Assert that obj_with_len is of length length."""
1164
if len(obj_with_len) != length:
1165
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1166
length, len(obj_with_len), obj_with_len))
1168
def assertLogsError(self, exception_class, func, *args, **kwargs):
1169
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1171
from bzrlib import trace
1173
orig_log_exception_quietly = trace.log_exception_quietly
1176
orig_log_exception_quietly()
1177
captured.append(sys.exc_info())
1178
trace.log_exception_quietly = capture
1179
func(*args, **kwargs)
1181
trace.log_exception_quietly = orig_log_exception_quietly
1182
self.assertLength(1, captured)
1183
err = captured[0][1]
1184
self.assertIsInstance(err, exception_class)
1187
def assertPositive(self, val):
1188
"""Assert that val is greater than 0."""
1189
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1191
def assertNegative(self, val):
1192
"""Assert that val is less than 0."""
1193
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1195
def assertStartsWith(self, s, prefix):
1196
if not s.startswith(prefix):
1197
raise AssertionError('string %r does not start with %r' % (s, prefix))
1199
def assertEndsWith(self, s, suffix):
1200
"""Asserts that s ends with suffix."""
1201
if not s.endswith(suffix):
1202
raise AssertionError('string %r does not end with %r' % (s, suffix))
1204
def assertContainsRe(self, haystack, needle_re, flags=0):
1205
"""Assert that a contains something matching a regular expression."""
1206
if not re.search(needle_re, haystack, flags):
1207
if '\n' in haystack or len(haystack) > 60:
1208
# a long string, format it in a more readable way
1209
raise AssertionError(
1210
'pattern "%s" not found in\n"""\\\n%s"""\n'
1211
% (needle_re, haystack))
1213
raise AssertionError('pattern "%s" not found in "%s"'
1214
% (needle_re, haystack))
1216
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1217
"""Assert that a does not match a regular expression"""
1218
if re.search(needle_re, haystack, flags):
1219
raise AssertionError('pattern "%s" found in "%s"'
1220
% (needle_re, haystack))
1222
def assertContainsString(self, haystack, needle):
1223
if haystack.find(needle) == -1:
1224
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1226
def assertSubset(self, sublist, superlist):
1227
"""Assert that every entry in sublist is present in superlist."""
1228
missing = set(sublist) - set(superlist)
1229
if len(missing) > 0:
1230
raise AssertionError("value(s) %r not present in container %r" %
1231
(missing, superlist))
1233
def assertListRaises(self, excClass, func, *args, **kwargs):
1234
"""Fail unless excClass is raised when the iterator from func is used.
1236
Many functions can return generators this makes sure
1237
to wrap them in a list() call to make sure the whole generator
1238
is run, and that the proper exception is raised.
1241
list(func(*args, **kwargs))
1245
if getattr(excClass,'__name__', None) is not None:
1246
excName = excClass.__name__
1248
excName = str(excClass)
1249
raise self.failureException, "%s not raised" % excName
1251
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1252
"""Assert that a callable raises a particular exception.
1254
:param excClass: As for the except statement, this may be either an
1255
exception class, or a tuple of classes.
1256
:param callableObj: A callable, will be passed ``*args`` and
1259
Returns the exception so that you can examine it.
1262
callableObj(*args, **kwargs)
1266
if getattr(excClass,'__name__', None) is not None:
1267
excName = excClass.__name__
1270
excName = str(excClass)
1271
raise self.failureException, "%s not raised" % excName
1273
def assertIs(self, left, right, message=None):
1274
if not (left is right):
1275
if message is not None:
1276
raise AssertionError(message)
1278
raise AssertionError("%r is not %r." % (left, right))
1280
def assertIsNot(self, left, right, message=None):
1282
if message is not None:
1283
raise AssertionError(message)
1285
raise AssertionError("%r is %r." % (left, right))
1287
def assertTransportMode(self, transport, path, mode):
1288
"""Fail if a path does not have mode "mode".
1290
If modes are not supported on this transport, the assertion is ignored.
1292
if not transport._can_roundtrip_unix_modebits():
1294
path_stat = transport.stat(path)
1295
actual_mode = stat.S_IMODE(path_stat.st_mode)
1296
self.assertEqual(mode, actual_mode,
1297
'mode of %r incorrect (%s != %s)'
1298
% (path, oct(mode), oct(actual_mode)))
1300
def assertIsSameRealPath(self, path1, path2):
1301
"""Fail if path1 and path2 points to different files"""
1302
self.assertEqual(osutils.realpath(path1),
1303
osutils.realpath(path2),
1304
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1306
def assertIsInstance(self, obj, kls, msg=None):
1307
"""Fail if obj is not an instance of kls
1309
:param msg: Supplementary message to show if the assertion fails.
1311
if not isinstance(obj, kls):
1312
m = "%r is an instance of %s rather than %s" % (
1313
obj, obj.__class__, kls)
1318
def assertFileEqual(self, content, path):
1319
"""Fail if path does not contain 'content'."""
1320
self.failUnlessExists(path)
1321
f = file(path, 'rb')
1326
self.assertEqualDiff(content, s)
1328
def assertDocstring(self, expected_docstring, obj):
1329
"""Fail if obj does not have expected_docstring"""
1331
# With -OO the docstring should be None instead
1332
self.assertIs(obj.__doc__, None)
1334
self.assertEqual(expected_docstring, obj.__doc__)
1336
def failUnlessExists(self, path):
1337
"""Fail unless path or paths, which may be abs or relative, exist."""
1338
if not isinstance(path, basestring):
1340
self.failUnlessExists(p)
1342
self.failUnless(osutils.lexists(path),path+" does not exist")
1344
def failIfExists(self, path):
1345
"""Fail if path or paths, which may be abs or relative, exist."""
1346
if not isinstance(path, basestring):
1348
self.failIfExists(p)
1350
self.failIf(osutils.lexists(path),path+" exists")
1352
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1353
"""A helper for callDeprecated and applyDeprecated.
1355
:param a_callable: A callable to call.
1356
:param args: The positional arguments for the callable
1357
:param kwargs: The keyword arguments for the callable
1358
:return: A tuple (warnings, result). result is the result of calling
1359
a_callable(``*args``, ``**kwargs``).
1362
def capture_warnings(msg, cls=None, stacklevel=None):
1363
# we've hooked into a deprecation specific callpath,
1364
# only deprecations should getting sent via it.
1365
self.assertEqual(cls, DeprecationWarning)
1366
local_warnings.append(msg)
1367
original_warning_method = symbol_versioning.warn
1368
symbol_versioning.set_warning_method(capture_warnings)
1370
result = a_callable(*args, **kwargs)
1372
symbol_versioning.set_warning_method(original_warning_method)
1373
return (local_warnings, result)
1375
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1376
"""Call a deprecated callable without warning the user.
1378
Note that this only captures warnings raised by symbol_versioning.warn,
1379
not other callers that go direct to the warning module.
1381
To test that a deprecated method raises an error, do something like
1384
self.assertRaises(errors.ReservedId,
1385
self.applyDeprecated,
1386
deprecated_in((1, 5, 0)),
1390
:param deprecation_format: The deprecation format that the callable
1391
should have been deprecated with. This is the same type as the
1392
parameter to deprecated_method/deprecated_function. If the
1393
callable is not deprecated with this format, an assertion error
1395
:param a_callable: A callable to call. This may be a bound method or
1396
a regular function. It will be called with ``*args`` and
1398
:param args: The positional arguments for the callable
1399
:param kwargs: The keyword arguments for the callable
1400
:return: The result of a_callable(``*args``, ``**kwargs``)
1402
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1404
expected_first_warning = symbol_versioning.deprecation_string(
1405
a_callable, deprecation_format)
1406
if len(call_warnings) == 0:
1407
self.fail("No deprecation warning generated by call to %s" %
1409
self.assertEqual(expected_first_warning, call_warnings[0])
1412
def callCatchWarnings(self, fn, *args, **kw):
1413
"""Call a callable that raises python warnings.
1415
The caller's responsible for examining the returned warnings.
1417
If the callable raises an exception, the exception is not
1418
caught and propagates up to the caller. In that case, the list
1419
of warnings is not available.
1421
:returns: ([warning_object, ...], fn_result)
1423
# XXX: This is not perfect, because it completely overrides the
1424
# warnings filters, and some code may depend on suppressing particular
1425
# warnings. It's the easiest way to insulate ourselves from -Werror,
1426
# though. -- Andrew, 20071062
1428
def _catcher(message, category, filename, lineno, file=None, line=None):
1429
# despite the name, 'message' is normally(?) a Warning subclass
1431
wlist.append(message)
1432
saved_showwarning = warnings.showwarning
1433
saved_filters = warnings.filters
1435
warnings.showwarning = _catcher
1436
warnings.filters = []
1437
result = fn(*args, **kw)
1439
warnings.showwarning = saved_showwarning
1440
warnings.filters = saved_filters
1441
return wlist, result
1443
def callDeprecated(self, expected, callable, *args, **kwargs):
1444
"""Assert that a callable is deprecated in a particular way.
1446
This is a very precise test for unusual requirements. The
1447
applyDeprecated helper function is probably more suited for most tests
1448
as it allows you to simply specify the deprecation format being used
1449
and will ensure that that is issued for the function being called.
1451
Note that this only captures warnings raised by symbol_versioning.warn,
1452
not other callers that go direct to the warning module. To catch
1453
general warnings, use callCatchWarnings.
1455
:param expected: a list of the deprecation warnings expected, in order
1456
:param callable: The callable to call
1457
:param args: The positional arguments for the callable
1458
:param kwargs: The keyword arguments for the callable
1460
call_warnings, result = self._capture_deprecation_warnings(callable,
1462
self.assertEqual(expected, call_warnings)
1465
def _startLogFile(self):
1466
"""Send bzr and test log messages to a temporary file.
1468
The file is removed as the test is torn down.
1470
self._log_file = StringIO()
1471
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1472
self.addCleanup(self._finishLogFile)
1474
def _finishLogFile(self):
1475
"""Finished with the log file.
1477
Close the file and delete it, unless setKeepLogfile was called.
1479
if bzrlib.trace._trace_file:
1480
# flush the log file, to get all content
1481
bzrlib.trace._trace_file.flush()
1482
bzrlib.trace.pop_log_file(self._log_memento)
1483
# Cache the log result and delete the file on disk
1484
self._get_log(False)
1486
def thisFailsStrictLockCheck(self):
1487
"""It is known that this test would fail with -Dstrict_locks.
1489
By default, all tests are run with strict lock checking unless
1490
-Edisable_lock_checks is supplied. However there are some tests which
1491
we know fail strict locks at this point that have not been fixed.
1492
They should call this function to disable the strict checking.
1494
This should be used sparingly, it is much better to fix the locking
1495
issues rather than papering over the problem by calling this function.
1497
debug.debug_flags.discard('strict_locks')
1499
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1500
"""Overrides an object attribute restoring it after the test.
1502
:param obj: The object that will be mutated.
1504
:param attr_name: The attribute name we want to preserve/override in
1507
:param new: The optional value we want to set the attribute to.
1509
:returns: The actual attr value.
1511
value = getattr(obj, attr_name)
1512
# The actual value is captured by the call below
1513
self.addCleanup(setattr, obj, attr_name, value)
1514
if new is not _unitialized_attr:
1515
setattr(obj, attr_name, new)
1518
def _cleanEnvironment(self):
1520
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1521
'HOME': os.getcwd(),
1522
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1523
# tests do check our impls match APPDATA
1524
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1528
'BZREMAIL': None, # may still be present in the environment
1529
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1530
'BZR_PROGRESS_BAR': None,
1532
'BZR_PLUGIN_PATH': None,
1533
'BZR_DISABLE_PLUGINS': None,
1534
'BZR_PLUGINS_AT': None,
1535
'BZR_CONCURRENCY': None,
1536
# Make sure that any text ui tests are consistent regardless of
1537
# the environment the test case is run in; you may want tests that
1538
# test other combinations. 'dumb' is a reasonable guess for tests
1539
# going to a pipe or a StringIO.
1543
'BZR_COLUMNS': '80',
1545
'SSH_AUTH_SOCK': None,
1549
'https_proxy': None,
1550
'HTTPS_PROXY': None,
1555
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1556
# least. If you do (care), please update this comment
1560
'BZR_REMOTE_PATH': None,
1561
# Generally speaking, we don't want apport reporting on crashes in
1562
# the test envirnoment unless we're specifically testing apport,
1563
# so that it doesn't leak into the real system environment. We
1564
# use an env var so it propagates to subprocesses.
1565
'APPORT_DISABLE': '1',
1568
self.addCleanup(self._restoreEnvironment)
1569
for name, value in new_env.iteritems():
1570
self._captureVar(name, value)
1572
def _captureVar(self, name, newvalue):
1573
"""Set an environment variable, and reset it when finished."""
1574
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1576
def _restoreEnvironment(self):
1577
for name, value in self._old_env.iteritems():
1578
osutils.set_or_unset_env(name, value)
1580
def _restoreHooks(self):
1581
for klass, (name, hooks) in self._preserved_hooks.items():
1582
setattr(klass, name, hooks)
1584
def knownFailure(self, reason):
1585
"""This test has failed for some known reason."""
1586
raise KnownFailure(reason)
1588
def _suppress_log(self):
1589
"""Remove the log info from details."""
1590
self.discardDetail('log')
1592
def _do_skip(self, result, reason):
1593
self._suppress_log()
1594
addSkip = getattr(result, 'addSkip', None)
1595
if not callable(addSkip):
1596
result.addSuccess(result)
1598
addSkip(self, reason)
1601
def _do_known_failure(self, result, e):
1602
self._suppress_log()
1603
err = sys.exc_info()
1604
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1605
if addExpectedFailure is not None:
1606
addExpectedFailure(self, err)
1608
result.addSuccess(self)
1611
def _do_not_applicable(self, result, e):
1613
reason = 'No reason given'
1616
self._suppress_log ()
1617
addNotApplicable = getattr(result, 'addNotApplicable', None)
1618
if addNotApplicable is not None:
1619
result.addNotApplicable(self, reason)
1621
self._do_skip(result, reason)
1624
def _report_skip(self, result, err):
1625
"""Override the default _report_skip.
1627
We want to strip the 'log' detail. If we waint until _do_skip, it has
1628
already been formatted into the 'reason' string, and we can't pull it
1631
self._suppress_log()
1632
super(TestCase, self)._report_skip(self, result, err)
1635
def _report_expected_failure(self, result, err):
1638
See _report_skip for motivation.
1640
self._suppress_log()
1641
super(TestCase, self)._report_expected_failure(self, result, err)
1644
def _do_unsupported_or_skip(self, result, e):
1646
self._suppress_log()
1647
addNotSupported = getattr(result, 'addNotSupported', None)
1648
if addNotSupported is not None:
1649
result.addNotSupported(self, reason)
1651
self._do_skip(result, reason)
1653
def time(self, callable, *args, **kwargs):
1654
"""Run callable and accrue the time it takes to the benchmark time.
1656
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1657
this will cause lsprofile statistics to be gathered and stored in
1660
if self._benchtime is None:
1661
self.addDetail('benchtime', content.Content(content.ContentType(
1662
"text", "plain"), lambda:[str(self._benchtime)]))
1666
if not self._gather_lsprof_in_benchmarks:
1667
return callable(*args, **kwargs)
1669
# record this benchmark
1670
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1672
self._benchcalls.append(((callable, args, kwargs), stats))
1675
self._benchtime += time.time() - start
1677
def log(self, *args):
1680
def _get_log(self, keep_log_file=False):
1681
"""Internal helper to get the log from bzrlib.trace for this test.
1683
Please use self.getDetails, or self.get_log to access this in test case
1686
:param keep_log_file: When True, if the log is still a file on disk
1687
leave it as a file on disk. When False, if the log is still a file
1688
on disk, the log file is deleted and the log preserved as
1690
:return: A string containing the log.
1692
if self._log_contents is not None:
1694
self._log_contents.decode('utf8')
1695
except UnicodeDecodeError:
1696
unicodestr = self._log_contents.decode('utf8', 'replace')
1697
self._log_contents = unicodestr.encode('utf8')
1698
return self._log_contents
1699
if self._log_file is not None:
1700
log_contents = self._log_file.getvalue()
1702
log_contents.decode('utf8')
1703
except UnicodeDecodeError:
1704
unicodestr = log_contents.decode('utf8', 'replace')
1705
log_contents = unicodestr.encode('utf8')
1706
if not keep_log_file:
1707
self._log_file = None
1708
# Permit multiple calls to get_log until we clean it up in
1710
self._log_contents = log_contents
1713
return "No log file content."
1716
"""Get a unicode string containing the log from bzrlib.trace.
1718
Undecodable characters are replaced.
1720
return u"".join(self.getDetails()['log'].iter_text())
1722
def requireFeature(self, feature):
1723
"""This test requires a specific feature is available.
1725
:raises UnavailableFeature: When feature is not available.
1727
if not feature.available():
1728
raise UnavailableFeature(feature)
1730
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1732
"""Run bazaar command line, splitting up a string command line."""
1733
if isinstance(args, basestring):
1734
# shlex don't understand unicode strings,
1735
# so args should be plain string (bialix 20070906)
1736
args = list(shlex.split(str(args)))
1737
return self._run_bzr_core(args, retcode=retcode,
1738
encoding=encoding, stdin=stdin, working_dir=working_dir,
1741
def _run_bzr_core(self, args, retcode, encoding, stdin,
1743
# Clear chk_map page cache, because the contents are likely to mask
1745
chk_map.clear_cache()
1746
if encoding is None:
1747
encoding = osutils.get_user_encoding()
1748
stdout = StringIOWrapper()
1749
stderr = StringIOWrapper()
1750
stdout.encoding = encoding
1751
stderr.encoding = encoding
1753
self.log('run bzr: %r', args)
1754
# FIXME: don't call into logging here
1755
handler = logging.StreamHandler(stderr)
1756
handler.setLevel(logging.INFO)
1757
logger = logging.getLogger('')
1758
logger.addHandler(handler)
1759
old_ui_factory = ui.ui_factory
1760
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1763
if working_dir is not None:
1764
cwd = osutils.getcwd()
1765
os.chdir(working_dir)
1769
result = self.apply_redirected(ui.ui_factory.stdin,
1771
bzrlib.commands.run_bzr_catch_user_errors,
1773
except KeyboardInterrupt:
1774
# Reraise KeyboardInterrupt with contents of redirected stdout
1775
# and stderr as arguments, for tests which are interested in
1776
# stdout and stderr and are expecting the exception.
1777
out = stdout.getvalue()
1778
err = stderr.getvalue()
1780
self.log('output:\n%r', out)
1782
self.log('errors:\n%r', err)
1783
raise KeyboardInterrupt(out, err)
1785
logger.removeHandler(handler)
1786
ui.ui_factory = old_ui_factory
1790
out = stdout.getvalue()
1791
err = stderr.getvalue()
1793
self.log('output:\n%r', out)
1795
self.log('errors:\n%r', err)
1796
if retcode is not None:
1797
self.assertEquals(retcode, result,
1798
message='Unexpected return code')
1799
return result, out, err
1801
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1802
working_dir=None, error_regexes=[], output_encoding=None):
1803
"""Invoke bzr, as if it were run from the command line.
1805
The argument list should not include the bzr program name - the
1806
first argument is normally the bzr command. Arguments may be
1807
passed in three ways:
1809
1- A list of strings, eg ["commit", "a"]. This is recommended
1810
when the command contains whitespace or metacharacters, or
1811
is built up at run time.
1813
2- A single string, eg "add a". This is the most convenient
1814
for hardcoded commands.
1816
This runs bzr through the interface that catches and reports
1817
errors, and with logging set to something approximating the
1818
default, so that error reporting can be checked.
1820
This should be the main method for tests that want to exercise the
1821
overall behavior of the bzr application (rather than a unit test
1822
or a functional test of the library.)
1824
This sends the stdout/stderr results into the test's log,
1825
where it may be useful for debugging. See also run_captured.
1827
:keyword stdin: A string to be used as stdin for the command.
1828
:keyword retcode: The status code the command should return;
1830
:keyword working_dir: The directory to run the command in
1831
:keyword error_regexes: A list of expected error messages. If
1832
specified they must be seen in the error output of the command.
1834
retcode, out, err = self._run_bzr_autosplit(
1839
working_dir=working_dir,
1841
self.assertIsInstance(error_regexes, (list, tuple))
1842
for regex in error_regexes:
1843
self.assertContainsRe(err, regex)
1846
def run_bzr_error(self, error_regexes, *args, **kwargs):
1847
"""Run bzr, and check that stderr contains the supplied regexes
1849
:param error_regexes: Sequence of regular expressions which
1850
must each be found in the error output. The relative ordering
1852
:param args: command-line arguments for bzr
1853
:param kwargs: Keyword arguments which are interpreted by run_bzr
1854
This function changes the default value of retcode to be 3,
1855
since in most cases this is run when you expect bzr to fail.
1857
:return: (out, err) The actual output of running the command (in case
1858
you want to do more inspection)
1862
# Make sure that commit is failing because there is nothing to do
1863
self.run_bzr_error(['no changes to commit'],
1864
['commit', '-m', 'my commit comment'])
1865
# Make sure --strict is handling an unknown file, rather than
1866
# giving us the 'nothing to do' error
1867
self.build_tree(['unknown'])
1868
self.run_bzr_error(['Commit refused because there are unknown files'],
1869
['commit', --strict', '-m', 'my commit comment'])
1871
kwargs.setdefault('retcode', 3)
1872
kwargs['error_regexes'] = error_regexes
1873
out, err = self.run_bzr(*args, **kwargs)
1876
def run_bzr_subprocess(self, *args, **kwargs):
1877
"""Run bzr in a subprocess for testing.
1879
This starts a new Python interpreter and runs bzr in there.
1880
This should only be used for tests that have a justifiable need for
1881
this isolation: e.g. they are testing startup time, or signal
1882
handling, or early startup code, etc. Subprocess code can't be
1883
profiled or debugged so easily.
1885
:keyword retcode: The status code that is expected. Defaults to 0. If
1886
None is supplied, the status code is not checked.
1887
:keyword env_changes: A dictionary which lists changes to environment
1888
variables. A value of None will unset the env variable.
1889
The values must be strings. The change will only occur in the
1890
child, so you don't need to fix the environment after running.
1891
:keyword universal_newlines: Convert CRLF => LF
1892
:keyword allow_plugins: By default the subprocess is run with
1893
--no-plugins to ensure test reproducibility. Also, it is possible
1894
for system-wide plugins to create unexpected output on stderr,
1895
which can cause unnecessary test failures.
1897
env_changes = kwargs.get('env_changes', {})
1898
working_dir = kwargs.get('working_dir', None)
1899
allow_plugins = kwargs.get('allow_plugins', False)
1901
if isinstance(args[0], list):
1903
elif isinstance(args[0], basestring):
1904
args = list(shlex.split(args[0]))
1906
raise ValueError("passing varargs to run_bzr_subprocess")
1907
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1908
working_dir=working_dir,
1909
allow_plugins=allow_plugins)
1910
# We distinguish between retcode=None and retcode not passed.
1911
supplied_retcode = kwargs.get('retcode', 0)
1912
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1913
universal_newlines=kwargs.get('universal_newlines', False),
1916
def start_bzr_subprocess(self, process_args, env_changes=None,
1917
skip_if_plan_to_signal=False,
1919
allow_plugins=False):
1920
"""Start bzr in a subprocess for testing.
1922
This starts a new Python interpreter and runs bzr in there.
1923
This should only be used for tests that have a justifiable need for
1924
this isolation: e.g. they are testing startup time, or signal
1925
handling, or early startup code, etc. Subprocess code can't be
1926
profiled or debugged so easily.
1928
:param process_args: a list of arguments to pass to the bzr executable,
1929
for example ``['--version']``.
1930
:param env_changes: A dictionary which lists changes to environment
1931
variables. A value of None will unset the env variable.
1932
The values must be strings. The change will only occur in the
1933
child, so you don't need to fix the environment after running.
1934
:param skip_if_plan_to_signal: raise TestSkipped when true and system
1935
doesn't support signalling subprocesses.
1936
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1938
:returns: Popen object for the started process.
1940
if skip_if_plan_to_signal:
1941
if os.name != "posix":
1942
raise TestSkipped("Sending signals not supported")
1944
if env_changes is None:
1948
def cleanup_environment():
1949
for env_var, value in env_changes.iteritems():
1950
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1952
def restore_environment():
1953
for env_var, value in old_env.iteritems():
1954
osutils.set_or_unset_env(env_var, value)
1956
bzr_path = self.get_bzr_path()
1959
if working_dir is not None:
1960
cwd = osutils.getcwd()
1961
os.chdir(working_dir)
1964
# win32 subprocess doesn't support preexec_fn
1965
# so we will avoid using it on all platforms, just to
1966
# make sure the code path is used, and we don't break on win32
1967
cleanup_environment()
1968
command = [sys.executable]
1969
# frozen executables don't need the path to bzr
1970
if getattr(sys, "frozen", None) is None:
1971
command.append(bzr_path)
1972
if not allow_plugins:
1973
command.append('--no-plugins')
1974
command.extend(process_args)
1975
process = self._popen(command, stdin=subprocess.PIPE,
1976
stdout=subprocess.PIPE,
1977
stderr=subprocess.PIPE)
1979
restore_environment()
1985
def _popen(self, *args, **kwargs):
1986
"""Place a call to Popen.
1988
Allows tests to override this method to intercept the calls made to
1989
Popen for introspection.
1991
return subprocess.Popen(*args, **kwargs)
1993
def get_source_path(self):
1994
"""Return the path of the directory containing bzrlib."""
1995
return os.path.dirname(os.path.dirname(bzrlib.__file__))
1997
def get_bzr_path(self):
1998
"""Return the path of the 'bzr' executable for this test suite."""
1999
bzr_path = os.path.join(self.get_source_path(), "bzr")
2000
if not os.path.isfile(bzr_path):
2001
# We are probably installed. Assume sys.argv is the right file
2002
bzr_path = sys.argv[0]
2005
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2006
universal_newlines=False, process_args=None):
2007
"""Finish the execution of process.
2009
:param process: the Popen object returned from start_bzr_subprocess.
2010
:param retcode: The status code that is expected. Defaults to 0. If
2011
None is supplied, the status code is not checked.
2012
:param send_signal: an optional signal to send to the process.
2013
:param universal_newlines: Convert CRLF => LF
2014
:returns: (stdout, stderr)
2016
if send_signal is not None:
2017
os.kill(process.pid, send_signal)
2018
out, err = process.communicate()
2020
if universal_newlines:
2021
out = out.replace('\r\n', '\n')
2022
err = err.replace('\r\n', '\n')
2024
if retcode is not None and retcode != process.returncode:
2025
if process_args is None:
2026
process_args = "(unknown args)"
2027
mutter('Output of bzr %s:\n%s', process_args, out)
2028
mutter('Error for bzr %s:\n%s', process_args, err)
2029
self.fail('Command bzr %s failed with retcode %s != %s'
2030
% (process_args, retcode, process.returncode))
2033
def check_inventory_shape(self, inv, shape):
2034
"""Compare an inventory to a list of expected names.
2036
Fail if they are not precisely equal.
2039
shape = list(shape) # copy
2040
for path, ie in inv.entries():
2041
name = path.replace('\\', '/')
2042
if ie.kind == 'directory':
2049
self.fail("expected paths not found in inventory: %r" % shape)
2051
self.fail("unexpected paths found in inventory: %r" % extras)
2053
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2054
a_callable=None, *args, **kwargs):
2055
"""Call callable with redirected std io pipes.
2057
Returns the return code."""
2058
if not callable(a_callable):
2059
raise ValueError("a_callable must be callable.")
2061
stdin = StringIO("")
2063
if getattr(self, "_log_file", None) is not None:
2064
stdout = self._log_file
2068
if getattr(self, "_log_file", None is not None):
2069
stderr = self._log_file
2072
real_stdin = sys.stdin
2073
real_stdout = sys.stdout
2074
real_stderr = sys.stderr
2079
return a_callable(*args, **kwargs)
2081
sys.stdout = real_stdout
2082
sys.stderr = real_stderr
2083
sys.stdin = real_stdin
2085
def reduceLockdirTimeout(self):
2086
"""Reduce the default lock timeout for the duration of the test, so that
2087
if LockContention occurs during a test, it does so quickly.
2089
Tests that expect to provoke LockContention errors should call this.
2091
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2093
def make_utf8_encoded_stringio(self, encoding_type=None):
2094
"""Return a StringIOWrapper instance, that will encode Unicode
2097
if encoding_type is None:
2098
encoding_type = 'strict'
2100
output_encoding = 'utf-8'
2101
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2102
sio.encoding = output_encoding
2105
def disable_verb(self, verb):
2106
"""Disable a smart server verb for one test."""
2107
from bzrlib.smart import request
2108
request_handlers = request.request_handlers
2109
orig_method = request_handlers.get(verb)
2110
request_handlers.remove(verb)
2111
self.addCleanup(request_handlers.register, verb, orig_method)
2114
class CapturedCall(object):
2115
"""A helper for capturing smart server calls for easy debug analysis."""
2117
def __init__(self, params, prefix_length):
2118
"""Capture the call with params and skip prefix_length stack frames."""
2121
# The last 5 frames are the __init__, the hook frame, and 3 smart
2122
# client frames. Beyond this we could get more clever, but this is good
2124
stack = traceback.extract_stack()[prefix_length:-5]
2125
self.stack = ''.join(traceback.format_list(stack))
2128
return self.call.method
2131
return self.call.method
2137
class TestCaseWithMemoryTransport(TestCase):
2138
"""Common test class for tests that do not need disk resources.
2140
Tests that need disk resources should derive from TestCaseWithTransport.
2142
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2144
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2145
a directory which does not exist. This serves to help ensure test isolation
2146
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2147
must exist. However, TestCaseWithMemoryTransport does not offer local
2148
file defaults for the transport in tests, nor does it obey the command line
2149
override, so tests that accidentally write to the common directory should
2152
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2153
a .bzr directory that stops us ascending higher into the filesystem.
2159
def __init__(self, methodName='runTest'):
2160
# allow test parameterization after test construction and before test
2161
# execution. Variables that the parameterizer sets need to be
2162
# ones that are not set by setUp, or setUp will trash them.
2163
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2164
self.vfs_transport_factory = default_transport
2165
self.transport_server = None
2166
self.transport_readonly_server = None
2167
self.__vfs_server = None
2169
def get_transport(self, relpath=None):
2170
"""Return a writeable transport.
2172
This transport is for the test scratch space relative to
2175
:param relpath: a path relative to the base url.
2177
t = _mod_transport.get_transport(self.get_url(relpath))
2178
self.assertFalse(t.is_readonly())
2181
def get_readonly_transport(self, relpath=None):
2182
"""Return a readonly transport for the test scratch space
2184
This can be used to test that operations which should only need
2185
readonly access in fact do not try to write.
2187
:param relpath: a path relative to the base url.
2189
t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2190
self.assertTrue(t.is_readonly())
2193
def create_transport_readonly_server(self):
2194
"""Create a transport server from class defined at init.
2196
This is mostly a hook for daughter classes.
2198
return self.transport_readonly_server()
2200
def get_readonly_server(self):
2201
"""Get the server instance for the readonly transport
2203
This is useful for some tests with specific servers to do diagnostics.
2205
if self.__readonly_server is None:
2206
if self.transport_readonly_server is None:
2207
# readonly decorator requested
2208
self.__readonly_server = test_server.ReadonlyServer()
2210
# explicit readonly transport.
2211
self.__readonly_server = self.create_transport_readonly_server()
2212
self.start_server(self.__readonly_server,
2213
self.get_vfs_only_server())
2214
return self.__readonly_server
2216
def get_readonly_url(self, relpath=None):
2217
"""Get a URL for the readonly transport.
2219
This will either be backed by '.' or a decorator to the transport
2220
used by self.get_url()
2221
relpath provides for clients to get a path relative to the base url.
2222
These should only be downwards relative, not upwards.
2224
base = self.get_readonly_server().get_url()
2225
return self._adjust_url(base, relpath)
2227
def get_vfs_only_server(self):
2228
"""Get the vfs only read/write server instance.
2230
This is useful for some tests with specific servers that need
2233
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2234
is no means to override it.
2236
if self.__vfs_server is None:
2237
self.__vfs_server = memory.MemoryServer()
2238
self.start_server(self.__vfs_server)
2239
return self.__vfs_server
2241
def get_server(self):
2242
"""Get the read/write server instance.
2244
This is useful for some tests with specific servers that need
2247
This is built from the self.transport_server factory. If that is None,
2248
then the self.get_vfs_server is returned.
2250
if self.__server is None:
2251
if (self.transport_server is None or self.transport_server is
2252
self.vfs_transport_factory):
2253
self.__server = self.get_vfs_only_server()
2255
# bring up a decorated means of access to the vfs only server.
2256
self.__server = self.transport_server()
2257
self.start_server(self.__server, self.get_vfs_only_server())
2258
return self.__server
2260
def _adjust_url(self, base, relpath):
2261
"""Get a URL (or maybe a path) for the readwrite transport.
2263
This will either be backed by '.' or to an equivalent non-file based
2265
relpath provides for clients to get a path relative to the base url.
2266
These should only be downwards relative, not upwards.
2268
if relpath is not None and relpath != '.':
2269
if not base.endswith('/'):
2271
# XXX: Really base should be a url; we did after all call
2272
# get_url()! But sometimes it's just a path (from
2273
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2274
# to a non-escaped local path.
2275
if base.startswith('./') or base.startswith('/'):
2278
base += urlutils.escape(relpath)
2281
def get_url(self, relpath=None):
2282
"""Get a URL (or maybe a path) for the readwrite transport.
2284
This will either be backed by '.' or to an equivalent non-file based
2286
relpath provides for clients to get a path relative to the base url.
2287
These should only be downwards relative, not upwards.
2289
base = self.get_server().get_url()
2290
return self._adjust_url(base, relpath)
2292
def get_vfs_only_url(self, relpath=None):
2293
"""Get a URL (or maybe a path for the plain old vfs transport.
2295
This will never be a smart protocol. It always has all the
2296
capabilities of the local filesystem, but it might actually be a
2297
MemoryTransport or some other similar virtual filesystem.
2299
This is the backing transport (if any) of the server returned by
2300
get_url and get_readonly_url.
2302
:param relpath: provides for clients to get a path relative to the base
2303
url. These should only be downwards relative, not upwards.
2306
base = self.get_vfs_only_server().get_url()
2307
return self._adjust_url(base, relpath)
2309
def _create_safety_net(self):
2310
"""Make a fake bzr directory.
2312
This prevents any tests propagating up onto the TEST_ROOT directory's
2315
root = TestCaseWithMemoryTransport.TEST_ROOT
2316
bzrdir.BzrDir.create_standalone_workingtree(root)
2318
def _check_safety_net(self):
2319
"""Check that the safety .bzr directory have not been touched.
2321
_make_test_root have created a .bzr directory to prevent tests from
2322
propagating. This method ensures than a test did not leaked.
2324
root = TestCaseWithMemoryTransport.TEST_ROOT
2325
self.permit_url(_mod_transport.get_transport(root).base)
2326
wt = workingtree.WorkingTree.open(root)
2327
last_rev = wt.last_revision()
2328
if last_rev != 'null:':
2329
# The current test have modified the /bzr directory, we need to
2330
# recreate a new one or all the followng tests will fail.
2331
# If you need to inspect its content uncomment the following line
2332
# import pdb; pdb.set_trace()
2333
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2334
self._create_safety_net()
2335
raise AssertionError('%s/.bzr should not be modified' % root)
2337
def _make_test_root(self):
2338
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2339
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2340
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2342
TestCaseWithMemoryTransport.TEST_ROOT = root
2344
self._create_safety_net()
2346
# The same directory is used by all tests, and we're not
2347
# specifically told when all tests are finished. This will do.
2348
atexit.register(_rmtree_temp_dir, root)
2350
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2351
self.addCleanup(self._check_safety_net)
2353
def makeAndChdirToTestDir(self):
2354
"""Create a temporary directories for this one test.
2356
This must set self.test_home_dir and self.test_dir and chdir to
2359
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2361
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2362
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2363
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2364
self.permit_dir(self.test_dir)
2366
def make_branch(self, relpath, format=None):
2367
"""Create a branch on the transport at relpath."""
2368
repo = self.make_repository(relpath, format=format)
2369
return repo.bzrdir.create_branch()
2371
def make_bzrdir(self, relpath, format=None):
2373
# might be a relative or absolute path
2374
maybe_a_url = self.get_url(relpath)
2375
segments = maybe_a_url.rsplit('/', 1)
2376
t = _mod_transport.get_transport(maybe_a_url)
2377
if len(segments) > 1 and segments[-1] not in ('', '.'):
2381
if isinstance(format, basestring):
2382
format = bzrdir.format_registry.make_bzrdir(format)
2383
return format.initialize_on_transport(t)
2384
except errors.UninitializableFormat:
2385
raise TestSkipped("Format %s is not initializable." % format)
2387
def make_repository(self, relpath, shared=False, format=None):
2388
"""Create a repository on our default transport at relpath.
2390
Note that relpath must be a relative path, not a full url.
2392
# FIXME: If you create a remoterepository this returns the underlying
2393
# real format, which is incorrect. Actually we should make sure that
2394
# RemoteBzrDir returns a RemoteRepository.
2395
# maybe mbp 20070410
2396
made_control = self.make_bzrdir(relpath, format=format)
2397
return made_control.create_repository(shared=shared)
2399
def make_smart_server(self, path, backing_server=None):
2400
if backing_server is None:
2401
backing_server = self.get_server()
2402
smart_server = test_server.SmartTCPServer_for_testing()
2403
self.start_server(smart_server, backing_server)
2404
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2406
return remote_transport
2408
def make_branch_and_memory_tree(self, relpath, format=None):
2409
"""Create a branch on the default transport and a MemoryTree for it."""
2410
b = self.make_branch(relpath, format=format)
2411
return memorytree.MemoryTree.create_on_branch(b)
2413
def make_branch_builder(self, relpath, format=None):
2414
branch = self.make_branch(relpath, format=format)
2415
return branchbuilder.BranchBuilder(branch=branch)
2417
def overrideEnvironmentForTesting(self):
2418
test_home_dir = self.test_home_dir
2419
if isinstance(test_home_dir, unicode):
2420
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2421
os.environ['HOME'] = test_home_dir
2422
os.environ['BZR_HOME'] = test_home_dir
2425
super(TestCaseWithMemoryTransport, self).setUp()
2426
# Ensure that ConnectedTransport doesn't leak sockets
2427
def get_transport_with_cleanup(*args, **kwargs):
2428
t = orig_get_transport(*args, **kwargs)
2429
if isinstance(t, _mod_transport.ConnectedTransport):
2430
self.addCleanup(t.disconnect)
2433
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
get_transport_with_cleanup)
2435
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2437
self.makeAndChdirToTestDir()
2438
self.overrideEnvironmentForTesting()
2439
self.__readonly_server = None
2440
self.__server = None
2441
self.reduceLockdirTimeout()
2443
def setup_smart_server_with_call_log(self):
2444
"""Sets up a smart server as the transport server with a call log."""
2445
self.transport_server = test_server.SmartTCPServer_for_testing
2446
self.hpss_calls = []
2448
# Skip the current stack down to the caller of
2449
# setup_smart_server_with_call_log
2450
prefix_length = len(traceback.extract_stack()) - 2
2451
def capture_hpss_call(params):
2452
self.hpss_calls.append(
2453
CapturedCall(params, prefix_length))
2454
client._SmartClient.hooks.install_named_hook(
2455
'call', capture_hpss_call, None)
2457
def reset_smart_call_log(self):
2458
self.hpss_calls = []
2461
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2462
"""Derived class that runs a test within a temporary directory.
2464
This is useful for tests that need to create a branch, etc.
2466
The directory is created in a slightly complex way: for each
2467
Python invocation, a new temporary top-level directory is created.
2468
All test cases create their own directory within that. If the
2469
tests complete successfully, the directory is removed.
2471
:ivar test_base_dir: The path of the top-level directory for this
2472
test, which contains a home directory and a work directory.
2474
:ivar test_home_dir: An initially empty directory under test_base_dir
2475
which is used as $HOME for this test.
2477
:ivar test_dir: A directory under test_base_dir used as the current
2478
directory when the test proper is run.
2481
OVERRIDE_PYTHON = 'python'
2483
def check_file_contents(self, filename, expect):
2484
self.log("check contents of file %s" % filename)
2490
if contents != expect:
2491
self.log("expected: %r" % expect)
2492
self.log("actually: %r" % contents)
2493
self.fail("contents of %s not as expected" % filename)
2495
def _getTestDirPrefix(self):
2496
# create a directory within the top level test directory
2497
if sys.platform in ('win32', 'cygwin'):
2498
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2499
# windows is likely to have path-length limits so use a short name
2500
name_prefix = name_prefix[-30:]
2502
name_prefix = re.sub('[/]', '_', self.id())
2505
def makeAndChdirToTestDir(self):
2506
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2508
For TestCaseInTempDir we create a temporary directory based on the test
2509
name and then create two subdirs - test and home under it.
2511
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2512
self._getTestDirPrefix())
2514
for i in range(100):
2515
if os.path.exists(name):
2516
name = name_prefix + '_' + str(i)
2518
# now create test and home directories within this dir
2519
self.test_base_dir = name
2520
self.addCleanup(self.deleteTestDir)
2521
os.mkdir(self.test_base_dir)
2523
self.permit_dir(self.test_base_dir)
2524
# 'sprouting' and 'init' of a branch both walk up the tree to find
2525
# stacking policy to honour; create a bzr dir with an unshared
2526
# repository (but not a branch - our code would be trying to escape
2527
# then!) to stop them, and permit it to be read.
2528
# control = bzrdir.BzrDir.create(self.test_base_dir)
2529
# control.create_repository()
2530
self.test_home_dir = self.test_base_dir + '/home'
2531
os.mkdir(self.test_home_dir)
2532
self.test_dir = self.test_base_dir + '/work'
2533
os.mkdir(self.test_dir)
2534
os.chdir(self.test_dir)
2535
# put name of test inside
2536
f = file(self.test_base_dir + '/name', 'w')
2542
def deleteTestDir(self):
2543
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2544
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2546
def build_tree(self, shape, line_endings='binary', transport=None):
2547
"""Build a test tree according to a pattern.
2549
shape is a sequence of file specifications. If the final
2550
character is '/', a directory is created.
2552
This assumes that all the elements in the tree being built are new.
2554
This doesn't add anything to a branch.
2556
:type shape: list or tuple.
2557
:param line_endings: Either 'binary' or 'native'
2558
in binary mode, exact contents are written in native mode, the
2559
line endings match the default platform endings.
2560
:param transport: A transport to write to, for building trees on VFS's.
2561
If the transport is readonly or None, "." is opened automatically.
2564
if type(shape) not in (list, tuple):
2565
raise AssertionError("Parameter 'shape' should be "
2566
"a list or a tuple. Got %r instead" % (shape,))
2567
# It's OK to just create them using forward slashes on windows.
2568
if transport is None or transport.is_readonly():
2569
transport = _mod_transport.get_transport(".")
2571
self.assertIsInstance(name, basestring)
2573
transport.mkdir(urlutils.escape(name[:-1]))
2575
if line_endings == 'binary':
2577
elif line_endings == 'native':
2580
raise errors.BzrError(
2581
'Invalid line ending request %r' % line_endings)
2582
content = "contents of %s%s" % (name.encode('utf-8'), end)
2583
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2585
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2587
def assertInWorkingTree(self, path, root_path='.', tree=None):
2588
"""Assert whether path or paths are in the WorkingTree"""
2590
tree = workingtree.WorkingTree.open(root_path)
2591
if not isinstance(path, basestring):
2593
self.assertInWorkingTree(p, tree=tree)
2595
self.assertIsNot(tree.path2id(path), None,
2596
path+' not in working tree.')
2598
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2599
"""Assert whether path or paths are not in the WorkingTree"""
2601
tree = workingtree.WorkingTree.open(root_path)
2602
if not isinstance(path, basestring):
2604
self.assertNotInWorkingTree(p,tree=tree)
2606
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2609
class TestCaseWithTransport(TestCaseInTempDir):
2610
"""A test case that provides get_url and get_readonly_url facilities.
2612
These back onto two transport servers, one for readonly access and one for
2615
If no explicit class is provided for readonly access, a
2616
ReadonlyTransportDecorator is used instead which allows the use of non disk
2617
based read write transports.
2619
If an explicit class is provided for readonly access, that server and the
2620
readwrite one must both define get_url() as resolving to os.getcwd().
2623
def get_vfs_only_server(self):
2624
"""See TestCaseWithMemoryTransport.
2626
This is useful for some tests with specific servers that need
2629
if self.__vfs_server is None:
2630
self.__vfs_server = self.vfs_transport_factory()
2631
self.start_server(self.__vfs_server)
2632
return self.__vfs_server
2634
def make_branch_and_tree(self, relpath, format=None):
2635
"""Create a branch on the transport and a tree locally.
2637
If the transport is not a LocalTransport, the Tree can't be created on
2638
the transport. In that case if the vfs_transport_factory is
2639
LocalURLServer the working tree is created in the local
2640
directory backing the transport, and the returned tree's branch and
2641
repository will also be accessed locally. Otherwise a lightweight
2642
checkout is created and returned.
2644
We do this because we can't physically create a tree in the local
2645
path, with a branch reference to the transport_factory url, and
2646
a branch + repository in the vfs_transport, unless the vfs_transport
2647
namespace is distinct from the local disk - the two branch objects
2648
would collide. While we could construct a tree with its branch object
2649
pointing at the transport_factory transport in memory, reopening it
2650
would behaving unexpectedly, and has in the past caused testing bugs
2651
when we tried to do it that way.
2653
:param format: The BzrDirFormat.
2654
:returns: the WorkingTree.
2656
# TODO: always use the local disk path for the working tree,
2657
# this obviously requires a format that supports branch references
2658
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2660
b = self.make_branch(relpath, format=format)
2662
return b.bzrdir.create_workingtree()
2663
except errors.NotLocalUrl:
2664
# We can only make working trees locally at the moment. If the
2665
# transport can't support them, then we keep the non-disk-backed
2666
# branch and create a local checkout.
2667
if self.vfs_transport_factory is test_server.LocalURLServer:
2668
# the branch is colocated on disk, we cannot create a checkout.
2669
# hopefully callers will expect this.
2670
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2671
wt = local_controldir.create_workingtree()
2672
if wt.branch._format != b._format:
2674
# Make sure that assigning to wt._branch fixes wt.branch,
2675
# in case the implementation details of workingtree objects
2677
self.assertIs(b, wt.branch)
2680
return b.create_checkout(relpath, lightweight=True)
2682
def assertIsDirectory(self, relpath, transport):
2683
"""Assert that relpath within transport is a directory.
2685
This may not be possible on all transports; in that case it propagates
2686
a TransportNotPossible.
2689
mode = transport.stat(relpath).st_mode
2690
except errors.NoSuchFile:
2691
self.fail("path %s is not a directory; no such file"
2693
if not stat.S_ISDIR(mode):
2694
self.fail("path %s is not a directory; has mode %#o"
2697
def assertTreesEqual(self, left, right):
2698
"""Check that left and right have the same content and properties."""
2699
# we use a tree delta to check for equality of the content, and we
2700
# manually check for equality of other things such as the parents list.
2701
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2702
differences = left.changes_from(right)
2703
self.assertFalse(differences.has_changed(),
2704
"Trees %r and %r are different: %r" % (left, right, differences))
2707
super(TestCaseWithTransport, self).setUp()
2708
self.__vfs_server = None
2710
def disable_missing_extensions_warning(self):
2711
"""Some tests expect a precise stderr content.
2713
There is no point in forcing them to duplicate the extension related
2716
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2719
class ChrootedTestCase(TestCaseWithTransport):
2720
"""A support class that provides readonly urls outside the local namespace.
2722
This is done by checking if self.transport_server is a MemoryServer. if it
2723
is then we are chrooted already, if it is not then an HttpServer is used
2726
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2727
be used without needed to redo it when a different
2728
subclass is in use ?
2732
from bzrlib.tests import http_server
2733
super(ChrootedTestCase, self).setUp()
2734
if not self.vfs_transport_factory == memory.MemoryServer:
2735
self.transport_readonly_server = http_server.HttpServer
2738
def condition_id_re(pattern):
2739
"""Create a condition filter which performs a re check on a test's id.
2741
:param pattern: A regular expression string.
2742
:return: A callable that returns True if the re matches.
2744
filter_re = re.compile(pattern, 0)
2745
def condition(test):
2747
return filter_re.search(test_id)
2751
def condition_isinstance(klass_or_klass_list):
2752
"""Create a condition filter which returns isinstance(param, klass).
2754
:return: A callable which when called with one parameter obj return the
2755
result of isinstance(obj, klass_or_klass_list).
2758
return isinstance(obj, klass_or_klass_list)
2762
def condition_id_in_list(id_list):
2763
"""Create a condition filter which verify that test's id in a list.
2765
:param id_list: A TestIdList object.
2766
:return: A callable that returns True if the test's id appears in the list.
2768
def condition(test):
2769
return id_list.includes(test.id())
2773
def condition_id_startswith(starts):
2774
"""Create a condition filter verifying that test's id starts with a string.
2776
:param starts: A list of string.
2777
:return: A callable that returns True if the test's id starts with one of
2780
def condition(test):
2781
for start in starts:
2782
if test.id().startswith(start):
2788
def exclude_tests_by_condition(suite, condition):
2789
"""Create a test suite which excludes some tests from suite.
2791
:param suite: The suite to get tests from.
2792
:param condition: A callable whose result evaluates True when called with a
2793
test case which should be excluded from the result.
2794
:return: A suite which contains the tests found in suite that fail
2798
for test in iter_suite_tests(suite):
2799
if not condition(test):
2801
return TestUtil.TestSuite(result)
2804
def filter_suite_by_condition(suite, condition):
2805
"""Create a test suite by filtering another one.
2807
:param suite: The source suite.
2808
:param condition: A callable whose result evaluates True when called with a
2809
test case which should be included in the result.
2810
:return: A suite which contains the tests found in suite that pass
2814
for test in iter_suite_tests(suite):
2817
return TestUtil.TestSuite(result)
2820
def filter_suite_by_re(suite, pattern):
2821
"""Create a test suite by filtering another one.
2823
:param suite: the source suite
2824
:param pattern: pattern that names must match
2825
:returns: the newly created suite
2827
condition = condition_id_re(pattern)
2828
result_suite = filter_suite_by_condition(suite, condition)
2832
def filter_suite_by_id_list(suite, test_id_list):
2833
"""Create a test suite by filtering another one.
2835
:param suite: The source suite.
2836
:param test_id_list: A list of the test ids to keep as strings.
2837
:returns: the newly created suite
2839
condition = condition_id_in_list(test_id_list)
2840
result_suite = filter_suite_by_condition(suite, condition)
2844
def filter_suite_by_id_startswith(suite, start):
2845
"""Create a test suite by filtering another one.
2847
:param suite: The source suite.
2848
:param start: A list of string the test id must start with one of.
2849
:returns: the newly created suite
2851
condition = condition_id_startswith(start)
2852
result_suite = filter_suite_by_condition(suite, condition)
2856
def exclude_tests_by_re(suite, pattern):
2857
"""Create a test suite which excludes some tests from suite.
2859
:param suite: The suite to get tests from.
2860
:param pattern: A regular expression string. Test ids that match this
2861
pattern will be excluded from the result.
2862
:return: A TestSuite that contains all the tests from suite without the
2863
tests that matched pattern. The order of tests is the same as it was in
2866
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2869
def preserve_input(something):
2870
"""A helper for performing test suite transformation chains.
2872
:param something: Anything you want to preserve.
2878
def randomize_suite(suite):
2879
"""Return a new TestSuite with suite's tests in random order.
2881
The tests in the input suite are flattened into a single suite in order to
2882
accomplish this. Any nested TestSuites are removed to provide global
2885
tests = list(iter_suite_tests(suite))
2886
random.shuffle(tests)
2887
return TestUtil.TestSuite(tests)
2890
def split_suite_by_condition(suite, condition):
2891
"""Split a test suite into two by a condition.
2893
:param suite: The suite to split.
2894
:param condition: The condition to match on. Tests that match this
2895
condition are returned in the first test suite, ones that do not match
2896
are in the second suite.
2897
:return: A tuple of two test suites, where the first contains tests from
2898
suite matching the condition, and the second contains the remainder
2899
from suite. The order within each output suite is the same as it was in
2904
for test in iter_suite_tests(suite):
2906
matched.append(test)
2908
did_not_match.append(test)
2909
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2912
def split_suite_by_re(suite, pattern):
2913
"""Split a test suite into two by a regular expression.
2915
:param suite: The suite to split.
2916
:param pattern: A regular expression string. Test ids that match this
2917
pattern will be in the first test suite returned, and the others in the
2918
second test suite returned.
2919
:return: A tuple of two test suites, where the first contains tests from
2920
suite matching pattern, and the second contains the remainder from
2921
suite. The order within each output suite is the same as it was in
2924
return split_suite_by_condition(suite, condition_id_re(pattern))
2927
def run_suite(suite, name='test', verbose=False, pattern=".*",
2928
stop_on_failure=False,
2929
transport=None, lsprof_timed=None, bench_history=None,
2930
matching_tests_first=None,
2933
exclude_pattern=None,
2936
suite_decorators=None,
2938
result_decorators=None,
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from testsweet import TestBase, run_suite, InTempDir
21
MODULES_TO_DOCTEST = []
24
from unittest import TestLoader, TestSuite
25
import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
26
import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
27
global MODULES_TO_TEST, MODULES_TO_DOCTEST
29
import bzrlib.selftest.whitebox
30
import bzrlib.selftest.blackbox
31
import bzrlib.selftest.versioning
32
import bzrlib.selftest.testmerge3
33
import bzrlib.selftest.testhashcache
34
import bzrlib.selftest.testrevisionnamespaces
35
import bzrlib.selftest.testbranch
36
import bzrlib.selftest.teststatus
37
import bzrlib.merge_core
38
from doctest import DocTestSuite
45
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
46
bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
47
if m not in MODULES_TO_DOCTEST:
48
MODULES_TO_DOCTEST.append(m)
50
for m in (bzrlib.selftest.whitebox,
51
bzrlib.selftest.versioning,
52
bzrlib.selftest.testmerge3,
53
bzrlib.selftest.testhashcache,
54
bzrlib.selftest.teststatus,
55
bzrlib.selftest.blackbox,
56
bzrlib.selftest.testhashcache,
57
bzrlib.selftest.testrevisionnamespaces,
58
bzrlib.selftest.testbranch,
2940
"""Run a test suite for bzr selftest.
2942
:param runner_class: The class of runner to use. Must support the
2943
constructor arguments passed by run_suite which are more than standard
2945
:return: A boolean indicating success.
2947
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2952
if runner_class is None:
2953
runner_class = TextTestRunner
2956
runner = runner_class(stream=stream,
2958
verbosity=verbosity,
2959
bench_history=bench_history,
2961
result_decorators=result_decorators,
2963
runner.stop_on_failure=stop_on_failure
2964
# built in decorator factories:
2966
random_order(random_seed, runner),
2967
exclude_tests(exclude_pattern),
2969
if matching_tests_first:
2970
decorators.append(tests_first(pattern))
2972
decorators.append(filter_tests(pattern))
2973
if suite_decorators:
2974
decorators.extend(suite_decorators)
2975
# tell the result object how many tests will be running: (except if
2976
# --parallel=fork is being used. Robert said he will provide a better
2977
# progress design later -- vila 20090817)
2978
if fork_decorator not in decorators:
2979
decorators.append(CountingDecorator)
2980
for decorator in decorators:
2981
suite = decorator(suite)
2983
# Done after test suite decoration to allow randomisation etc
2984
# to take effect, though that is of marginal benefit.
2986
stream.write("Listing tests only ...\n")
2987
for t in iter_suite_tests(suite):
2988
stream.write("%s\n" % (t.id()))
2990
result = runner.run(suite)
2992
return result.wasStrictlySuccessful()
2994
return result.wasSuccessful()
2997
# A registry where get() returns a suite decorator.
2998
parallel_registry = registry.Registry()
3001
def fork_decorator(suite):
3002
if getattr(os, "fork", None) is None:
3003
raise errors.BzrCommandError("platform does not support fork,"
3004
" try --parallel=subprocess instead.")
3005
concurrency = osutils.local_concurrency()
3006
if concurrency == 1:
3008
from testtools import ConcurrentTestSuite
3009
return ConcurrentTestSuite(suite, fork_for_tests)
3010
parallel_registry.register('fork', fork_decorator)
3013
def subprocess_decorator(suite):
3014
concurrency = osutils.local_concurrency()
3015
if concurrency == 1:
3017
from testtools import ConcurrentTestSuite
3018
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3019
parallel_registry.register('subprocess', subprocess_decorator)
3022
def exclude_tests(exclude_pattern):
3023
"""Return a test suite decorator that excludes tests."""
3024
if exclude_pattern is None:
3025
return identity_decorator
3026
def decorator(suite):
3027
return ExcludeDecorator(suite, exclude_pattern)
3031
def filter_tests(pattern):
3033
return identity_decorator
3034
def decorator(suite):
3035
return FilterTestsDecorator(suite, pattern)
3039
def random_order(random_seed, runner):
3040
"""Return a test suite decorator factory for randomising tests order.
3042
:param random_seed: now, a string which casts to a long, or a long.
3043
:param runner: A test runner with a stream attribute to report on.
3045
if random_seed is None:
3046
return identity_decorator
3047
def decorator(suite):
3048
return RandomDecorator(suite, random_seed, runner.stream)
3052
def tests_first(pattern):
3054
return identity_decorator
3055
def decorator(suite):
3056
return TestFirstDecorator(suite, pattern)
3060
def identity_decorator(suite):
3065
class TestDecorator(TestUtil.TestSuite):
3066
"""A decorator for TestCase/TestSuite objects.
3068
Usually, subclasses should override __iter__(used when flattening test
3069
suites), which we do to filter, reorder, parallelise and so on, run() and
3073
def __init__(self, suite):
3074
TestUtil.TestSuite.__init__(self)
3077
def countTestCases(self):
3080
cases += test.countTestCases()
3087
def run(self, result):
3088
# Use iteration on self, not self._tests, to allow subclasses to hook
3091
if result.shouldStop:
3097
class CountingDecorator(TestDecorator):
3098
"""A decorator which calls result.progress(self.countTestCases)."""
3100
def run(self, result):
3101
progress_method = getattr(result, 'progress', None)
3102
if callable(progress_method):
3103
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3104
return super(CountingDecorator, self).run(result)
3107
class ExcludeDecorator(TestDecorator):
3108
"""A decorator which excludes test matching an exclude pattern."""
3110
def __init__(self, suite, exclude_pattern):
3111
TestDecorator.__init__(self, suite)
3112
self.exclude_pattern = exclude_pattern
3113
self.excluded = False
3117
return iter(self._tests)
3118
self.excluded = True
3119
suite = exclude_tests_by_re(self, self.exclude_pattern)
3121
self.addTests(suite)
3122
return iter(self._tests)
3125
class FilterTestsDecorator(TestDecorator):
3126
"""A decorator which filters tests to those matching a pattern."""
3128
def __init__(self, suite, pattern):
3129
TestDecorator.__init__(self, suite)
3130
self.pattern = pattern
3131
self.filtered = False
3135
return iter(self._tests)
3136
self.filtered = True
3137
suite = filter_suite_by_re(self, self.pattern)
3139
self.addTests(suite)
3140
return iter(self._tests)
3143
class RandomDecorator(TestDecorator):
3144
"""A decorator which randomises the order of its tests."""
3146
def __init__(self, suite, random_seed, stream):
3147
TestDecorator.__init__(self, suite)
3148
self.random_seed = random_seed
3149
self.randomised = False
3150
self.stream = stream
3154
return iter(self._tests)
3155
self.randomised = True
3156
self.stream.write("Randomizing test order using seed %s\n\n" %
3157
(self.actual_seed()))
3158
# Initialise the random number generator.
3159
random.seed(self.actual_seed())
3160
suite = randomize_suite(self)
3162
self.addTests(suite)
3163
return iter(self._tests)
3165
def actual_seed(self):
3166
if self.random_seed == "now":
3167
# We convert the seed to a long to make it reuseable across
3168
# invocations (because the user can reenter it).
3169
self.random_seed = long(time.time())
3171
# Convert the seed to a long if we can
3173
self.random_seed = long(self.random_seed)
3176
return self.random_seed
3179
class TestFirstDecorator(TestDecorator):
3180
"""A decorator which moves named tests to the front."""
3182
def __init__(self, suite, pattern):
3183
TestDecorator.__init__(self, suite)
3184
self.pattern = pattern
3185
self.filtered = False
3189
return iter(self._tests)
3190
self.filtered = True
3191
suites = split_suite_by_re(self, self.pattern)
3193
self.addTests(suites)
3194
return iter(self._tests)
3197
def partition_tests(suite, count):
3198
"""Partition suite into count lists of tests."""
3199
# This just assigns tests in a round-robin fashion. On one hand this
3200
# splits up blocks of related tests that might run faster if they shared
3201
# resources, but on the other it avoids assigning blocks of slow tests to
3202
# just one partition. So the slowest partition shouldn't be much slower
3204
partitions = [list() for i in range(count)]
3205
tests = iter_suite_tests(suite)
3206
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3207
partition.append(test)
3211
def workaround_zealous_crypto_random():
3212
"""Crypto.Random want to help us being secure, but we don't care here.
3214
This workaround some test failure related to the sftp server. Once paramiko
3215
stop using the controversial API in Crypto.Random, we may get rid of it.
3218
from Crypto.Random import atfork
3224
def fork_for_tests(suite):
3225
"""Take suite and start up one runner per CPU by forking()
3227
:return: An iterable of TestCase-like objects which can each have
3228
run(result) called on them to feed tests to result.
3230
concurrency = osutils.local_concurrency()
3232
from subunit import TestProtocolClient, ProtocolTestCase
3233
from subunit.test_results import AutoTimingTestResultDecorator
3234
class TestInOtherProcess(ProtocolTestCase):
3235
# Should be in subunit, I think. RBC.
3236
def __init__(self, stream, pid):
3237
ProtocolTestCase.__init__(self, stream)
3240
def run(self, result):
3242
ProtocolTestCase.run(self, result)
3244
os.waitpid(self.pid, 0)
3246
test_blocks = partition_tests(suite, concurrency)
3247
for process_tests in test_blocks:
3248
process_suite = TestUtil.TestSuite()
3249
process_suite.addTests(process_tests)
3250
c2pread, c2pwrite = os.pipe()
3253
workaround_zealous_crypto_random()
3256
# Leave stderr and stdout open so we can see test noise
3257
# Close stdin so that the child goes away if it decides to
3258
# read from stdin (otherwise its a roulette to see what
3259
# child actually gets keystrokes for pdb etc).
3262
stream = os.fdopen(c2pwrite, 'wb', 1)
3263
subunit_result = AutoTimingTestResultDecorator(
3264
TestProtocolClient(stream))
3265
process_suite.run(subunit_result)
3270
stream = os.fdopen(c2pread, 'rb', 1)
3271
test = TestInOtherProcess(stream, pid)
3276
def reinvoke_for_tests(suite):
3277
"""Take suite and start up one runner per CPU using subprocess().
3279
:return: An iterable of TestCase-like objects which can each have
3280
run(result) called on them to feed tests to result.
3282
concurrency = osutils.local_concurrency()
3284
from subunit import ProtocolTestCase
3285
class TestInSubprocess(ProtocolTestCase):
3286
def __init__(self, process, name):
3287
ProtocolTestCase.__init__(self, process.stdout)
3288
self.process = process
3289
self.process.stdin.close()
3292
def run(self, result):
3294
ProtocolTestCase.run(self, result)
3297
os.unlink(self.name)
3298
# print "pid %d finished" % finished_process
3299
test_blocks = partition_tests(suite, concurrency)
3300
for process_tests in test_blocks:
3301
# ugly; currently reimplement rather than reuses TestCase methods.
3302
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3303
if not os.path.isfile(bzr_path):
3304
# We are probably installed. Assume sys.argv is the right file
3305
bzr_path = sys.argv[0]
3306
bzr_path = [bzr_path]
3307
if sys.platform == "win32":
3308
# if we're on windows, we can't execute the bzr script directly
3309
bzr_path = [sys.executable] + bzr_path
3310
fd, test_list_file_name = tempfile.mkstemp()
3311
test_list_file = os.fdopen(fd, 'wb', 1)
3312
for test in process_tests:
3313
test_list_file.write(test.id() + '\n')
3314
test_list_file.close()
3316
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3318
if '--no-plugins' in sys.argv:
3319
argv.append('--no-plugins')
3320
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3321
# noise on stderr it can interrupt the subunit protocol.
3322
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3323
stdout=subprocess.PIPE,
3324
stderr=subprocess.PIPE,
3326
test = TestInSubprocess(process, test_list_file_name)
3329
os.unlink(test_list_file_name)
3334
class ForwardingResult(unittest.TestResult):
3336
def __init__(self, target):
3337
unittest.TestResult.__init__(self)
3338
self.result = target
3340
def startTest(self, test):
3341
self.result.startTest(test)
3343
def stopTest(self, test):
3344
self.result.stopTest(test)
3346
def startTestRun(self):
3347
self.result.startTestRun()
3349
def stopTestRun(self):
3350
self.result.stopTestRun()
3352
def addSkip(self, test, reason):
3353
self.result.addSkip(test, reason)
3355
def addSuccess(self, test):
3356
self.result.addSuccess(test)
3358
def addError(self, test, err):
3359
self.result.addError(test, err)
3361
def addFailure(self, test, err):
3362
self.result.addFailure(test, err)
3363
ForwardingResult = testtools.ExtendedToOriginalDecorator
3366
class ProfileResult(ForwardingResult):
3367
"""Generate profiling data for all activity between start and success.
3369
The profile data is appended to the test's _benchcalls attribute and can
3370
be accessed by the forwarded-to TestResult.
3372
While it might be cleaner do accumulate this in stopTest, addSuccess is
3373
where our existing output support for lsprof is, and this class aims to
3374
fit in with that: while it could be moved it's not necessary to accomplish
3375
test profiling, nor would it be dramatically cleaner.
3378
def startTest(self, test):
3379
self.profiler = bzrlib.lsprof.BzrProfiler()
3380
# Prevent deadlocks in tests that use lsprof: those tests will
3382
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3383
self.profiler.start()
3384
ForwardingResult.startTest(self, test)
3386
def addSuccess(self, test):
3387
stats = self.profiler.stop()
3389
calls = test._benchcalls
3390
except AttributeError:
3391
test._benchcalls = []
3392
calls = test._benchcalls
3393
calls.append(((test.id(), "", ""), stats))
3394
ForwardingResult.addSuccess(self, test)
3396
def stopTest(self, test):
3397
ForwardingResult.stopTest(self, test)
3398
self.profiler = None
3401
# Controlled by "bzr selftest -E=..." option
3402
# Currently supported:
3403
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3404
# preserves any flags supplied at the command line.
3405
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3406
# rather than failing tests. And no longer raise
3407
# LockContention when fctnl locks are not being used
3408
# with proper exclusion rules.
3409
# -Ethreads Will display thread ident at creation/join time to
3410
# help track thread leaks
3411
selftest_debug_flags = set()
3414
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3416
test_suite_factory=None,
3419
matching_tests_first=None,
3422
exclude_pattern=None,
3428
suite_decorators=None,
3432
"""Run the whole test suite under the enhanced runner"""
3433
# XXX: Very ugly way to do this...
3434
# Disable warning about old formats because we don't want it to disturb
3435
# any blackbox tests.
3436
from bzrlib import repository
3437
repository._deprecation_warning_done = True
3439
global default_transport
3440
if transport is None:
3441
transport = default_transport
3442
old_transport = default_transport
3443
default_transport = transport
3444
global selftest_debug_flags
3445
old_debug_flags = selftest_debug_flags
3446
if debug_flags is not None:
3447
selftest_debug_flags = set(debug_flags)
3449
if load_list is None:
3452
keep_only = load_test_id_list(load_list)
3454
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3455
for start in starting_with]
3456
if test_suite_factory is None:
3457
# Reduce loading time by loading modules based on the starting_with
3459
suite = test_suite(keep_only, starting_with)
3461
suite = test_suite_factory()
3463
# But always filter as requested.
3464
suite = filter_suite_by_id_startswith(suite, starting_with)
3465
result_decorators = []
3467
result_decorators.append(ProfileResult)
3468
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3469
stop_on_failure=stop_on_failure,
3470
transport=transport,
3471
lsprof_timed=lsprof_timed,
3472
bench_history=bench_history,
3473
matching_tests_first=matching_tests_first,
3474
list_only=list_only,
3475
random_seed=random_seed,
3476
exclude_pattern=exclude_pattern,
3478
runner_class=runner_class,
3479
suite_decorators=suite_decorators,
3481
result_decorators=result_decorators,
3484
default_transport = old_transport
3485
selftest_debug_flags = old_debug_flags
3488
def load_test_id_list(file_name):
3489
"""Load a test id list from a text file.
3491
The format is one test id by line. No special care is taken to impose
3492
strict rules, these test ids are used to filter the test suite so a test id
3493
that do not match an existing test will do no harm. This allows user to add
3494
comments, leave blank lines, etc.
3498
ftest = open(file_name, 'rt')
3500
if e.errno != errno.ENOENT:
3503
raise errors.NoSuchFile(file_name)
3505
for test_name in ftest.readlines():
3506
test_list.append(test_name.strip())
3511
def suite_matches_id_list(test_suite, id_list):
3512
"""Warns about tests not appearing or appearing more than once.
3514
:param test_suite: A TestSuite object.
3515
:param test_id_list: The list of test ids that should be found in
3518
:return: (absents, duplicates) absents is a list containing the test found
3519
in id_list but not in test_suite, duplicates is a list containing the
3520
test found multiple times in test_suite.
3522
When using a prefined test id list, it may occurs that some tests do not
3523
exist anymore or that some tests use the same id. This function warns the
3524
tester about potential problems in his workflow (test lists are volatile)
3525
or in the test suite itself (using the same id for several tests does not
3526
help to localize defects).
3528
# Build a dict counting id occurrences
3530
for test in iter_suite_tests(test_suite):
3532
tests[id] = tests.get(id, 0) + 1
3537
occurs = tests.get(id, 0)
3539
not_found.append(id)
3541
duplicates.append(id)
3543
return not_found, duplicates
3546
class TestIdList(object):
3547
"""Test id list to filter a test suite.
3549
Relying on the assumption that test ids are built as:
3550
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3551
notation, this class offers methods to :
3552
- avoid building a test suite for modules not refered to in the test list,
3553
- keep only the tests listed from the module test suite.
3556
def __init__(self, test_id_list):
3557
# When a test suite needs to be filtered against us we compare test ids
3558
# for equality, so a simple dict offers a quick and simple solution.
3559
self.tests = dict().fromkeys(test_id_list, True)
3561
# While unittest.TestCase have ids like:
3562
# <module>.<class>.<method>[(<param+)],
3563
# doctest.DocTestCase can have ids like:
3566
# <module>.<function>
3567
# <module>.<class>.<method>
3569
# Since we can't predict a test class from its name only, we settle on
3570
# a simple constraint: a test id always begins with its module name.
3573
for test_id in test_id_list:
3574
parts = test_id.split('.')
3575
mod_name = parts.pop(0)
3576
modules[mod_name] = True
3578
mod_name += '.' + part
3579
modules[mod_name] = True
3580
self.modules = modules
3582
def refers_to(self, module_name):
3583
"""Is there tests for the module or one of its sub modules."""
3584
return self.modules.has_key(module_name)
3586
def includes(self, test_id):
3587
return self.tests.has_key(test_id)
3590
class TestPrefixAliasRegistry(registry.Registry):
3591
"""A registry for test prefix aliases.
3593
This helps implement shorcuts for the --starting-with selftest
3594
option. Overriding existing prefixes is not allowed but not fatal (a
3595
warning will be emitted).
3598
def register(self, key, obj, help=None, info=None,
3599
override_existing=False):
3600
"""See Registry.register.
3602
Trying to override an existing alias causes a warning to be emitted,
3603
not a fatal execption.
3606
super(TestPrefixAliasRegistry, self).register(
3607
key, obj, help=help, info=info, override_existing=False)
3609
actual = self.get(key)
3610
note('Test prefix alias %s is already used for %s, ignoring %s'
3611
% (key, actual, obj))
3613
def resolve_alias(self, id_start):
3614
"""Replace the alias by the prefix in the given string.
3616
Using an unknown prefix is an error to help catching typos.
3618
parts = id_start.split('.')
3620
parts[0] = self.get(parts[0])
3622
raise errors.BzrCommandError(
3623
'%s is not a known test prefix alias' % parts[0])
3624
return '.'.join(parts)
3627
test_prefix_alias_registry = TestPrefixAliasRegistry()
3628
"""Registry of test prefix aliases."""
3631
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3632
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3633
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3635
# Obvious highest levels prefixes, feel free to add your own via a plugin
3636
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3637
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3638
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3639
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3640
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3643
def _test_suite_testmod_names():
3644
"""Return the standard list of test module names to test."""
3647
'bzrlib.tests.blackbox',
3648
'bzrlib.tests.commands',
3649
'bzrlib.tests.doc_generate',
3650
'bzrlib.tests.per_branch',
3651
'bzrlib.tests.per_bzrdir',
3652
'bzrlib.tests.per_controldir',
3653
'bzrlib.tests.per_controldir_colo',
3654
'bzrlib.tests.per_foreign_vcs',
3655
'bzrlib.tests.per_interrepository',
3656
'bzrlib.tests.per_intertree',
3657
'bzrlib.tests.per_inventory',
3658
'bzrlib.tests.per_interbranch',
3659
'bzrlib.tests.per_lock',
3660
'bzrlib.tests.per_merger',
3661
'bzrlib.tests.per_transport',
3662
'bzrlib.tests.per_tree',
3663
'bzrlib.tests.per_pack_repository',
3664
'bzrlib.tests.per_repository',
3665
'bzrlib.tests.per_repository_chk',
3666
'bzrlib.tests.per_repository_reference',
3667
'bzrlib.tests.per_uifactory',
3668
'bzrlib.tests.per_versionedfile',
3669
'bzrlib.tests.per_workingtree',
3670
'bzrlib.tests.test__annotator',
3671
'bzrlib.tests.test__bencode',
3672
'bzrlib.tests.test__btree_serializer',
3673
'bzrlib.tests.test__chk_map',
3674
'bzrlib.tests.test__dirstate_helpers',
3675
'bzrlib.tests.test__groupcompress',
3676
'bzrlib.tests.test__known_graph',
3677
'bzrlib.tests.test__rio',
3678
'bzrlib.tests.test__simple_set',
3679
'bzrlib.tests.test__static_tuple',
3680
'bzrlib.tests.test__walkdirs_win32',
3681
'bzrlib.tests.test_ancestry',
3682
'bzrlib.tests.test_annotate',
3683
'bzrlib.tests.test_api',
3684
'bzrlib.tests.test_atomicfile',
3685
'bzrlib.tests.test_bad_files',
3686
'bzrlib.tests.test_bisect_multi',
3687
'bzrlib.tests.test_branch',
3688
'bzrlib.tests.test_branchbuilder',
3689
'bzrlib.tests.test_btree_index',
3690
'bzrlib.tests.test_bugtracker',
3691
'bzrlib.tests.test_bundle',
3692
'bzrlib.tests.test_bzrdir',
3693
'bzrlib.tests.test__chunks_to_lines',
3694
'bzrlib.tests.test_cache_utf8',
3695
'bzrlib.tests.test_chk_map',
3696
'bzrlib.tests.test_chk_serializer',
3697
'bzrlib.tests.test_chunk_writer',
3698
'bzrlib.tests.test_clean_tree',
3699
'bzrlib.tests.test_cleanup',
3700
'bzrlib.tests.test_cmdline',
3701
'bzrlib.tests.test_commands',
3702
'bzrlib.tests.test_commit',
3703
'bzrlib.tests.test_commit_merge',
3704
'bzrlib.tests.test_config',
3705
'bzrlib.tests.test_conflicts',
3706
'bzrlib.tests.test_counted_lock',
3707
'bzrlib.tests.test_crash',
3708
'bzrlib.tests.test_decorators',
3709
'bzrlib.tests.test_delta',
3710
'bzrlib.tests.test_debug',
3711
'bzrlib.tests.test_deprecated_graph',
3712
'bzrlib.tests.test_diff',
3713
'bzrlib.tests.test_directory_service',
3714
'bzrlib.tests.test_dirstate',
3715
'bzrlib.tests.test_email_message',
3716
'bzrlib.tests.test_eol_filters',
3717
'bzrlib.tests.test_errors',
3718
'bzrlib.tests.test_export',
3719
'bzrlib.tests.test_extract',
3720
'bzrlib.tests.test_fetch',
3721
'bzrlib.tests.test_fixtures',
3722
'bzrlib.tests.test_fifo_cache',
3723
'bzrlib.tests.test_filters',
3724
'bzrlib.tests.test_ftp_transport',
3725
'bzrlib.tests.test_foreign',
3726
'bzrlib.tests.test_generate_docs',
3727
'bzrlib.tests.test_generate_ids',
3728
'bzrlib.tests.test_globbing',
3729
'bzrlib.tests.test_gpg',
3730
'bzrlib.tests.test_graph',
3731
'bzrlib.tests.test_groupcompress',
3732
'bzrlib.tests.test_hashcache',
3733
'bzrlib.tests.test_help',
3734
'bzrlib.tests.test_hooks',
3735
'bzrlib.tests.test_http',
3736
'bzrlib.tests.test_http_response',
3737
'bzrlib.tests.test_https_ca_bundle',
3738
'bzrlib.tests.test_identitymap',
3739
'bzrlib.tests.test_ignores',
3740
'bzrlib.tests.test_index',
3741
'bzrlib.tests.test_import_tariff',
3742
'bzrlib.tests.test_info',
3743
'bzrlib.tests.test_inv',
3744
'bzrlib.tests.test_inventory_delta',
3745
'bzrlib.tests.test_knit',
3746
'bzrlib.tests.test_lazy_import',
3747
'bzrlib.tests.test_lazy_regex',
3748
'bzrlib.tests.test_library_state',
3749
'bzrlib.tests.test_lock',
3750
'bzrlib.tests.test_lockable_files',
3751
'bzrlib.tests.test_lockdir',
3752
'bzrlib.tests.test_log',
3753
'bzrlib.tests.test_lru_cache',
3754
'bzrlib.tests.test_lsprof',
3755
'bzrlib.tests.test_mail_client',
3756
'bzrlib.tests.test_matchers',
3757
'bzrlib.tests.test_memorytree',
3758
'bzrlib.tests.test_merge',
3759
'bzrlib.tests.test_merge3',
3760
'bzrlib.tests.test_merge_core',
3761
'bzrlib.tests.test_merge_directive',
3762
'bzrlib.tests.test_missing',
3763
'bzrlib.tests.test_msgeditor',
3764
'bzrlib.tests.test_multiparent',
3765
'bzrlib.tests.test_mutabletree',
3766
'bzrlib.tests.test_nonascii',
3767
'bzrlib.tests.test_options',
3768
'bzrlib.tests.test_osutils',
3769
'bzrlib.tests.test_osutils_encodings',
3770
'bzrlib.tests.test_pack',
3771
'bzrlib.tests.test_patch',
3772
'bzrlib.tests.test_patches',
3773
'bzrlib.tests.test_permissions',
3774
'bzrlib.tests.test_plugins',
3775
'bzrlib.tests.test_progress',
3776
'bzrlib.tests.test_read_bundle',
3777
'bzrlib.tests.test_reconcile',
3778
'bzrlib.tests.test_reconfigure',
3779
'bzrlib.tests.test_registry',
3780
'bzrlib.tests.test_remote',
3781
'bzrlib.tests.test_rename_map',
3782
'bzrlib.tests.test_repository',
3783
'bzrlib.tests.test_revert',
3784
'bzrlib.tests.test_revision',
3785
'bzrlib.tests.test_revisionspec',
3786
'bzrlib.tests.test_revisiontree',
3787
'bzrlib.tests.test_rio',
3788
'bzrlib.tests.test_rules',
3789
'bzrlib.tests.test_sampler',
3790
'bzrlib.tests.test_script',
3791
'bzrlib.tests.test_selftest',
3792
'bzrlib.tests.test_serializer',
3793
'bzrlib.tests.test_setup',
3794
'bzrlib.tests.test_sftp_transport',
3795
'bzrlib.tests.test_shelf',
3796
'bzrlib.tests.test_shelf_ui',
3797
'bzrlib.tests.test_smart',
3798
'bzrlib.tests.test_smart_add',
3799
'bzrlib.tests.test_smart_request',
3800
'bzrlib.tests.test_smart_transport',
3801
'bzrlib.tests.test_smtp_connection',
3802
'bzrlib.tests.test_source',
3803
'bzrlib.tests.test_ssh_transport',
3804
'bzrlib.tests.test_status',
3805
'bzrlib.tests.test_store',
3806
'bzrlib.tests.test_strace',
3807
'bzrlib.tests.test_subsume',
3808
'bzrlib.tests.test_switch',
3809
'bzrlib.tests.test_symbol_versioning',
3810
'bzrlib.tests.test_tag',
3811
'bzrlib.tests.test_test_server',
3812
'bzrlib.tests.test_testament',
3813
'bzrlib.tests.test_textfile',
3814
'bzrlib.tests.test_textmerge',
3815
'bzrlib.tests.test_timestamp',
3816
'bzrlib.tests.test_trace',
3817
'bzrlib.tests.test_transactions',
3818
'bzrlib.tests.test_transform',
3819
'bzrlib.tests.test_transport',
3820
'bzrlib.tests.test_transport_log',
3821
'bzrlib.tests.test_tree',
3822
'bzrlib.tests.test_treebuilder',
3823
'bzrlib.tests.test_treeshape',
3824
'bzrlib.tests.test_tsort',
3825
'bzrlib.tests.test_tuned_gzip',
3826
'bzrlib.tests.test_ui',
3827
'bzrlib.tests.test_uncommit',
3828
'bzrlib.tests.test_upgrade',
3829
'bzrlib.tests.test_upgrade_stacked',
3830
'bzrlib.tests.test_urlutils',
3831
'bzrlib.tests.test_version',
3832
'bzrlib.tests.test_version_info',
3833
'bzrlib.tests.test_versionedfile',
3834
'bzrlib.tests.test_weave',
3835
'bzrlib.tests.test_whitebox',
3836
'bzrlib.tests.test_win32utils',
3837
'bzrlib.tests.test_workingtree',
3838
'bzrlib.tests.test_workingtree_4',
3839
'bzrlib.tests.test_wsgi',
3840
'bzrlib.tests.test_xml',
3844
def _test_suite_modules_to_doctest():
3845
"""Return the list of modules to doctest."""
3847
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3851
'bzrlib.branchbuilder',
3852
'bzrlib.decorators',
3855
'bzrlib.iterablefile',
3859
'bzrlib.symbol_versioning',
3861
'bzrlib.tests.fixtures',
3863
'bzrlib.version_info_formats.format_custom',
3867
def test_suite(keep_only=None, starting_with=None):
3868
"""Build and return TestSuite for the whole of bzrlib.
3870
:param keep_only: A list of test ids limiting the suite returned.
3872
:param starting_with: An id limiting the suite returned to the tests
3875
This function can be replaced if you need to change the default test
3876
suite on a global basis, but it is not encouraged.
3879
loader = TestUtil.TestLoader()
3881
if keep_only is not None:
3882
id_filter = TestIdList(keep_only)
3884
# We take precedence over keep_only because *at loading time* using
3885
# both options means we will load less tests for the same final result.
3886
def interesting_module(name):
3887
for start in starting_with:
3889
# Either the module name starts with the specified string
3890
name.startswith(start)
3891
# or it may contain tests starting with the specified string
3892
or start.startswith(name)
3896
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3898
elif keep_only is not None:
3899
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3900
def interesting_module(name):
3901
return id_filter.refers_to(name)
3904
loader = TestUtil.TestLoader()
3905
def interesting_module(name):
3906
# No filtering, all modules are interesting
3909
suite = loader.suiteClass()
3911
# modules building their suite with loadTestsFromModuleNames
3912
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3914
for mod in _test_suite_modules_to_doctest():
3915
if not interesting_module(mod):
3916
# No tests to keep here, move along
3919
# note that this really does mean "report only" -- doctest
3920
# still runs the rest of the examples
3921
doc_suite = doctest.DocTestSuite(mod,
3922
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3923
except ValueError, e:
3924
print '**failed to get doctest for: %s\n%s' % (mod, e)
3926
if len(doc_suite._tests) == 0:
3927
raise errors.BzrError("no doctests found in %s" % (mod,))
3928
suite.addTest(doc_suite)
3930
default_encoding = sys.getdefaultencoding()
3931
for name, plugin in bzrlib.plugin.plugins().items():
3932
if not interesting_module(plugin.module.__name__):
3934
plugin_suite = plugin.test_suite()
3935
# We used to catch ImportError here and turn it into just a warning,
3936
# but really if you don't have --no-plugins this should be a failure.
3937
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3938
if plugin_suite is None:
3939
plugin_suite = plugin.load_plugin_tests(loader)
3940
if plugin_suite is not None:
3941
suite.addTest(plugin_suite)
3942
if default_encoding != sys.getdefaultencoding():
3943
bzrlib.trace.warning(
3944
'Plugin "%s" tried to reset default encoding to: %s', name,
3945
sys.getdefaultencoding())
3947
sys.setdefaultencoding(default_encoding)
3949
if keep_only is not None:
3950
# Now that the referred modules have loaded their tests, keep only the
3952
suite = filter_suite_by_id_list(suite, id_filter)
3953
# Do some sanity checks on the id_list filtering
3954
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3956
# The tester has used both keep_only and starting_with, so he is
3957
# already aware that some tests are excluded from the list, there
3958
# is no need to tell him which.
3961
# Some tests mentioned in the list are not in the test suite. The
3962
# list may be out of date, report to the tester.
3963
for id in not_found:
3964
bzrlib.trace.warning('"%s" not found in the test suite', id)
3965
for id in duplicates:
3966
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3971
def multiply_scenarios(scenarios_left, scenarios_right):
3972
"""Multiply two sets of scenarios.
3974
:returns: the cartesian product of the two sets of scenarios, that is
3975
a scenario for every possible combination of a left scenario and a
3979
('%s,%s' % (left_name, right_name),
3980
dict(left_dict.items() + right_dict.items()))
3981
for left_name, left_dict in scenarios_left
3982
for right_name, right_dict in scenarios_right]
3985
def multiply_tests(tests, scenarios, result):
3986
"""Multiply tests_list by scenarios into result.
3988
This is the core workhorse for test parameterisation.
3990
Typically the load_tests() method for a per-implementation test suite will
3991
call multiply_tests and return the result.
3993
:param tests: The tests to parameterise.
3994
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3995
scenario_param_dict).
3996
:param result: A TestSuite to add created tests to.
3998
This returns the passed in result TestSuite with the cross product of all
3999
the tests repeated once for each scenario. Each test is adapted by adding
4000
the scenario name at the end of its id(), and updating the test object's
4001
__dict__ with the scenario_param_dict.
4003
>>> import bzrlib.tests.test_sampler
4004
>>> r = multiply_tests(
4005
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4006
... [('one', dict(param=1)),
4007
... ('two', dict(param=2))],
4008
... TestUtil.TestSuite())
4009
>>> tests = list(iter_suite_tests(r))
4013
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4019
for test in iter_suite_tests(tests):
4020
apply_scenarios(test, scenarios, result)
4024
def apply_scenarios(test, scenarios, result):
4025
"""Apply the scenarios in scenarios to test and add to result.
4027
:param test: The test to apply scenarios to.
4028
:param scenarios: An iterable of scenarios to apply to test.
4030
:seealso: apply_scenario
4032
for scenario in scenarios:
4033
result.addTest(apply_scenario(test, scenario))
4037
def apply_scenario(test, scenario):
4038
"""Copy test and apply scenario to it.
4040
:param test: A test to adapt.
4041
:param scenario: A tuple describing the scenarion.
4042
The first element of the tuple is the new test id.
4043
The second element is a dict containing attributes to set on the
4045
:return: The adapted test.
4047
new_id = "%s(%s)" % (test.id(), scenario[0])
4048
new_test = clone_test(test, new_id)
4049
for name, value in scenario[1].items():
4050
setattr(new_test, name, value)
4054
def clone_test(test, new_id):
4055
"""Clone a test giving it a new id.
4057
:param test: The test to clone.
4058
:param new_id: The id to assign to it.
4059
:return: The new test.
4061
new_test = copy.copy(test)
4062
new_test.id = lambda: new_id
4063
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4064
# causes cloned tests to share the 'details' dict. This makes it hard to
4065
# read the test output for parameterized tests, because tracebacks will be
4066
# associated with irrelevant tests.
4068
details = new_test._TestCase__details
4069
except AttributeError:
4070
# must be a different version of testtools than expected. Do nothing.
4073
# Reset the '__details' dict.
4074
new_test._TestCase__details = {}
4078
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4080
"""Helper for permutating tests against an extension module.
4082
This is meant to be used inside a modules 'load_tests()' function. It will
4083
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4084
against both implementations. Setting 'test.module' to the appropriate
4085
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4087
:param standard_tests: A test suite to permute
4088
:param loader: A TestLoader
4089
:param py_module_name: The python path to a python module that can always
4090
be loaded, and will be considered the 'python' implementation. (eg
4091
'bzrlib._chk_map_py')
4092
:param ext_module_name: The python path to an extension module. If the
4093
module cannot be loaded, a single test will be added, which notes that
4094
the module is not available. If it can be loaded, all standard_tests
4095
will be run against that module.
4096
:return: (suite, feature) suite is a test-suite that has all the permuted
4097
tests. feature is the Feature object that can be used to determine if
4098
the module is available.
4101
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4103
('python', {'module': py_module}),
4105
suite = loader.suiteClass()
4106
feature = ModuleAvailableFeature(ext_module_name)
4107
if feature.available():
4108
scenarios.append(('C', {'module': feature.module}))
4110
# the compiled module isn't available, so we add a failing test
4111
class FailWithoutFeature(TestCase):
4112
def test_fail(self):
4113
self.requireFeature(feature)
4114
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4115
result = multiply_tests(standard_tests, scenarios, suite)
4116
return result, feature
4119
def _rmtree_temp_dir(dirname, test_id=None):
4120
# If LANG=C we probably have created some bogus paths
4121
# which rmtree(unicode) will fail to delete
4122
# so make sure we are using rmtree(str) to delete everything
4123
# except on win32, where rmtree(str) will fail
4124
# since it doesn't have the property of byte-stream paths
4125
# (they are either ascii or mbcs)
4126
if sys.platform == 'win32':
4127
# make sure we are using the unicode win32 api
4128
dirname = unicode(dirname)
4130
dirname = dirname.encode(sys.getfilesystemencoding())
4132
osutils.rmtree(dirname)
4134
# We don't want to fail here because some useful display will be lost
4135
# otherwise. Polluting the tmp dir is bad, but not giving all the
4136
# possible info to the test runner is even worse.
4138
ui.ui_factory.clear_term()
4139
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4140
# Ugly, but the last thing we want here is fail, so bear with it.
4141
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4142
).encode('ascii', 'replace')
4143
sys.stderr.write('Unable to remove testing dir %s\n%s'
4144
% (os.path.basename(dirname), printable_e))
4147
class Feature(object):
4148
"""An operating system Feature."""
4151
self._available = None
4153
def available(self):
4154
"""Is the feature available?
4156
:return: True if the feature is available.
4158
if self._available is None:
4159
self._available = self._probe()
4160
return self._available
4163
"""Implement this method in concrete features.
4165
:return: True if the feature is available.
4167
raise NotImplementedError
4170
if getattr(self, 'feature_name', None):
4171
return self.feature_name()
4172
return self.__class__.__name__
4175
class _SymlinkFeature(Feature):
4178
return osutils.has_symlinks()
4180
def feature_name(self):
4183
SymlinkFeature = _SymlinkFeature()
4186
class _HardlinkFeature(Feature):
4189
return osutils.has_hardlinks()
4191
def feature_name(self):
4194
HardlinkFeature = _HardlinkFeature()
4197
class _OsFifoFeature(Feature):
4200
return getattr(os, 'mkfifo', None)
4202
def feature_name(self):
4203
return 'filesystem fifos'
4205
OsFifoFeature = _OsFifoFeature()
4208
class _UnicodeFilenameFeature(Feature):
4209
"""Does the filesystem support Unicode filenames?"""
4213
# Check for character combinations unlikely to be covered by any
4214
# single non-unicode encoding. We use the characters
4215
# - greek small letter alpha (U+03B1) and
4216
# - braille pattern dots-123456 (U+283F).
4217
os.stat(u'\u03b1\u283f')
4218
except UnicodeEncodeError:
4220
except (IOError, OSError):
4221
# The filesystem allows the Unicode filename but the file doesn't
4225
# The filesystem allows the Unicode filename and the file exists,
4229
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4232
class _CompatabilityThunkFeature(Feature):
4233
"""This feature is just a thunk to another feature.
4235
It issues a deprecation warning if it is accessed, to let you know that you
4236
should really use a different feature.
4239
def __init__(self, dep_version, module, name,
4240
replacement_name, replacement_module=None):
4241
super(_CompatabilityThunkFeature, self).__init__()
4242
self._module = module
4243
if replacement_module is None:
4244
replacement_module = module
4245
self._replacement_module = replacement_module
4247
self._replacement_name = replacement_name
4248
self._dep_version = dep_version
4249
self._feature = None
4252
if self._feature is None:
4253
depr_msg = self._dep_version % ('%s.%s'
4254
% (self._module, self._name))
4255
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4256
self._replacement_name)
4257
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4258
# Import the new feature and use it as a replacement for the
4260
mod = __import__(self._replacement_module, {}, {},
4261
[self._replacement_name])
4262
self._feature = getattr(mod, self._replacement_name)
4266
return self._feature._probe()
4269
class ModuleAvailableFeature(Feature):
4270
"""This is a feature than describes a module we want to be available.
4272
Declare the name of the module in __init__(), and then after probing, the
4273
module will be available as 'self.module'.
4275
:ivar module: The module if it is available, else None.
4278
def __init__(self, module_name):
4279
super(ModuleAvailableFeature, self).__init__()
4280
self.module_name = module_name
4284
self._module = __import__(self.module_name, {}, {}, [''])
4291
if self.available(): # Make sure the probe has been done
4295
def feature_name(self):
4296
return self.module_name
4299
# This is kept here for compatibility, it is recommended to use
4300
# 'bzrlib.tests.feature.paramiko' instead
4301
ParamikoFeature = _CompatabilityThunkFeature(
4302
deprecated_in((2,1,0)),
4303
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4306
def probe_unicode_in_user_encoding():
4307
"""Try to encode several unicode strings to use in unicode-aware tests.
4308
Return first successfull match.
4310
:return: (unicode value, encoded plain string value) or (None, None)
4312
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4313
for uni_val in possible_vals:
4315
str_val = uni_val.encode(osutils.get_user_encoding())
4316
except UnicodeEncodeError:
4317
# Try a different character
4320
return uni_val, str_val
4324
def probe_bad_non_ascii(encoding):
4325
"""Try to find [bad] character with code [128..255]
4326
that cannot be decoded to unicode in some encoding.
4327
Return None if all non-ascii characters is valid
4330
for i in xrange(128, 256):
4333
char.decode(encoding)
4334
except UnicodeDecodeError:
4339
class _HTTPSServerFeature(Feature):
4340
"""Some tests want an https Server, check if one is available.
4342
Right now, the only way this is available is under python2.6 which provides
4353
def feature_name(self):
4354
return 'HTTPSServer'
4357
HTTPSServerFeature = _HTTPSServerFeature()
4360
class _UnicodeFilename(Feature):
4361
"""Does the filesystem support Unicode filenames?"""
4366
except UnicodeEncodeError:
4368
except (IOError, OSError):
4369
# The filesystem allows the Unicode filename but the file doesn't
4373
# The filesystem allows the Unicode filename and the file exists,
4377
UnicodeFilename = _UnicodeFilename()
4380
class _ByteStringNamedFilesystem(Feature):
4381
"""Is the filesystem based on bytes?"""
4384
if os.name == "posix":
4388
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4391
class _UTF8Filesystem(Feature):
4392
"""Is the filesystem UTF-8?"""
4395
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4399
UTF8Filesystem = _UTF8Filesystem()
4402
class _BreakinFeature(Feature):
4403
"""Does this platform support the breakin feature?"""
4406
from bzrlib import breakin
4407
if breakin.determine_signal() is None:
4409
if sys.platform == 'win32':
4410
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4411
# We trigger SIGBREAK via a Console api so we need ctypes to
4412
# access the function
4419
def feature_name(self):
4420
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4423
BreakinFeature = _BreakinFeature()
4426
class _CaseInsCasePresFilenameFeature(Feature):
4427
"""Is the file-system case insensitive, but case-preserving?"""
4430
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4432
# first check truly case-preserving for created files, then check
4433
# case insensitive when opening existing files.
4434
name = osutils.normpath(name)
4435
base, rel = osutils.split(name)
4436
found_rel = osutils.canonical_relpath(base, name)
4437
return (found_rel == rel
4438
and os.path.isfile(name.upper())
4439
and os.path.isfile(name.lower()))
4444
def feature_name(self):
4445
return "case-insensitive case-preserving filesystem"
4447
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4450
class _CaseInsensitiveFilesystemFeature(Feature):
4451
"""Check if underlying filesystem is case-insensitive but *not* case
4454
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4455
# more likely to be case preserving, so this case is rare.
4458
if CaseInsCasePresFilenameFeature.available():
4461
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4462
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4463
TestCaseWithMemoryTransport.TEST_ROOT = root
4465
root = TestCaseWithMemoryTransport.TEST_ROOT
4466
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4468
name_a = osutils.pathjoin(tdir, 'a')
4469
name_A = osutils.pathjoin(tdir, 'A')
4471
result = osutils.isdir(name_A)
4472
_rmtree_temp_dir(tdir)
4475
def feature_name(self):
4476
return 'case-insensitive filesystem'
4478
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4481
class _CaseSensitiveFilesystemFeature(Feature):
4484
if CaseInsCasePresFilenameFeature.available():
4486
elif CaseInsensitiveFilesystemFeature.available():
4491
def feature_name(self):
4492
return 'case-sensitive filesystem'
4494
# new coding style is for feature instances to be lowercase
4495
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4498
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4499
SubUnitFeature = _CompatabilityThunkFeature(
4500
deprecated_in((2,1,0)),
4501
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4502
# Only define SubUnitBzrRunner if subunit is available.
4504
from subunit import TestProtocolClient
4505
from subunit.test_results import AutoTimingTestResultDecorator
4506
class SubUnitBzrProtocolClient(TestProtocolClient):
4508
def addSuccess(self, test, details=None):
4509
# The subunit client always includes the details in the subunit
4510
# stream, but we don't want to include it in ours.
4511
if details is not None and 'log' in details:
4513
return super(SubUnitBzrProtocolClient, self).addSuccess(
4516
class SubUnitBzrRunner(TextTestRunner):
4517
def run(self, test):
4518
result = AutoTimingTestResultDecorator(
4519
SubUnitBzrProtocolClient(self.stream))
4525
class _PosixPermissionsFeature(Feature):
4529
# create temporary file and check if specified perms are maintained.
4532
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4533
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4536
os.chmod(name, write_perms)
4538
read_perms = os.stat(name).st_mode & 0777
4540
return (write_perms == read_perms)
4542
return (os.name == 'posix') and has_perms()
4544
def feature_name(self):
4545
return 'POSIX permissions support'
4547
posix_permissions_feature = _PosixPermissionsFeature()
60
if m not in MODULES_TO_TEST:
61
MODULES_TO_TEST.append(m)
64
TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
65
print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
71
# should also test bzrlib.merge_core, but they seem to be out of date with
75
# XXX: python2.3's TestLoader() doesn't seem to find all the
76
# tests; don't know why
77
for m in MODULES_TO_TEST:
78
suite.addTest(TestLoader().loadTestsFromModule(m))
80
for m in (MODULES_TO_DOCTEST):
81
suite.addTest(DocTestSuite(m))
83
for p in bzrlib.plugin.all_plugins:
84
if hasattr(p, 'test_suite'):
85
suite.addTest(p.test_suite())
87
suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
89
return run_suite(suite, 'testbzr')