1
# Copyright (C) 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
30
from logging import debug, warning, error
32
import bzrlib.commands
34
import bzrlib.osutils as osutils
35
from bzrlib.selftest import TestUtil
36
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
37
from bzrlib.selftest.treeshape import build_tree_contents
38
from bzrlib.errors import BzrError
41
MODULES_TO_DOCTEST = []
45
class EarlyStoppingTestResultAdapter(object):
46
"""An adapter for TestResult to stop at the first first failure or error"""
48
def __init__(self, result):
51
def addError(self, test, err):
52
self._result.addError(test, err)
55
def addFailure(self, test, err):
56
self._result.addFailure(test, err)
59
def __getattr__(self, name):
60
return getattr(self._result, name)
62
def __setattr__(self, name, value):
64
object.__setattr__(self, name, value)
65
return setattr(self._result, name, value)
68
class _MyResult(unittest._TextTestResult):
72
No special behaviour for now.
75
def _elapsedTime(self):
76
return "(Took %.3fs)" % (time.time() - self._start_time)
78
def startTest(self, test):
79
unittest.TestResult.startTest(self, test)
80
# TODO: Maybe show test.shortDescription somewhere?
81
what = test.shortDescription() or test.id()
83
self.stream.write('%-70.70s' % what)
85
self._start_time = time.time()
87
def addError(self, test, err):
88
unittest.TestResult.addError(self, test, err)
90
self.stream.writeln("ERROR %s" % self._elapsedTime())
92
self.stream.write('E')
95
def addFailure(self, test, err):
96
unittest.TestResult.addFailure(self, test, err)
98
self.stream.writeln("FAIL %s" % self._elapsedTime())
100
self.stream.write('F')
103
def addSuccess(self, test):
105
self.stream.writeln('OK %s' % self._elapsedTime())
107
self.stream.write('~')
109
unittest.TestResult.addSuccess(self, test)
111
def printErrorList(self, flavour, errors):
112
for test, err in errors:
113
self.stream.writeln(self.separator1)
114
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
115
if hasattr(test, '_get_log'):
116
self.stream.writeln()
117
self.stream.writeln('log from this test:')
118
print >>self.stream, test._get_log()
119
self.stream.writeln(self.separator2)
120
self.stream.writeln("%s" % err)
123
class TextTestRunner(unittest.TextTestRunner):
124
stop_on_failure = False
126
def _makeResult(self):
127
result = _MyResult(self.stream, self.descriptions, self.verbosity)
128
if self.stop_on_failure:
129
result = EarlyStoppingTestResultAdapter(result)
133
def iter_suite_tests(suite):
134
"""Return all tests in a suite, recursing through nested suites"""
135
for item in suite._tests:
136
if isinstance(item, unittest.TestCase):
138
elif isinstance(item, unittest.TestSuite):
139
for r in iter_suite_tests(item):
142
raise Exception('unknown object %r inside test suite %r'
146
class TestSkipped(Exception):
147
"""Indicates that a test was intentionally skipped, rather than failing."""
151
class CommandFailed(Exception):
154
class TestCase(unittest.TestCase):
155
"""Base class for bzr unit tests.
157
Tests that need access to disk resources should subclass
158
TestCaseInTempDir not TestCase.
160
Error and debug log messages are redirected from their usual
161
location into a temporary file, the contents of which can be
162
retrieved by _get_log(). We use a real OS file, not an in-memory object,
163
so that it can also capture file IO. When the test completes this file
164
is read into memory and removed from disk.
166
There are also convenience functions to invoke bzr's command-line
167
routine, and to build and check bzr trees.
169
In addition to the usual method of overriding tearDown(), this class also
170
allows subclasses to register functions into the _cleanups list, which is
171
run in order as the object is torn down. It's less likely this will be
172
accidentally overlooked.
176
_log_file_name = None
180
unittest.TestCase.setUp(self)
182
self._cleanEnvironment()
183
bzrlib.trace.disable_default_logging()
186
def _ndiff_strings(self, a, b):
187
"""Return ndiff between two strings containing lines.
189
A trailing newline is added if missing to make the strings
191
if b and b[-1] != '\n':
193
if a and a[-1] != '\n':
195
difflines = difflib.ndiff(a.splitlines(True),
197
linejunk=lambda x: False,
198
charjunk=lambda x: False)
199
return ''.join(difflines)
201
def assertEqualDiff(self, a, b):
202
"""Assert two texts are equal, if not raise an exception.
204
This is intended for use with multi-line strings where it can
205
be hard to find the differences by eye.
207
# TODO: perhaps override assertEquals to call this for strings?
210
raise AssertionError("texts not equal:\n" +
211
self._ndiff_strings(a, b))
213
def assertContainsRe(self, haystack, needle_re):
214
"""Assert that a contains something matching a regular expression."""
215
if not re.search(needle_re, haystack):
216
raise AssertionError('pattern "%s" not found in "%s"'
217
% (needle_re, haystack))
219
def _startLogFile(self):
220
"""Send bzr and test log messages to a temporary file.
222
The file is removed as the test is torn down.
224
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
225
self._log_file = os.fdopen(fileno, 'w+')
226
bzrlib.trace.enable_test_log(self._log_file)
227
debug('opened log file %s', name)
228
self._log_file_name = name
229
self.addCleanup(self._finishLogFile)
231
def _finishLogFile(self):
232
"""Finished with the log file.
234
Read contents into memory, close, and delete.
236
bzrlib.trace.disable_test_log()
237
self._log_file.seek(0)
238
self._log_contents = self._log_file.read()
239
self._log_file.close()
240
os.remove(self._log_file_name)
241
self._log_file = self._log_file_name = None
243
def addCleanup(self, callable):
244
"""Arrange to run a callable when this case is torn down.
246
Callables are run in the reverse of the order they are registered,
247
ie last-in first-out.
249
if callable in self._cleanups:
250
raise ValueError("cleanup function %r already registered on %s"
252
self._cleanups.append(callable)
254
def _cleanEnvironment(self):
257
'APPDATA': os.getcwd(),
262
self.addCleanup(self._restoreEnvironment)
263
for name, value in new_env.iteritems():
264
self._captureVar(name, value)
267
def _captureVar(self, name, newvalue):
268
"""Set an environment variable, preparing it to be reset when finished."""
269
self.__old_env[name] = os.environ.get(name, None)
271
if name in os.environ:
274
os.environ[name] = newvalue
277
def _restoreVar(name, value):
279
if name in os.environ:
282
os.environ[name] = value
284
def _restoreEnvironment(self):
285
for name, value in self.__old_env.iteritems():
286
self._restoreVar(name, value)
290
unittest.TestCase.tearDown(self)
292
def _runCleanups(self):
293
"""Run registered cleanup functions.
295
This should only be called from TestCase.tearDown.
297
for callable in reversed(self._cleanups):
300
def log(self, *args):
304
"""Return as a string the log for this test"""
305
if self._log_file_name:
306
return open(self._log_file_name).read()
308
return self._log_contents
310
def capture(self, cmd, retcode=0):
311
"""Shortcut that splits cmd into words, runs, and returns stdout"""
312
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
314
def run_bzr_captured(self, argv, retcode=0):
315
"""Invoke bzr and return (stdout, stderr).
317
Useful for code that wants to check the contents of the
318
output, the way error messages are presented, etc.
320
This should be the main method for tests that want to exercise the
321
overall behavior of the bzr application (rather than a unit test
322
or a functional test of the library.)
324
Much of the old code runs bzr by forking a new copy of Python, but
325
that is slower, harder to debug, and generally not necessary.
327
This runs bzr through the interface that catches and reports
328
errors, and with logging set to something approximating the
329
default, so that error reporting can be checked.
331
argv -- arguments to invoke bzr
332
retcode -- expected return code, or None for don't-care.
336
self.log('run bzr: %s', ' '.join(argv))
337
handler = logging.StreamHandler(stderr)
338
handler.setFormatter(bzrlib.trace.QuietFormatter())
339
handler.setLevel(logging.INFO)
340
logger = logging.getLogger('')
341
logger.addHandler(handler)
343
result = self.apply_redirected(None, stdout, stderr,
344
bzrlib.commands.run_bzr_catch_errors,
347
logger.removeHandler(handler)
348
out = stdout.getvalue()
349
err = stderr.getvalue()
351
self.log('output:\n%s', out)
353
self.log('errors:\n%s', err)
354
if retcode is not None:
355
self.assertEquals(result, retcode)
358
def run_bzr(self, *args, **kwargs):
359
"""Invoke bzr, as if it were run from the command line.
361
This should be the main method for tests that want to exercise the
362
overall behavior of the bzr application (rather than a unit test
363
or a functional test of the library.)
365
This sends the stdout/stderr results into the test's log,
366
where it may be useful for debugging. See also run_captured.
368
retcode = kwargs.pop('retcode', 0)
369
return self.run_bzr_captured(args, retcode)
371
def check_inventory_shape(self, inv, shape):
372
"""Compare an inventory to a list of expected names.
374
Fail if they are not precisely equal.
377
shape = list(shape) # copy
378
for path, ie in inv.entries():
379
name = path.replace('\\', '/')
387
self.fail("expected paths not found in inventory: %r" % shape)
389
self.fail("unexpected paths found in inventory: %r" % extras)
391
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
392
a_callable=None, *args, **kwargs):
393
"""Call callable with redirected std io pipes.
395
Returns the return code."""
396
if not callable(a_callable):
397
raise ValueError("a_callable must be callable.")
401
if hasattr(self, "_log_file"):
402
stdout = self._log_file
406
if hasattr(self, "_log_file"):
407
stderr = self._log_file
410
real_stdin = sys.stdin
411
real_stdout = sys.stdout
412
real_stderr = sys.stderr
417
return a_callable(*args, **kwargs)
419
sys.stdout = real_stdout
420
sys.stderr = real_stderr
421
sys.stdin = real_stdin
424
BzrTestBase = TestCase
427
class TestCaseInTempDir(TestCase):
428
"""Derived class that runs a test within a temporary directory.
430
This is useful for tests that need to create a branch, etc.
432
The directory is created in a slightly complex way: for each
433
Python invocation, a new temporary top-level directory is created.
434
All test cases create their own directory within that. If the
435
tests complete successfully, the directory is removed.
437
InTempDir is an old alias for FunctionalTestCase.
442
OVERRIDE_PYTHON = 'python'
444
def check_file_contents(self, filename, expect):
445
self.log("check contents of file %s" % filename)
446
contents = file(filename, 'r').read()
447
if contents != expect:
448
self.log("expected: %r" % expect)
449
self.log("actually: %r" % contents)
450
self.fail("contents of %s not as expected" % filename)
452
def _make_test_root(self):
453
if TestCaseInTempDir.TEST_ROOT is not None:
457
root = u'test%04d.tmp' % i
461
if e.errno == errno.EEXIST:
466
# successfully created
467
TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
469
# make a fake bzr directory there to prevent any tests propagating
470
# up onto the source directory's real branch
471
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
474
super(TestCaseInTempDir, self).setUp()
475
self._make_test_root()
476
_currentdir = os.getcwdu()
477
short_id = self.id().replace('bzrlib.selftest.', '') \
478
.replace('__main__.', '')
479
self.test_dir = os.path.join(self.TEST_ROOT, short_id)
480
os.mkdir(self.test_dir)
481
os.chdir(self.test_dir)
482
os.environ['HOME'] = self.test_dir
483
def _leaveDirectory():
484
os.chdir(_currentdir)
485
self.addCleanup(_leaveDirectory)
487
def build_tree(self, shape, line_endings='native'):
488
"""Build a test tree according to a pattern.
490
shape is a sequence of file specifications. If the final
491
character is '/', a directory is created.
493
This doesn't add anything to a branch.
494
:param line_endings: Either 'binary' or 'native'
495
in binary mode, exact contents are written
496
in native mode, the line endings match the
497
default platform endings.
499
# XXX: It's OK to just create them using forward slashes on windows?
501
self.assert_(isinstance(name, basestring))
505
if line_endings == 'binary':
507
elif line_endings == 'native':
510
raise BzrError('Invalid line ending request %r' % (line_endings,))
511
print >>f, "contents of", name
514
def build_tree_contents(self, shape):
515
bzrlib.selftest.build_tree_contents(shape)
517
def failUnlessExists(self, path):
518
"""Fail unless path, which may be abs or relative, exists."""
519
self.failUnless(osutils.lexists(path))
521
def assertFileEqual(self, content, path):
522
"""Fail if path does not contain 'content'."""
523
self.failUnless(osutils.lexists(path))
524
self.assertEqualDiff(content, open(path, 'r').read())
527
class MetaTestLog(TestCase):
528
def test_logging(self):
529
"""Test logs are captured when a test fails."""
530
self.log('a test message')
531
self.assertContainsRe(self._get_log(), 'a test message\n')
534
def filter_suite_by_re(suite, pattern):
535
result = TestUtil.TestSuite()
536
filter_re = re.compile(pattern)
537
for test in iter_suite_tests(suite):
538
if filter_re.search(test.id()):
543
def run_suite(suite, name='test', verbose=False, pattern=".*",
544
stop_on_failure=False, keep_output=False):
545
TestCaseInTempDir._TEST_NAME = name
550
runner = TextTestRunner(stream=sys.stdout,
553
runner.stop_on_failure=stop_on_failure
555
suite = filter_suite_by_re(suite, pattern)
556
result = runner.run(suite)
557
# This is still a little bogus,
558
# but only a little. Folk not using our testrunner will
559
# have to delete their temp directories themselves.
560
if result.wasSuccessful() or not keep_output:
561
if TestCaseInTempDir.TEST_ROOT is not None:
562
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
564
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
565
return result.wasSuccessful()
568
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
570
"""Run the whole test suite under the enhanced runner"""
571
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
572
stop_on_failure=stop_on_failure, keep_output=keep_output)
576
"""Build and return TestSuite for the whole program."""
577
import bzrlib.store, bzrlib.inventory, bzrlib.branch
578
import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
579
from doctest import DocTestSuite
581
global MODULES_TO_TEST, MODULES_TO_DOCTEST
583
# FIXME: If these fail to load, e.g. because of a syntax error, the
584
# exception is hidden by unittest. Sucks. Should either fix that or
585
# perhaps import them and pass them to unittest as modules.
587
['bzrlib.selftest.MetaTestLog',
588
'bzrlib.selftest.testapi',
589
'bzrlib.selftest.testgpg',
590
'bzrlib.selftest.testidentitymap',
591
'bzrlib.selftest.testinv',
592
'bzrlib.selftest.test_ancestry',
593
'bzrlib.selftest.test_commit',
594
'bzrlib.selftest.test_command',
595
'bzrlib.selftest.test_commit_merge',
596
'bzrlib.selftest.testconfig',
597
'bzrlib.selftest.versioning',
598
'bzrlib.selftest.testmerge3',
599
'bzrlib.selftest.testmerge',
600
'bzrlib.selftest.testhashcache',
601
'bzrlib.selftest.teststatus',
602
'bzrlib.selftest.testlog',
603
'bzrlib.selftest.testrevisionnamespaces',
604
'bzrlib.selftest.testbranch',
605
'bzrlib.selftest.testrevision',
606
'bzrlib.selftest.test_revision_info',
607
'bzrlib.selftest.test_merge_core',
608
'bzrlib.selftest.test_smart_add',
609
'bzrlib.selftest.test_bad_files',
610
'bzrlib.selftest.testdiff',
611
'bzrlib.selftest.test_parent',
612
'bzrlib.selftest.test_xml',
613
'bzrlib.selftest.test_weave',
614
'bzrlib.selftest.testfetch',
615
'bzrlib.selftest.whitebox',
616
'bzrlib.selftest.teststore',
617
'bzrlib.selftest.blackbox',
618
'bzrlib.selftest.testsampler',
619
'bzrlib.selftest.testtransactions',
620
'bzrlib.selftest.testtransport',
621
'bzrlib.selftest.testsftp',
622
'bzrlib.selftest.testgraph',
623
'bzrlib.selftest.testworkingtree',
624
'bzrlib.selftest.test_upgrade',
625
'bzrlib.selftest.test_conflicts',
626
'bzrlib.selftest.testtestament',
627
'bzrlib.selftest.testannotate',
628
'bzrlib.selftest.testrevprops',
629
'bzrlib.selftest.testoptions',
630
'bzrlib.selftest.testhttp',
631
'bzrlib.selftest.testnonascii',
632
'bzrlib.selftest.testreweave',
633
'bzrlib.selftest.testtsort',
634
'bzrlib.selftest.testtrace',
637
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
638
bzrlib.osutils, bzrlib.commands, bzrlib.merge3,
641
if m not in MODULES_TO_DOCTEST:
642
MODULES_TO_DOCTEST.append(m)
644
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
645
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
648
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
649
for m in MODULES_TO_TEST:
650
suite.addTest(TestLoader().loadTestsFromModule(m))
651
for m in (MODULES_TO_DOCTEST):
652
suite.addTest(DocTestSuite(m))
653
for p in bzrlib.plugin.all_plugins:
654
if hasattr(p, 'test_suite'):
655
suite.addTest(p.test_suite())