1
# Copyright (C) 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
30
import bzrlib.commands
33
import bzrlib.osutils as osutils
34
from bzrlib.selftest import TestUtil
35
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
36
from bzrlib.selftest.treeshape import build_tree_contents
39
MODULES_TO_DOCTEST = []
41
from logging import debug, warning, error
45
class EarlyStoppingTestResultAdapter(object):
46
"""An adapter for TestResult to stop at the first first failure or error"""
48
def __init__(self, result):
51
def addError(self, test, err):
52
self._result.addError(test, err)
55
def addFailure(self, test, err):
56
self._result.addFailure(test, err)
59
def __getattr__(self, name):
60
return getattr(self._result, name)
62
def __setattr__(self, name, value):
64
object.__setattr__(self, name, value)
65
return setattr(self._result, name, value)
68
class _MyResult(unittest._TextTestResult):
72
No special behaviour for now.
75
def _elapsedTime(self):
76
return "(Took %.3fs)" % (time.time() - self._start_time)
78
def startTest(self, test):
79
unittest.TestResult.startTest(self, test)
80
# TODO: Maybe show test.shortDescription somewhere?
81
what = test.shortDescription() or test.id()
83
self.stream.write('%-70.70s' % what)
85
self._start_time = time.time()
87
def addError(self, test, err):
88
unittest.TestResult.addError(self, test, err)
90
self.stream.writeln("ERROR %s" % self._elapsedTime())
92
self.stream.write('E')
95
def addFailure(self, test, err):
96
unittest.TestResult.addFailure(self, test, err)
98
self.stream.writeln("FAIL %s" % self._elapsedTime())
100
self.stream.write('F')
103
def addSuccess(self, test):
105
self.stream.writeln('OK %s' % self._elapsedTime())
107
self.stream.write('~')
109
unittest.TestResult.addSuccess(self, test)
111
def printErrorList(self, flavour, errors):
112
for test, err in errors:
113
self.stream.writeln(self.separator1)
114
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
115
if hasattr(test, '_get_log'):
116
self.stream.writeln()
117
self.stream.writeln('log from this test:')
118
print >>self.stream, test._get_log()
119
self.stream.writeln(self.separator2)
120
self.stream.writeln("%s" % err)
123
class TextTestRunner(unittest.TextTestRunner):
124
stop_on_failure = False
126
def _makeResult(self):
127
result = _MyResult(self.stream, self.descriptions, self.verbosity)
128
if self.stop_on_failure:
129
result = EarlyStoppingTestResultAdapter(result)
133
def iter_suite_tests(suite):
134
"""Return all tests in a suite, recursing through nested suites"""
135
for item in suite._tests:
136
if isinstance(item, unittest.TestCase):
138
elif isinstance(item, unittest.TestSuite):
139
for r in iter_suite_tests(item):
142
raise Exception('unknown object %r inside test suite %r'
146
class TestSkipped(Exception):
147
"""Indicates that a test was intentionally skipped, rather than failing."""
151
class CommandFailed(Exception):
154
class TestCase(unittest.TestCase):
155
"""Base class for bzr unit tests.
157
Tests that need access to disk resources should subclass
158
TestCaseInTempDir not TestCase.
160
Error and debug log messages are redirected from their usual
161
location into a temporary file, the contents of which can be
162
retrieved by _get_log().
164
There are also convenience functions to invoke bzr's command-line
165
routine, and to build and check bzr trees."""
168
_log_file_name = None
171
unittest.TestCase.setUp(self)
172
self.oldenv = os.environ.get('HOME', None)
173
os.environ['HOME'] = os.getcwd()
174
self.bzr_email = os.environ.get('BZREMAIL')
175
if self.bzr_email is not None:
176
del os.environ['BZREMAIL']
177
self.email = os.environ.get('EMAIL')
178
if self.email is not None:
179
del os.environ['EMAIL']
180
bzrlib.trace.disable_default_logging()
181
self._enable_file_logging()
183
def _ndiff_strings(self, a, b):
184
"""Return ndiff between two strings containing lines.
186
A trailing newline is added if missing to make the strings
188
if b and b[-1] != '\n':
190
if a and a[-1] != '\n':
192
difflines = difflib.ndiff(a.splitlines(True),
194
linejunk=lambda x: False,
195
charjunk=lambda x: False)
196
return ''.join(difflines)
198
def assertEqualDiff(self, a, b):
199
"""Assert two texts are equal, if not raise an exception.
201
This is intended for use with multi-line strings where it can
202
be hard to find the differences by eye.
204
# TODO: perhaps override assertEquals to call this for strings?
207
raise AssertionError("texts not equal:\n" +
208
self._ndiff_strings(a, b))
210
def assertContainsRe(self, haystack, needle_re):
211
"""Assert that a contains something matching a regular expression."""
212
if not re.search(needle_re, haystack):
213
raise AssertionError('pattern "%s" not found in "%s"'
214
% (needle_re, haystack))
216
def _enable_file_logging(self):
217
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
219
self._log_file = os.fdopen(fileno, 'w+')
221
hdlr = logging.StreamHandler(self._log_file)
222
hdlr.setLevel(logging.DEBUG)
223
hdlr.setFormatter(logging.Formatter('%(levelname)8s %(message)s'))
224
logging.getLogger('').addHandler(hdlr)
225
logging.getLogger('').setLevel(logging.DEBUG)
226
self._log_hdlr = hdlr
227
debug('opened log file %s', name)
229
self._log_file_name = name
232
os.environ['HOME'] = self.oldenv
233
if os.environ.get('BZREMAIL') is not None:
234
del os.environ['BZREMAIL']
235
if self.bzr_email is not None:
236
os.environ['BZREMAIL'] = self.bzr_email
237
if os.environ.get('EMAIL') is not None:
238
del os.environ['EMAIL']
239
if self.email is not None:
240
os.environ['EMAIL'] = self.email
241
logging.getLogger('').removeHandler(self._log_hdlr)
242
bzrlib.trace.enable_default_logging()
243
logging.debug('%s teardown', self.id())
244
self._log_file.close()
245
unittest.TestCase.tearDown(self)
247
def log(self, *args):
251
"""Return as a string the log for this test"""
252
if self._log_file_name:
253
return open(self._log_file_name).read()
257
def capture(self, cmd):
258
"""Shortcut that splits cmd into words, runs, and returns stdout"""
259
return self.run_bzr_captured(cmd.split())[0]
261
def run_bzr_captured(self, argv, retcode=0):
262
"""Invoke bzr and return (result, stdout, stderr).
264
Useful for code that wants to check the contents of the
265
output, the way error messages are presented, etc.
267
This should be the main method for tests that want to exercise the
268
overall behavior of the bzr application (rather than a unit test
269
or a functional test of the library.)
271
Much of the old code runs bzr by forking a new copy of Python, but
272
that is slower, harder to debug, and generally not necessary.
274
This runs bzr through the interface that catches and reports
275
errors, and with logging set to something approximating the
276
default, so that error reporting can be checked.
278
argv -- arguments to invoke bzr
279
retcode -- expected return code, or None for don't-care.
283
self.log('run bzr: %s', ' '.join(argv))
284
handler = logging.StreamHandler(stderr)
285
handler.setFormatter(bzrlib.trace.QuietFormatter())
286
handler.setLevel(logging.INFO)
287
logger = logging.getLogger('')
288
logger.addHandler(handler)
290
result = self.apply_redirected(None, stdout, stderr,
291
bzrlib.commands.run_bzr_catch_errors,
294
logger.removeHandler(handler)
295
out = stdout.getvalue()
296
err = stderr.getvalue()
298
self.log('output:\n%s', out)
300
self.log('errors:\n%s', err)
301
if retcode is not None:
302
self.assertEquals(result, retcode)
305
def run_bzr(self, *args, **kwargs):
306
"""Invoke bzr, as if it were run from the command line.
308
This should be the main method for tests that want to exercise the
309
overall behavior of the bzr application (rather than a unit test
310
or a functional test of the library.)
312
This sends the stdout/stderr results into the test's log,
313
where it may be useful for debugging. See also run_captured.
315
retcode = kwargs.pop('retcode', 0)
316
return self.run_bzr_captured(args, retcode)
318
def check_inventory_shape(self, inv, shape):
319
"""Compare an inventory to a list of expected names.
321
Fail if they are not precisely equal.
324
shape = list(shape) # copy
325
for path, ie in inv.entries():
326
name = path.replace('\\', '/')
334
self.fail("expected paths not found in inventory: %r" % shape)
336
self.fail("unexpected paths found in inventory: %r" % extras)
338
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
339
a_callable=None, *args, **kwargs):
340
"""Call callable with redirected std io pipes.
342
Returns the return code."""
343
if not callable(a_callable):
344
raise ValueError("a_callable must be callable.")
348
if hasattr(self, "_log_file"):
349
stdout = self._log_file
353
if hasattr(self, "_log_file"):
354
stderr = self._log_file
357
real_stdin = sys.stdin
358
real_stdout = sys.stdout
359
real_stderr = sys.stderr
364
return a_callable(*args, **kwargs)
366
sys.stdout = real_stdout
367
sys.stderr = real_stderr
368
sys.stdin = real_stdin
371
BzrTestBase = TestCase
374
class TestCaseInTempDir(TestCase):
375
"""Derived class that runs a test within a temporary directory.
377
This is useful for tests that need to create a branch, etc.
379
The directory is created in a slightly complex way: for each
380
Python invocation, a new temporary top-level directory is created.
381
All test cases create their own directory within that. If the
382
tests complete successfully, the directory is removed.
384
InTempDir is an old alias for FunctionalTestCase.
389
OVERRIDE_PYTHON = 'python'
391
def check_file_contents(self, filename, expect):
392
self.log("check contents of file %s" % filename)
393
contents = file(filename, 'r').read()
394
if contents != expect:
395
self.log("expected: %r" % expect)
396
self.log("actually: %r" % contents)
397
self.fail("contents of %s not as expected" % filename)
399
def _make_test_root(self):
400
if TestCaseInTempDir.TEST_ROOT is not None:
404
root = 'test%04d.tmp' % i
408
if e.errno == errno.EEXIST:
413
# successfully created
414
TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
416
# make a fake bzr directory there to prevent any tests propagating
417
# up onto the source directory's real branch
418
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
421
self._make_test_root()
422
self._currentdir = os.getcwdu()
423
short_id = self.id().replace('bzrlib.selftest.', '') \
424
.replace('__main__.', '')
425
self.test_dir = os.path.join(self.TEST_ROOT, short_id)
426
os.mkdir(self.test_dir)
427
os.chdir(self.test_dir)
428
super(TestCaseInTempDir, self).setUp()
431
os.chdir(self._currentdir)
432
super(TestCaseInTempDir, self).tearDown()
434
def build_tree(self, shape):
435
"""Build a test tree according to a pattern.
437
shape is a sequence of file specifications. If the final
438
character is '/', a directory is created.
440
This doesn't add anything to a branch.
442
# XXX: It's OK to just create them using forward slashes on windows?
444
assert isinstance(name, basestring)
449
print >>f, "contents of", name
452
def build_tree_contents(self, shape):
453
bzrlib.selftest.build_tree_contents(shape)
455
def failUnlessExists(self, path):
456
"""Fail unless path, which may be abs or relative, exists."""
457
self.failUnless(osutils.lexists(path))
459
def assertFileEqual(self, content, path):
460
"""Fail if path does not contain 'content'."""
461
self.failUnless(osutils.lexists(path))
462
self.assertEqualDiff(content, open(path, 'r').read())
465
class MetaTestLog(TestCase):
466
def test_logging(self):
467
"""Test logs are captured when a test fails."""
468
logging.info('an info message')
469
warning('something looks dodgy...')
470
logging.debug('hello, test is running')
474
def filter_suite_by_re(suite, pattern):
475
result = TestUtil.TestSuite()
476
filter_re = re.compile(pattern)
477
for test in iter_suite_tests(suite):
478
if filter_re.search(test.id()):
483
def run_suite(suite, name='test', verbose=False, pattern=".*",
484
stop_on_failure=False):
485
TestCaseInTempDir._TEST_NAME = name
490
runner = TextTestRunner(stream=sys.stdout,
493
runner.stop_on_failure=stop_on_failure
495
suite = filter_suite_by_re(suite, pattern)
496
result = runner.run(suite)
497
# This is still a little bogus,
498
# but only a little. Folk not using our testrunner will
499
# have to delete their temp directories themselves.
500
if result.wasSuccessful():
501
if TestCaseInTempDir.TEST_ROOT is not None:
502
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
504
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
505
return result.wasSuccessful()
508
def selftest(verbose=False, pattern=".*", stop_on_failure=True):
509
"""Run the whole test suite under the enhanced runner"""
510
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
511
stop_on_failure=stop_on_failure)
515
"""Build and return TestSuite for the whole program."""
516
import bzrlib.store, bzrlib.inventory, bzrlib.branch
517
import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
518
from doctest import DocTestSuite
520
global MODULES_TO_TEST, MODULES_TO_DOCTEST
523
['bzrlib.selftest.MetaTestLog',
524
'bzrlib.selftest.testgpg',
525
'bzrlib.selftest.testidentitymap',
526
'bzrlib.selftest.testinv',
527
'bzrlib.selftest.test_ancestry',
528
'bzrlib.selftest.test_commit',
529
'bzrlib.selftest.test_commit_merge',
530
'bzrlib.selftest.testconfig',
531
'bzrlib.selftest.versioning',
532
'bzrlib.selftest.testmerge3',
533
'bzrlib.selftest.testmerge',
534
'bzrlib.selftest.testhashcache',
535
'bzrlib.selftest.teststatus',
536
'bzrlib.selftest.testlog',
537
'bzrlib.selftest.testrevisionnamespaces',
538
'bzrlib.selftest.testbranch',
539
'bzrlib.selftest.testrevision',
540
'bzrlib.selftest.test_revision_info',
541
'bzrlib.selftest.test_merge_core',
542
'bzrlib.selftest.test_smart_add',
543
'bzrlib.selftest.test_bad_files',
544
'bzrlib.selftest.testdiff',
545
'bzrlib.selftest.test_parent',
546
'bzrlib.selftest.test_xml',
547
'bzrlib.selftest.test_weave',
548
'bzrlib.selftest.testfetch',
549
'bzrlib.selftest.whitebox',
550
'bzrlib.selftest.teststore',
551
'bzrlib.selftest.blackbox',
552
'bzrlib.selftest.testsampler',
553
'bzrlib.selftest.testtransactions',
554
'bzrlib.selftest.testtransport',
555
'bzrlib.selftest.testgraph',
556
'bzrlib.selftest.testworkingtree',
557
'bzrlib.selftest.test_upgrade',
558
'bzrlib.selftest.test_conflicts',
559
'bzrlib.selftest.testtestament',
560
'bzrlib.selftest.testannotate',
561
'bzrlib.selftest.testrevprops',
562
'bzrlib.selftest.testoptions',
563
'bzrlib.selftest.testhttp',
564
'bzrlib.selftest.testnonascii',
567
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
568
bzrlib.osutils, bzrlib.commands, bzrlib.merge3,
571
if m not in MODULES_TO_DOCTEST:
572
MODULES_TO_DOCTEST.append(m)
574
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
575
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
578
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
579
for m in MODULES_TO_TEST:
580
suite.addTest(TestLoader().loadTestsFromModule(m))
581
for m in (MODULES_TO_DOCTEST):
582
suite.addTest(DocTestSuite(m))
583
for p in bzrlib.plugin.all_plugins:
584
if hasattr(p, 'test_suite'):
585
suite.addTest(p.test_suite())