~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Robert Collins
  • Date: 2005-08-25 00:02:12 UTC
  • mto: (974.1.50) (1185.1.10) (1092.3.1)
  • mto: This revision was merged to the branch mainline in revision 1139.
  • Revision ID: robertc@robertcollins.net-20050825000212-b95ea0dafb96ddb2
start writing star-topology test, realise we need smart-add change

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
"""Enhanced layer on unittest.
 
19
 
 
20
This does several things:
 
21
 
 
22
* nicer reporting as tests run
 
23
 
 
24
* test code can log messages into a buffer that is recorded to disk
 
25
  and displayed if the test fails
 
26
 
 
27
* tests can be run in a separate directory, which is useful for code that
 
28
  wants to create files
 
29
 
 
30
* utilities to run external commands and check their return code
 
31
  and/or output
 
32
 
 
33
Test cases should normally subclass testsweet.TestCase.  The test runner should
 
34
call runsuite().
 
35
 
 
36
This is meant to become independent of bzr, though that's not quite
 
37
true yet.
 
38
"""  
 
39
 
 
40
import unittest
 
41
import sys
 
42
from bzrlib.selftest import TestUtil
 
43
 
 
44
# XXX: Don't need this anymore now we depend on python2.4
 
45
def _need_subprocess():
 
46
    sys.stderr.write("sorry, this test suite requires the subprocess module\n"
 
47
                     "this is shipped with python2.4 and available separately for 2.3\n")
 
48
    
 
49
 
 
50
class CommandFailed(Exception):
 
51
    pass
 
52
 
 
53
 
 
54
class TestSkipped(Exception):
 
55
    """Indicates that a test was intentionally skipped, rather than failing."""
 
56
    # XXX: Not used yet
 
57
 
 
58
 
 
59
class TestCase(unittest.TestCase):
 
60
    """Base class for bzr unit tests.
 
61
    
 
62
    Tests that need access to disk resources should subclass 
 
63
    FunctionalTestCase not TestCase.
 
64
    """
 
65
    
 
66
    # TODO: Special methods to invoke bzr, so that we can run it
 
67
    # through a specified Python intepreter
 
68
 
 
69
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
 
70
    BZRPATH = 'bzr'
 
71
 
 
72
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
73
                         a_callable=None, *args, **kwargs):
 
74
        """Call callable with redirected std io pipes.
 
75
 
 
76
        Returns the return code."""
 
77
        from StringIO import StringIO
 
78
        if not callable(a_callable):
 
79
            raise ValueError("a_callable must be callable.")
 
80
        if stdin is None:
 
81
            stdin = StringIO("")
 
82
        if stdout is None:
 
83
            stdout = self.TEST_LOG
 
84
        if stderr is None:
 
85
            stderr = self.TEST_LOG
 
86
        real_stdin = sys.stdin
 
87
        real_stdout = sys.stdout
 
88
        real_stderr = sys.stderr
 
89
        result = None
 
90
        try:
 
91
            sys.stdout = stdout
 
92
            sys.stderr = stderr
 
93
            sys.stdin = stdin
 
94
            result = a_callable(*args, **kwargs)
 
95
        finally:
 
96
            sys.stdout = real_stdout
 
97
            sys.stderr = real_stderr
 
98
            sys.stdin = real_stdin
 
99
        return result
 
100
 
 
101
    def setUp(self):
 
102
        super(TestCase, self).setUp()
 
103
        # setup a temporary log for the test 
 
104
        import tempfile
 
105
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
 
106
        self.log("%s setup" % self.id())
 
107
 
 
108
    def tearDown(self):
 
109
        self.log("%s teardown" % self.id())
 
110
        self.log('')
 
111
        super(TestCase, self).tearDown()
 
112
 
 
113
    def log(self, msg):
 
114
        """Log a message to a progress file"""
 
115
        print >>self.TEST_LOG, msg
 
116
 
 
117
    def check_inventory_shape(self, inv, shape):
 
118
        """
 
119
        Compare an inventory to a list of expected names.
 
120
 
 
121
        Fail if they are not precisely equal.
 
122
        """
 
123
        extras = []
 
124
        shape = list(shape)             # copy
 
125
        for path, ie in inv.entries():
 
126
            name = path.replace('\\', '/')
 
127
            if ie.kind == 'dir':
 
128
                name = name + '/'
 
129
            if name in shape:
 
130
                shape.remove(name)
 
131
            else:
 
132
                extras.append(name)
 
133
        if shape:
 
134
            self.fail("expected paths not found in inventory: %r" % shape)
 
135
        if extras:
 
136
            self.fail("unexpected paths found in inventory: %r" % extras)
 
137
     
 
138
    def _get_log(self):
 
139
        """Get the log the test case used. This can only be called once,
 
140
        after which an exception will be raised.
 
141
        """
 
142
        self.TEST_LOG.flush()
 
143
        log = open(self.TEST_LOG.name, 'rt').read()
 
144
        self.TEST_LOG.close()
 
145
        return log
 
146
 
 
147
 
 
148
class FunctionalTestCase(TestCase):
 
149
    """Base class for tests that perform function testing - running bzr,
 
150
    using files on disk, and similar activities.
 
151
 
 
152
    InTempDir is an old alias for FunctionalTestCase.
 
153
    """
 
154
 
 
155
    TEST_ROOT = None
 
156
    _TEST_NAME = 'test'
 
157
 
 
158
    def check_file_contents(self, filename, expect):
 
159
        self.log("check contents of file %s" % filename)
 
160
        contents = file(filename, 'r').read()
 
161
        if contents != expect:
 
162
            self.log("expected: %r" % expect)
 
163
            self.log("actually: %r" % contents)
 
164
            self.fail("contents of %s not as expected")
 
165
 
 
166
    def _make_test_root(self):
 
167
        import os
 
168
        import shutil
 
169
        import tempfile
 
170
        
 
171
        if FunctionalTestCase.TEST_ROOT is not None:
 
172
            return
 
173
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
 
174
                                 tempfile.mkdtemp(suffix='.tmp',
 
175
                                                  prefix=self._TEST_NAME + '-',
 
176
                                                  dir=os.curdir))
 
177
    
 
178
        # make a fake bzr directory there to prevent any tests propagating
 
179
        # up onto the source directory's real branch
 
180
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
 
181
 
 
182
    def setUp(self):
 
183
        super(FunctionalTestCase, self).setUp()
 
184
        import os
 
185
        self._make_test_root()
 
186
        self._currentdir = os.getcwdu()
 
187
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
 
188
        os.mkdir(self.test_dir)
 
189
        os.chdir(self.test_dir)
 
190
        
 
191
    def tearDown(self):
 
192
        import os
 
193
        os.chdir(self._currentdir)
 
194
        super(FunctionalTestCase, self).tearDown()
 
195
 
 
196
    def formcmd(self, cmd):
 
197
        if isinstance(cmd, basestring):
 
198
            cmd = cmd.split()
 
199
        if cmd[0] == 'bzr':
 
200
            cmd[0] = self.BZRPATH
 
201
            if self.OVERRIDE_PYTHON:
 
202
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
203
        self.log('$ %r' % cmd)
 
204
        return cmd
 
205
 
 
206
    def runcmd(self, cmd, retcode=0):
 
207
        """Run one command and check the return code.
 
208
 
 
209
        Returns a tuple of (stdout,stderr) strings.
 
210
 
 
211
        If a single string is based, it is split into words.
 
212
        For commands that are not simple space-separated words, please
 
213
        pass a list instead."""
 
214
        try:
 
215
            import shutil
 
216
            from subprocess import call
 
217
        except ImportError, e:
 
218
            _need_subprocess()
 
219
            raise
 
220
        cmd = self.formcmd(cmd)
 
221
        self.log('$ ' + ' '.join(cmd))
 
222
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
223
        if retcode != actual_retcode:
 
224
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
225
                                % (cmd, actual_retcode, retcode))
 
226
 
 
227
    def backtick(self, cmd, retcode=0):
 
228
        """Run a command and return its output"""
 
229
        try:
 
230
            import shutil
 
231
            from subprocess import Popen, PIPE
 
232
        except ImportError, e:
 
233
            _need_subprocess()
 
234
            raise
 
235
        cmd = self.formcmd(cmd)
 
236
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
 
237
        outd, errd = child.communicate()
 
238
        self.log(outd)
 
239
        actual_retcode = child.wait()
 
240
        outd = outd.replace('\r', '')
 
241
        if retcode != actual_retcode:
 
242
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
243
                                % (cmd, actual_retcode, retcode))
 
244
        return outd
 
245
 
 
246
    def build_tree(self, shape):
 
247
        """Build a test tree according to a pattern.
 
248
 
 
249
        shape is a sequence of file specifications.  If the final
 
250
        character is '/', a directory is created.
 
251
 
 
252
        This doesn't add anything to a branch.
 
253
        """
 
254
        # XXX: It's OK to just create them using forward slashes on windows?
 
255
        import os
 
256
        for name in shape:
 
257
            assert isinstance(name, basestring)
 
258
            if name[-1] == '/':
 
259
                os.mkdir(name[:-1])
 
260
            else:
 
261
                f = file(name, 'wt')
 
262
                print >>f, "contents of", name
 
263
                f.close()
 
264
                
 
265
InTempDir = FunctionalTestCase
 
266
 
 
267
 
 
268
class EarlyStoppingTestResultAdapter(object):
 
269
    """An adapter for TestResult to stop at the first first failure or error"""
 
270
 
 
271
    def __init__(self, result):
 
272
        self._result = result
 
273
 
 
274
    def addError(self, test, err):
 
275
        self._result.addError(test, err)
 
276
        self._result.stop()
 
277
 
 
278
    def addFailure(self, test, err):
 
279
        self._result.addFailure(test, err)
 
280
        self._result.stop()
 
281
 
 
282
    def __getattr__(self, name):
 
283
        return getattr(self._result, name)
 
284
 
 
285
    def __setattr__(self, name, value):
 
286
        if name == '_result':
 
287
            object.__setattr__(self, name, value)
 
288
        return setattr(self._result, name, value)
 
289
 
 
290
 
 
291
class _MyResult(unittest._TextTestResult):
 
292
    """
 
293
    Custom TestResult.
 
294
 
 
295
    No special behaviour for now.
 
296
    """
 
297
 
 
298
    def startTest(self, test):
 
299
        unittest.TestResult.startTest(self, test)
 
300
        # TODO: Maybe show test.shortDescription somewhere?
 
301
        what = test.id()
 
302
        # python2.3 has the bad habit of just "runit" for doctests
 
303
        if what == 'runit':
 
304
            what = test.shortDescription()
 
305
        if self.showAll:
 
306
            self.stream.write('%-60.60s' % what)
 
307
        self.stream.flush()
 
308
 
 
309
    def addError(self, test, err):
 
310
        super(_MyResult, self).addError(test, err)
 
311
        self.stream.flush()
 
312
 
 
313
    def addFailure(self, test, err):
 
314
        super(_MyResult, self).addFailure(test, err)
 
315
        self.stream.flush()
 
316
 
 
317
    def addSuccess(self, test):
 
318
        if self.showAll:
 
319
            self.stream.writeln('OK')
 
320
        elif self.dots:
 
321
            self.stream.write('~')
 
322
        self.stream.flush()
 
323
        unittest.TestResult.addSuccess(self, test)
 
324
 
 
325
    def printErrorList(self, flavour, errors):
 
326
        for test, err in errors:
 
327
            self.stream.writeln(self.separator1)
 
328
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
 
329
            self.stream.writeln(self.separator2)
 
330
            self.stream.writeln("%s" % err)
 
331
            if isinstance(test, TestCase):
 
332
                self.stream.writeln()
 
333
                self.stream.writeln('log from this test:')
 
334
                print >>self.stream, test._get_log()
 
335
 
 
336
 
 
337
class TextTestRunner(unittest.TextTestRunner):
 
338
 
 
339
    def _makeResult(self):
 
340
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
341
        return EarlyStoppingTestResultAdapter(result)
 
342
 
 
343
 
 
344
class filteringVisitor(TestUtil.TestVisitor):
 
345
    """I accruse all the testCases I visit that pass a regexp filter on id
 
346
    into my suite
 
347
    """
 
348
 
 
349
    def __init__(self, filter):
 
350
        import re
 
351
        TestUtil.TestVisitor.__init__(self)
 
352
        self._suite=None
 
353
        self.filter=re.compile(filter)
 
354
 
 
355
    def suite(self):
 
356
        """answer the suite we are building"""
 
357
        if self._suite is None:
 
358
            self._suite=TestUtil.TestSuite()
 
359
        return self._suite
 
360
 
 
361
    def visitCase(self, aCase):
 
362
        if self.filter.match(aCase.id()):
 
363
            self.suite().addTest(aCase)
 
364
 
 
365
 
 
366
def run_suite(suite, name='test', verbose=False, pattern=".*"):
 
367
    import shutil
 
368
    FunctionalTestCase._TEST_NAME = name
 
369
    if verbose:
 
370
        verbosity = 2
 
371
    else:
 
372
        verbosity = 1
 
373
    runner = TextTestRunner(stream=sys.stdout,
 
374
                            descriptions=0,
 
375
                            verbosity=verbosity)
 
376
    visitor = filteringVisitor(pattern)
 
377
    suite.visit(visitor)
 
378
    result = runner.run(visitor.suite())
 
379
    # This is still a little bogus, 
 
380
    # but only a little. Folk not using our testrunner will
 
381
    # have to delete their temp directories themselves.
 
382
    if result.wasSuccessful():
 
383
        if FunctionalTestCase.TEST_ROOT is not None:
 
384
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
 
385
    else:
 
386
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
 
387
    return result.wasSuccessful()