1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
65
import bzrlib.commands
66
import bzrlib.timestamp
68
import bzrlib.inventory
69
import bzrlib.iterablefile
74
# lsprof not available
76
from bzrlib.merge import merge_inner
80
from bzrlib import symbol_versioning
81
from bzrlib.symbol_versioning import (
88
from bzrlib.transport import get_transport
89
import bzrlib.transport
90
from bzrlib.transport.local import LocalURLServer
91
from bzrlib.transport.memory import MemoryServer
92
from bzrlib.transport.readonly import ReadonlyServer
93
from bzrlib.trace import mutter, note
94
from bzrlib.tests import TestUtil
95
from bzrlib.tests.http_server import HttpServer
96
from bzrlib.tests.TestUtil import (
100
from bzrlib.tests.treeshape import build_tree_contents
101
import bzrlib.version_info_formats.format_custom
102
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
104
# Mark this python module as being part of the implementation
105
# of unittest: this gives us better tracebacks where the last
106
# shown frame is the test code, not our assertXYZ.
109
default_transport = LocalURLServer
112
class ExtendedTestResult(unittest._TextTestResult):
113
"""Accepts, reports and accumulates the results of running tests.
115
Compared to the unittest version this class adds support for
116
profiling, benchmarking, stopping as soon as a test fails, and
117
skipping tests. There are further-specialized subclasses for
118
different types of display.
120
When a test finishes, in whatever way, it calls one of the addSuccess,
121
addFailure or addError classes. These in turn may redirect to a more
122
specific case for the special test results supported by our extended
125
Note that just one of these objects is fed the results from many tests.
130
def __init__(self, stream, descriptions, verbosity,
134
"""Construct new TestResult.
136
:param bench_history: Optionally, a writable file object to accumulate
139
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
140
if bench_history is not None:
141
from bzrlib.version import _get_bzr_source_tree
142
src_tree = _get_bzr_source_tree()
145
revision_id = src_tree.get_parent_ids()[0]
147
# XXX: if this is a brand new tree, do the same as if there
151
# XXX: If there's no branch, what should we do?
153
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
154
self._bench_history = bench_history
155
self.ui = ui.ui_factory
156
self.num_tests = num_tests
158
self.failure_count = 0
159
self.known_failure_count = 0
161
self.not_applicable_count = 0
162
self.unsupported = {}
164
self._overall_start_time = time.time()
166
def _extractBenchmarkTime(self, testCase):
167
"""Add a benchmark time for the current test case."""
168
return getattr(testCase, "_benchtime", None)
170
def _elapsedTestTimeString(self):
171
"""Return a time string for the overall time the current test has taken."""
172
return self._formatTime(time.time() - self._start_time)
174
def _testTimeString(self, testCase):
175
benchmark_time = self._extractBenchmarkTime(testCase)
176
if benchmark_time is not None:
178
self._formatTime(benchmark_time),
179
self._elapsedTestTimeString())
181
return " %s" % self._elapsedTestTimeString()
183
def _formatTime(self, seconds):
184
"""Format seconds as milliseconds with leading spaces."""
185
# some benchmarks can take thousands of seconds to run, so we need 8
187
return "%8dms" % (1000 * seconds)
189
def _shortened_test_description(self, test):
191
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
194
def startTest(self, test):
195
unittest.TestResult.startTest(self, test)
196
self.report_test_start(test)
197
test.number = self.count
198
self._recordTestStartTime()
200
def _recordTestStartTime(self):
201
"""Record that a test has started."""
202
self._start_time = time.time()
204
def _cleanupLogFile(self, test):
205
# We can only do this if we have one of our TestCases, not if
207
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
208
if setKeepLogfile is not None:
211
def addError(self, test, err):
212
"""Tell result that test finished with an error.
214
Called from the TestCase run() method when the test
215
fails with an unexpected error.
217
self._testConcluded(test)
218
if isinstance(err[1], TestSkipped):
219
return self._addSkipped(test, err)
220
elif isinstance(err[1], UnavailableFeature):
221
return self.addNotSupported(test, err[1].args[0])
223
unittest.TestResult.addError(self, test, err)
224
self.error_count += 1
225
self.report_error(test, err)
228
self._cleanupLogFile(test)
230
def addFailure(self, test, err):
231
"""Tell result that test failed.
233
Called from the TestCase run() method when the test
234
fails because e.g. an assert() method failed.
236
self._testConcluded(test)
237
if isinstance(err[1], KnownFailure):
238
return self._addKnownFailure(test, err)
240
unittest.TestResult.addFailure(self, test, err)
241
self.failure_count += 1
242
self.report_failure(test, err)
245
self._cleanupLogFile(test)
247
def addSuccess(self, test):
248
"""Tell result that test completed successfully.
250
Called from the TestCase run()
252
self._testConcluded(test)
253
if self._bench_history is not None:
254
benchmark_time = self._extractBenchmarkTime(test)
255
if benchmark_time is not None:
256
self._bench_history.write("%s %s\n" % (
257
self._formatTime(benchmark_time),
259
self.report_success(test)
260
self._cleanupLogFile(test)
261
unittest.TestResult.addSuccess(self, test)
262
test._log_contents = ''
264
def _testConcluded(self, test):
265
"""Common code when a test has finished.
267
Called regardless of whether it succeded, failed, etc.
271
def _addKnownFailure(self, test, err):
272
self.known_failure_count += 1
273
self.report_known_failure(test, err)
275
def addNotSupported(self, test, feature):
276
"""The test will not be run because of a missing feature.
278
# this can be called in two different ways: it may be that the
279
# test started running, and then raised (through addError)
280
# UnavailableFeature. Alternatively this method can be called
281
# while probing for features before running the tests; in that
282
# case we will see startTest and stopTest, but the test will never
284
self.unsupported.setdefault(str(feature), 0)
285
self.unsupported[str(feature)] += 1
286
self.report_unsupported(test, feature)
288
def _addSkipped(self, test, skip_excinfo):
289
if isinstance(skip_excinfo[1], TestNotApplicable):
290
self.not_applicable_count += 1
291
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
302
# seems best to treat this as success from point-of-view of unittest
303
# -- it actually does nothing so it barely matters :)
304
unittest.TestResult.addSuccess(self, test)
305
test._log_contents = ''
307
def printErrorList(self, flavour, errors):
308
for test, err in errors:
309
self.stream.writeln(self.separator1)
310
self.stream.write("%s: " % flavour)
311
self.stream.writeln(self.getDescription(test))
312
if getattr(test, '_get_log', None) is not None:
313
self.stream.write('\n')
315
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
316
self.stream.write('\n')
317
self.stream.write(test._get_log())
318
self.stream.write('\n')
320
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
321
self.stream.write('\n')
322
self.stream.writeln(self.separator2)
323
self.stream.writeln("%s" % err)
328
def report_cleaning_up(self):
331
def report_success(self, test):
334
def wasStrictlySuccessful(self):
335
if self.unsupported or self.known_failure_count:
337
return self.wasSuccessful()
340
class TextTestResult(ExtendedTestResult):
341
"""Displays progress and results of tests in text form"""
343
def __init__(self, stream, descriptions, verbosity,
348
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
349
bench_history, num_tests)
351
self.pb = self.ui.nested_progress_bar()
352
self._supplied_pb = False
355
self._supplied_pb = True
356
self.pb.show_pct = False
357
self.pb.show_spinner = False
358
self.pb.show_eta = False,
359
self.pb.show_count = False
360
self.pb.show_bar = False
362
def report_starting(self):
363
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
365
def _progress_prefix_text(self):
366
# the longer this text, the less space we have to show the test
368
a = '[%d' % self.count # total that have been run
369
# tests skipped as known not to be relevant are not important enough
371
## if self.skip_count:
372
## a += ', %d skip' % self.skip_count
373
## if self.known_failure_count:
374
## a += '+%dX' % self.known_failure_count
375
if self.num_tests is not None:
376
a +='/%d' % self.num_tests
378
runtime = time.time() - self._overall_start_time
380
a += '%dm%ds' % (runtime / 60, runtime % 60)
384
a += ', %d err' % self.error_count
385
if self.failure_count:
386
a += ', %d fail' % self.failure_count
388
a += ', %d missing' % len(self.unsupported)
392
def report_test_start(self, test):
395
self._progress_prefix_text()
397
+ self._shortened_test_description(test))
399
def _test_description(self, test):
400
return self._shortened_test_description(test)
402
def report_error(self, test, err):
403
self.pb.note('ERROR: %s\n %s\n',
404
self._test_description(test),
408
def report_failure(self, test, err):
409
self.pb.note('FAIL: %s\n %s\n',
410
self._test_description(test),
414
def report_known_failure(self, test, err):
415
self.pb.note('XFAIL: %s\n%s\n',
416
self._test_description(test), err[1])
418
def report_skip(self, test, skip_excinfo):
421
def report_not_applicable(self, test, skip_excinfo):
424
def report_unsupported(self, test, feature):
425
"""test cannot be run because feature is missing."""
427
def report_cleaning_up(self):
428
self.pb.update('cleaning up...')
431
if not self._supplied_pb:
435
class VerboseTestResult(ExtendedTestResult):
436
"""Produce long output, with one line per test run plus times"""
438
def _ellipsize_to_right(self, a_string, final_width):
439
"""Truncate and pad a string, keeping the right hand side"""
440
if len(a_string) > final_width:
441
result = '...' + a_string[3-final_width:]
444
return result.ljust(final_width)
446
def report_starting(self):
447
self.stream.write('running %d tests...\n' % self.num_tests)
449
def report_test_start(self, test):
451
name = self._shortened_test_description(test)
452
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
453
# numbers, plus a trailing blank
454
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
455
self.stream.write(self._ellipsize_to_right(name,
456
osutils.terminal_width()-30))
459
def _error_summary(self, err):
461
return '%s%s' % (indent, err[1])
463
def report_error(self, test, err):
464
self.stream.writeln('ERROR %s\n%s'
465
% (self._testTimeString(test),
466
self._error_summary(err)))
468
def report_failure(self, test, err):
469
self.stream.writeln(' FAIL %s\n%s'
470
% (self._testTimeString(test),
471
self._error_summary(err)))
473
def report_known_failure(self, test, err):
474
self.stream.writeln('XFAIL %s\n%s'
475
% (self._testTimeString(test),
476
self._error_summary(err)))
478
def report_success(self, test):
479
self.stream.writeln(' OK %s' % self._testTimeString(test))
480
for bench_called, stats in getattr(test, '_benchcalls', []):
481
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
482
stats.pprint(file=self.stream)
483
# flush the stream so that we get smooth output. This verbose mode is
484
# used to show the output in PQM.
487
def report_skip(self, test, skip_excinfo):
488
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(test),
490
self._error_summary(skip_excinfo)))
492
def report_not_applicable(self, test, skip_excinfo):
493
self.stream.writeln(' N/A %s\n%s'
494
% (self._testTimeString(test),
495
self._error_summary(skip_excinfo)))
497
def report_unsupported(self, test, feature):
498
"""test cannot be run because feature is missing."""
499
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
500
%(self._testTimeString(test), feature))
503
class TextTestRunner(object):
504
stop_on_failure = False
513
self.stream = unittest._WritelnDecorator(stream)
514
self.descriptions = descriptions
515
self.verbosity = verbosity
516
self._bench_history = bench_history
517
self.list_only = list_only
520
"Run the given test case or test suite."
521
startTime = time.time()
522
if self.verbosity == 1:
523
result_class = TextTestResult
524
elif self.verbosity >= 2:
525
result_class = VerboseTestResult
526
result = result_class(self.stream,
529
bench_history=self._bench_history,
530
num_tests=test.countTestCases(),
532
result.stop_early = self.stop_on_failure
533
result.report_starting()
535
if self.verbosity >= 2:
536
self.stream.writeln("Listing tests only ...\n")
538
for t in iter_suite_tests(test):
539
self.stream.writeln("%s" % (t.id()))
541
actionTaken = "Listed"
544
run = result.testsRun
546
stopTime = time.time()
547
timeTaken = stopTime - startTime
549
self.stream.writeln(result.separator2)
550
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
551
run, run != 1 and "s" or "", timeTaken))
552
self.stream.writeln()
553
if not result.wasSuccessful():
554
self.stream.write("FAILED (")
555
failed, errored = map(len, (result.failures, result.errors))
557
self.stream.write("failures=%d" % failed)
559
if failed: self.stream.write(", ")
560
self.stream.write("errors=%d" % errored)
561
if result.known_failure_count:
562
if failed or errored: self.stream.write(", ")
563
self.stream.write("known_failure_count=%d" %
564
result.known_failure_count)
565
self.stream.writeln(")")
567
if result.known_failure_count:
568
self.stream.writeln("OK (known_failures=%d)" %
569
result.known_failure_count)
571
self.stream.writeln("OK")
572
if result.skip_count > 0:
573
skipped = result.skip_count
574
self.stream.writeln('%d test%s skipped' %
575
(skipped, skipped != 1 and "s" or ""))
576
if result.unsupported:
577
for feature, count in sorted(result.unsupported.items()):
578
self.stream.writeln("Missing feature '%s' skipped %d tests." %
584
def iter_suite_tests(suite):
585
"""Return all tests in a suite, recursing through nested suites"""
586
for item in suite._tests:
587
if isinstance(item, unittest.TestCase):
589
elif isinstance(item, unittest.TestSuite):
590
for r in iter_suite_tests(item):
593
raise Exception('unknown object %r inside test suite %r'
597
class TestSkipped(Exception):
598
"""Indicates that a test was intentionally skipped, rather than failing."""
601
class TestNotApplicable(TestSkipped):
602
"""A test is not applicable to the situation where it was run.
604
This is only normally raised by parameterized tests, if they find that
605
the instance they're constructed upon does not support one aspect
610
class KnownFailure(AssertionError):
611
"""Indicates that a test failed in a precisely expected manner.
613
Such failures dont block the whole test suite from passing because they are
614
indicators of partially completed code or of future work. We have an
615
explicit error for them so that we can ensure that they are always visible:
616
KnownFailures are always shown in the output of bzr selftest.
620
class UnavailableFeature(Exception):
621
"""A feature required for this test was not available.
623
The feature should be used to construct the exception.
627
class CommandFailed(Exception):
631
class StringIOWrapper(object):
632
"""A wrapper around cStringIO which just adds an encoding attribute.
634
Internally we can check sys.stdout to see what the output encoding
635
should be. However, cStringIO has no encoding attribute that we can
636
set. So we wrap it instead.
641
def __init__(self, s=None):
643
self.__dict__['_cstring'] = StringIO(s)
645
self.__dict__['_cstring'] = StringIO()
647
def __getattr__(self, name, getattr=getattr):
648
return getattr(self.__dict__['_cstring'], name)
650
def __setattr__(self, name, val):
651
if name == 'encoding':
652
self.__dict__['encoding'] = val
654
return setattr(self._cstring, name, val)
657
class TestUIFactory(ui.CLIUIFactory):
658
"""A UI Factory for testing.
660
Hide the progress bar but emit note()s.
662
Allows get_password to be tested without real tty attached.
669
super(TestUIFactory, self).__init__()
670
if stdin is not None:
671
# We use a StringIOWrapper to be able to test various
672
# encodings, but the user is still responsible to
673
# encode the string and to set the encoding attribute
674
# of StringIOWrapper.
675
self.stdin = StringIOWrapper(stdin)
677
self.stdout = sys.stdout
681
self.stderr = sys.stderr
686
"""See progress.ProgressBar.clear()."""
688
def clear_term(self):
689
"""See progress.ProgressBar.clear_term()."""
691
def clear_term(self):
692
"""See progress.ProgressBar.clear_term()."""
695
"""See progress.ProgressBar.finished()."""
697
def note(self, fmt_string, *args, **kwargs):
698
"""See progress.ProgressBar.note()."""
699
self.stdout.write((fmt_string + "\n") % args)
701
def progress_bar(self):
704
def nested_progress_bar(self):
707
def update(self, message, count=None, total=None):
708
"""See progress.ProgressBar.update()."""
710
def get_non_echoed_password(self, prompt):
711
"""Get password from stdin without trying to handle the echo mode"""
713
self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
714
password = self.stdin.readline()
717
if password[-1] == '\n':
718
password = password[:-1]
722
def _report_leaked_threads():
723
bzrlib.trace.warning('%s is leaking threads among %d leaking tests',
724
TestCase._first_thread_leaker_id,
725
TestCase._leaking_threads_tests)
728
class TestCase(unittest.TestCase):
729
"""Base class for bzr unit tests.
731
Tests that need access to disk resources should subclass
732
TestCaseInTempDir not TestCase.
734
Error and debug log messages are redirected from their usual
735
location into a temporary file, the contents of which can be
736
retrieved by _get_log(). We use a real OS file, not an in-memory object,
737
so that it can also capture file IO. When the test completes this file
738
is read into memory and removed from disk.
740
There are also convenience functions to invoke bzr's command-line
741
routine, and to build and check bzr trees.
743
In addition to the usual method of overriding tearDown(), this class also
744
allows subclasses to register functions into the _cleanups list, which is
745
run in order as the object is torn down. It's less likely this will be
746
accidentally overlooked.
749
_active_threads = None
750
_leaking_threads_tests = 0
751
_first_thread_leaker_id = None
752
_log_file_name = None
754
_keep_log_file = False
755
# record lsprof data when performing benchmark calls.
756
_gather_lsprof_in_benchmarks = False
757
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
758
'_log_contents', '_log_file_name', '_benchtime',
759
'_TestCase__testMethodName')
761
def __init__(self, methodName='testMethod'):
762
super(TestCase, self).__init__(methodName)
766
unittest.TestCase.setUp(self)
767
self._cleanEnvironment()
770
self._benchcalls = []
771
self._benchtime = None
773
self._clear_debug_flags()
774
TestCase._active_threads = threading.activeCount()
775
self.addCleanup(self._check_leaked_threads)
777
def _check_leaked_threads(self):
778
active = threading.activeCount()
779
leaked_threads = active - TestCase._active_threads
780
TestCase._active_threads = active
782
TestCase._leaking_threads_tests += 1
783
if TestCase._first_thread_leaker_id is None:
784
TestCase._first_thread_leaker_id = self.id()
785
# we're not specifically told when all tests are finished.
786
# This will do. We use a function to avoid keeping a reference
787
# to a TestCase object.
788
atexit.register(_report_leaked_threads)
790
def _clear_debug_flags(self):
791
"""Prevent externally set debug flags affecting tests.
793
Tests that want to use debug flags can just set them in the
794
debug_flags set during setup/teardown.
796
self._preserved_debug_flags = set(debug.debug_flags)
797
if 'allow_debug' not in selftest_debug_flags:
798
debug.debug_flags.clear()
799
self.addCleanup(self._restore_debug_flags)
801
def _clear_hooks(self):
802
# prevent hooks affecting tests
804
import bzrlib.smart.client
805
import bzrlib.smart.server
806
self._preserved_hooks = {
807
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
808
bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
809
bzrlib.smart.client._SmartClient: bzrlib.smart.client._SmartClient.hooks,
810
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
812
self.addCleanup(self._restoreHooks)
813
# reset all hooks to an empty instance of the appropriate type
814
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
815
bzrlib.smart.client._SmartClient.hooks = bzrlib.smart.client.SmartClientHooks()
816
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
818
def _silenceUI(self):
819
"""Turn off UI for duration of test"""
820
# by default the UI is off; tests can turn it on if they want it.
821
saved = ui.ui_factory
823
ui.ui_factory = saved
824
ui.ui_factory = ui.SilentUIFactory()
825
self.addCleanup(_restore)
827
def _ndiff_strings(self, a, b):
828
"""Return ndiff between two strings containing lines.
830
A trailing newline is added if missing to make the strings
832
if b and b[-1] != '\n':
834
if a and a[-1] != '\n':
836
difflines = difflib.ndiff(a.splitlines(True),
838
linejunk=lambda x: False,
839
charjunk=lambda x: False)
840
return ''.join(difflines)
842
def assertEqual(self, a, b, message=''):
846
except UnicodeError, e:
847
# If we can't compare without getting a UnicodeError, then
848
# obviously they are different
849
mutter('UnicodeError: %s', e)
852
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
854
pformat(a), pformat(b)))
856
assertEquals = assertEqual
858
def assertEqualDiff(self, a, b, message=None):
859
"""Assert two texts are equal, if not raise an exception.
861
This is intended for use with multi-line strings where it can
862
be hard to find the differences by eye.
864
# TODO: perhaps override assertEquals to call this for strings?
868
message = "texts not equal:\n"
870
message = 'first string is missing a final newline.\n'
872
message = 'second string is missing a final newline.\n'
873
raise AssertionError(message +
874
self._ndiff_strings(a, b))
876
def assertEqualMode(self, mode, mode_test):
877
self.assertEqual(mode, mode_test,
878
'mode mismatch %o != %o' % (mode, mode_test))
880
def assertEqualStat(self, expected, actual):
881
"""assert that expected and actual are the same stat result.
883
:param expected: A stat result.
884
:param actual: A stat result.
885
:raises AssertionError: If the expected and actual stat values differ
888
self.assertEqual(expected.st_size, actual.st_size)
889
self.assertEqual(expected.st_mtime, actual.st_mtime)
890
self.assertEqual(expected.st_ctime, actual.st_ctime)
891
self.assertEqual(expected.st_dev, actual.st_dev)
892
self.assertEqual(expected.st_ino, actual.st_ino)
893
self.assertEqual(expected.st_mode, actual.st_mode)
895
def assertPositive(self, val):
896
"""Assert that val is greater than 0."""
897
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
899
def assertNegative(self, val):
900
"""Assert that val is less than 0."""
901
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
903
def assertStartsWith(self, s, prefix):
904
if not s.startswith(prefix):
905
raise AssertionError('string %r does not start with %r' % (s, prefix))
907
def assertEndsWith(self, s, suffix):
908
"""Asserts that s ends with suffix."""
909
if not s.endswith(suffix):
910
raise AssertionError('string %r does not end with %r' % (s, suffix))
912
def assertContainsRe(self, haystack, needle_re):
913
"""Assert that a contains something matching a regular expression."""
914
if not re.search(needle_re, haystack):
915
if '\n' in haystack or len(haystack) > 60:
916
# a long string, format it in a more readable way
917
raise AssertionError(
918
'pattern "%s" not found in\n"""\\\n%s"""\n'
919
% (needle_re, haystack))
921
raise AssertionError('pattern "%s" not found in "%s"'
922
% (needle_re, haystack))
924
def assertNotContainsRe(self, haystack, needle_re):
925
"""Assert that a does not match a regular expression"""
926
if re.search(needle_re, haystack):
927
raise AssertionError('pattern "%s" found in "%s"'
928
% (needle_re, haystack))
930
def assertSubset(self, sublist, superlist):
931
"""Assert that every entry in sublist is present in superlist."""
932
missing = set(sublist) - set(superlist)
934
raise AssertionError("value(s) %r not present in container %r" %
935
(missing, superlist))
937
def assertListRaises(self, excClass, func, *args, **kwargs):
938
"""Fail unless excClass is raised when the iterator from func is used.
940
Many functions can return generators this makes sure
941
to wrap them in a list() call to make sure the whole generator
942
is run, and that the proper exception is raised.
945
list(func(*args, **kwargs))
949
if getattr(excClass,'__name__', None) is not None:
950
excName = excClass.__name__
952
excName = str(excClass)
953
raise self.failureException, "%s not raised" % excName
955
def assertRaises(self, excClass, callableObj, *args, **kwargs):
956
"""Assert that a callable raises a particular exception.
958
:param excClass: As for the except statement, this may be either an
959
exception class, or a tuple of classes.
960
:param callableObj: A callable, will be passed ``*args`` and
963
Returns the exception so that you can examine it.
966
callableObj(*args, **kwargs)
970
if getattr(excClass,'__name__', None) is not None:
971
excName = excClass.__name__
974
excName = str(excClass)
975
raise self.failureException, "%s not raised" % excName
977
def assertIs(self, left, right, message=None):
978
if not (left is right):
979
if message is not None:
980
raise AssertionError(message)
982
raise AssertionError("%r is not %r." % (left, right))
984
def assertIsNot(self, left, right, message=None):
986
if message is not None:
987
raise AssertionError(message)
989
raise AssertionError("%r is %r." % (left, right))
991
def assertTransportMode(self, transport, path, mode):
992
"""Fail if a path does not have mode mode.
994
If modes are not supported on this transport, the assertion is ignored.
996
if not transport._can_roundtrip_unix_modebits():
998
path_stat = transport.stat(path)
999
actual_mode = stat.S_IMODE(path_stat.st_mode)
1000
self.assertEqual(mode, actual_mode,
1001
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
1003
def assertIsSameRealPath(self, path1, path2):
1004
"""Fail if path1 and path2 points to different files"""
1005
self.assertEqual(osutils.realpath(path1),
1006
osutils.realpath(path2),
1007
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1009
def assertIsInstance(self, obj, kls):
1010
"""Fail if obj is not an instance of kls"""
1011
if not isinstance(obj, kls):
1012
self.fail("%r is an instance of %s rather than %s" % (
1013
obj, obj.__class__, kls))
1015
def expectFailure(self, reason, assertion, *args, **kwargs):
1016
"""Invoke a test, expecting it to fail for the given reason.
1018
This is for assertions that ought to succeed, but currently fail.
1019
(The failure is *expected* but not *wanted*.) Please be very precise
1020
about the failure you're expecting. If a new bug is introduced,
1021
AssertionError should be raised, not KnownFailure.
1023
Frequently, expectFailure should be followed by an opposite assertion.
1026
Intended to be used with a callable that raises AssertionError as the
1027
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1029
Raises KnownFailure if the test fails. Raises AssertionError if the
1034
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1036
self.assertEqual(42, dynamic_val)
1038
This means that a dynamic_val of 54 will cause the test to raise
1039
a KnownFailure. Once math is fixed and the expectFailure is removed,
1040
only a dynamic_val of 42 will allow the test to pass. Anything other
1041
than 54 or 42 will cause an AssertionError.
1044
assertion(*args, **kwargs)
1045
except AssertionError:
1046
raise KnownFailure(reason)
1048
self.fail('Unexpected success. Should have failed: %s' % reason)
1050
def assertFileEqual(self, content, path):
1051
"""Fail if path does not contain 'content'."""
1052
self.failUnlessExists(path)
1053
f = file(path, 'rb')
1058
self.assertEqualDiff(content, s)
1060
def failUnlessExists(self, path):
1061
"""Fail unless path or paths, which may be abs or relative, exist."""
1062
if not isinstance(path, basestring):
1064
self.failUnlessExists(p)
1066
self.failUnless(osutils.lexists(path),path+" does not exist")
1068
def failIfExists(self, path):
1069
"""Fail if path or paths, which may be abs or relative, exist."""
1070
if not isinstance(path, basestring):
1072
self.failIfExists(p)
1074
self.failIf(osutils.lexists(path),path+" exists")
1076
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1077
"""A helper for callDeprecated and applyDeprecated.
1079
:param a_callable: A callable to call.
1080
:param args: The positional arguments for the callable
1081
:param kwargs: The keyword arguments for the callable
1082
:return: A tuple (warnings, result). result is the result of calling
1083
a_callable(``*args``, ``**kwargs``).
1086
def capture_warnings(msg, cls=None, stacklevel=None):
1087
# we've hooked into a deprecation specific callpath,
1088
# only deprecations should getting sent via it.
1089
self.assertEqual(cls, DeprecationWarning)
1090
local_warnings.append(msg)
1091
original_warning_method = symbol_versioning.warn
1092
symbol_versioning.set_warning_method(capture_warnings)
1094
result = a_callable(*args, **kwargs)
1096
symbol_versioning.set_warning_method(original_warning_method)
1097
return (local_warnings, result)
1099
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1100
"""Call a deprecated callable without warning the user.
1102
Note that this only captures warnings raised by symbol_versioning.warn,
1103
not other callers that go direct to the warning module.
1105
To test that a deprecated method raises an error, do something like
1108
self.assertRaises(errors.ReservedId,
1109
self.applyDeprecated,
1110
deprecated_in((1, 5, 0)),
1114
:param deprecation_format: The deprecation format that the callable
1115
should have been deprecated with. This is the same type as the
1116
parameter to deprecated_method/deprecated_function. If the
1117
callable is not deprecated with this format, an assertion error
1119
:param a_callable: A callable to call. This may be a bound method or
1120
a regular function. It will be called with ``*args`` and
1122
:param args: The positional arguments for the callable
1123
:param kwargs: The keyword arguments for the callable
1124
:return: The result of a_callable(``*args``, ``**kwargs``)
1126
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1128
expected_first_warning = symbol_versioning.deprecation_string(
1129
a_callable, deprecation_format)
1130
if len(call_warnings) == 0:
1131
self.fail("No deprecation warning generated by call to %s" %
1133
self.assertEqual(expected_first_warning, call_warnings[0])
1136
def callCatchWarnings(self, fn, *args, **kw):
1137
"""Call a callable that raises python warnings.
1139
The caller's responsible for examining the returned warnings.
1141
If the callable raises an exception, the exception is not
1142
caught and propagates up to the caller. In that case, the list
1143
of warnings is not available.
1145
:returns: ([warning_object, ...], fn_result)
1147
# XXX: This is not perfect, because it completely overrides the
1148
# warnings filters, and some code may depend on suppressing particular
1149
# warnings. It's the easiest way to insulate ourselves from -Werror,
1150
# though. -- Andrew, 20071062
1152
def _catcher(message, category, filename, lineno, file=None, line=None):
1153
# despite the name, 'message' is normally(?) a Warning subclass
1155
wlist.append(message)
1156
saved_showwarning = warnings.showwarning
1157
saved_filters = warnings.filters
1159
warnings.showwarning = _catcher
1160
warnings.filters = []
1161
result = fn(*args, **kw)
1163
warnings.showwarning = saved_showwarning
1164
warnings.filters = saved_filters
1165
return wlist, result
1167
def callDeprecated(self, expected, callable, *args, **kwargs):
1168
"""Assert that a callable is deprecated in a particular way.
1170
This is a very precise test for unusual requirements. The
1171
applyDeprecated helper function is probably more suited for most tests
1172
as it allows you to simply specify the deprecation format being used
1173
and will ensure that that is issued for the function being called.
1175
Note that this only captures warnings raised by symbol_versioning.warn,
1176
not other callers that go direct to the warning module. To catch
1177
general warnings, use callCatchWarnings.
1179
:param expected: a list of the deprecation warnings expected, in order
1180
:param callable: The callable to call
1181
:param args: The positional arguments for the callable
1182
:param kwargs: The keyword arguments for the callable
1184
call_warnings, result = self._capture_deprecation_warnings(callable,
1186
self.assertEqual(expected, call_warnings)
1189
def _startLogFile(self):
1190
"""Send bzr and test log messages to a temporary file.
1192
The file is removed as the test is torn down.
1194
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1195
self._log_file = os.fdopen(fileno, 'w+')
1196
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1197
self._log_file_name = name
1198
self.addCleanup(self._finishLogFile)
1200
def _finishLogFile(self):
1201
"""Finished with the log file.
1203
Close the file and delete it, unless setKeepLogfile was called.
1205
if self._log_file is None:
1207
bzrlib.trace.pop_log_file(self._log_memento)
1208
self._log_file.close()
1209
self._log_file = None
1210
if not self._keep_log_file:
1211
os.remove(self._log_file_name)
1212
self._log_file_name = None
1214
def setKeepLogfile(self):
1215
"""Make the logfile not be deleted when _finishLogFile is called."""
1216
self._keep_log_file = True
1218
def addCleanup(self, callable, *args, **kwargs):
1219
"""Arrange to run a callable when this case is torn down.
1221
Callables are run in the reverse of the order they are registered,
1222
ie last-in first-out.
1224
self._cleanups.append((callable, args, kwargs))
1226
def _cleanEnvironment(self):
1228
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1229
'HOME': os.getcwd(),
1230
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1231
# tests do check our impls match APPDATA
1232
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1234
'BZREMAIL': None, # may still be present in the environment
1236
'BZR_PROGRESS_BAR': None,
1238
'BZR_PLUGIN_PATH': None,
1240
'SSH_AUTH_SOCK': None,
1244
'https_proxy': None,
1245
'HTTPS_PROXY': None,
1250
# Nobody cares about these ones AFAIK. So far at
1251
# least. If you do (care), please update this comment
1255
'BZR_REMOTE_PATH': None,
1258
self.addCleanup(self._restoreEnvironment)
1259
for name, value in new_env.iteritems():
1260
self._captureVar(name, value)
1262
def _captureVar(self, name, newvalue):
1263
"""Set an environment variable, and reset it when finished."""
1264
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1266
def _restore_debug_flags(self):
1267
debug.debug_flags.clear()
1268
debug.debug_flags.update(self._preserved_debug_flags)
1270
def _restoreEnvironment(self):
1271
for name, value in self.__old_env.iteritems():
1272
osutils.set_or_unset_env(name, value)
1274
def _restoreHooks(self):
1275
for klass, hooks in self._preserved_hooks.items():
1276
setattr(klass, 'hooks', hooks)
1278
def knownFailure(self, reason):
1279
"""This test has failed for some known reason."""
1280
raise KnownFailure(reason)
1282
def run(self, result=None):
1283
if result is None: result = self.defaultTestResult()
1284
for feature in getattr(self, '_test_needs_features', []):
1285
if not feature.available():
1286
result.startTest(self)
1287
if getattr(result, 'addNotSupported', None):
1288
result.addNotSupported(self, feature)
1290
result.addSuccess(self)
1291
result.stopTest(self)
1294
return unittest.TestCase.run(self, result)
1297
absent_attr = object()
1298
for attr_name in self.attrs_to_keep:
1299
attr = getattr(self, attr_name, absent_attr)
1300
if attr is not absent_attr:
1301
saved_attrs[attr_name] = attr
1302
self.__dict__ = saved_attrs
1306
unittest.TestCase.tearDown(self)
1308
def time(self, callable, *args, **kwargs):
1309
"""Run callable and accrue the time it takes to the benchmark time.
1311
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1312
this will cause lsprofile statistics to be gathered and stored in
1315
if self._benchtime is None:
1319
if not self._gather_lsprof_in_benchmarks:
1320
return callable(*args, **kwargs)
1322
# record this benchmark
1323
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1325
self._benchcalls.append(((callable, args, kwargs), stats))
1328
self._benchtime += time.time() - start
1330
def _runCleanups(self):
1331
"""Run registered cleanup functions.
1333
This should only be called from TestCase.tearDown.
1335
# TODO: Perhaps this should keep running cleanups even if
1336
# one of them fails?
1338
# Actually pop the cleanups from the list so tearDown running
1339
# twice is safe (this happens for skipped tests).
1340
while self._cleanups:
1341
cleanup, args, kwargs = self._cleanups.pop()
1342
cleanup(*args, **kwargs)
1344
def log(self, *args):
1347
def _get_log(self, keep_log_file=False):
1348
"""Get the log from bzrlib.trace calls from this test.
1350
:param keep_log_file: When True, if the log is still a file on disk
1351
leave it as a file on disk. When False, if the log is still a file
1352
on disk, the log file is deleted and the log preserved as
1354
:return: A string containing the log.
1356
# flush the log file, to get all content
1358
if bzrlib.trace._trace_file:
1359
bzrlib.trace._trace_file.flush()
1360
if self._log_contents:
1361
# XXX: this can hardly contain the content flushed above --vila
1363
return self._log_contents
1364
if self._log_file_name is not None:
1365
logfile = open(self._log_file_name)
1367
log_contents = logfile.read()
1370
if not keep_log_file:
1371
self._log_contents = log_contents
1373
os.remove(self._log_file_name)
1375
if sys.platform == 'win32' and e.errno == errno.EACCES:
1376
sys.stderr.write(('Unable to delete log file '
1377
' %r\n' % self._log_file_name))
1382
return "DELETED log file to reduce memory footprint"
1384
def requireFeature(self, feature):
1385
"""This test requires a specific feature is available.
1387
:raises UnavailableFeature: When feature is not available.
1389
if not feature.available():
1390
raise UnavailableFeature(feature)
1392
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1394
"""Run bazaar command line, splitting up a string command line."""
1395
if isinstance(args, basestring):
1396
# shlex don't understand unicode strings,
1397
# so args should be plain string (bialix 20070906)
1398
args = list(shlex.split(str(args)))
1399
return self._run_bzr_core(args, retcode=retcode,
1400
encoding=encoding, stdin=stdin, working_dir=working_dir,
1403
def _run_bzr_core(self, args, retcode, encoding, stdin,
1405
if encoding is None:
1406
encoding = osutils.get_user_encoding()
1407
stdout = StringIOWrapper()
1408
stderr = StringIOWrapper()
1409
stdout.encoding = encoding
1410
stderr.encoding = encoding
1412
self.log('run bzr: %r', args)
1413
# FIXME: don't call into logging here
1414
handler = logging.StreamHandler(stderr)
1415
handler.setLevel(logging.INFO)
1416
logger = logging.getLogger('')
1417
logger.addHandler(handler)
1418
old_ui_factory = ui.ui_factory
1419
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1422
if working_dir is not None:
1423
cwd = osutils.getcwd()
1424
os.chdir(working_dir)
1427
result = self.apply_redirected(ui.ui_factory.stdin,
1429
bzrlib.commands.run_bzr_catch_user_errors,
1432
logger.removeHandler(handler)
1433
ui.ui_factory = old_ui_factory
1437
out = stdout.getvalue()
1438
err = stderr.getvalue()
1440
self.log('output:\n%r', out)
1442
self.log('errors:\n%r', err)
1443
if retcode is not None:
1444
self.assertEquals(retcode, result,
1445
message='Unexpected return code')
1448
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1449
working_dir=None, error_regexes=[], output_encoding=None):
1450
"""Invoke bzr, as if it were run from the command line.
1452
The argument list should not include the bzr program name - the
1453
first argument is normally the bzr command. Arguments may be
1454
passed in three ways:
1456
1- A list of strings, eg ["commit", "a"]. This is recommended
1457
when the command contains whitespace or metacharacters, or
1458
is built up at run time.
1460
2- A single string, eg "add a". This is the most convenient
1461
for hardcoded commands.
1463
This runs bzr through the interface that catches and reports
1464
errors, and with logging set to something approximating the
1465
default, so that error reporting can be checked.
1467
This should be the main method for tests that want to exercise the
1468
overall behavior of the bzr application (rather than a unit test
1469
or a functional test of the library.)
1471
This sends the stdout/stderr results into the test's log,
1472
where it may be useful for debugging. See also run_captured.
1474
:keyword stdin: A string to be used as stdin for the command.
1475
:keyword retcode: The status code the command should return;
1477
:keyword working_dir: The directory to run the command in
1478
:keyword error_regexes: A list of expected error messages. If
1479
specified they must be seen in the error output of the command.
1481
out, err = self._run_bzr_autosplit(
1486
working_dir=working_dir,
1488
for regex in error_regexes:
1489
self.assertContainsRe(err, regex)
1492
def run_bzr_error(self, error_regexes, *args, **kwargs):
1493
"""Run bzr, and check that stderr contains the supplied regexes
1495
:param error_regexes: Sequence of regular expressions which
1496
must each be found in the error output. The relative ordering
1498
:param args: command-line arguments for bzr
1499
:param kwargs: Keyword arguments which are interpreted by run_bzr
1500
This function changes the default value of retcode to be 3,
1501
since in most cases this is run when you expect bzr to fail.
1503
:return: (out, err) The actual output of running the command (in case
1504
you want to do more inspection)
1508
# Make sure that commit is failing because there is nothing to do
1509
self.run_bzr_error(['no changes to commit'],
1510
['commit', '-m', 'my commit comment'])
1511
# Make sure --strict is handling an unknown file, rather than
1512
# giving us the 'nothing to do' error
1513
self.build_tree(['unknown'])
1514
self.run_bzr_error(['Commit refused because there are unknown files'],
1515
['commit', --strict', '-m', 'my commit comment'])
1517
kwargs.setdefault('retcode', 3)
1518
kwargs['error_regexes'] = error_regexes
1519
out, err = self.run_bzr(*args, **kwargs)
1522
def run_bzr_subprocess(self, *args, **kwargs):
1523
"""Run bzr in a subprocess for testing.
1525
This starts a new Python interpreter and runs bzr in there.
1526
This should only be used for tests that have a justifiable need for
1527
this isolation: e.g. they are testing startup time, or signal
1528
handling, or early startup code, etc. Subprocess code can't be
1529
profiled or debugged so easily.
1531
:keyword retcode: The status code that is expected. Defaults to 0. If
1532
None is supplied, the status code is not checked.
1533
:keyword env_changes: A dictionary which lists changes to environment
1534
variables. A value of None will unset the env variable.
1535
The values must be strings. The change will only occur in the
1536
child, so you don't need to fix the environment after running.
1537
:keyword universal_newlines: Convert CRLF => LF
1538
:keyword allow_plugins: By default the subprocess is run with
1539
--no-plugins to ensure test reproducibility. Also, it is possible
1540
for system-wide plugins to create unexpected output on stderr,
1541
which can cause unnecessary test failures.
1543
env_changes = kwargs.get('env_changes', {})
1544
working_dir = kwargs.get('working_dir', None)
1545
allow_plugins = kwargs.get('allow_plugins', False)
1547
if isinstance(args[0], list):
1549
elif isinstance(args[0], basestring):
1550
args = list(shlex.split(args[0]))
1552
raise ValueError("passing varargs to run_bzr_subprocess")
1553
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1554
working_dir=working_dir,
1555
allow_plugins=allow_plugins)
1556
# We distinguish between retcode=None and retcode not passed.
1557
supplied_retcode = kwargs.get('retcode', 0)
1558
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1559
universal_newlines=kwargs.get('universal_newlines', False),
1562
def start_bzr_subprocess(self, process_args, env_changes=None,
1563
skip_if_plan_to_signal=False,
1565
allow_plugins=False):
1566
"""Start bzr in a subprocess for testing.
1568
This starts a new Python interpreter and runs bzr in there.
1569
This should only be used for tests that have a justifiable need for
1570
this isolation: e.g. they are testing startup time, or signal
1571
handling, or early startup code, etc. Subprocess code can't be
1572
profiled or debugged so easily.
1574
:param process_args: a list of arguments to pass to the bzr executable,
1575
for example ``['--version']``.
1576
:param env_changes: A dictionary which lists changes to environment
1577
variables. A value of None will unset the env variable.
1578
The values must be strings. The change will only occur in the
1579
child, so you don't need to fix the environment after running.
1580
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1582
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1584
:returns: Popen object for the started process.
1586
if skip_if_plan_to_signal:
1587
if not getattr(os, 'kill', None):
1588
raise TestSkipped("os.kill not available.")
1590
if env_changes is None:
1594
def cleanup_environment():
1595
for env_var, value in env_changes.iteritems():
1596
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1598
def restore_environment():
1599
for env_var, value in old_env.iteritems():
1600
osutils.set_or_unset_env(env_var, value)
1602
bzr_path = self.get_bzr_path()
1605
if working_dir is not None:
1606
cwd = osutils.getcwd()
1607
os.chdir(working_dir)
1610
# win32 subprocess doesn't support preexec_fn
1611
# so we will avoid using it on all platforms, just to
1612
# make sure the code path is used, and we don't break on win32
1613
cleanup_environment()
1614
command = [sys.executable]
1615
# frozen executables don't need the path to bzr
1616
if getattr(sys, "frozen", None) is None:
1617
command.append(bzr_path)
1618
if not allow_plugins:
1619
command.append('--no-plugins')
1620
command.extend(process_args)
1621
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1623
restore_environment()
1629
def _popen(self, *args, **kwargs):
1630
"""Place a call to Popen.
1632
Allows tests to override this method to intercept the calls made to
1633
Popen for introspection.
1635
return Popen(*args, **kwargs)
1637
def get_bzr_path(self):
1638
"""Return the path of the 'bzr' executable for this test suite."""
1639
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1640
if not os.path.isfile(bzr_path):
1641
# We are probably installed. Assume sys.argv is the right file
1642
bzr_path = sys.argv[0]
1645
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1646
universal_newlines=False, process_args=None):
1647
"""Finish the execution of process.
1649
:param process: the Popen object returned from start_bzr_subprocess.
1650
:param retcode: The status code that is expected. Defaults to 0. If
1651
None is supplied, the status code is not checked.
1652
:param send_signal: an optional signal to send to the process.
1653
:param universal_newlines: Convert CRLF => LF
1654
:returns: (stdout, stderr)
1656
if send_signal is not None:
1657
os.kill(process.pid, send_signal)
1658
out, err = process.communicate()
1660
if universal_newlines:
1661
out = out.replace('\r\n', '\n')
1662
err = err.replace('\r\n', '\n')
1664
if retcode is not None and retcode != process.returncode:
1665
if process_args is None:
1666
process_args = "(unknown args)"
1667
mutter('Output of bzr %s:\n%s', process_args, out)
1668
mutter('Error for bzr %s:\n%s', process_args, err)
1669
self.fail('Command bzr %s failed with retcode %s != %s'
1670
% (process_args, retcode, process.returncode))
1673
def check_inventory_shape(self, inv, shape):
1674
"""Compare an inventory to a list of expected names.
1676
Fail if they are not precisely equal.
1679
shape = list(shape) # copy
1680
for path, ie in inv.entries():
1681
name = path.replace('\\', '/')
1682
if ie.kind == 'directory':
1689
self.fail("expected paths not found in inventory: %r" % shape)
1691
self.fail("unexpected paths found in inventory: %r" % extras)
1693
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1694
a_callable=None, *args, **kwargs):
1695
"""Call callable with redirected std io pipes.
1697
Returns the return code."""
1698
if not callable(a_callable):
1699
raise ValueError("a_callable must be callable.")
1701
stdin = StringIO("")
1703
if getattr(self, "_log_file", None) is not None:
1704
stdout = self._log_file
1708
if getattr(self, "_log_file", None is not None):
1709
stderr = self._log_file
1712
real_stdin = sys.stdin
1713
real_stdout = sys.stdout
1714
real_stderr = sys.stderr
1719
return a_callable(*args, **kwargs)
1721
sys.stdout = real_stdout
1722
sys.stderr = real_stderr
1723
sys.stdin = real_stdin
1725
def reduceLockdirTimeout(self):
1726
"""Reduce the default lock timeout for the duration of the test, so that
1727
if LockContention occurs during a test, it does so quickly.
1729
Tests that expect to provoke LockContention errors should call this.
1731
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1733
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1734
self.addCleanup(resetTimeout)
1735
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1737
def make_utf8_encoded_stringio(self, encoding_type=None):
1738
"""Return a StringIOWrapper instance, that will encode Unicode
1741
if encoding_type is None:
1742
encoding_type = 'strict'
1744
output_encoding = 'utf-8'
1745
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1746
sio.encoding = output_encoding
1750
class TestCaseWithMemoryTransport(TestCase):
1751
"""Common test class for tests that do not need disk resources.
1753
Tests that need disk resources should derive from TestCaseWithTransport.
1755
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1757
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1758
a directory which does not exist. This serves to help ensure test isolation
1759
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1760
must exist. However, TestCaseWithMemoryTransport does not offer local
1761
file defaults for the transport in tests, nor does it obey the command line
1762
override, so tests that accidentally write to the common directory should
1765
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1766
a .bzr directory that stops us ascending higher into the filesystem.
1772
def __init__(self, methodName='runTest'):
1773
# allow test parameterization after test construction and before test
1774
# execution. Variables that the parameterizer sets need to be
1775
# ones that are not set by setUp, or setUp will trash them.
1776
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1777
self.vfs_transport_factory = default_transport
1778
self.transport_server = None
1779
self.transport_readonly_server = None
1780
self.__vfs_server = None
1782
def get_transport(self, relpath=None):
1783
"""Return a writeable transport.
1785
This transport is for the test scratch space relative to
1788
:param relpath: a path relative to the base url.
1790
t = get_transport(self.get_url(relpath))
1791
self.assertFalse(t.is_readonly())
1794
def get_readonly_transport(self, relpath=None):
1795
"""Return a readonly transport for the test scratch space
1797
This can be used to test that operations which should only need
1798
readonly access in fact do not try to write.
1800
:param relpath: a path relative to the base url.
1802
t = get_transport(self.get_readonly_url(relpath))
1803
self.assertTrue(t.is_readonly())
1806
def create_transport_readonly_server(self):
1807
"""Create a transport server from class defined at init.
1809
This is mostly a hook for daughter classes.
1811
return self.transport_readonly_server()
1813
def get_readonly_server(self):
1814
"""Get the server instance for the readonly transport
1816
This is useful for some tests with specific servers to do diagnostics.
1818
if self.__readonly_server is None:
1819
if self.transport_readonly_server is None:
1820
# readonly decorator requested
1821
# bring up the server
1822
self.__readonly_server = ReadonlyServer()
1823
self.__readonly_server.setUp(self.get_vfs_only_server())
1825
self.__readonly_server = self.create_transport_readonly_server()
1826
self.__readonly_server.setUp(self.get_vfs_only_server())
1827
self.addCleanup(self.__readonly_server.tearDown)
1828
return self.__readonly_server
1830
def get_readonly_url(self, relpath=None):
1831
"""Get a URL for the readonly transport.
1833
This will either be backed by '.' or a decorator to the transport
1834
used by self.get_url()
1835
relpath provides for clients to get a path relative to the base url.
1836
These should only be downwards relative, not upwards.
1838
base = self.get_readonly_server().get_url()
1839
return self._adjust_url(base, relpath)
1841
def get_vfs_only_server(self):
1842
"""Get the vfs only read/write server instance.
1844
This is useful for some tests with specific servers that need
1847
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1848
is no means to override it.
1850
if self.__vfs_server is None:
1851
self.__vfs_server = MemoryServer()
1852
self.__vfs_server.setUp()
1853
self.addCleanup(self.__vfs_server.tearDown)
1854
return self.__vfs_server
1856
def get_server(self):
1857
"""Get the read/write server instance.
1859
This is useful for some tests with specific servers that need
1862
This is built from the self.transport_server factory. If that is None,
1863
then the self.get_vfs_server is returned.
1865
if self.__server is None:
1866
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1867
return self.get_vfs_only_server()
1869
# bring up a decorated means of access to the vfs only server.
1870
self.__server = self.transport_server()
1872
self.__server.setUp(self.get_vfs_only_server())
1873
except TypeError, e:
1874
# This should never happen; the try:Except here is to assist
1875
# developers having to update code rather than seeing an
1876
# uninformative TypeError.
1877
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1878
self.addCleanup(self.__server.tearDown)
1879
return self.__server
1881
def _adjust_url(self, base, relpath):
1882
"""Get a URL (or maybe a path) for the readwrite transport.
1884
This will either be backed by '.' or to an equivalent non-file based
1886
relpath provides for clients to get a path relative to the base url.
1887
These should only be downwards relative, not upwards.
1889
if relpath is not None and relpath != '.':
1890
if not base.endswith('/'):
1892
# XXX: Really base should be a url; we did after all call
1893
# get_url()! But sometimes it's just a path (from
1894
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1895
# to a non-escaped local path.
1896
if base.startswith('./') or base.startswith('/'):
1899
base += urlutils.escape(relpath)
1902
def get_url(self, relpath=None):
1903
"""Get a URL (or maybe a path) for the readwrite transport.
1905
This will either be backed by '.' or to an equivalent non-file based
1907
relpath provides for clients to get a path relative to the base url.
1908
These should only be downwards relative, not upwards.
1910
base = self.get_server().get_url()
1911
return self._adjust_url(base, relpath)
1913
def get_vfs_only_url(self, relpath=None):
1914
"""Get a URL (or maybe a path for the plain old vfs transport.
1916
This will never be a smart protocol. It always has all the
1917
capabilities of the local filesystem, but it might actually be a
1918
MemoryTransport or some other similar virtual filesystem.
1920
This is the backing transport (if any) of the server returned by
1921
get_url and get_readonly_url.
1923
:param relpath: provides for clients to get a path relative to the base
1924
url. These should only be downwards relative, not upwards.
1927
base = self.get_vfs_only_server().get_url()
1928
return self._adjust_url(base, relpath)
1930
def _create_safety_net(self):
1931
"""Make a fake bzr directory.
1933
This prevents any tests propagating up onto the TEST_ROOT directory's
1936
root = TestCaseWithMemoryTransport.TEST_ROOT
1937
bzrdir.BzrDir.create_standalone_workingtree(root)
1939
def _check_safety_net(self):
1940
"""Check that the safety .bzr directory have not been touched.
1942
_make_test_root have created a .bzr directory to prevent tests from
1943
propagating. This method ensures than a test did not leaked.
1945
root = TestCaseWithMemoryTransport.TEST_ROOT
1946
wt = workingtree.WorkingTree.open(root)
1947
last_rev = wt.last_revision()
1948
if last_rev != 'null:':
1949
# The current test have modified the /bzr directory, we need to
1950
# recreate a new one or all the followng tests will fail.
1951
# If you need to inspect its content uncomment the following line
1952
# import pdb; pdb.set_trace()
1953
_rmtree_temp_dir(root + '/.bzr')
1954
self._create_safety_net()
1955
raise AssertionError('%s/.bzr should not be modified' % root)
1957
def _make_test_root(self):
1958
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1959
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1960
TestCaseWithMemoryTransport.TEST_ROOT = root
1962
self._create_safety_net()
1964
# The same directory is used by all tests, and we're not
1965
# specifically told when all tests are finished. This will do.
1966
atexit.register(_rmtree_temp_dir, root)
1968
self.addCleanup(self._check_safety_net)
1970
def makeAndChdirToTestDir(self):
1971
"""Create a temporary directories for this one test.
1973
This must set self.test_home_dir and self.test_dir and chdir to
1976
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1978
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1979
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1980
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1982
def make_branch(self, relpath, format=None):
1983
"""Create a branch on the transport at relpath."""
1984
repo = self.make_repository(relpath, format=format)
1985
return repo.bzrdir.create_branch()
1987
def make_bzrdir(self, relpath, format=None):
1989
# might be a relative or absolute path
1990
maybe_a_url = self.get_url(relpath)
1991
segments = maybe_a_url.rsplit('/', 1)
1992
t = get_transport(maybe_a_url)
1993
if len(segments) > 1 and segments[-1] not in ('', '.'):
1997
if isinstance(format, basestring):
1998
format = bzrdir.format_registry.make_bzrdir(format)
1999
return format.initialize_on_transport(t)
2000
except errors.UninitializableFormat:
2001
raise TestSkipped("Format %s is not initializable." % format)
2003
def make_repository(self, relpath, shared=False, format=None):
2004
"""Create a repository on our default transport at relpath.
2006
Note that relpath must be a relative path, not a full url.
2008
# FIXME: If you create a remoterepository this returns the underlying
2009
# real format, which is incorrect. Actually we should make sure that
2010
# RemoteBzrDir returns a RemoteRepository.
2011
# maybe mbp 20070410
2012
made_control = self.make_bzrdir(relpath, format=format)
2013
return made_control.create_repository(shared=shared)
2015
def make_branch_and_memory_tree(self, relpath, format=None):
2016
"""Create a branch on the default transport and a MemoryTree for it."""
2017
b = self.make_branch(relpath, format=format)
2018
return memorytree.MemoryTree.create_on_branch(b)
2020
def make_branch_builder(self, relpath, format=None):
2021
url = self.get_url(relpath)
2022
tran = get_transport(url)
2023
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2025
def overrideEnvironmentForTesting(self):
2026
os.environ['HOME'] = self.test_home_dir
2027
os.environ['BZR_HOME'] = self.test_home_dir
2030
super(TestCaseWithMemoryTransport, self).setUp()
2031
self._make_test_root()
2032
_currentdir = os.getcwdu()
2033
def _leaveDirectory():
2034
os.chdir(_currentdir)
2035
self.addCleanup(_leaveDirectory)
2036
self.makeAndChdirToTestDir()
2037
self.overrideEnvironmentForTesting()
2038
self.__readonly_server = None
2039
self.__server = None
2040
self.reduceLockdirTimeout()
2043
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2044
"""Derived class that runs a test within a temporary directory.
2046
This is useful for tests that need to create a branch, etc.
2048
The directory is created in a slightly complex way: for each
2049
Python invocation, a new temporary top-level directory is created.
2050
All test cases create their own directory within that. If the
2051
tests complete successfully, the directory is removed.
2053
:ivar test_base_dir: The path of the top-level directory for this
2054
test, which contains a home directory and a work directory.
2056
:ivar test_home_dir: An initially empty directory under test_base_dir
2057
which is used as $HOME for this test.
2059
:ivar test_dir: A directory under test_base_dir used as the current
2060
directory when the test proper is run.
2063
OVERRIDE_PYTHON = 'python'
2065
def check_file_contents(self, filename, expect):
2066
self.log("check contents of file %s" % filename)
2067
contents = file(filename, 'r').read()
2068
if contents != expect:
2069
self.log("expected: %r" % expect)
2070
self.log("actually: %r" % contents)
2071
self.fail("contents of %s not as expected" % filename)
2073
def _getTestDirPrefix(self):
2074
# create a directory within the top level test directory
2075
if sys.platform == 'win32':
2076
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2077
# windows is likely to have path-length limits so use a short name
2078
name_prefix = name_prefix[-30:]
2080
name_prefix = re.sub('[/]', '_', self.id())
2083
def makeAndChdirToTestDir(self):
2084
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2086
For TestCaseInTempDir we create a temporary directory based on the test
2087
name and then create two subdirs - test and home under it.
2089
name_prefix = osutils.pathjoin(self.TEST_ROOT, self._getTestDirPrefix())
2091
for i in range(100):
2092
if os.path.exists(name):
2093
name = name_prefix + '_' + str(i)
2097
# now create test and home directories within this dir
2098
self.test_base_dir = name
2099
self.test_home_dir = self.test_base_dir + '/home'
2100
os.mkdir(self.test_home_dir)
2101
self.test_dir = self.test_base_dir + '/work'
2102
os.mkdir(self.test_dir)
2103
os.chdir(self.test_dir)
2104
# put name of test inside
2105
f = file(self.test_base_dir + '/name', 'w')
2110
self.addCleanup(self.deleteTestDir)
2112
def deleteTestDir(self):
2113
os.chdir(self.TEST_ROOT)
2114
_rmtree_temp_dir(self.test_base_dir)
2116
def build_tree(self, shape, line_endings='binary', transport=None):
2117
"""Build a test tree according to a pattern.
2119
shape is a sequence of file specifications. If the final
2120
character is '/', a directory is created.
2122
This assumes that all the elements in the tree being built are new.
2124
This doesn't add anything to a branch.
2126
:type shape: list or tuple.
2127
:param line_endings: Either 'binary' or 'native'
2128
in binary mode, exact contents are written in native mode, the
2129
line endings match the default platform endings.
2130
:param transport: A transport to write to, for building trees on VFS's.
2131
If the transport is readonly or None, "." is opened automatically.
2134
if type(shape) not in (list, tuple):
2135
raise AssertionError("Parameter 'shape' should be "
2136
"a list or a tuple. Got %r instead" % (shape,))
2137
# It's OK to just create them using forward slashes on windows.
2138
if transport is None or transport.is_readonly():
2139
transport = get_transport(".")
2141
self.assert_(isinstance(name, basestring))
2143
transport.mkdir(urlutils.escape(name[:-1]))
2145
if line_endings == 'binary':
2147
elif line_endings == 'native':
2150
raise errors.BzrError(
2151
'Invalid line ending request %r' % line_endings)
2152
content = "contents of %s%s" % (name.encode('utf-8'), end)
2153
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2155
def build_tree_contents(self, shape):
2156
build_tree_contents(shape)
2158
def assertInWorkingTree(self, path, root_path='.', tree=None):
2159
"""Assert whether path or paths are in the WorkingTree"""
2161
tree = workingtree.WorkingTree.open(root_path)
2162
if not isinstance(path, basestring):
2164
self.assertInWorkingTree(p, tree=tree)
2166
self.assertIsNot(tree.path2id(path), None,
2167
path+' not in working tree.')
2169
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2170
"""Assert whether path or paths are not in the WorkingTree"""
2172
tree = workingtree.WorkingTree.open(root_path)
2173
if not isinstance(path, basestring):
2175
self.assertNotInWorkingTree(p,tree=tree)
2177
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2180
class TestCaseWithTransport(TestCaseInTempDir):
2181
"""A test case that provides get_url and get_readonly_url facilities.
2183
These back onto two transport servers, one for readonly access and one for
2186
If no explicit class is provided for readonly access, a
2187
ReadonlyTransportDecorator is used instead which allows the use of non disk
2188
based read write transports.
2190
If an explicit class is provided for readonly access, that server and the
2191
readwrite one must both define get_url() as resolving to os.getcwd().
2194
def get_vfs_only_server(self):
2195
"""See TestCaseWithMemoryTransport.
2197
This is useful for some tests with specific servers that need
2200
if self.__vfs_server is None:
2201
self.__vfs_server = self.vfs_transport_factory()
2202
self.__vfs_server.setUp()
2203
self.addCleanup(self.__vfs_server.tearDown)
2204
return self.__vfs_server
2206
def make_branch_and_tree(self, relpath, format=None):
2207
"""Create a branch on the transport and a tree locally.
2209
If the transport is not a LocalTransport, the Tree can't be created on
2210
the transport. In that case if the vfs_transport_factory is
2211
LocalURLServer the working tree is created in the local
2212
directory backing the transport, and the returned tree's branch and
2213
repository will also be accessed locally. Otherwise a lightweight
2214
checkout is created and returned.
2216
:param format: The BzrDirFormat.
2217
:returns: the WorkingTree.
2219
# TODO: always use the local disk path for the working tree,
2220
# this obviously requires a format that supports branch references
2221
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2223
b = self.make_branch(relpath, format=format)
2225
return b.bzrdir.create_workingtree()
2226
except errors.NotLocalUrl:
2227
# We can only make working trees locally at the moment. If the
2228
# transport can't support them, then we keep the non-disk-backed
2229
# branch and create a local checkout.
2230
if self.vfs_transport_factory is LocalURLServer:
2231
# the branch is colocated on disk, we cannot create a checkout.
2232
# hopefully callers will expect this.
2233
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2234
wt = local_controldir.create_workingtree()
2235
if wt.branch._format != b._format:
2237
# Make sure that assigning to wt._branch fixes wt.branch,
2238
# in case the implementation details of workingtree objects
2240
self.assertIs(b, wt.branch)
2243
return b.create_checkout(relpath, lightweight=True)
2245
def assertIsDirectory(self, relpath, transport):
2246
"""Assert that relpath within transport is a directory.
2248
This may not be possible on all transports; in that case it propagates
2249
a TransportNotPossible.
2252
mode = transport.stat(relpath).st_mode
2253
except errors.NoSuchFile:
2254
self.fail("path %s is not a directory; no such file"
2256
if not stat.S_ISDIR(mode):
2257
self.fail("path %s is not a directory; has mode %#o"
2260
def assertTreesEqual(self, left, right):
2261
"""Check that left and right have the same content and properties."""
2262
# we use a tree delta to check for equality of the content, and we
2263
# manually check for equality of other things such as the parents list.
2264
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2265
differences = left.changes_from(right)
2266
self.assertFalse(differences.has_changed(),
2267
"Trees %r and %r are different: %r" % (left, right, differences))
2270
super(TestCaseWithTransport, self).setUp()
2271
self.__vfs_server = None
2274
class ChrootedTestCase(TestCaseWithTransport):
2275
"""A support class that provides readonly urls outside the local namespace.
2277
This is done by checking if self.transport_server is a MemoryServer. if it
2278
is then we are chrooted already, if it is not then an HttpServer is used
2281
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2282
be used without needed to redo it when a different
2283
subclass is in use ?
2287
super(ChrootedTestCase, self).setUp()
2288
if not self.vfs_transport_factory == MemoryServer:
2289
self.transport_readonly_server = HttpServer
2292
def condition_id_re(pattern):
2293
"""Create a condition filter which performs a re check on a test's id.
2295
:param pattern: A regular expression string.
2296
:return: A callable that returns True if the re matches.
2298
filter_re = re.compile(pattern)
2299
def condition(test):
2301
return filter_re.search(test_id)
2305
def condition_isinstance(klass_or_klass_list):
2306
"""Create a condition filter which returns isinstance(param, klass).
2308
:return: A callable which when called with one parameter obj return the
2309
result of isinstance(obj, klass_or_klass_list).
2312
return isinstance(obj, klass_or_klass_list)
2316
def condition_id_in_list(id_list):
2317
"""Create a condition filter which verify that test's id in a list.
2319
:param id_list: A TestIdList object.
2320
:return: A callable that returns True if the test's id appears in the list.
2322
def condition(test):
2323
return id_list.includes(test.id())
2327
def condition_id_startswith(starts):
2328
"""Create a condition filter verifying that test's id starts with a string.
2330
:param starts: A list of string.
2331
:return: A callable that returns True if the test's id starts with one of
2334
def condition(test):
2335
for start in starts:
2336
if test.id().startswith(start):
2342
def exclude_tests_by_condition(suite, condition):
2343
"""Create a test suite which excludes some tests from suite.
2345
:param suite: The suite to get tests from.
2346
:param condition: A callable whose result evaluates True when called with a
2347
test case which should be excluded from the result.
2348
:return: A suite which contains the tests found in suite that fail
2352
for test in iter_suite_tests(suite):
2353
if not condition(test):
2355
return TestUtil.TestSuite(result)
2358
def filter_suite_by_condition(suite, condition):
2359
"""Create a test suite by filtering another one.
2361
:param suite: The source suite.
2362
:param condition: A callable whose result evaluates True when called with a
2363
test case which should be included in the result.
2364
:return: A suite which contains the tests found in suite that pass
2368
for test in iter_suite_tests(suite):
2371
return TestUtil.TestSuite(result)
2374
def filter_suite_by_re(suite, pattern):
2375
"""Create a test suite by filtering another one.
2377
:param suite: the source suite
2378
:param pattern: pattern that names must match
2379
:returns: the newly created suite
2381
condition = condition_id_re(pattern)
2382
result_suite = filter_suite_by_condition(suite, condition)
2386
def filter_suite_by_id_list(suite, test_id_list):
2387
"""Create a test suite by filtering another one.
2389
:param suite: The source suite.
2390
:param test_id_list: A list of the test ids to keep as strings.
2391
:returns: the newly created suite
2393
condition = condition_id_in_list(test_id_list)
2394
result_suite = filter_suite_by_condition(suite, condition)
2398
def filter_suite_by_id_startswith(suite, start):
2399
"""Create a test suite by filtering another one.
2401
:param suite: The source suite.
2402
:param start: A list of string the test id must start with one of.
2403
:returns: the newly created suite
2405
condition = condition_id_startswith(start)
2406
result_suite = filter_suite_by_condition(suite, condition)
2410
def exclude_tests_by_re(suite, pattern):
2411
"""Create a test suite which excludes some tests from suite.
2413
:param suite: The suite to get tests from.
2414
:param pattern: A regular expression string. Test ids that match this
2415
pattern will be excluded from the result.
2416
:return: A TestSuite that contains all the tests from suite without the
2417
tests that matched pattern. The order of tests is the same as it was in
2420
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2423
def preserve_input(something):
2424
"""A helper for performing test suite transformation chains.
2426
:param something: Anything you want to preserve.
2432
def randomize_suite(suite):
2433
"""Return a new TestSuite with suite's tests in random order.
2435
The tests in the input suite are flattened into a single suite in order to
2436
accomplish this. Any nested TestSuites are removed to provide global
2439
tests = list(iter_suite_tests(suite))
2440
random.shuffle(tests)
2441
return TestUtil.TestSuite(tests)
2444
def split_suite_by_condition(suite, condition):
2445
"""Split a test suite into two by a condition.
2447
:param suite: The suite to split.
2448
:param condition: The condition to match on. Tests that match this
2449
condition are returned in the first test suite, ones that do not match
2450
are in the second suite.
2451
:return: A tuple of two test suites, where the first contains tests from
2452
suite matching the condition, and the second contains the remainder
2453
from suite. The order within each output suite is the same as it was in
2458
for test in iter_suite_tests(suite):
2460
matched.append(test)
2462
did_not_match.append(test)
2463
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2466
def split_suite_by_re(suite, pattern):
2467
"""Split a test suite into two by a regular expression.
2469
:param suite: The suite to split.
2470
:param pattern: A regular expression string. Test ids that match this
2471
pattern will be in the first test suite returned, and the others in the
2472
second test suite returned.
2473
:return: A tuple of two test suites, where the first contains tests from
2474
suite matching pattern, and the second contains the remainder from
2475
suite. The order within each output suite is the same as it was in
2478
return split_suite_by_condition(suite, condition_id_re(pattern))
2481
def run_suite(suite, name='test', verbose=False, pattern=".*",
2482
stop_on_failure=False,
2483
transport=None, lsprof_timed=None, bench_history=None,
2484
matching_tests_first=None,
2487
exclude_pattern=None,
2489
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2494
runner = TextTestRunner(stream=sys.stdout,
2496
verbosity=verbosity,
2497
bench_history=bench_history,
2498
list_only=list_only,
2500
runner.stop_on_failure=stop_on_failure
2501
# Initialise the random number generator and display the seed used.
2502
# We convert the seed to a long to make it reuseable across invocations.
2503
random_order = False
2504
if random_seed is not None:
2506
if random_seed == "now":
2507
random_seed = long(time.time())
2509
# Convert the seed to a long if we can
2511
random_seed = long(random_seed)
2514
runner.stream.writeln("Randomizing test order using seed %s\n" %
2516
random.seed(random_seed)
2517
# Customise the list of tests if requested
2518
if exclude_pattern is not None:
2519
suite = exclude_tests_by_re(suite, exclude_pattern)
2521
order_changer = randomize_suite
2523
order_changer = preserve_input
2524
if pattern != '.*' or random_order:
2525
if matching_tests_first:
2526
suites = map(order_changer, split_suite_by_re(suite, pattern))
2527
suite = TestUtil.TestSuite(suites)
2529
suite = order_changer(filter_suite_by_re(suite, pattern))
2531
result = runner.run(suite)
2534
return result.wasStrictlySuccessful()
2536
return result.wasSuccessful()
2539
# Controlled by "bzr selftest -E=..." option
2540
selftest_debug_flags = set()
2543
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2545
test_suite_factory=None,
2548
matching_tests_first=None,
2551
exclude_pattern=None,
2557
"""Run the whole test suite under the enhanced runner"""
2558
# XXX: Very ugly way to do this...
2559
# Disable warning about old formats because we don't want it to disturb
2560
# any blackbox tests.
2561
from bzrlib import repository
2562
repository._deprecation_warning_done = True
2564
global default_transport
2565
if transport is None:
2566
transport = default_transport
2567
old_transport = default_transport
2568
default_transport = transport
2569
global selftest_debug_flags
2570
old_debug_flags = selftest_debug_flags
2571
if debug_flags is not None:
2572
selftest_debug_flags = set(debug_flags)
2574
if load_list is None:
2577
keep_only = load_test_id_list(load_list)
2578
if test_suite_factory is None:
2579
suite = test_suite(keep_only, starting_with)
2581
suite = test_suite_factory()
2582
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2583
stop_on_failure=stop_on_failure,
2584
transport=transport,
2585
lsprof_timed=lsprof_timed,
2586
bench_history=bench_history,
2587
matching_tests_first=matching_tests_first,
2588
list_only=list_only,
2589
random_seed=random_seed,
2590
exclude_pattern=exclude_pattern,
2593
default_transport = old_transport
2594
selftest_debug_flags = old_debug_flags
2597
def load_test_id_list(file_name):
2598
"""Load a test id list from a text file.
2600
The format is one test id by line. No special care is taken to impose
2601
strict rules, these test ids are used to filter the test suite so a test id
2602
that do not match an existing test will do no harm. This allows user to add
2603
comments, leave blank lines, etc.
2607
ftest = open(file_name, 'rt')
2609
if e.errno != errno.ENOENT:
2612
raise errors.NoSuchFile(file_name)
2614
for test_name in ftest.readlines():
2615
test_list.append(test_name.strip())
2620
def suite_matches_id_list(test_suite, id_list):
2621
"""Warns about tests not appearing or appearing more than once.
2623
:param test_suite: A TestSuite object.
2624
:param test_id_list: The list of test ids that should be found in
2627
:return: (absents, duplicates) absents is a list containing the test found
2628
in id_list but not in test_suite, duplicates is a list containing the
2629
test found multiple times in test_suite.
2631
When using a prefined test id list, it may occurs that some tests do not
2632
exist anymore or that some tests use the same id. This function warns the
2633
tester about potential problems in his workflow (test lists are volatile)
2634
or in the test suite itself (using the same id for several tests does not
2635
help to localize defects).
2637
# Build a dict counting id occurrences
2639
for test in iter_suite_tests(test_suite):
2641
tests[id] = tests.get(id, 0) + 1
2646
occurs = tests.get(id, 0)
2648
not_found.append(id)
2650
duplicates.append(id)
2652
return not_found, duplicates
2655
class TestIdList(object):
2656
"""Test id list to filter a test suite.
2658
Relying on the assumption that test ids are built as:
2659
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2660
notation, this class offers methods to :
2661
- avoid building a test suite for modules not refered to in the test list,
2662
- keep only the tests listed from the module test suite.
2665
def __init__(self, test_id_list):
2666
# When a test suite needs to be filtered against us we compare test ids
2667
# for equality, so a simple dict offers a quick and simple solution.
2668
self.tests = dict().fromkeys(test_id_list, True)
2670
# While unittest.TestCase have ids like:
2671
# <module>.<class>.<method>[(<param+)],
2672
# doctest.DocTestCase can have ids like:
2675
# <module>.<function>
2676
# <module>.<class>.<method>
2678
# Since we can't predict a test class from its name only, we settle on
2679
# a simple constraint: a test id always begins with its module name.
2682
for test_id in test_id_list:
2683
parts = test_id.split('.')
2684
mod_name = parts.pop(0)
2685
modules[mod_name] = True
2687
mod_name += '.' + part
2688
modules[mod_name] = True
2689
self.modules = modules
2691
def refers_to(self, module_name):
2692
"""Is there tests for the module or one of its sub modules."""
2693
return self.modules.has_key(module_name)
2695
def includes(self, test_id):
2696
return self.tests.has_key(test_id)
2699
class TestPrefixAliasRegistry(registry.Registry):
2700
"""A registry for test prefix aliases.
2702
This helps implement shorcuts for the --starting-with selftest
2703
option. Overriding existing prefixes is not allowed but not fatal (a
2704
warning will be emitted).
2707
def register(self, key, obj, help=None, info=None,
2708
override_existing=False):
2709
"""See Registry.register.
2711
Trying to override an existing alias causes a warning to be emitted,
2712
not a fatal execption.
2715
super(TestPrefixAliasRegistry, self).register(
2716
key, obj, help=help, info=info, override_existing=False)
2718
actual = self.get(key)
2719
note('Test prefix alias %s is already used for %s, ignoring %s'
2720
% (key, actual, obj))
2722
def resolve_alias(self, id_start):
2723
"""Replace the alias by the prefix in the given string.
2725
Using an unknown prefix is an error to help catching typos.
2727
parts = id_start.split('.')
2729
parts[0] = self.get(parts[0])
2731
raise errors.BzrCommandError(
2732
'%s is not a known test prefix alias' % parts[0])
2733
return '.'.join(parts)
2736
test_prefix_alias_registry = TestPrefixAliasRegistry()
2737
"""Registry of test prefix aliases."""
2740
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2741
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2742
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2744
# Obvious higest levels prefixes, feel free to add your own via a plugin
2745
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2746
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2747
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2748
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2749
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2752
def test_suite(keep_only=None, starting_with=None):
2753
"""Build and return TestSuite for the whole of bzrlib.
2755
:param keep_only: A list of test ids limiting the suite returned.
2757
:param starting_with: An id limiting the suite returned to the tests
2760
This function can be replaced if you need to change the default test
2761
suite on a global basis, but it is not encouraged.
2765
'bzrlib.tests.blackbox',
2766
'bzrlib.tests.branch_implementations',
2767
'bzrlib.tests.bzrdir_implementations',
2768
'bzrlib.tests.commands',
2769
'bzrlib.tests.interrepository_implementations',
2770
'bzrlib.tests.intertree_implementations',
2771
'bzrlib.tests.inventory_implementations',
2772
'bzrlib.tests.per_lock',
2773
'bzrlib.tests.per_repository',
2774
'bzrlib.tests.per_repository_reference',
2775
'bzrlib.tests.test__dirstate_helpers',
2776
'bzrlib.tests.test__walkdirs_win32',
2777
'bzrlib.tests.test_ancestry',
2778
'bzrlib.tests.test_annotate',
2779
'bzrlib.tests.test_api',
2780
'bzrlib.tests.test_atomicfile',
2781
'bzrlib.tests.test_bad_files',
2782
'bzrlib.tests.test_bisect_multi',
2783
'bzrlib.tests.test_branch',
2784
'bzrlib.tests.test_branchbuilder',
2785
'bzrlib.tests.test_btree_index',
2786
'bzrlib.tests.test_bugtracker',
2787
'bzrlib.tests.test_bundle',
2788
'bzrlib.tests.test_bzrdir',
2789
'bzrlib.tests.test_cache_utf8',
2790
'bzrlib.tests.test_chunk_writer',
2791
'bzrlib.tests.test__chunks_to_lines',
2792
'bzrlib.tests.test_commands',
2793
'bzrlib.tests.test_commit',
2794
'bzrlib.tests.test_commit_merge',
2795
'bzrlib.tests.test_config',
2796
'bzrlib.tests.test_conflicts',
2797
'bzrlib.tests.test_counted_lock',
2798
'bzrlib.tests.test_decorators',
2799
'bzrlib.tests.test_delta',
2800
'bzrlib.tests.test_deprecated_graph',
2801
'bzrlib.tests.test_diff',
2802
'bzrlib.tests.test_directory_service',
2803
'bzrlib.tests.test_dirstate',
2804
'bzrlib.tests.test_email_message',
2805
'bzrlib.tests.test_errors',
2806
'bzrlib.tests.test_extract',
2807
'bzrlib.tests.test_fetch',
2808
'bzrlib.tests.test_fifo_cache',
2809
'bzrlib.tests.test_ftp_transport',
2810
'bzrlib.tests.test_foreign',
2811
'bzrlib.tests.test_generate_docs',
2812
'bzrlib.tests.test_generate_ids',
2813
'bzrlib.tests.test_globbing',
2814
'bzrlib.tests.test_gpg',
2815
'bzrlib.tests.test_graph',
2816
'bzrlib.tests.test_hashcache',
2817
'bzrlib.tests.test_help',
2818
'bzrlib.tests.test_hooks',
2819
'bzrlib.tests.test_http',
2820
'bzrlib.tests.test_http_implementations',
2821
'bzrlib.tests.test_http_response',
2822
'bzrlib.tests.test_https_ca_bundle',
2823
'bzrlib.tests.test_identitymap',
2824
'bzrlib.tests.test_ignores',
2825
'bzrlib.tests.test_index',
2826
'bzrlib.tests.test_info',
2827
'bzrlib.tests.test_inv',
2828
'bzrlib.tests.test_knit',
2829
'bzrlib.tests.test_lazy_import',
2830
'bzrlib.tests.test_lazy_regex',
2831
'bzrlib.tests.test_lockable_files',
2832
'bzrlib.tests.test_lockdir',
2833
'bzrlib.tests.test_log',
2834
'bzrlib.tests.test_lru_cache',
2835
'bzrlib.tests.test_lsprof',
2836
'bzrlib.tests.test_mail_client',
2837
'bzrlib.tests.test_memorytree',
2838
'bzrlib.tests.test_merge',
2839
'bzrlib.tests.test_merge3',
2840
'bzrlib.tests.test_merge_core',
2841
'bzrlib.tests.test_merge_directive',
2842
'bzrlib.tests.test_missing',
2843
'bzrlib.tests.test_msgeditor',
2844
'bzrlib.tests.test_multiparent',
2845
'bzrlib.tests.test_mutabletree',
2846
'bzrlib.tests.test_nonascii',
2847
'bzrlib.tests.test_options',
2848
'bzrlib.tests.test_osutils',
2849
'bzrlib.tests.test_osutils_encodings',
2850
'bzrlib.tests.test_pack',
2851
'bzrlib.tests.test_pack_repository',
2852
'bzrlib.tests.test_patch',
2853
'bzrlib.tests.test_patches',
2854
'bzrlib.tests.test_permissions',
2855
'bzrlib.tests.test_plugins',
2856
'bzrlib.tests.test_progress',
2857
'bzrlib.tests.test_read_bundle',
2858
'bzrlib.tests.test_reconcile',
2859
'bzrlib.tests.test_reconfigure',
2860
'bzrlib.tests.test_registry',
2861
'bzrlib.tests.test_remote',
2862
'bzrlib.tests.test_repository',
2863
'bzrlib.tests.test_revert',
2864
'bzrlib.tests.test_revision',
2865
'bzrlib.tests.test_revisionspec',
2866
'bzrlib.tests.test_revisiontree',
2867
'bzrlib.tests.test_rio',
2868
'bzrlib.tests.test_rules',
2869
'bzrlib.tests.test_sampler',
2870
'bzrlib.tests.test_selftest',
2871
'bzrlib.tests.test_setup',
2872
'bzrlib.tests.test_sftp_transport',
2873
'bzrlib.tests.test_shelf',
2874
'bzrlib.tests.test_shelf_ui',
2875
'bzrlib.tests.test_smart',
2876
'bzrlib.tests.test_smart_add',
2877
'bzrlib.tests.test_smart_transport',
2878
'bzrlib.tests.test_smtp_connection',
2879
'bzrlib.tests.test_source',
2880
'bzrlib.tests.test_ssh_transport',
2881
'bzrlib.tests.test_status',
2882
'bzrlib.tests.test_store',
2883
'bzrlib.tests.test_strace',
2884
'bzrlib.tests.test_subsume',
2885
'bzrlib.tests.test_switch',
2886
'bzrlib.tests.test_symbol_versioning',
2887
'bzrlib.tests.test_tag',
2888
'bzrlib.tests.test_testament',
2889
'bzrlib.tests.test_textfile',
2890
'bzrlib.tests.test_textmerge',
2891
'bzrlib.tests.test_timestamp',
2892
'bzrlib.tests.test_trace',
2893
'bzrlib.tests.test_transactions',
2894
'bzrlib.tests.test_transform',
2895
'bzrlib.tests.test_transport',
2896
'bzrlib.tests.test_transport_implementations',
2897
'bzrlib.tests.test_transport_log',
2898
'bzrlib.tests.test_tree',
2899
'bzrlib.tests.test_treebuilder',
2900
'bzrlib.tests.test_tsort',
2901
'bzrlib.tests.test_tuned_gzip',
2902
'bzrlib.tests.test_ui',
2903
'bzrlib.tests.test_uncommit',
2904
'bzrlib.tests.test_upgrade',
2905
'bzrlib.tests.test_upgrade_stacked',
2906
'bzrlib.tests.test_urlutils',
2907
'bzrlib.tests.test_version',
2908
'bzrlib.tests.test_version_info',
2909
'bzrlib.tests.test_versionedfile',
2910
'bzrlib.tests.test_weave',
2911
'bzrlib.tests.test_whitebox',
2912
'bzrlib.tests.test_win32utils',
2913
'bzrlib.tests.test_workingtree',
2914
'bzrlib.tests.test_workingtree_4',
2915
'bzrlib.tests.test_wsgi',
2916
'bzrlib.tests.test_xml',
2917
'bzrlib.tests.tree_implementations',
2918
'bzrlib.tests.workingtree_implementations',
2919
'bzrlib.util.tests.test_bencode',
2922
loader = TestUtil.TestLoader()
2925
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2926
for start in starting_with]
2927
# We take precedence over keep_only because *at loading time* using
2928
# both options means we will load less tests for the same final result.
2929
def interesting_module(name):
2930
for start in starting_with:
2932
# Either the module name starts with the specified string
2933
name.startswith(start)
2934
# or it may contain tests starting with the specified string
2935
or start.startswith(name)
2939
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2941
elif keep_only is not None:
2942
id_filter = TestIdList(keep_only)
2943
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2944
def interesting_module(name):
2945
return id_filter.refers_to(name)
2948
loader = TestUtil.TestLoader()
2949
def interesting_module(name):
2950
# No filtering, all modules are interesting
2953
suite = loader.suiteClass()
2955
# modules building their suite with loadTestsFromModuleNames
2956
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2958
modules_to_doctest = [
2960
'bzrlib.branchbuilder',
2963
'bzrlib.iterablefile',
2967
'bzrlib.symbol_versioning',
2970
'bzrlib.version_info_formats.format_custom',
2973
for mod in modules_to_doctest:
2974
if not interesting_module(mod):
2975
# No tests to keep here, move along
2978
# note that this really does mean "report only" -- doctest
2979
# still runs the rest of the examples
2980
doc_suite = doctest.DocTestSuite(mod,
2981
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
2982
except ValueError, e:
2983
print '**failed to get doctest for: %s\n%s' % (mod, e)
2985
if len(doc_suite._tests) == 0:
2986
raise errors.BzrError("no doctests found in %s" % (mod,))
2987
suite.addTest(doc_suite)
2989
default_encoding = sys.getdefaultencoding()
2990
for name, plugin in bzrlib.plugin.plugins().items():
2991
if not interesting_module(plugin.module.__name__):
2993
plugin_suite = plugin.test_suite()
2994
# We used to catch ImportError here and turn it into just a warning,
2995
# but really if you don't have --no-plugins this should be a failure.
2996
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2997
if plugin_suite is None:
2998
plugin_suite = plugin.load_plugin_tests(loader)
2999
if plugin_suite is not None:
3000
suite.addTest(plugin_suite)
3001
if default_encoding != sys.getdefaultencoding():
3002
bzrlib.trace.warning(
3003
'Plugin "%s" tried to reset default encoding to: %s', name,
3004
sys.getdefaultencoding())
3006
sys.setdefaultencoding(default_encoding)
3009
suite = filter_suite_by_id_startswith(suite, starting_with)
3011
if keep_only is not None:
3012
# Now that the referred modules have loaded their tests, keep only the
3014
suite = filter_suite_by_id_list(suite, id_filter)
3015
# Do some sanity checks on the id_list filtering
3016
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3018
# The tester has used both keep_only and starting_with, so he is
3019
# already aware that some tests are excluded from the list, there
3020
# is no need to tell him which.
3023
# Some tests mentioned in the list are not in the test suite. The
3024
# list may be out of date, report to the tester.
3025
for id in not_found:
3026
bzrlib.trace.warning('"%s" not found in the test suite', id)
3027
for id in duplicates:
3028
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3033
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3034
"""Adapt all tests in some given modules to given scenarios.
3036
This is the recommended public interface for test parameterization.
3037
Typically the test_suite() method for a per-implementation test
3038
suite will call multiply_tests_from_modules and return the
3041
:param module_name_list: List of fully-qualified names of test
3043
:param scenario_iter: Iterable of pairs of (scenario_name,
3044
scenario_param_dict).
3045
:param loader: If provided, will be used instead of a new
3046
bzrlib.tests.TestLoader() instance.
3048
This returns a new TestSuite containing the cross product of
3049
all the tests in all the modules, each repeated for each scenario.
3050
Each test is adapted by adding the scenario name at the end
3051
of its name, and updating the test object's __dict__ with the
3052
scenario_param_dict.
3054
>>> r = multiply_tests_from_modules(
3055
... ['bzrlib.tests.test_sampler'],
3056
... [('one', dict(param=1)),
3057
... ('two', dict(param=2))])
3058
>>> tests = list(iter_suite_tests(r))
3062
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3068
# XXX: Isn't load_tests() a better way to provide the same functionality
3069
# without forcing a predefined TestScenarioApplier ? --vila 080215
3071
loader = TestUtil.TestLoader()
3073
suite = loader.suiteClass()
3075
adapter = TestScenarioApplier()
3076
adapter.scenarios = list(scenario_iter)
3077
adapt_modules(module_name_list, adapter, loader, suite)
3081
def multiply_scenarios(scenarios_left, scenarios_right):
3082
"""Multiply two sets of scenarios.
3084
:returns: the cartesian product of the two sets of scenarios, that is
3085
a scenario for every possible combination of a left scenario and a
3089
('%s,%s' % (left_name, right_name),
3090
dict(left_dict.items() + right_dict.items()))
3091
for left_name, left_dict in scenarios_left
3092
for right_name, right_dict in scenarios_right]
3096
def adapt_modules(mods_list, adapter, loader, suite):
3097
"""Adapt the modules in mods_list using adapter and add to suite."""
3098
tests = loader.loadTestsFromModuleNames(mods_list)
3099
adapt_tests(tests, adapter, suite)
3102
def adapt_tests(tests_list, adapter, suite):
3103
"""Adapt the tests in tests_list using adapter and add to suite."""
3104
for test in iter_suite_tests(tests_list):
3105
suite.addTests(adapter.adapt(test))
3108
def _rmtree_temp_dir(dirname):
3109
# If LANG=C we probably have created some bogus paths
3110
# which rmtree(unicode) will fail to delete
3111
# so make sure we are using rmtree(str) to delete everything
3112
# except on win32, where rmtree(str) will fail
3113
# since it doesn't have the property of byte-stream paths
3114
# (they are either ascii or mbcs)
3115
if sys.platform == 'win32':
3116
# make sure we are using the unicode win32 api
3117
dirname = unicode(dirname)
3119
dirname = dirname.encode(sys.getfilesystemencoding())
3121
osutils.rmtree(dirname)
3123
if sys.platform == 'win32' and e.errno == errno.EACCES:
3124
sys.stderr.write(('Permission denied: '
3125
'unable to remove testing dir '
3126
'%s\n' % os.path.basename(dirname)))
3131
class Feature(object):
3132
"""An operating system Feature."""
3135
self._available = None
3137
def available(self):
3138
"""Is the feature available?
3140
:return: True if the feature is available.
3142
if self._available is None:
3143
self._available = self._probe()
3144
return self._available
3147
"""Implement this method in concrete features.
3149
:return: True if the feature is available.
3151
raise NotImplementedError
3154
if getattr(self, 'feature_name', None):
3155
return self.feature_name()
3156
return self.__class__.__name__
3159
class _SymlinkFeature(Feature):
3162
return osutils.has_symlinks()
3164
def feature_name(self):
3167
SymlinkFeature = _SymlinkFeature()
3170
class _HardlinkFeature(Feature):
3173
return osutils.has_hardlinks()
3175
def feature_name(self):
3178
HardlinkFeature = _HardlinkFeature()
3181
class _OsFifoFeature(Feature):
3184
return getattr(os, 'mkfifo', None)
3186
def feature_name(self):
3187
return 'filesystem fifos'
3189
OsFifoFeature = _OsFifoFeature()
3192
class _UnicodeFilenameFeature(Feature):
3193
"""Does the filesystem support Unicode filenames?"""
3197
# Check for character combinations unlikely to be covered by any
3198
# single non-unicode encoding. We use the characters
3199
# - greek small letter alpha (U+03B1) and
3200
# - braille pattern dots-123456 (U+283F).
3201
os.stat(u'\u03b1\u283f')
3202
except UnicodeEncodeError:
3204
except (IOError, OSError):
3205
# The filesystem allows the Unicode filename but the file doesn't
3209
# The filesystem allows the Unicode filename and the file exists,
3213
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3216
class TestScenarioApplier(object):
3217
"""A tool to apply scenarios to tests."""
3219
def adapt(self, test):
3220
"""Return a TestSuite containing a copy of test for each scenario."""
3221
result = unittest.TestSuite()
3222
for scenario in self.scenarios:
3223
result.addTest(self.adapt_test_to_scenario(test, scenario))
3226
def adapt_test_to_scenario(self, test, scenario):
3227
"""Copy test and apply scenario to it.
3229
:param test: A test to adapt.
3230
:param scenario: A tuple describing the scenarion.
3231
The first element of the tuple is the new test id.
3232
The second element is a dict containing attributes to set on the
3234
:return: The adapted test.
3236
from copy import deepcopy
3237
new_test = deepcopy(test)
3238
for name, value in scenario[1].items():
3239
setattr(new_test, name, value)
3240
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3241
new_test.id = lambda: new_id
3245
def probe_unicode_in_user_encoding():
3246
"""Try to encode several unicode strings to use in unicode-aware tests.
3247
Return first successfull match.
3249
:return: (unicode value, encoded plain string value) or (None, None)
3251
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3252
for uni_val in possible_vals:
3254
str_val = uni_val.encode(osutils.get_user_encoding())
3255
except UnicodeEncodeError:
3256
# Try a different character
3259
return uni_val, str_val
3263
def probe_bad_non_ascii(encoding):
3264
"""Try to find [bad] character with code [128..255]
3265
that cannot be decoded to unicode in some encoding.
3266
Return None if all non-ascii characters is valid
3269
for i in xrange(128, 256):
3272
char.decode(encoding)
3273
except UnicodeDecodeError:
3278
class _FTPServerFeature(Feature):
3279
"""Some tests want an FTP Server, check if one is available.
3281
Right now, the only way this is available is if 'medusa' is installed.
3282
http://www.amk.ca/python/code/medusa.html
3287
import bzrlib.tests.ftp_server
3292
def feature_name(self):
3295
FTPServerFeature = _FTPServerFeature()
3298
class _UnicodeFilename(Feature):
3299
"""Does the filesystem support Unicode filenames?"""
3304
except UnicodeEncodeError:
3306
except (IOError, OSError):
3307
# The filesystem allows the Unicode filename but the file doesn't
3311
# The filesystem allows the Unicode filename and the file exists,
3315
UnicodeFilename = _UnicodeFilename()
3318
class _UTF8Filesystem(Feature):
3319
"""Is the filesystem UTF-8?"""
3322
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3326
UTF8Filesystem = _UTF8Filesystem()
3329
class _CaseInsensitiveFilesystemFeature(Feature):
3330
"""Check if underlying filesystem is case-insensitive
3331
(e.g. on Windows, Cygwin, MacOS)
3335
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3336
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3337
TestCaseWithMemoryTransport.TEST_ROOT = root
3339
root = TestCaseWithMemoryTransport.TEST_ROOT
3340
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3342
name_a = osutils.pathjoin(tdir, 'a')
3343
name_A = osutils.pathjoin(tdir, 'A')
3345
result = osutils.isdir(name_A)
3346
_rmtree_temp_dir(tdir)
3349
def feature_name(self):
3350
return 'case-insensitive filesystem'
3352
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()