~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Robert Collins
  • Date: 2005-10-06 22:15:52 UTC
  • mfrom: (1185.13.2)
  • mto: This revision was merged to the branch mainline in revision 1420.
  • Revision ID: robertc@robertcollins.net-20051006221552-9b15c96fa504e0ad
mergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
 
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
 
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
"""Enhanced layer on unittest.
19
 
 
20
 
This does several things:
21
 
 
22
 
* nicer reporting as tests run
23
 
 
24
 
* test code can log messages into a buffer that is recorded to disk
25
 
  and displayed if the test fails
26
 
 
27
 
* tests can be run in a separate directory, which is useful for code that
28
 
  wants to create files
29
 
 
30
 
* utilities to run external commands and check their return code
31
 
  and/or output
32
 
 
33
 
Test cases should normally subclass testsweet.TestCase.  The test runner should
34
 
call runsuite().
35
 
 
36
 
This is meant to become independent of bzr, though that's not quite
37
 
true yet.
38
 
"""  
39
 
 
40
 
import unittest
41
 
import sys
42
 
from bzrlib.selftest import TestUtil
43
 
 
44
 
# XXX: Don't need this anymore now we depend on python2.4
45
 
def _need_subprocess():
46
 
    sys.stderr.write("sorry, this test suite requires the subprocess module\n"
47
 
                     "this is shipped with python2.4 and available separately for 2.3\n")
48
 
    
49
 
 
50
 
class CommandFailed(Exception):
51
 
    pass
52
 
 
53
 
 
54
 
class TestSkipped(Exception):
55
 
    """Indicates that a test was intentionally skipped, rather than failing."""
56
 
    # XXX: Not used yet
57
 
 
58
 
 
59
 
class TestCase(unittest.TestCase):
60
 
    """Base class for bzr unit tests.
61
 
    
62
 
    Tests that need access to disk resources should subclass 
63
 
    FunctionalTestCase not TestCase.
64
 
    """
65
 
    
66
 
    # TODO: Special methods to invoke bzr, so that we can run it
67
 
    # through a specified Python intepreter
68
 
 
69
 
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
70
 
    BZRPATH = 'bzr'
71
 
 
72
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
73
 
                         a_callable=None, *args, **kwargs):
74
 
        """Call callable with redirected std io pipes.
75
 
 
76
 
        Returns the return code."""
77
 
        from StringIO import StringIO
78
 
        if not callable(a_callable):
79
 
            raise ValueError("a_callable must be callable.")
80
 
        if stdin is None:
81
 
            stdin = StringIO("")
82
 
        if stdout is None:
83
 
            stdout = self.TEST_LOG
84
 
        if stderr is None:
85
 
            stderr = self.TEST_LOG
86
 
        real_stdin = sys.stdin
87
 
        real_stdout = sys.stdout
88
 
        real_stderr = sys.stderr
89
 
        result = None
90
 
        try:
91
 
            sys.stdout = stdout
92
 
            sys.stderr = stderr
93
 
            sys.stdin = stdin
94
 
            result = a_callable(*args, **kwargs)
95
 
        finally:
96
 
            sys.stdout = real_stdout
97
 
            sys.stderr = real_stderr
98
 
            sys.stdin = real_stdin
99
 
        return result
100
 
 
101
 
    def setUp(self):
102
 
        super(TestCase, self).setUp()
103
 
        # setup a temporary log for the test 
104
 
        import tempfile
105
 
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
106
 
        self.log("%s setup" % self.id())
107
 
 
108
 
    def tearDown(self):
109
 
        self.log("%s teardown" % self.id())
110
 
        self.log('')
111
 
        super(TestCase, self).tearDown()
112
 
 
113
 
    def log(self, msg):
114
 
        """Log a message to a progress file"""
115
 
        print >>self.TEST_LOG, msg
116
 
 
117
 
    def check_inventory_shape(self, inv, shape):
118
 
        """
119
 
        Compare an inventory to a list of expected names.
120
 
 
121
 
        Fail if they are not precisely equal.
122
 
        """
123
 
        extras = []
124
 
        shape = list(shape)             # copy
125
 
        for path, ie in inv.entries():
126
 
            name = path.replace('\\', '/')
127
 
            if ie.kind == 'dir':
128
 
                name = name + '/'
129
 
            if name in shape:
130
 
                shape.remove(name)
131
 
            else:
132
 
                extras.append(name)
133
 
        if shape:
134
 
            self.fail("expected paths not found in inventory: %r" % shape)
135
 
        if extras:
136
 
            self.fail("unexpected paths found in inventory: %r" % extras)
137
 
     
138
 
    def _get_log(self):
139
 
        """Get the log the test case used. This can only be called once,
140
 
        after which an exception will be raised.
141
 
        """
142
 
        self.TEST_LOG.flush()
143
 
        log = open(self.TEST_LOG.name, 'rt').read()
144
 
        self.TEST_LOG.close()
145
 
        return log
146
 
 
147
 
 
148
 
class FunctionalTestCase(TestCase):
149
 
    """Base class for tests that perform function testing - running bzr,
150
 
    using files on disk, and similar activities.
151
 
 
152
 
    InTempDir is an old alias for FunctionalTestCase.
153
 
    """
154
 
 
155
 
    TEST_ROOT = None
156
 
    _TEST_NAME = 'test'
157
 
 
158
 
    def check_file_contents(self, filename, expect):
159
 
        self.log("check contents of file %s" % filename)
160
 
        contents = file(filename, 'r').read()
161
 
        if contents != expect:
162
 
            self.log("expected: %r" % expect)
163
 
            self.log("actually: %r" % contents)
164
 
            self.fail("contents of %s not as expected")
165
 
 
166
 
    def _make_test_root(self):
167
 
        import os
168
 
        import shutil
169
 
        import tempfile
170
 
        
171
 
        if FunctionalTestCase.TEST_ROOT is not None:
172
 
            return
173
 
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
174
 
                                 tempfile.mkdtemp(suffix='.tmp',
175
 
                                                  prefix=self._TEST_NAME + '-',
176
 
                                                  dir=os.curdir))
177
 
    
178
 
        # make a fake bzr directory there to prevent any tests propagating
179
 
        # up onto the source directory's real branch
180
 
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
181
 
 
182
 
    def setUp(self):
183
 
        super(FunctionalTestCase, self).setUp()
184
 
        import os
185
 
        self._make_test_root()
186
 
        self._currentdir = os.getcwdu()
187
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
188
 
        os.mkdir(self.test_dir)
189
 
        os.chdir(self.test_dir)
190
 
        
191
 
    def tearDown(self):
192
 
        import os
193
 
        os.chdir(self._currentdir)
194
 
        super(FunctionalTestCase, self).tearDown()
195
 
 
196
 
    def formcmd(self, cmd):
197
 
        if isinstance(cmd, basestring):
198
 
            cmd = cmd.split()
199
 
        if cmd[0] == 'bzr':
200
 
            cmd[0] = self.BZRPATH
201
 
            if self.OVERRIDE_PYTHON:
202
 
                cmd.insert(0, self.OVERRIDE_PYTHON)
203
 
        self.log('$ %r' % cmd)
204
 
        return cmd
205
 
 
206
 
    def runcmd(self, cmd, retcode=0):
207
 
        """Run one command and check the return code.
208
 
 
209
 
        Returns a tuple of (stdout,stderr) strings.
210
 
 
211
 
        If a single string is based, it is split into words.
212
 
        For commands that are not simple space-separated words, please
213
 
        pass a list instead."""
214
 
        try:
215
 
            import shutil
216
 
            from subprocess import call
217
 
        except ImportError, e:
218
 
            _need_subprocess()
219
 
            raise
220
 
        cmd = self.formcmd(cmd)
221
 
        self.log('$ ' + ' '.join(cmd))
222
 
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
223
 
        if retcode != actual_retcode:
224
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
225
 
                                % (cmd, actual_retcode, retcode))
226
 
 
227
 
    def backtick(self, cmd, retcode=0):
228
 
        """Run a command and return its output"""
229
 
        try:
230
 
            import shutil
231
 
            from subprocess import Popen, PIPE
232
 
        except ImportError, e:
233
 
            _need_subprocess()
234
 
            raise
235
 
        cmd = self.formcmd(cmd)
236
 
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
237
 
        outd, errd = child.communicate()
238
 
        self.log(outd)
239
 
        actual_retcode = child.wait()
240
 
        outd = outd.replace('\r', '')
241
 
        if retcode != actual_retcode:
242
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
243
 
                                % (cmd, actual_retcode, retcode))
244
 
        return outd
245
 
 
246
 
    def build_tree(self, shape):
247
 
        """Build a test tree according to a pattern.
248
 
 
249
 
        shape is a sequence of file specifications.  If the final
250
 
        character is '/', a directory is created.
251
 
 
252
 
        This doesn't add anything to a branch.
253
 
        """
254
 
        # XXX: It's OK to just create them using forward slashes on windows?
255
 
        import os
256
 
        for name in shape:
257
 
            assert isinstance(name, basestring)
258
 
            if name[-1] == '/':
259
 
                os.mkdir(name[:-1])
260
 
            else:
261
 
                f = file(name, 'wt')
262
 
                print >>f, "contents of", name
263
 
                f.close()
264
 
                
265
 
InTempDir = FunctionalTestCase
266
 
 
267
 
 
268
 
class EarlyStoppingTestResultAdapter(object):
269
 
    """An adapter for TestResult to stop at the first first failure or error"""
270
 
 
271
 
    def __init__(self, result):
272
 
        self._result = result
273
 
 
274
 
    def addError(self, test, err):
275
 
        self._result.addError(test, err)
276
 
        self._result.stop()
277
 
 
278
 
    def addFailure(self, test, err):
279
 
        self._result.addFailure(test, err)
280
 
        self._result.stop()
281
 
 
282
 
    def __getattr__(self, name):
283
 
        return getattr(self._result, name)
284
 
 
285
 
    def __setattr__(self, name, value):
286
 
        if name == '_result':
287
 
            object.__setattr__(self, name, value)
288
 
        return setattr(self._result, name, value)
289
 
 
290
 
 
291
 
class _MyResult(unittest._TextTestResult):
292
 
    """
293
 
    Custom TestResult.
294
 
 
295
 
    No special behaviour for now.
296
 
    """
297
 
 
298
 
    def startTest(self, test):
299
 
        unittest.TestResult.startTest(self, test)
300
 
        # TODO: Maybe show test.shortDescription somewhere?
301
 
        what = test.id()
302
 
        # python2.3 has the bad habit of just "runit" for doctests
303
 
        if what == 'runit':
304
 
            what = test.shortDescription()
305
 
        if self.showAll:
306
 
            self.stream.write('%-60.60s' % what)
307
 
        self.stream.flush()
308
 
 
309
 
    def addError(self, test, err):
310
 
        super(_MyResult, self).addError(test, err)
311
 
        self.stream.flush()
312
 
 
313
 
    def addFailure(self, test, err):
314
 
        super(_MyResult, self).addFailure(test, err)
315
 
        self.stream.flush()
316
 
 
317
 
    def addSuccess(self, test):
318
 
        if self.showAll:
319
 
            self.stream.writeln('OK')
320
 
        elif self.dots:
321
 
            self.stream.write('~')
322
 
        self.stream.flush()
323
 
        unittest.TestResult.addSuccess(self, test)
324
 
 
325
 
    def printErrorList(self, flavour, errors):
326
 
        for test, err in errors:
327
 
            self.stream.writeln(self.separator1)
328
 
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
329
 
            self.stream.writeln(self.separator2)
330
 
            self.stream.writeln("%s" % err)
331
 
            if isinstance(test, TestCase):
332
 
                self.stream.writeln()
333
 
                self.stream.writeln('log from this test:')
334
 
                print >>self.stream, test._get_log()
335
 
 
336
 
 
337
 
class TextTestRunner(unittest.TextTestRunner):
338
 
 
339
 
    def _makeResult(self):
340
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
341
 
        return EarlyStoppingTestResultAdapter(result)
342
 
 
343
 
 
344
 
class filteringVisitor(TestUtil.TestVisitor):
345
 
    """I accruse all the testCases I visit that pass a regexp filter on id
346
 
    into my suite
347
 
    """
348
 
 
349
 
    def __init__(self, filter):
350
 
        import re
351
 
        TestUtil.TestVisitor.__init__(self)
352
 
        self._suite=None
353
 
        self.filter=re.compile(filter)
354
 
 
355
 
    def suite(self):
356
 
        """answer the suite we are building"""
357
 
        if self._suite is None:
358
 
            self._suite=TestUtil.TestSuite()
359
 
        return self._suite
360
 
 
361
 
    def visitCase(self, aCase):
362
 
        if self.filter.match(aCase.id()):
363
 
            self.suite().addTest(aCase)
364
 
 
365
 
 
366
 
def run_suite(suite, name='test', verbose=False, pattern=".*"):
367
 
    import shutil
368
 
    FunctionalTestCase._TEST_NAME = name
369
 
    if verbose:
370
 
        verbosity = 2
371
 
    else:
372
 
        verbosity = 1
373
 
    runner = TextTestRunner(stream=sys.stdout,
374
 
                            descriptions=0,
375
 
                            verbosity=verbosity)
376
 
    visitor = filteringVisitor(pattern)
377
 
    suite.visit(visitor)
378
 
    result = runner.run(visitor.suite())
379
 
    # This is still a little bogus, 
380
 
    # but only a little. Folk not using our testrunner will
381
 
    # have to delete their temp directories themselves.
382
 
    if result.wasSuccessful():
383
 
        if FunctionalTestCase.TEST_ROOT is not None:
384
 
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
385
 
    else:
386
 
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
387
 
    return result.wasSuccessful()