~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Martin Pool
  • Date: 2005-07-22 23:32:00 UTC
  • Revision ID: mbp@sourcefrog.net-20050722233200-ccdeca985093a9fb
- now needs python 2.4
- update instructions for running selftest

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
* utilities to run external commands and check their return code
31
31
  and/or output
32
32
 
33
 
Test cases should normally subclass testsweet.TestCase.  The test runner should
 
33
Test cases should normally subclass TestBase.  The test runner should
34
34
call runsuite().
35
35
 
36
36
This is meant to become independent of bzr, though that's not quite
37
37
true yet.
38
38
"""  
39
39
 
40
 
import unittest
41
 
import sys
42
 
from bzrlib.selftest import TestUtil
43
 
 
44
 
# XXX: Don't need this anymore now we depend on python2.4
 
40
 
 
41
from unittest import TestResult, TestCase
 
42
 
45
43
def _need_subprocess():
46
44
    sys.stderr.write("sorry, this test suite requires the subprocess module\n"
47
45
                     "this is shipped with python2.4 and available separately for 2.3\n")
51
49
    pass
52
50
 
53
51
 
 
52
 
54
53
class TestSkipped(Exception):
55
54
    """Indicates that a test was intentionally skipped, rather than failing."""
56
55
    # XXX: Not used yet
57
56
 
58
57
 
59
 
class TestCase(unittest.TestCase):
60
 
    """Base class for bzr unit tests.
61
 
    
62
 
    Tests that need access to disk resources should subclass 
63
 
    FunctionalTestCase not TestCase.
 
58
class TestBase(TestCase):
 
59
    """Base class for bzr test cases.
 
60
 
 
61
    Just defines some useful helper functions; doesn't actually test
 
62
    anything.
64
63
    """
65
64
    
66
65
    # TODO: Special methods to invoke bzr, so that we can run it
69
68
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
70
69
    BZRPATH = 'bzr'
71
70
 
72
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
73
 
                         a_callable=None, *args, **kwargs):
74
 
        """Call callable with redirected std io pipes.
 
71
    _log_buf = ""
75
72
 
76
 
        Returns the return code."""
77
 
        from StringIO import StringIO
78
 
        if not callable(a_callable):
79
 
            raise ValueError("a_callable must be callable.")
80
 
        if stdin is None:
81
 
            stdin = StringIO("")
82
 
        if stdout is None:
83
 
            stdout = self.TEST_LOG
84
 
        if stderr is None:
85
 
            stderr = self.TEST_LOG
86
 
        real_stdin = sys.stdin
87
 
        real_stdout = sys.stdout
88
 
        real_stderr = sys.stderr
89
 
        result = None
90
 
        try:
91
 
            sys.stdout = stdout
92
 
            sys.stderr = stderr
93
 
            sys.stdin = stdin
94
 
            result = a_callable(*args, **kwargs)
95
 
        finally:
96
 
            sys.stdout = real_stdout
97
 
            sys.stderr = real_stderr
98
 
            sys.stdin = real_stdin
99
 
        return result
100
73
 
101
74
    def setUp(self):
102
 
        super(TestCase, self).setUp()
103
 
        # setup a temporary log for the test 
104
 
        import tempfile
105
 
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
 
75
        super(TestBase, self).setUp()
106
76
        self.log("%s setup" % self.id())
107
77
 
 
78
 
108
79
    def tearDown(self):
 
80
        super(TestBase, self).tearDown()
109
81
        self.log("%s teardown" % self.id())
110
82
        self.log('')
111
 
        super(TestCase, self).tearDown()
112
 
 
113
 
    def log(self, msg):
114
 
        """Log a message to a progress file"""
115
 
        print >>self.TEST_LOG, msg
116
 
 
117
 
    def check_inventory_shape(self, inv, shape):
118
 
        """
119
 
        Compare an inventory to a list of expected names.
120
 
 
121
 
        Fail if they are not precisely equal.
122
 
        """
123
 
        extras = []
124
 
        shape = list(shape)             # copy
125
 
        for path, ie in inv.entries():
126
 
            name = path.replace('\\', '/')
127
 
            if ie.kind == 'dir':
128
 
                name = name + '/'
129
 
            if name in shape:
130
 
                shape.remove(name)
131
 
            else:
132
 
                extras.append(name)
133
 
        if shape:
134
 
            self.fail("expected paths not found in inventory: %r" % shape)
135
 
        if extras:
136
 
            self.fail("unexpected paths found in inventory: %r" % extras)
137
 
     
138
 
    def _get_log(self):
139
 
        """Get the log the test case used. This can only be called once,
140
 
        after which an exception will be raised.
141
 
        """
142
 
        self.TEST_LOG.flush()
143
 
        log = open(self.TEST_LOG.name, 'rt').read()
144
 
        self.TEST_LOG.close()
145
 
        return log
146
 
 
147
 
 
148
 
class FunctionalTestCase(TestCase):
149
 
    """Base class for tests that perform function testing - running bzr,
150
 
    using files on disk, and similar activities.
151
 
 
152
 
    InTempDir is an old alias for FunctionalTestCase.
153
 
    """
154
 
 
155
 
    TEST_ROOT = None
156
 
    _TEST_NAME = 'test'
157
 
 
158
 
    def check_file_contents(self, filename, expect):
159
 
        self.log("check contents of file %s" % filename)
160
 
        contents = file(filename, 'r').read()
161
 
        if contents != expect:
162
 
            self.log("expected: %r" % expect)
163
 
            self.log("actually: %r" % contents)
164
 
            self.fail("contents of %s not as expected")
165
 
 
166
 
    def _make_test_root(self):
167
 
        import os
168
 
        import shutil
169
 
        import tempfile
170
 
        
171
 
        if FunctionalTestCase.TEST_ROOT is not None:
172
 
            return
173
 
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
174
 
                                 tempfile.mkdtemp(suffix='.tmp',
175
 
                                                  prefix=self._TEST_NAME + '-',
176
 
                                                  dir=os.curdir))
177
 
    
178
 
        # make a fake bzr directory there to prevent any tests propagating
179
 
        # up onto the source directory's real branch
180
 
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
181
 
 
182
 
    def setUp(self):
183
 
        super(FunctionalTestCase, self).setUp()
184
 
        import os
185
 
        self._make_test_root()
186
 
        self._currentdir = os.getcwdu()
187
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
188
 
        os.mkdir(self.test_dir)
189
 
        os.chdir(self.test_dir)
190
 
        
191
 
    def tearDown(self):
192
 
        import os
193
 
        os.chdir(self._currentdir)
194
 
        super(FunctionalTestCase, self).tearDown()
 
83
 
195
84
 
196
85
    def formcmd(self, cmd):
197
86
        if isinstance(cmd, basestring):
198
87
            cmd = cmd.split()
 
88
 
199
89
        if cmd[0] == 'bzr':
200
90
            cmd[0] = self.BZRPATH
201
91
            if self.OVERRIDE_PYTHON:
202
92
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
93
 
203
94
        self.log('$ %r' % cmd)
 
95
 
204
96
        return cmd
205
97
 
 
98
 
206
99
    def runcmd(self, cmd, retcode=0):
207
100
        """Run one command and check the return code.
208
101
 
217
110
        except ImportError, e:
218
111
            _need_subprocess()
219
112
            raise
 
113
 
 
114
 
220
115
        cmd = self.formcmd(cmd)
 
116
 
221
117
        self.log('$ ' + ' '.join(cmd))
222
118
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
119
 
223
120
        if retcode != actual_retcode:
224
121
            raise CommandFailed("test failed: %r returned %d, expected %d"
225
122
                                % (cmd, actual_retcode, retcode))
226
123
 
 
124
 
227
125
    def backtick(self, cmd, retcode=0):
228
126
        """Run a command and return its output"""
229
127
        try:
232
130
        except ImportError, e:
233
131
            _need_subprocess()
234
132
            raise
 
133
 
235
134
        cmd = self.formcmd(cmd)
236
135
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
237
136
        outd, errd = child.communicate()
238
137
        self.log(outd)
239
138
        actual_retcode = child.wait()
 
139
 
240
140
        outd = outd.replace('\r', '')
 
141
 
241
142
        if retcode != actual_retcode:
242
143
            raise CommandFailed("test failed: %r returned %d, expected %d"
243
144
                                % (cmd, actual_retcode, retcode))
 
145
 
244
146
        return outd
245
147
 
 
148
 
 
149
 
246
150
    def build_tree(self, shape):
247
151
        """Build a test tree according to a pattern.
248
152
 
261
165
                f = file(name, 'wt')
262
166
                print >>f, "contents of", name
263
167
                f.close()
264
 
                
265
 
InTempDir = FunctionalTestCase
266
 
 
267
 
 
268
 
class EarlyStoppingTestResultAdapter(object):
269
 
    """An adapter for TestResult to stop at the first first failure or error"""
270
 
 
271
 
    def __init__(self, result):
272
 
        self._result = result
273
 
 
274
 
    def addError(self, test, err):
275
 
        self._result.addError(test, err)
276
 
        self._result.stop()
277
 
 
278
 
    def addFailure(self, test, err):
279
 
        self._result.addFailure(test, err)
280
 
        self._result.stop()
281
 
 
282
 
    def __getattr__(self, name):
283
 
        return getattr(self._result, name)
284
 
 
285
 
    def __setattr__(self, name, value):
286
 
        if name == '_result':
287
 
            object.__setattr__(self, name, value)
288
 
        return setattr(self._result, name, value)
289
 
 
290
 
 
291
 
class _MyResult(unittest._TextTestResult):
 
168
 
 
169
 
 
170
    def log(self, msg):
 
171
        """Log a message to a progress file"""
 
172
        self._log_buf = self._log_buf + str(msg) + '\n'
 
173
        print >>self.TEST_LOG, msg
 
174
 
 
175
 
 
176
    def check_inventory_shape(self, inv, shape):
 
177
        """
 
178
        Compare an inventory to a list of expected names.
 
179
 
 
180
        Fail if they are not precisely equal.
 
181
        """
 
182
        extras = []
 
183
        shape = list(shape)             # copy
 
184
        for path, ie in inv.entries():
 
185
            name = path.replace('\\', '/')
 
186
            if ie.kind == 'dir':
 
187
                name = name + '/'
 
188
            if name in shape:
 
189
                shape.remove(name)
 
190
            else:
 
191
                extras.append(name)
 
192
        if shape:
 
193
            self.fail("expected paths not found in inventory: %r" % shape)
 
194
        if extras:
 
195
            self.fail("unexpected paths found in inventory: %r" % extras)
 
196
 
 
197
 
 
198
    def check_file_contents(self, filename, expect):
 
199
        self.log("check contents of file %s" % filename)
 
200
        contents = file(filename, 'r').read()
 
201
        if contents != expect:
 
202
            self.log("expected: %r" % expect)
 
203
            self.log("actually: %r" % contents)
 
204
            self.fail("contents of %s not as expected")
 
205
            
 
206
 
 
207
 
 
208
class InTempDir(TestBase):
 
209
    """Base class for tests run in a temporary branch."""
 
210
    def setUp(self):
 
211
        import os
 
212
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
213
        os.mkdir(self.test_dir)
 
214
        os.chdir(self.test_dir)
 
215
        
 
216
    def tearDown(self):
 
217
        import os
 
218
        os.chdir(self.TEST_ROOT)
 
219
 
 
220
 
 
221
 
 
222
 
 
223
 
 
224
class _MyResult(TestResult):
292
225
    """
293
226
    Custom TestResult.
294
227
 
295
228
    No special behaviour for now.
296
229
    """
 
230
    def __init__(self, out, style):
 
231
        self.out = out
 
232
        TestResult.__init__(self)
 
233
        assert style in ('none', 'progress', 'verbose')
 
234
        self.style = style
 
235
 
297
236
 
298
237
    def startTest(self, test):
299
 
        unittest.TestResult.startTest(self, test)
300
238
        # TODO: Maybe show test.shortDescription somewhere?
301
239
        what = test.id()
302
240
        # python2.3 has the bad habit of just "runit" for doctests
303
241
        if what == 'runit':
304
242
            what = test.shortDescription()
305
 
        if self.showAll:
306
 
            self.stream.write('%-60.60s' % what)
307
 
        self.stream.flush()
 
243
        
 
244
        if self.style == 'verbose':
 
245
            print >>self.out, '%-60.60s' % what,
 
246
            self.out.flush()
 
247
        elif self.style == 'progress':
 
248
            self.out.write('~')
 
249
            self.out.flush()
 
250
        TestResult.startTest(self, test)
 
251
 
 
252
 
 
253
    def stopTest(self, test):
 
254
        # print
 
255
        TestResult.stopTest(self, test)
 
256
 
308
257
 
309
258
    def addError(self, test, err):
310
 
        super(_MyResult, self).addError(test, err)
311
 
        self.stream.flush()
 
259
        if self.style == 'verbose':
 
260
            print >>self.out, 'ERROR'
 
261
        TestResult.addError(self, test, err)
 
262
        _show_test_failure('error', test, err, self.out)
312
263
 
313
264
    def addFailure(self, test, err):
314
 
        super(_MyResult, self).addFailure(test, err)
315
 
        self.stream.flush()
 
265
        if self.style == 'verbose':
 
266
            print >>self.out, 'FAILURE'
 
267
        TestResult.addFailure(self, test, err)
 
268
        _show_test_failure('failure', test, err, self.out)
316
269
 
317
270
    def addSuccess(self, test):
318
 
        if self.showAll:
319
 
            self.stream.writeln('OK')
320
 
        elif self.dots:
321
 
            self.stream.write('~')
322
 
        self.stream.flush()
323
 
        unittest.TestResult.addSuccess(self, test)
324
 
 
325
 
    def printErrorList(self, flavour, errors):
326
 
        for test, err in errors:
327
 
            self.stream.writeln(self.separator1)
328
 
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
329
 
            self.stream.writeln(self.separator2)
330
 
            self.stream.writeln("%s" % err)
331
 
            if isinstance(test, TestCase):
332
 
                self.stream.writeln()
333
 
                self.stream.writeln('log from this test:')
334
 
                print >>self.stream, test._get_log()
335
 
 
336
 
 
337
 
class TextTestRunner(unittest.TextTestRunner):
338
 
 
339
 
    def _makeResult(self):
340
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
341
 
        return EarlyStoppingTestResultAdapter(result)
342
 
 
343
 
 
344
 
class filteringVisitor(TestUtil.TestVisitor):
345
 
    """I accruse all the testCases I visit that pass a regexp filter on id
346
 
    into my suite
347
 
    """
348
 
 
349
 
    def __init__(self, filter):
350
 
        import re
351
 
        TestUtil.TestVisitor.__init__(self)
352
 
        self._suite=None
353
 
        self.filter=re.compile(filter)
354
 
 
355
 
    def suite(self):
356
 
        """answer the suite we are building"""
357
 
        if self._suite is None:
358
 
            self._suite=TestUtil.TestSuite()
359
 
        return self._suite
360
 
 
361
 
    def visitCase(self, aCase):
362
 
        if self.filter.match(aCase.id()):
363
 
            self.suite().addTest(aCase)
364
 
 
365
 
 
366
 
def run_suite(suite, name='test', verbose=False, pattern=".*"):
 
271
        if self.style == 'verbose':
 
272
            print >>self.out, 'OK'
 
273
        TestResult.addSuccess(self, test)
 
274
 
 
275
 
 
276
 
 
277
def run_suite(suite, name='test', verbose=False):
 
278
    import os
367
279
    import shutil
368
 
    FunctionalTestCase._TEST_NAME = name
369
 
    if verbose:
370
 
        verbosity = 2
371
 
    else:
372
 
        verbosity = 1
373
 
    runner = TextTestRunner(stream=sys.stdout,
374
 
                            descriptions=0,
375
 
                            verbosity=verbosity)
376
 
    visitor = filteringVisitor(pattern)
377
 
    suite.visit(visitor)
378
 
    result = runner.run(visitor.suite())
379
 
    # This is still a little bogus, 
380
 
    # but only a little. Folk not using our testrunner will
381
 
    # have to delete their temp directories themselves.
382
 
    if result.wasSuccessful():
383
 
        if FunctionalTestCase.TEST_ROOT is not None:
384
 
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
385
 
    else:
386
 
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
 
280
    import time
 
281
    import sys
 
282
    
 
283
    _setup_test_log(name)
 
284
    _setup_test_dir(name)
 
285
    print
 
286
 
 
287
    # save stdout & stderr so there's no leakage from code-under-test
 
288
    real_stdout = sys.stdout
 
289
    real_stderr = sys.stderr
 
290
    sys.stdout = sys.stderr = TestBase.TEST_LOG
 
291
    try:
 
292
        if verbose:
 
293
            style = 'verbose'
 
294
        else:
 
295
            style = 'progress'
 
296
        result = _MyResult(real_stdout, style)
 
297
        suite.run(result)
 
298
    finally:
 
299
        sys.stdout = real_stdout
 
300
        sys.stderr = real_stderr
 
301
 
 
302
    _show_results(result)
 
303
 
387
304
    return result.wasSuccessful()
 
305
 
 
306
 
 
307
 
 
308
def _setup_test_log(name):
 
309
    import time
 
310
    import os
 
311
    
 
312
    log_filename = os.path.abspath(name + '.log')
 
313
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
 
314
 
 
315
    print >>TestBase.TEST_LOG, "tests run at " + time.ctime()
 
316
    print '%-30s %s' % ('test log', log_filename)
 
317
 
 
318
 
 
319
def _setup_test_dir(name):
 
320
    import os
 
321
    import shutil
 
322
    
 
323
    TestBase.ORIG_DIR = os.getcwdu()
 
324
    TestBase.TEST_ROOT = os.path.abspath(name + '.tmp')
 
325
 
 
326
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
 
327
 
 
328
    if os.path.exists(TestBase.TEST_ROOT):
 
329
        shutil.rmtree(TestBase.TEST_ROOT)
 
330
    os.mkdir(TestBase.TEST_ROOT)
 
331
    os.chdir(TestBase.TEST_ROOT)
 
332
 
 
333
    # make a fake bzr directory there to prevent any tests propagating
 
334
    # up onto the source directory's real branch
 
335
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
 
336
 
 
337
    
 
338
 
 
339
def _show_results(result):
 
340
     print
 
341
     print '%4d tests run' % result.testsRun
 
342
     print '%4d errors' % len(result.errors)
 
343
     print '%4d failures' % len(result.failures)
 
344
 
 
345
 
 
346
 
 
347
def _show_test_failure(kind, case, exc_info, out):
 
348
    from traceback import print_exception
 
349
    
 
350
    print >>out, '-' * 60
 
351
    print >>out, case
 
352
    
 
353
    desc = case.shortDescription()
 
354
    if desc:
 
355
        print >>out, '   (%s)' % desc
 
356
         
 
357
    print_exception(exc_info[0], exc_info[1], exc_info[2], None, out)
 
358
        
 
359
    if isinstance(case, TestBase):
 
360
        print >>out
 
361
        print >>out, 'log from this test:'
 
362
        print >>out, case._log_buf
 
363
         
 
364
    print >>out, '-' * 60
 
365
    
 
366