~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: aaron.bentley at utoronto
  • Date: 2005-08-27 04:42:41 UTC
  • mfrom: (1092.1.43)
  • mto: (1185.3.4)
  • mto: This revision was merged to the branch mainline in revision 1178.
  • Revision ID: aaron.bentley@utoronto.ca-20050827044241-23d676133b9fc981
Merge of robertc@robertcollins.net-20050826013321-52eee1f1da679ee9

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
* utilities to run external commands and check their return code
31
31
  and/or output
32
32
 
33
 
Test cases should normally subclass TestBase.  The test runner should
 
33
Test cases should normally subclass testsweet.TestCase.  The test runner should
34
34
call runsuite().
35
35
 
36
36
This is meant to become independent of bzr, though that's not quite
37
37
true yet.
38
38
"""  
39
39
 
40
 
 
41
 
from unittest import TestResult, TestCase
 
40
import unittest
 
41
import sys
 
42
from bzrlib.selftest import TestUtil
42
43
 
43
44
# XXX: Don't need this anymore now we depend on python2.4
44
45
def _need_subprocess():
50
51
    pass
51
52
 
52
53
 
53
 
 
54
54
class TestSkipped(Exception):
55
55
    """Indicates that a test was intentionally skipped, rather than failing."""
56
56
    # XXX: Not used yet
57
57
 
58
58
 
59
 
class TestBase(TestCase):
60
 
    """Base class for bzr test cases.
61
 
 
62
 
    Just defines some useful helper functions; doesn't actually test
63
 
    anything.
 
59
class TestCase(unittest.TestCase):
 
60
    """Base class for bzr unit tests.
 
61
    
 
62
    Tests that need access to disk resources should subclass 
 
63
    FunctionalTestCase not TestCase.
64
64
    """
65
65
    
66
66
    # TODO: Special methods to invoke bzr, so that we can run it
69
69
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
70
70
    BZRPATH = 'bzr'
71
71
 
72
 
    _log_buf = ""
 
72
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
73
                         a_callable=None, *args, **kwargs):
 
74
        """Call callable with redirected std io pipes.
73
75
 
 
76
        Returns the return code."""
 
77
        from StringIO import StringIO
 
78
        if not callable(a_callable):
 
79
            raise ValueError("a_callable must be callable.")
 
80
        if stdin is None:
 
81
            stdin = StringIO("")
 
82
        if stdout is None:
 
83
            stdout = self.TEST_LOG
 
84
        if stderr is None:
 
85
            stderr = self.TEST_LOG
 
86
        real_stdin = sys.stdin
 
87
        real_stdout = sys.stdout
 
88
        real_stderr = sys.stderr
 
89
        result = None
 
90
        try:
 
91
            sys.stdout = stdout
 
92
            sys.stderr = stderr
 
93
            sys.stdin = stdin
 
94
            result = a_callable(*args, **kwargs)
 
95
        finally:
 
96
            sys.stdout = real_stdout
 
97
            sys.stderr = real_stderr
 
98
            sys.stdin = real_stdin
 
99
        return result
74
100
 
75
101
    def setUp(self):
76
 
        super(TestBase, self).setUp()
 
102
        super(TestCase, self).setUp()
 
103
        # setup a temporary log for the test 
 
104
        import tempfile
 
105
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
77
106
        self.log("%s setup" % self.id())
78
107
 
79
 
 
80
108
    def tearDown(self):
81
 
        super(TestBase, self).tearDown()
82
109
        self.log("%s teardown" % self.id())
83
110
        self.log('')
84
 
 
 
111
        super(TestCase, self).tearDown()
 
112
 
 
113
    def log(self, msg):
 
114
        """Log a message to a progress file"""
 
115
        print >>self.TEST_LOG, msg
 
116
 
 
117
    def check_inventory_shape(self, inv, shape):
 
118
        """
 
119
        Compare an inventory to a list of expected names.
 
120
 
 
121
        Fail if they are not precisely equal.
 
122
        """
 
123
        extras = []
 
124
        shape = list(shape)             # copy
 
125
        for path, ie in inv.entries():
 
126
            name = path.replace('\\', '/')
 
127
            if ie.kind == 'dir':
 
128
                name = name + '/'
 
129
            if name in shape:
 
130
                shape.remove(name)
 
131
            else:
 
132
                extras.append(name)
 
133
        if shape:
 
134
            self.fail("expected paths not found in inventory: %r" % shape)
 
135
        if extras:
 
136
            self.fail("unexpected paths found in inventory: %r" % extras)
 
137
     
 
138
    def _get_log(self):
 
139
        """Get the log the test case used. This can only be called once,
 
140
        after which an exception will be raised.
 
141
        """
 
142
        self.TEST_LOG.flush()
 
143
        log = open(self.TEST_LOG.name, 'rt').read()
 
144
        self.TEST_LOG.close()
 
145
        return log
 
146
 
 
147
 
 
148
class FunctionalTestCase(TestCase):
 
149
    """Base class for tests that perform function testing - running bzr,
 
150
    using files on disk, and similar activities.
 
151
 
 
152
    InTempDir is an old alias for FunctionalTestCase.
 
153
    """
 
154
 
 
155
    TEST_ROOT = None
 
156
    _TEST_NAME = 'test'
 
157
 
 
158
    def check_file_contents(self, filename, expect):
 
159
        self.log("check contents of file %s" % filename)
 
160
        contents = file(filename, 'r').read()
 
161
        if contents != expect:
 
162
            self.log("expected: %r" % expect)
 
163
            self.log("actually: %r" % contents)
 
164
            self.fail("contents of %s not as expected")
 
165
 
 
166
    def _make_test_root(self):
 
167
        import os
 
168
        import shutil
 
169
        import tempfile
 
170
        
 
171
        if FunctionalTestCase.TEST_ROOT is not None:
 
172
            return
 
173
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
 
174
                                 tempfile.mkdtemp(suffix='.tmp',
 
175
                                                  prefix=self._TEST_NAME + '-',
 
176
                                                  dir=os.curdir))
 
177
    
 
178
        # make a fake bzr directory there to prevent any tests propagating
 
179
        # up onto the source directory's real branch
 
180
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
 
181
 
 
182
    def setUp(self):
 
183
        super(FunctionalTestCase, self).setUp()
 
184
        import os
 
185
        self._make_test_root()
 
186
        self._currentdir = os.getcwdu()
 
187
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
 
188
        os.mkdir(self.test_dir)
 
189
        os.chdir(self.test_dir)
 
190
        
 
191
    def tearDown(self):
 
192
        import os
 
193
        os.chdir(self._currentdir)
 
194
        super(FunctionalTestCase, self).tearDown()
85
195
 
86
196
    def formcmd(self, cmd):
87
197
        if isinstance(cmd, basestring):
88
198
            cmd = cmd.split()
89
 
 
90
199
        if cmd[0] == 'bzr':
91
200
            cmd[0] = self.BZRPATH
92
201
            if self.OVERRIDE_PYTHON:
93
202
                cmd.insert(0, self.OVERRIDE_PYTHON)
94
 
 
95
203
        self.log('$ %r' % cmd)
96
 
 
97
204
        return cmd
98
205
 
99
 
 
100
206
    def runcmd(self, cmd, retcode=0):
101
207
        """Run one command and check the return code.
102
208
 
111
217
        except ImportError, e:
112
218
            _need_subprocess()
113
219
            raise
114
 
 
115
 
 
116
220
        cmd = self.formcmd(cmd)
117
 
 
118
221
        self.log('$ ' + ' '.join(cmd))
119
222
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
120
 
 
121
223
        if retcode != actual_retcode:
122
224
            raise CommandFailed("test failed: %r returned %d, expected %d"
123
225
                                % (cmd, actual_retcode, retcode))
124
226
 
125
 
 
126
227
    def backtick(self, cmd, retcode=0):
127
228
        """Run a command and return its output"""
128
229
        try:
131
232
        except ImportError, e:
132
233
            _need_subprocess()
133
234
            raise
134
 
 
135
235
        cmd = self.formcmd(cmd)
136
236
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
137
237
        outd, errd = child.communicate()
138
238
        self.log(outd)
139
239
        actual_retcode = child.wait()
140
 
 
141
240
        outd = outd.replace('\r', '')
142
 
 
143
241
        if retcode != actual_retcode:
144
242
            raise CommandFailed("test failed: %r returned %d, expected %d"
145
243
                                % (cmd, actual_retcode, retcode))
146
 
 
147
244
        return outd
148
245
 
149
 
 
150
 
 
151
246
    def build_tree(self, shape):
152
247
        """Build a test tree according to a pattern.
153
248
 
166
261
                f = file(name, 'wt')
167
262
                print >>f, "contents of", name
168
263
                f.close()
169
 
 
170
 
 
171
 
    def log(self, msg):
172
 
        """Log a message to a progress file"""
173
 
        # XXX: The problem with this is that code that writes straight
174
 
        # to the log file won't be shown when we display the log
175
 
        # buffer; would be better to not have the in-memory buffer and
176
 
        # instead just a log file per test, which is read in and
177
 
        # displayed if the test fails.  That seems to imply one log
178
 
        # per test case, not globally.  OK?
179
 
        self._log_buf = self._log_buf + str(msg) + '\n'
180
 
        print >>self.TEST_LOG, msg
181
 
 
182
 
 
183
 
    def check_inventory_shape(self, inv, shape):
184
 
        """
185
 
        Compare an inventory to a list of expected names.
186
 
 
187
 
        Fail if they are not precisely equal.
188
 
        """
189
 
        extras = []
190
 
        shape = list(shape)             # copy
191
 
        for path, ie in inv.entries():
192
 
            name = path.replace('\\', '/')
193
 
            if ie.kind == 'dir':
194
 
                name = name + '/'
195
 
            if name in shape:
196
 
                shape.remove(name)
197
 
            else:
198
 
                extras.append(name)
199
 
        if shape:
200
 
            self.fail("expected paths not found in inventory: %r" % shape)
201
 
        if extras:
202
 
            self.fail("unexpected paths found in inventory: %r" % extras)
203
 
 
204
 
 
205
 
    def check_file_contents(self, filename, expect):
206
 
        self.log("check contents of file %s" % filename)
207
 
        contents = file(filename, 'r').read()
208
 
        if contents != expect:
209
 
            self.log("expected: %r" % expect)
210
 
            self.log("actually: %r" % contents)
211
 
            self.fail("contents of %s not as expected")
212
 
            
213
 
 
214
 
 
215
 
class InTempDir(TestBase):
216
 
    """Base class for tests run in a temporary branch."""
217
 
    def setUp(self):
218
 
        import os
219
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
220
 
        os.mkdir(self.test_dir)
221
 
        os.chdir(self.test_dir)
222
 
        
223
 
    def tearDown(self):
224
 
        import os
225
 
        os.chdir(self.TEST_ROOT)
226
 
 
227
 
 
228
 
 
229
 
 
230
 
 
231
 
class _MyResult(TestResult):
 
264
                
 
265
InTempDir = FunctionalTestCase
 
266
 
 
267
 
 
268
class EarlyStoppingTestResultAdapter(object):
 
269
    """An adapter for TestResult to stop at the first first failure or error"""
 
270
 
 
271
    def __init__(self, result):
 
272
        self._result = result
 
273
 
 
274
    def addError(self, test, err):
 
275
        self._result.addError(test, err)
 
276
        self._result.stop()
 
277
 
 
278
    def addFailure(self, test, err):
 
279
        self._result.addFailure(test, err)
 
280
        self._result.stop()
 
281
 
 
282
    def __getattr__(self, name):
 
283
        return getattr(self._result, name)
 
284
 
 
285
    def __setattr__(self, name, value):
 
286
        if name == '_result':
 
287
            object.__setattr__(self, name, value)
 
288
        return setattr(self._result, name, value)
 
289
 
 
290
 
 
291
class _MyResult(unittest._TextTestResult):
232
292
    """
233
293
    Custom TestResult.
234
294
 
235
295
    No special behaviour for now.
236
296
    """
237
 
    def __init__(self, out, style):
238
 
        self.out = out
239
 
        TestResult.__init__(self)
240
 
        assert style in ('none', 'progress', 'verbose')
241
 
        self.style = style
242
 
 
243
297
 
244
298
    def startTest(self, test):
 
299
        unittest.TestResult.startTest(self, test)
245
300
        # TODO: Maybe show test.shortDescription somewhere?
246
301
        what = test.id()
247
302
        # python2.3 has the bad habit of just "runit" for doctests
248
303
        if what == 'runit':
249
304
            what = test.shortDescription()
250
 
        
251
 
        if self.style == 'verbose':
252
 
            print >>self.out, '%-60.60s' % what,
253
 
            self.out.flush()
254
 
        elif self.style == 'progress':
255
 
            self.out.write('~')
256
 
            self.out.flush()
257
 
        TestResult.startTest(self, test)
258
 
 
259
 
 
260
 
    def stopTest(self, test):
261
 
        # print
262
 
        TestResult.stopTest(self, test)
263
 
 
 
305
        if self.showAll:
 
306
            self.stream.write('%-60.60s' % what)
 
307
        self.stream.flush()
264
308
 
265
309
    def addError(self, test, err):
266
 
        if self.style == 'verbose':
267
 
            print >>self.out, 'ERROR'
268
 
        TestResult.addError(self, test, err)
269
 
        _show_test_failure('error', test, err, self.out)
 
310
        super(_MyResult, self).addError(test, err)
 
311
        self.stream.flush()
270
312
 
271
313
    def addFailure(self, test, err):
272
 
        if self.style == 'verbose':
273
 
            print >>self.out, 'FAILURE'
274
 
        TestResult.addFailure(self, test, err)
275
 
        _show_test_failure('failure', test, err, self.out)
 
314
        super(_MyResult, self).addFailure(test, err)
 
315
        self.stream.flush()
276
316
 
277
317
    def addSuccess(self, test):
278
 
        if self.style == 'verbose':
279
 
            print >>self.out, 'OK'
280
 
        TestResult.addSuccess(self, test)
281
 
 
282
 
 
283
 
 
284
 
def run_suite(suite, name='test', verbose=False):
285
 
    import os
 
318
        if self.showAll:
 
319
            self.stream.writeln('OK')
 
320
        elif self.dots:
 
321
            self.stream.write('~')
 
322
        self.stream.flush()
 
323
        unittest.TestResult.addSuccess(self, test)
 
324
 
 
325
    def printErrorList(self, flavour, errors):
 
326
        for test, err in errors:
 
327
            self.stream.writeln(self.separator1)
 
328
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
 
329
            self.stream.writeln(self.separator2)
 
330
            self.stream.writeln("%s" % err)
 
331
            if isinstance(test, TestCase):
 
332
                self.stream.writeln()
 
333
                self.stream.writeln('log from this test:')
 
334
                print >>self.stream, test._get_log()
 
335
 
 
336
 
 
337
class TextTestRunner(unittest.TextTestRunner):
 
338
 
 
339
    def _makeResult(self):
 
340
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
341
        return EarlyStoppingTestResultAdapter(result)
 
342
 
 
343
 
 
344
class filteringVisitor(TestUtil.TestVisitor):
 
345
    """I accruse all the testCases I visit that pass a regexp filter on id
 
346
    into my suite
 
347
    """
 
348
 
 
349
    def __init__(self, filter):
 
350
        import re
 
351
        TestUtil.TestVisitor.__init__(self)
 
352
        self._suite=None
 
353
        self.filter=re.compile(filter)
 
354
 
 
355
    def suite(self):
 
356
        """answer the suite we are building"""
 
357
        if self._suite is None:
 
358
            self._suite=TestUtil.TestSuite()
 
359
        return self._suite
 
360
 
 
361
    def visitCase(self, aCase):
 
362
        if self.filter.match(aCase.id()):
 
363
            self.suite().addTest(aCase)
 
364
 
 
365
 
 
366
def run_suite(suite, name='test', verbose=False, pattern=".*"):
286
367
    import shutil
287
 
    import time
288
 
    import sys
289
 
    
290
 
    _setup_test_log(name)
291
 
    _setup_test_dir(name)
292
 
    print
293
 
 
294
 
    # save stdout & stderr so there's no leakage from code-under-test
295
 
    real_stdout = sys.stdout
296
 
    real_stderr = sys.stderr
297
 
    sys.stdout = sys.stderr = TestBase.TEST_LOG
298
 
    try:
299
 
        if verbose:
300
 
            style = 'verbose'
301
 
        else:
302
 
            style = 'progress'
303
 
        result = _MyResult(real_stdout, style)
304
 
        suite.run(result)
305
 
    finally:
306
 
        sys.stdout = real_stdout
307
 
        sys.stderr = real_stderr
308
 
 
309
 
    _show_results(result)
310
 
 
 
368
    FunctionalTestCase._TEST_NAME = name
 
369
    if verbose:
 
370
        verbosity = 2
 
371
    else:
 
372
        verbosity = 1
 
373
    runner = TextTestRunner(stream=sys.stdout,
 
374
                            descriptions=0,
 
375
                            verbosity=verbosity)
 
376
    visitor = filteringVisitor(pattern)
 
377
    suite.visit(visitor)
 
378
    result = runner.run(visitor.suite())
 
379
    # This is still a little bogus, 
 
380
    # but only a little. Folk not using our testrunner will
 
381
    # have to delete their temp directories themselves.
 
382
    if result.wasSuccessful():
 
383
        if FunctionalTestCase.TEST_ROOT is not None:
 
384
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
 
385
    else:
 
386
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
311
387
    return result.wasSuccessful()
312
 
 
313
 
 
314
 
 
315
 
def _setup_test_log(name):
316
 
    import time
317
 
    import os
318
 
    
319
 
    log_filename = os.path.abspath(name + '.log')
320
 
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
321
 
 
322
 
    print >>TestBase.TEST_LOG, "tests run at " + time.ctime()
323
 
    print '%-30s %s' % ('test log', log_filename)
324
 
 
325
 
 
326
 
def _setup_test_dir(name):
327
 
    import os
328
 
    import shutil
329
 
    
330
 
    TestBase.ORIG_DIR = os.getcwdu()
331
 
    TestBase.TEST_ROOT = os.path.abspath(name + '.tmp')
332
 
 
333
 
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
334
 
 
335
 
    if os.path.exists(TestBase.TEST_ROOT):
336
 
        shutil.rmtree(TestBase.TEST_ROOT)
337
 
    os.mkdir(TestBase.TEST_ROOT)
338
 
    os.chdir(TestBase.TEST_ROOT)
339
 
 
340
 
    # make a fake bzr directory there to prevent any tests propagating
341
 
    # up onto the source directory's real branch
342
 
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
343
 
 
344
 
    
345
 
 
346
 
def _show_results(result):
347
 
     print
348
 
     print '%4d tests run' % result.testsRun
349
 
     print '%4d errors' % len(result.errors)
350
 
     print '%4d failures' % len(result.failures)
351
 
 
352
 
 
353
 
 
354
 
def _show_test_failure(kind, case, exc_info, out):
355
 
    from traceback import print_exception
356
 
 
357
 
    print >>out
358
 
    print >>out, '-' * 60
359
 
    print >>out, case
360
 
    
361
 
    desc = case.shortDescription()
362
 
    if desc:
363
 
        print >>out, '   (%s)' % desc
364
 
         
365
 
    print_exception(exc_info[0], exc_info[1], exc_info[2], None, out)
366
 
        
367
 
    if isinstance(case, TestBase):
368
 
        print >>out
369
 
        print >>out, 'log from this test:'
370
 
        print >>out, case._log_buf
371
 
         
372
 
    print >>out, '-' * 60
373
 
    
374