15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
24
from testsweet import run_suite
32
25
import bzrlib.commands
33
from bzrlib.errors import BzrError
34
import bzrlib.inventory
35
import bzrlib.iterablefile
38
import bzrlib.osutils as osutils
41
27
import bzrlib.trace
42
from bzrlib.trace import mutter
43
from bzrlib.tests.TestUtil import TestLoader, TestSuite
44
from bzrlib.tests.treeshape import build_tree_contents
46
30
MODULES_TO_TEST = []
47
MODULES_TO_DOCTEST = [
57
def packages_to_test():
58
import bzrlib.tests.blackbox
64
class EarlyStoppingTestResultAdapter(object):
65
"""An adapter for TestResult to stop at the first first failure or error"""
67
def __init__(self, result):
70
def addError(self, test, err):
71
self._result.addError(test, err)
74
def addFailure(self, test, err):
75
self._result.addFailure(test, err)
78
def __getattr__(self, name):
79
return getattr(self._result, name)
81
def __setattr__(self, name, value):
83
object.__setattr__(self, name, value)
84
return setattr(self._result, name, value)
87
class _MyResult(unittest._TextTestResult):
90
Shows output in a different format, including displaying runtime for tests.
93
def _elapsedTime(self):
94
return "%5dms" % (1000 * (time.time() - self._start_time))
96
def startTest(self, test):
97
unittest.TestResult.startTest(self, test)
98
# In a short description, the important words are in
99
# the beginning, but in an id, the important words are
101
SHOW_DESCRIPTIONS = False
103
width = osutils.terminal_width()
104
name_width = width - 15
106
if SHOW_DESCRIPTIONS:
107
what = test.shortDescription()
109
if len(what) > name_width:
110
what = what[:name_width-3] + '...'
113
if what.startswith('bzrlib.tests.'):
115
if len(what) > name_width:
116
what = '...' + what[3-name_width:]
117
what = what.ljust(name_width)
118
self.stream.write(what)
120
self._start_time = time.time()
122
def addError(self, test, err):
123
if isinstance(err[1], TestSkipped):
124
return self.addSkipped(test, err)
125
unittest.TestResult.addError(self, test, err)
127
self.stream.writeln("ERROR %s" % self._elapsedTime())
129
self.stream.write('E')
132
def addFailure(self, test, err):
133
unittest.TestResult.addFailure(self, test, err)
135
self.stream.writeln(" FAIL %s" % self._elapsedTime())
137
self.stream.write('F')
140
def addSuccess(self, test):
142
self.stream.writeln(' OK %s' % self._elapsedTime())
144
self.stream.write('~')
146
unittest.TestResult.addSuccess(self, test)
148
def addSkipped(self, test, skip_excinfo):
150
print >>self.stream, ' SKIP %s' % self._elapsedTime()
151
print >>self.stream, ' %s' % skip_excinfo[1]
153
self.stream.write('S')
155
# seems best to treat this as success from point-of-view of unittest
156
# -- it actually does nothing so it barely matters :)
157
unittest.TestResult.addSuccess(self, test)
159
def printErrorList(self, flavour, errors):
160
for test, err in errors:
161
self.stream.writeln(self.separator1)
162
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
163
if hasattr(test, '_get_log'):
165
print >>self.stream, \
166
('vvvv[log from %s]' % test).ljust(78,'-')
167
print >>self.stream, test._get_log()
168
print >>self.stream, \
169
('^^^^[log from %s]' % test).ljust(78,'-')
170
self.stream.writeln(self.separator2)
171
self.stream.writeln("%s" % err)
174
class TextTestRunner(unittest.TextTestRunner):
175
stop_on_failure = False
177
def _makeResult(self):
178
result = _MyResult(self.stream, self.descriptions, self.verbosity)
179
if self.stop_on_failure:
180
result = EarlyStoppingTestResultAdapter(result)
184
def iter_suite_tests(suite):
185
"""Return all tests in a suite, recursing through nested suites"""
186
for item in suite._tests:
187
if isinstance(item, unittest.TestCase):
189
elif isinstance(item, unittest.TestSuite):
190
for r in iter_suite_tests(item):
193
raise Exception('unknown object %r inside test suite %r'
197
class TestSkipped(Exception):
198
"""Indicates that a test was intentionally skipped, rather than failing."""
202
class CommandFailed(Exception):
31
MODULES_TO_DOCTEST = []
33
from logging import debug, warning, error
205
36
class TestCase(unittest.TestCase):
206
37
"""Base class for bzr unit tests.
208
39
Tests that need access to disk resources should subclass
209
TestCaseInTempDir not TestCase.
40
FunctionalTestCase not TestCase.
211
42
Error and debug log messages are redirected from their usual
212
43
location into a temporary file, the contents of which can be
213
retrieved by _get_log(). We use a real OS file, not an in-memory object,
214
so that it can also capture file IO. When the test completes this file
215
is read into memory and removed from disk.
44
retrieved by _get_log().
217
46
There are also convenience functions to invoke bzr's command-line
218
routine, and to build and check bzr trees.
220
In addition to the usual method of overriding tearDown(), this class also
221
allows subclasses to register functions into the _cleanups list, which is
222
run in order as the object is torn down. It's less likely this will be
223
accidentally overlooked.
47
routine, and to build and check bzr trees."""
227
_log_file_name = None
52
# this replaces the default testsweet.TestCase; we don't want logging changed
231
53
unittest.TestCase.setUp(self)
233
self._cleanEnvironment()
234
54
bzrlib.trace.disable_default_logging()
237
def _ndiff_strings(self, a, b):
238
"""Return ndiff between two strings containing lines.
240
A trailing newline is added if missing to make the strings
242
if b and b[-1] != '\n':
244
if a and a[-1] != '\n':
246
difflines = difflib.ndiff(a.splitlines(True),
248
linejunk=lambda x: False,
249
charjunk=lambda x: False)
250
return ''.join(difflines)
252
def assertEqualDiff(self, a, b):
253
"""Assert two texts are equal, if not raise an exception.
255
This is intended for use with multi-line strings where it can
256
be hard to find the differences by eye.
258
# TODO: perhaps override assertEquals to call this for strings?
261
raise AssertionError("texts not equal:\n" +
262
self._ndiff_strings(a, b))
264
def assertStartsWith(self, s, prefix):
265
if not s.startswith(prefix):
266
raise AssertionError('string %r does not start with %r' % (s, prefix))
268
def assertEndsWith(self, s, suffix):
269
if not s.endswith(prefix):
270
raise AssertionError('string %r does not end with %r' % (s, suffix))
272
def assertContainsRe(self, haystack, needle_re):
273
"""Assert that a contains something matching a regular expression."""
274
if not re.search(needle_re, haystack):
275
raise AssertionError('pattern "%s" not found in "%s"'
276
% (needle_re, haystack))
278
def AssertSubset(self, sublist, superlist):
279
"""Assert that every entry in sublist is present in superlist."""
281
for entry in sublist:
282
if entry not in superlist:
283
missing.append(entry)
285
raise AssertionError("value(s) %r not present in container %r" %
286
(missing, superlist))
288
def assertIs(self, left, right):
289
if not (left is right):
290
raise AssertionError("%r is not %r." % (left, right))
292
def _startLogFile(self):
293
"""Send bzr and test log messages to a temporary file.
295
The file is removed as the test is torn down.
55
self._enable_file_logging()
58
def _enable_file_logging(self):
297
59
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
298
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
299
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
300
bzrlib.trace.enable_test_log(self._log_file)
61
self._log_file = os.fdopen(fileno, 'w+')
63
hdlr = logging.StreamHandler(self._log_file)
64
hdlr.setLevel(logging.DEBUG)
65
hdlr.setFormatter(logging.Formatter('%(levelname)4.4s %(message)s'))
66
logging.getLogger('').addHandler(hdlr)
67
logging.getLogger('').setLevel(logging.DEBUG)
69
debug('opened log file %s', name)
301
71
self._log_file_name = name
302
self.addCleanup(self._finishLogFile)
304
def _finishLogFile(self):
305
"""Finished with the log file.
307
Read contents into memory, close, and delete.
309
bzrlib.trace.disable_test_log()
310
self._log_file.seek(0)
311
self._log_contents = self._log_file.read()
75
logging.getLogger('').removeHandler(self._log_hdlr)
76
bzrlib.trace.enable_default_logging()
77
logging.debug('%s teardown', self.id())
312
78
self._log_file.close()
313
os.remove(self._log_file_name)
314
self._log_file = self._log_file_name = None
316
def addCleanup(self, callable):
317
"""Arrange to run a callable when this case is torn down.
319
Callables are run in the reverse of the order they are registered,
320
ie last-in first-out.
322
if callable in self._cleanups:
323
raise ValueError("cleanup function %r already registered on %s"
325
self._cleanups.append(callable)
327
def _cleanEnvironment(self):
330
'APPDATA': os.getcwd(),
335
self.addCleanup(self._restoreEnvironment)
336
for name, value in new_env.iteritems():
337
self._captureVar(name, value)
340
def _captureVar(self, name, newvalue):
341
"""Set an environment variable, preparing it to be reset when finished."""
342
self.__old_env[name] = os.environ.get(name, None)
344
if name in os.environ:
347
os.environ[name] = newvalue
350
def _restoreVar(name, value):
352
if name in os.environ:
355
os.environ[name] = value
357
def _restoreEnvironment(self):
358
for name, value in self.__old_env.iteritems():
359
self._restoreVar(name, value)
363
79
unittest.TestCase.tearDown(self)
365
def _runCleanups(self):
366
"""Run registered cleanup functions.
368
This should only be called from TestCase.tearDown.
370
for cleanup_fn in reversed(self._cleanups):
373
82
def log(self, *args):
376
85
def _get_log(self):
377
86
"""Return as a string the log for this test"""
378
if self._log_file_name:
379
return open(self._log_file_name).read()
381
return self._log_contents
382
# TODO: Delete the log after it's been read in
384
def capture(self, cmd, retcode=0):
385
"""Shortcut that splits cmd into words, runs, and returns stdout"""
386
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
388
def run_bzr_captured(self, argv, retcode=0):
389
"""Invoke bzr and return (stdout, stderr).
391
Useful for code that wants to check the contents of the
392
output, the way error messages are presented, etc.
87
return open(self._log_file_name).read()
89
def run_bzr(self, *args, **kwargs):
90
"""Invoke bzr, as if it were run from the command line.
394
92
This should be the main method for tests that want to exercise the
395
93
overall behavior of the bzr application (rather than a unit test
522
142
if contents != expect:
523
143
self.log("expected: %r" % expect)
524
144
self.log("actually: %r" % contents)
525
self.fail("contents of %s not as expected" % filename)
145
self.fail("contents of %s not as expected")
527
147
def _make_test_root(self):
528
if TestCaseInTempDir.TEST_ROOT is not None:
152
if FunctionalTestCase.TEST_ROOT is not None:
532
root = u'test%04d.tmp' % i
536
if e.errno == errno.EEXIST:
541
# successfully created
542
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
154
FunctionalTestCase.TEST_ROOT = os.path.abspath(
155
tempfile.mkdtemp(suffix='.tmp',
156
prefix=self._TEST_NAME + '-',
544
159
# make a fake bzr directory there to prevent any tests propagating
545
160
# up onto the source directory's real branch
546
os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
161
os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
549
super(TestCaseInTempDir, self).setUp()
164
super(FunctionalTestCase, self).setUp()
550
166
self._make_test_root()
551
_currentdir = os.getcwdu()
552
short_id = self.id().replace('bzrlib.tests.', '') \
553
.replace('__main__.', '')
554
self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
167
self._currentdir = os.getcwdu()
168
self.test_dir = os.path.join(self.TEST_ROOT, self.id())
555
169
os.mkdir(self.test_dir)
556
170
os.chdir(self.test_dir)
557
os.environ['HOME'] = self.test_dir
558
os.environ['APPDATA'] = self.test_dir
559
def _leaveDirectory():
560
os.chdir(_currentdir)
561
self.addCleanup(_leaveDirectory)
563
def build_tree(self, shape, line_endings='native'):
174
os.chdir(self._currentdir)
175
super(FunctionalTestCase, self).tearDown()
177
def _formcmd(self, cmd):
178
if isinstance(cmd, basestring):
181
cmd[0] = self.BZRPATH
182
if self.OVERRIDE_PYTHON:
183
cmd.insert(0, self.OVERRIDE_PYTHON)
184
self.log('$ %r' % cmd)
187
def runcmd(self, cmd, retcode=0):
188
"""Run one command and check the return code.
190
Returns a tuple of (stdout,stderr) strings.
192
If a single string is based, it is split into words.
193
For commands that are not simple space-separated words, please
194
pass a list instead."""
197
from subprocess import call
198
except ImportError, e:
201
cmd = self._formcmd(cmd)
202
self.log('$ ' + ' '.join(cmd))
203
actual_retcode = call(cmd, stdout=self._log_file, stderr=self._log_file)
204
if retcode != actual_retcode:
205
raise CommandFailed("test failed: %r returned %d, expected %d"
206
% (cmd, actual_retcode, retcode))
208
def backtick(self, cmd, retcode=0):
209
"""Run a command and return its output"""
212
from subprocess import Popen, PIPE
213
except ImportError, e:
217
cmd = self._formcmd(cmd)
218
child = Popen(cmd, stdout=PIPE, stderr=self._log_file)
219
outd, errd = child.communicate()
221
actual_retcode = child.wait()
223
outd = outd.replace('\r', '')
225
if retcode != actual_retcode:
226
raise CommandFailed("test failed: %r returned %d, expected %d"
227
% (cmd, actual_retcode, retcode))
233
def build_tree(self, shape):
564
234
"""Build a test tree according to a pattern.
566
236
shape is a sequence of file specifications. If the final
567
237
character is '/', a directory is created.
569
239
This doesn't add anything to a branch.
570
:param line_endings: Either 'binary' or 'native'
571
in binary mode, exact contents are written
572
in native mode, the line endings match the
573
default platform endings.
575
241
# XXX: It's OK to just create them using forward slashes on windows?
576
243
for name in shape:
577
self.assert_(isinstance(name, basestring))
244
assert isinstance(name, basestring)
578
245
if name[-1] == '/':
579
246
os.mkdir(name[:-1])
581
if line_endings == 'binary':
583
elif line_endings == 'native':
586
raise BzrError('Invalid line ending request %r' % (line_endings,))
587
249
print >>f, "contents of", name
590
def build_tree_contents(self, shape):
591
build_tree_contents(shape)
593
def failUnlessExists(self, path):
594
"""Fail unless path, which may be abs or relative, exists."""
595
self.failUnless(osutils.lexists(path))
597
def failIfExists(self, path):
598
"""Fail if path, which may be abs or relative, exists."""
599
self.failIf(osutils.lexists(path))
601
def assertFileEqual(self, content, path):
602
"""Fail if path does not contain 'content'."""
603
self.failUnless(osutils.lexists(path))
604
self.assertEqualDiff(content, open(path, 'r').read())
607
def filter_suite_by_re(suite, pattern):
609
filter_re = re.compile(pattern)
610
for test in iter_suite_tests(suite):
611
if filter_re.search(test.id()):
616
def run_suite(suite, name='test', verbose=False, pattern=".*",
617
stop_on_failure=False, keep_output=False):
618
TestCaseInTempDir._TEST_NAME = name
623
runner = TextTestRunner(stream=sys.stdout,
626
runner.stop_on_failure=stop_on_failure
628
suite = filter_suite_by_re(suite, pattern)
629
result = runner.run(suite)
630
# This is still a little bogus,
631
# but only a little. Folk not using our testrunner will
632
# have to delete their temp directories themselves.
633
if result.wasSuccessful() or not keep_output:
634
if TestCaseInTempDir.TEST_ROOT is not None:
635
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
637
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
638
return result.wasSuccessful()
641
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
643
"""Run the whole test suite under the enhanced runner"""
644
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
645
stop_on_failure=stop_on_failure, keep_output=keep_output)
253
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
254
a_callable=None, *args, **kwargs):
255
"""Call callable with redirected std io pipes.
257
Returns the return code."""
258
from StringIO import StringIO
259
if not callable(a_callable):
260
raise ValueError("a_callable must be callable.")
264
stdout = self._log_file
266
stderr = self._log_file
267
real_stdin = sys.stdin
268
real_stdout = sys.stdout
269
real_stderr = sys.stderr
275
result = a_callable(*args, **kwargs)
277
sys.stdout = real_stdout
278
sys.stderr = real_stderr
279
sys.stdin = real_stdin
283
InTempDir = FunctionalTestCase
286
class MetaTestLog(TestCase):
287
def test_logging(self):
288
"""Test logs are captured when a test fails."""
289
logging.info('an info message')
290
warning('something looks dodgy...')
291
logging.debug('hello, test is running')
295
def selftest(verbose=False, pattern=".*"):
296
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
648
299
def test_suite():
649
"""Build and return TestSuite for the whole program."""
300
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
301
import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
302
import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
650
303
from doctest import DocTestSuite
652
global MODULES_TO_DOCTEST
655
'bzrlib.tests.test_ancestry',
656
'bzrlib.tests.test_annotate',
657
'bzrlib.tests.test_api',
658
'bzrlib.tests.test_bad_files',
659
'bzrlib.tests.test_basis_inventory',
660
'bzrlib.tests.test_branch',
661
'bzrlib.tests.test_command',
662
'bzrlib.tests.test_commit',
663
'bzrlib.tests.test_commit_merge',
664
'bzrlib.tests.test_config',
665
'bzrlib.tests.test_conflicts',
666
'bzrlib.tests.test_diff',
667
'bzrlib.tests.test_fetch',
668
'bzrlib.tests.test_gpg',
669
'bzrlib.tests.test_graph',
670
'bzrlib.tests.test_hashcache',
671
'bzrlib.tests.test_http',
672
'bzrlib.tests.test_identitymap',
673
'bzrlib.tests.test_inv',
674
'bzrlib.tests.test_lockable_files',
675
'bzrlib.tests.test_log',
676
'bzrlib.tests.test_merge',
677
'bzrlib.tests.test_merge3',
678
'bzrlib.tests.test_merge_core',
679
'bzrlib.tests.test_missing',
680
'bzrlib.tests.test_msgeditor',
681
'bzrlib.tests.test_nonascii',
682
'bzrlib.tests.test_options',
683
'bzrlib.tests.test_osutils',
684
'bzrlib.tests.test_parent',
685
'bzrlib.tests.test_permissions',
686
'bzrlib.tests.test_plugins',
687
'bzrlib.tests.test_remove',
688
'bzrlib.tests.test_revision',
689
'bzrlib.tests.test_revisionnamespaces',
690
'bzrlib.tests.test_revprops',
691
'bzrlib.tests.test_reweave',
692
'bzrlib.tests.test_rio',
693
'bzrlib.tests.test_sampler',
694
'bzrlib.tests.test_selftest',
695
'bzrlib.tests.test_setup',
696
'bzrlib.tests.test_sftp_transport',
697
'bzrlib.tests.test_smart_add',
698
'bzrlib.tests.test_source',
699
'bzrlib.tests.test_status',
700
'bzrlib.tests.test_store',
701
'bzrlib.tests.test_testament',
702
'bzrlib.tests.test_trace',
703
'bzrlib.tests.test_transactions',
704
'bzrlib.tests.test_transport',
705
'bzrlib.tests.test_tsort',
706
'bzrlib.tests.test_ui',
707
'bzrlib.tests.test_uncommit',
708
'bzrlib.tests.test_upgrade',
709
'bzrlib.tests.test_weave',
710
'bzrlib.tests.test_whitebox',
711
'bzrlib.tests.test_workingtree',
712
'bzrlib.tests.test_xml',
309
global MODULES_TO_TEST, MODULES_TO_DOCTEST
312
['bzrlib.selftest.MetaTestLog',
313
'bzrlib.selftest.testinv',
314
'bzrlib.selftest.testfetch',
315
'bzrlib.selftest.versioning',
316
'bzrlib.selftest.whitebox',
317
'bzrlib.selftest.testmerge3',
318
'bzrlib.selftest.testhashcache',
319
'bzrlib.selftest.teststatus',
320
'bzrlib.selftest.testlog',
321
'bzrlib.selftest.blackbox',
322
'bzrlib.selftest.testrevisionnamespaces',
323
'bzrlib.selftest.testbranch',
324
'bzrlib.selftest.testrevision',
325
'bzrlib.selftest.test_merge_core',
326
'bzrlib.selftest.test_smart_add',
327
'bzrlib.selftest.testdiff',
715
TestCase.BZRPATH = osutils.pathjoin(
716
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
717
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
718
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
331
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
332
bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
333
if m not in MODULES_TO_DOCTEST:
334
MODULES_TO_DOCTEST.append(m)
336
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
337
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
720
339
suite = TestSuite()
721
# python2.4's TestLoader.loadTestsFromNames gives very poor
722
# errors if it fails to load a named module - no indication of what's
723
# actually wrong, just "no such module". We should probably override that
724
# class, but for the moment just load them ourselves. (mbp 20051202)
725
loader = TestLoader()
726
for mod_name in testmod_names:
727
mod = _load_module_by_name(mod_name)
728
suite.addTest(loader.loadTestsFromModule(mod))
729
for package in packages_to_test():
730
suite.addTest(package.test_suite())
340
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
731
341
for m in MODULES_TO_TEST:
732
suite.addTest(loader.loadTestsFromModule(m))
342
suite.addTest(TestLoader().loadTestsFromModule(m))
733
343
for m in (MODULES_TO_DOCTEST):
734
344
suite.addTest(DocTestSuite(m))
735
for name, plugin in bzrlib.plugin.all_plugins().items():
736
if hasattr(plugin, 'test_suite'):
737
suite.addTest(plugin.test_suite())
345
for p in bzrlib.plugin.all_plugins:
346
if hasattr(p, 'test_suite'):
347
suite.addTest(p.test_suite())
741
def _load_module_by_name(mod_name):
742
parts = mod_name.split('.')
743
module = __import__(mod_name)
745
# for historical reasons python returns the top-level module even though
746
# it loads the submodule; we need to walk down to get the one we want.
748
module = getattr(module, parts.pop(0))