~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/__init__.py

[patch] show argv and pwd in exceptions (from ddaa)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import logging
 
20
import unittest
 
21
import tempfile
 
22
import os
 
23
import sys
 
24
import errno
 
25
import subprocess
 
26
import shutil
 
27
import re
 
28
 
 
29
import bzrlib.commands
 
30
import bzrlib.trace
 
31
import bzrlib.fetch
 
32
from bzrlib.selftest import TestUtil
 
33
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
 
34
 
 
35
 
 
36
MODULES_TO_TEST = []
 
37
MODULES_TO_DOCTEST = []
 
38
 
 
39
from logging import debug, warning, error
 
40
 
 
41
 
 
42
 
 
43
class EarlyStoppingTestResultAdapter(object):
 
44
    """An adapter for TestResult to stop at the first first failure or error"""
 
45
 
 
46
    def __init__(self, result):
 
47
        self._result = result
 
48
 
 
49
    def addError(self, test, err):
 
50
        self._result.addError(test, err)
 
51
        self._result.stop()
 
52
 
 
53
    def addFailure(self, test, err):
 
54
        self._result.addFailure(test, err)
 
55
        self._result.stop()
 
56
 
 
57
    def __getattr__(self, name):
 
58
        return getattr(self._result, name)
 
59
 
 
60
    def __setattr__(self, name, value):
 
61
        if name == '_result':
 
62
            object.__setattr__(self, name, value)
 
63
        return setattr(self._result, name, value)
 
64
 
 
65
 
 
66
class _MyResult(unittest._TextTestResult):
 
67
    """
 
68
    Custom TestResult.
 
69
 
 
70
    No special behaviour for now.
 
71
    """
 
72
 
 
73
    def startTest(self, test):
 
74
        unittest.TestResult.startTest(self, test)
 
75
        # TODO: Maybe show test.shortDescription somewhere?
 
76
        what = test.shortDescription() or test.id()        
 
77
        if self.showAll:
 
78
            self.stream.write('%-70.70s' % what)
 
79
        self.stream.flush()
 
80
 
 
81
    def addError(self, test, err):
 
82
        super(_MyResult, self).addError(test, err)
 
83
        self.stream.flush()
 
84
 
 
85
    def addFailure(self, test, err):
 
86
        super(_MyResult, self).addFailure(test, err)
 
87
        self.stream.flush()
 
88
 
 
89
    def addSuccess(self, test):
 
90
        if self.showAll:
 
91
            self.stream.writeln('OK')
 
92
        elif self.dots:
 
93
            self.stream.write('~')
 
94
        self.stream.flush()
 
95
        unittest.TestResult.addSuccess(self, test)
 
96
 
 
97
    def printErrorList(self, flavour, errors):
 
98
        for test, err in errors:
 
99
            self.stream.writeln(self.separator1)
 
100
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
 
101
            if hasattr(test, '_get_log'):
 
102
                self.stream.writeln()
 
103
                self.stream.writeln('log from this test:')
 
104
                print >>self.stream, test._get_log()
 
105
            self.stream.writeln(self.separator2)
 
106
            self.stream.writeln("%s" % err)
 
107
 
 
108
 
 
109
class TextTestRunner(unittest.TextTestRunner):
 
110
 
 
111
    def _makeResult(self):
 
112
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
113
        return EarlyStoppingTestResultAdapter(result)
 
114
 
 
115
 
 
116
def iter_suite_tests(suite):
 
117
    """Return all tests in a suite, recursing through nested suites"""
 
118
    for item in suite._tests:
 
119
        if isinstance(item, unittest.TestCase):
 
120
            yield item
 
121
        elif isinstance(item, unittest.TestSuite):
 
122
            for r in iter_suite_tests(item):
 
123
                yield r
 
124
        else:
 
125
            raise Exception('unknown object %r inside test suite %r'
 
126
                            % (item, suite))
 
127
 
 
128
 
 
129
class TestSkipped(Exception):
 
130
    """Indicates that a test was intentionally skipped, rather than failing."""
 
131
    # XXX: Not used yet
 
132
 
 
133
 
 
134
class CommandFailed(Exception):
 
135
    pass
 
136
 
 
137
class TestCase(unittest.TestCase):
 
138
    """Base class for bzr unit tests.
 
139
    
 
140
    Tests that need access to disk resources should subclass 
 
141
    TestCaseInTempDir not TestCase.
 
142
 
 
143
    Error and debug log messages are redirected from their usual
 
144
    location into a temporary file, the contents of which can be
 
145
    retrieved by _get_log().
 
146
       
 
147
    There are also convenience functions to invoke bzr's command-line
 
148
    routine, and to build and check bzr trees."""
 
149
 
 
150
    BZRPATH = 'bzr'
 
151
 
 
152
    def setUp(self):
 
153
        unittest.TestCase.setUp(self)
 
154
        bzrlib.trace.disable_default_logging()
 
155
        self._enable_file_logging()
 
156
 
 
157
 
 
158
    def _enable_file_logging(self):
 
159
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
160
 
 
161
        self._log_file = os.fdopen(fileno, 'w+')
 
162
 
 
163
        hdlr = logging.StreamHandler(self._log_file)
 
164
        hdlr.setLevel(logging.DEBUG)
 
165
        hdlr.setFormatter(logging.Formatter('%(levelname)8s  %(message)s'))
 
166
        logging.getLogger('').addHandler(hdlr)
 
167
        logging.getLogger('').setLevel(logging.DEBUG)
 
168
        self._log_hdlr = hdlr
 
169
        debug('opened log file %s', name)
 
170
        
 
171
        self._log_file_name = name
 
172
 
 
173
    def tearDown(self):
 
174
        logging.getLogger('').removeHandler(self._log_hdlr)
 
175
        bzrlib.trace.enable_default_logging()
 
176
        logging.debug('%s teardown', self.id())
 
177
        self._log_file.close()
 
178
        unittest.TestCase.tearDown(self)
 
179
 
 
180
    def log(self, *args):
 
181
        logging.debug(*args)
 
182
 
 
183
    def _get_log(self):
 
184
        """Return as a string the log for this test"""
 
185
        return open(self._log_file_name).read()
 
186
 
 
187
 
 
188
    def capture(self, cmd):
 
189
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
190
        return self.run_bzr_captured(cmd.split())[0]
 
191
 
 
192
    def run_bzr_captured(self, argv, retcode=0):
 
193
        """Invoke bzr and return (result, stdout, stderr).
 
194
 
 
195
        Useful for code that wants to check the contents of the
 
196
        output, the way error messages are presented, etc.
 
197
 
 
198
        This should be the main method for tests that want to exercise the
 
199
        overall behavior of the bzr application (rather than a unit test
 
200
        or a functional test of the library.)
 
201
 
 
202
        Much of the old code runs bzr by forking a new copy of Python, but
 
203
        that is slower, harder to debug, and generally not necessary.
 
204
 
 
205
        This runs bzr through the interface that catches and reports
 
206
        errors, and with logging set to something approximating the
 
207
        default, so that error reporting can be checked.
 
208
 
 
209
        argv -- arguments to invoke bzr
 
210
        retcode -- expected return code, or None for don't-care.
 
211
        """
 
212
        stdout = StringIO()
 
213
        stderr = StringIO()
 
214
        self.log('run bzr: %s', ' '.join(argv))
 
215
        handler = logging.StreamHandler(stderr)
 
216
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
217
        handler.setLevel(logging.INFO)
 
218
        logger = logging.getLogger('')
 
219
        logger.addHandler(handler)
 
220
        try:
 
221
            result = self.apply_redirected(None, stdout, stderr,
 
222
                                           bzrlib.commands.run_bzr_catch_errors,
 
223
                                           argv)
 
224
        finally:
 
225
            logger.removeHandler(handler)
 
226
        out = stdout.getvalue()
 
227
        err = stderr.getvalue()
 
228
        if out:
 
229
            self.log('output:\n%s', out)
 
230
        if err:
 
231
            self.log('errors:\n%s', err)
 
232
        if retcode is not None:
 
233
            self.assertEquals(result, retcode)
 
234
        return out, err
 
235
 
 
236
    def run_bzr(self, *args, **kwargs):
 
237
        """Invoke bzr, as if it were run from the command line.
 
238
 
 
239
        This should be the main method for tests that want to exercise the
 
240
        overall behavior of the bzr application (rather than a unit test
 
241
        or a functional test of the library.)
 
242
 
 
243
        This sends the stdout/stderr results into the test's log,
 
244
        where it may be useful for debugging.  See also run_captured.
 
245
        """
 
246
        retcode = kwargs.pop('retcode', 0)
 
247
        return self.run_bzr_captured(args, retcode)
 
248
 
 
249
    def check_inventory_shape(self, inv, shape):
 
250
        """Compare an inventory to a list of expected names.
 
251
 
 
252
        Fail if they are not precisely equal.
 
253
        """
 
254
        extras = []
 
255
        shape = list(shape)             # copy
 
256
        for path, ie in inv.entries():
 
257
            name = path.replace('\\', '/')
 
258
            if ie.kind == 'dir':
 
259
                name = name + '/'
 
260
            if name in shape:
 
261
                shape.remove(name)
 
262
            else:
 
263
                extras.append(name)
 
264
        if shape:
 
265
            self.fail("expected paths not found in inventory: %r" % shape)
 
266
        if extras:
 
267
            self.fail("unexpected paths found in inventory: %r" % extras)
 
268
 
 
269
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
270
                         a_callable=None, *args, **kwargs):
 
271
        """Call callable with redirected std io pipes.
 
272
 
 
273
        Returns the return code."""
 
274
        if not callable(a_callable):
 
275
            raise ValueError("a_callable must be callable.")
 
276
        if stdin is None:
 
277
            stdin = StringIO("")
 
278
        if stdout is None:
 
279
            if hasattr(self, "_log_file"):
 
280
                stdout = self._log_file
 
281
            else:
 
282
                stdout = StringIO()
 
283
        if stderr is None:
 
284
            if hasattr(self, "_log_file"):
 
285
                stderr = self._log_file
 
286
            else:
 
287
                stderr = StringIO()
 
288
        real_stdin = sys.stdin
 
289
        real_stdout = sys.stdout
 
290
        real_stderr = sys.stderr
 
291
        try:
 
292
            sys.stdout = stdout
 
293
            sys.stderr = stderr
 
294
            sys.stdin = stdin
 
295
            return a_callable(*args, **kwargs)
 
296
        finally:
 
297
            sys.stdout = real_stdout
 
298
            sys.stderr = real_stderr
 
299
            sys.stdin = real_stdin
 
300
 
 
301
 
 
302
BzrTestBase = TestCase
 
303
 
 
304
     
 
305
class TestCaseInTempDir(TestCase):
 
306
    """Derived class that runs a test within a temporary directory.
 
307
 
 
308
    This is useful for tests that need to create a branch, etc.
 
309
 
 
310
    The directory is created in a slightly complex way: for each
 
311
    Python invocation, a new temporary top-level directory is created.
 
312
    All test cases create their own directory within that.  If the
 
313
    tests complete successfully, the directory is removed.
 
314
 
 
315
    InTempDir is an old alias for FunctionalTestCase.
 
316
    """
 
317
 
 
318
    TEST_ROOT = None
 
319
    _TEST_NAME = 'test'
 
320
    OVERRIDE_PYTHON = 'python'
 
321
 
 
322
    def check_file_contents(self, filename, expect):
 
323
        self.log("check contents of file %s" % filename)
 
324
        contents = file(filename, 'r').read()
 
325
        if contents != expect:
 
326
            self.log("expected: %r" % expect)
 
327
            self.log("actually: %r" % contents)
 
328
            self.fail("contents of %s not as expected" % filename)
 
329
 
 
330
    def _make_test_root(self):
 
331
        if TestCaseInTempDir.TEST_ROOT is not None:
 
332
            return
 
333
        i = 0
 
334
        while True:
 
335
            root = 'test%04d.tmp' % i
 
336
            try:
 
337
                os.mkdir(root)
 
338
            except OSError, e:
 
339
                if e.errno == errno.EEXIST:
 
340
                    i += 1
 
341
                    continue
 
342
                else:
 
343
                    raise
 
344
            # successfully created
 
345
            TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
 
346
            break
 
347
        # make a fake bzr directory there to prevent any tests propagating
 
348
        # up onto the source directory's real branch
 
349
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
 
350
 
 
351
    def setUp(self):
 
352
        super(TestCaseInTempDir, self).setUp()
 
353
        self._make_test_root()
 
354
        self._currentdir = os.getcwdu()
 
355
        short_id = self.id().replace('bzrlib.selftest.', '') \
 
356
                   .replace('__main__.', '')
 
357
        self.test_dir = os.path.join(self.TEST_ROOT, short_id)
 
358
        os.mkdir(self.test_dir)
 
359
        os.chdir(self.test_dir)
 
360
        
 
361
    def tearDown(self):
 
362
        os.chdir(self._currentdir)
 
363
        super(TestCaseInTempDir, self).tearDown()
 
364
 
 
365
    def build_tree(self, shape):
 
366
        """Build a test tree according to a pattern.
 
367
 
 
368
        shape is a sequence of file specifications.  If the final
 
369
        character is '/', a directory is created.
 
370
 
 
371
        This doesn't add anything to a branch.
 
372
        """
 
373
        # XXX: It's OK to just create them using forward slashes on windows?
 
374
        for name in shape:
 
375
            assert isinstance(name, basestring)
 
376
            if name[-1] == '/':
 
377
                os.mkdir(name[:-1])
 
378
            else:
 
379
                f = file(name, 'wt')
 
380
                print >>f, "contents of", name
 
381
                f.close()
 
382
 
 
383
    def failUnlessExists(self, path):
 
384
        """Fail unless path, which may be abs or relative, exists."""
 
385
        self.failUnless(os.path.exists(path))
 
386
        
 
387
 
 
388
class MetaTestLog(TestCase):
 
389
    def test_logging(self):
 
390
        """Test logs are captured when a test fails."""
 
391
        logging.info('an info message')
 
392
        warning('something looks dodgy...')
 
393
        logging.debug('hello, test is running')
 
394
        ##assert 0
 
395
 
 
396
 
 
397
def filter_suite_by_re(suite, pattern):
 
398
    result = TestUtil.TestSuite()
 
399
    filter_re = re.compile(pattern)
 
400
    for test in iter_suite_tests(suite):
 
401
        if filter_re.match(test.id()):
 
402
            result.addTest(test)
 
403
    return result
 
404
 
 
405
 
 
406
def filter_suite_by_names(suite, wanted_names):
 
407
    """Return a new suite containing only selected tests.
 
408
    
 
409
    Names are considered to match if any name is a substring of the 
 
410
    fully-qualified test id (i.e. the class ."""
 
411
    result = TestSuite()
 
412
    for test in iter_suite_tests(suite):
 
413
        this_id = test.id()
 
414
        for p in wanted_names:
 
415
            if this_id.find(p) != -1:
 
416
                result.addTest(test)
 
417
    return result
 
418
 
 
419
 
 
420
def run_suite(suite, name='test', verbose=False, pattern=".*", testnames=None):
 
421
    TestCaseInTempDir._TEST_NAME = name
 
422
    if verbose:
 
423
        verbosity = 2
 
424
    else:
 
425
        verbosity = 1
 
426
    runner = TextTestRunner(stream=sys.stdout,
 
427
                            descriptions=0,
 
428
                            verbosity=verbosity)
 
429
    if testnames:
 
430
        suite = filter_suite_by_names(suite, testnames)
 
431
    if pattern != '.*':
 
432
        suite = filter_suite_by_re(suite, pattern)
 
433
    result = runner.run(suite)
 
434
    # This is still a little bogus, 
 
435
    # but only a little. Folk not using our testrunner will
 
436
    # have to delete their temp directories themselves.
 
437
    if result.wasSuccessful():
 
438
        if TestCaseInTempDir.TEST_ROOT is not None:
 
439
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
440
    else:
 
441
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
442
    return result.wasSuccessful()
 
443
 
 
444
 
 
445
def selftest(verbose=False, pattern=".*", testnames=None):
 
446
    """Run the whole test suite under the enhanced runner"""
 
447
    return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
 
448
                     testnames=testnames)
 
449
 
 
450
 
 
451
def test_suite():
 
452
    """Build and return TestSuite for the whole program."""
 
453
    import bzrlib.store, bzrlib.inventory, bzrlib.branch
 
454
    import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
 
455
    from doctest import DocTestSuite
 
456
 
 
457
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
 
458
 
 
459
    testmod_names = \
 
460
                  ['bzrlib.selftest.MetaTestLog',
 
461
                   'bzrlib.selftest.testidentitymap',
 
462
                   'bzrlib.selftest.testinv',
 
463
                   'bzrlib.selftest.test_ancestry',
 
464
                   'bzrlib.selftest.test_commit',
 
465
                   'bzrlib.selftest.test_commit_merge',
 
466
                   'bzrlib.selftest.versioning',
 
467
                   'bzrlib.selftest.testmerge3',
 
468
                   'bzrlib.selftest.testmerge',
 
469
                   'bzrlib.selftest.testhashcache',
 
470
                   'bzrlib.selftest.teststatus',
 
471
                   'bzrlib.selftest.testlog',
 
472
                   'bzrlib.selftest.testrevisionnamespaces',
 
473
                   'bzrlib.selftest.testbranch',
 
474
                   'bzrlib.selftest.testrevision',
 
475
                   'bzrlib.selftest.test_revision_info',
 
476
                   'bzrlib.selftest.test_merge_core',
 
477
                   'bzrlib.selftest.test_smart_add',
 
478
                   'bzrlib.selftest.test_bad_files',
 
479
                   'bzrlib.selftest.testdiff',
 
480
                   'bzrlib.selftest.test_parent',
 
481
                   'bzrlib.selftest.test_xml',
 
482
                   'bzrlib.selftest.test_weave',
 
483
                   'bzrlib.selftest.testfetch',
 
484
                   'bzrlib.selftest.whitebox',
 
485
                   'bzrlib.selftest.teststore',
 
486
                   'bzrlib.selftest.blackbox',
 
487
                   'bzrlib.selftest.testsampler',
 
488
                   'bzrlib.selftest.testtransactions',
 
489
                   'bzrlib.selftest.testtransport',
 
490
                   'bzrlib.selftest.testgraph',
 
491
                   'bzrlib.selftest.testworkingtree',
 
492
                   'bzrlib.selftest.test_upgrade',
 
493
                   'bzrlib.selftest.test_conflicts',
 
494
                   ]
 
495
 
 
496
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
 
497
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
 
498
        if m not in MODULES_TO_DOCTEST:
 
499
            MODULES_TO_DOCTEST.append(m)
 
500
 
 
501
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
 
502
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
 
503
    print
 
504
    suite = TestSuite()
 
505
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
 
506
    for m in MODULES_TO_TEST:
 
507
         suite.addTest(TestLoader().loadTestsFromModule(m))
 
508
    for m in (MODULES_TO_DOCTEST):
 
509
        suite.addTest(DocTestSuite(m))
 
510
    for p in bzrlib.plugin.all_plugins:
 
511
        if hasattr(p, 'test_suite'):
 
512
            suite.addTest(p.test_suite())
 
513
    return suite
 
514