~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Aaron Bentley
  • Date: 2005-08-25 15:23:10 UTC
  • mto: (1092.1.42) (1185.3.4)
  • mto: This revision was merged to the branch mainline in revision 1178.
  • Revision ID: abentley@panoramicfeedback.com-20050825152310-897f26086c7fb592
Fixed negative revision handling

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
* utilities to run external commands and check their return code
31
31
  and/or output
32
32
 
33
 
Test cases should normally subclass testsweet.TestCase.  The test runner should
 
33
Test cases should normally subclass TestBase.  The test runner should
34
34
call runsuite().
35
35
 
36
36
This is meant to become independent of bzr, though that's not quite
37
37
true yet.
38
38
"""  
39
39
 
40
 
import unittest
41
 
import sys
42
 
from bzrlib.selftest import TestUtil
 
40
 
 
41
from unittest import TestResult, TestCase
43
42
 
44
43
# XXX: Don't need this anymore now we depend on python2.4
45
44
def _need_subprocess():
51
50
    pass
52
51
 
53
52
 
 
53
 
54
54
class TestSkipped(Exception):
55
55
    """Indicates that a test was intentionally skipped, rather than failing."""
56
56
    # XXX: Not used yet
57
57
 
58
58
 
59
 
class TestCase(unittest.TestCase):
60
 
    """Base class for bzr unit tests.
61
 
    
62
 
    Tests that need access to disk resources should subclass 
63
 
    FunctionalTestCase not TestCase.
 
59
class TestBase(TestCase):
 
60
    """Base class for bzr test cases.
 
61
 
 
62
    Just defines some useful helper functions; doesn't actually test
 
63
    anything.
64
64
    """
65
65
    
66
66
    # TODO: Special methods to invoke bzr, so that we can run it
69
69
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
70
70
    BZRPATH = 'bzr'
71
71
 
72
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
73
 
                         a_callable=None, *args, **kwargs):
74
 
        """Call callable with redirected std io pipes.
 
72
    _log_buf = ""
75
73
 
76
 
        Returns the return code."""
77
 
        from StringIO import StringIO
78
 
        if not callable(a_callable):
79
 
            raise ValueError("a_callable must be callable.")
80
 
        if stdin is None:
81
 
            stdin = StringIO("")
82
 
        if stdout is None:
83
 
            stdout = self.TEST_LOG
84
 
        if stderr is None:
85
 
            stderr = self.TEST_LOG
86
 
        real_stdin = sys.stdin
87
 
        real_stdout = sys.stdout
88
 
        real_stderr = sys.stderr
89
 
        result = None
90
 
        try:
91
 
            sys.stdout = stdout
92
 
            sys.stderr = stderr
93
 
            sys.stdin = stdin
94
 
            result = a_callable(*args, **kwargs)
95
 
        finally:
96
 
            sys.stdout = real_stdout
97
 
            sys.stderr = real_stderr
98
 
            sys.stdin = real_stdin
99
 
        return result
100
74
 
101
75
    def setUp(self):
102
 
        super(TestCase, self).setUp()
103
 
        # setup a temporary log for the test 
104
 
        import tempfile
105
 
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
 
76
        super(TestBase, self).setUp()
106
77
        self.log("%s setup" % self.id())
107
78
 
 
79
 
108
80
    def tearDown(self):
 
81
        super(TestBase, self).tearDown()
109
82
        self.log("%s teardown" % self.id())
110
83
        self.log('')
111
 
        super(TestCase, self).tearDown()
112
 
 
113
 
    def log(self, msg):
114
 
        """Log a message to a progress file"""
115
 
        print >>self.TEST_LOG, msg
116
 
 
117
 
    def check_inventory_shape(self, inv, shape):
118
 
        """
119
 
        Compare an inventory to a list of expected names.
120
 
 
121
 
        Fail if they are not precisely equal.
122
 
        """
123
 
        extras = []
124
 
        shape = list(shape)             # copy
125
 
        for path, ie in inv.entries():
126
 
            name = path.replace('\\', '/')
127
 
            if ie.kind == 'dir':
128
 
                name = name + '/'
129
 
            if name in shape:
130
 
                shape.remove(name)
131
 
            else:
132
 
                extras.append(name)
133
 
        if shape:
134
 
            self.fail("expected paths not found in inventory: %r" % shape)
135
 
        if extras:
136
 
            self.fail("unexpected paths found in inventory: %r" % extras)
137
 
     
138
 
    def _get_log(self):
139
 
        """Get the log the test case used. This can only be called once,
140
 
        after which an exception will be raised.
141
 
        """
142
 
        self.TEST_LOG.flush()
143
 
        log = open(self.TEST_LOG.name, 'rt').read()
144
 
        self.TEST_LOG.close()
145
 
        return log
146
 
 
147
 
 
148
 
class FunctionalTestCase(TestCase):
149
 
    """Base class for tests that perform function testing - running bzr,
150
 
    using files on disk, and similar activities.
151
 
 
152
 
    InTempDir is an old alias for FunctionalTestCase.
153
 
    """
154
 
 
155
 
    TEST_ROOT = None
156
 
    _TEST_NAME = 'test'
157
 
 
158
 
    def check_file_contents(self, filename, expect):
159
 
        self.log("check contents of file %s" % filename)
160
 
        contents = file(filename, 'r').read()
161
 
        if contents != expect:
162
 
            self.log("expected: %r" % expect)
163
 
            self.log("actually: %r" % contents)
164
 
            self.fail("contents of %s not as expected")
165
 
 
166
 
    def _make_test_root(self):
167
 
        import os
168
 
        import shutil
169
 
        import tempfile
170
 
        
171
 
        if FunctionalTestCase.TEST_ROOT is not None:
172
 
            return
173
 
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
174
 
                                 tempfile.mkdtemp(suffix='.tmp',
175
 
                                                  prefix=self._TEST_NAME + '-',
176
 
                                                  dir=os.curdir))
177
 
    
178
 
        # make a fake bzr directory there to prevent any tests propagating
179
 
        # up onto the source directory's real branch
180
 
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
181
 
 
182
 
    def setUp(self):
183
 
        super(FunctionalTestCase, self).setUp()
184
 
        import os
185
 
        self._make_test_root()
186
 
        self._currentdir = os.getcwdu()
187
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
188
 
        os.mkdir(self.test_dir)
189
 
        os.chdir(self.test_dir)
190
 
        
191
 
    def tearDown(self):
192
 
        import os
193
 
        os.chdir(self._currentdir)
194
 
        super(FunctionalTestCase, self).tearDown()
 
84
 
195
85
 
196
86
    def formcmd(self, cmd):
197
87
        if isinstance(cmd, basestring):
198
88
            cmd = cmd.split()
 
89
 
199
90
        if cmd[0] == 'bzr':
200
91
            cmd[0] = self.BZRPATH
201
92
            if self.OVERRIDE_PYTHON:
202
93
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
94
 
203
95
        self.log('$ %r' % cmd)
 
96
 
204
97
        return cmd
205
98
 
 
99
 
206
100
    def runcmd(self, cmd, retcode=0):
207
101
        """Run one command and check the return code.
208
102
 
217
111
        except ImportError, e:
218
112
            _need_subprocess()
219
113
            raise
 
114
 
 
115
 
220
116
        cmd = self.formcmd(cmd)
 
117
 
221
118
        self.log('$ ' + ' '.join(cmd))
222
119
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
120
 
223
121
        if retcode != actual_retcode:
224
122
            raise CommandFailed("test failed: %r returned %d, expected %d"
225
123
                                % (cmd, actual_retcode, retcode))
226
124
 
 
125
 
227
126
    def backtick(self, cmd, retcode=0):
228
127
        """Run a command and return its output"""
229
128
        try:
232
131
        except ImportError, e:
233
132
            _need_subprocess()
234
133
            raise
 
134
 
235
135
        cmd = self.formcmd(cmd)
236
136
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
237
137
        outd, errd = child.communicate()
238
138
        self.log(outd)
239
139
        actual_retcode = child.wait()
 
140
 
240
141
        outd = outd.replace('\r', '')
 
142
 
241
143
        if retcode != actual_retcode:
242
144
            raise CommandFailed("test failed: %r returned %d, expected %d"
243
145
                                % (cmd, actual_retcode, retcode))
 
146
 
244
147
        return outd
245
148
 
 
149
 
 
150
 
246
151
    def build_tree(self, shape):
247
152
        """Build a test tree according to a pattern.
248
153
 
261
166
                f = file(name, 'wt')
262
167
                print >>f, "contents of", name
263
168
                f.close()
264
 
                
265
 
InTempDir = FunctionalTestCase
266
 
 
267
 
 
268
 
class EarlyStoppingTestResultAdapter(object):
269
 
    """An adapter for TestResult to stop at the first first failure or error"""
270
 
 
271
 
    def __init__(self, result):
272
 
        self._result = result
273
 
 
274
 
    def addError(self, test, err):
275
 
        self._result.addError(test, err)
276
 
        self._result.stop()
277
 
 
278
 
    def addFailure(self, test, err):
279
 
        self._result.addFailure(test, err)
280
 
        self._result.stop()
281
 
 
282
 
    def __getattr__(self, name):
283
 
        return getattr(self._result, name)
284
 
 
285
 
    def __setattr__(self, name, value):
286
 
        if name == '_result':
287
 
            object.__setattr__(self, name, value)
288
 
        return setattr(self._result, name, value)
289
 
 
290
 
 
291
 
class _MyResult(unittest._TextTestResult):
 
169
 
 
170
 
 
171
    def log(self, msg):
 
172
        """Log a message to a progress file"""
 
173
        # XXX: The problem with this is that code that writes straight
 
174
        # to the log file won't be shown when we display the log
 
175
        # buffer; would be better to not have the in-memory buffer and
 
176
        # instead just a log file per test, which is read in and
 
177
        # displayed if the test fails.  That seems to imply one log
 
178
        # per test case, not globally.  OK?
 
179
        self._log_buf = self._log_buf + str(msg) + '\n'
 
180
        print >>self.TEST_LOG, msg
 
181
 
 
182
 
 
183
    def check_inventory_shape(self, inv, shape):
 
184
        """
 
185
        Compare an inventory to a list of expected names.
 
186
 
 
187
        Fail if they are not precisely equal.
 
188
        """
 
189
        extras = []
 
190
        shape = list(shape)             # copy
 
191
        for path, ie in inv.entries():
 
192
            name = path.replace('\\', '/')
 
193
            if ie.kind == 'dir':
 
194
                name = name + '/'
 
195
            if name in shape:
 
196
                shape.remove(name)
 
197
            else:
 
198
                extras.append(name)
 
199
        if shape:
 
200
            self.fail("expected paths not found in inventory: %r" % shape)
 
201
        if extras:
 
202
            self.fail("unexpected paths found in inventory: %r" % extras)
 
203
 
 
204
 
 
205
    def check_file_contents(self, filename, expect):
 
206
        self.log("check contents of file %s" % filename)
 
207
        contents = file(filename, 'r').read()
 
208
        if contents != expect:
 
209
            self.log("expected: %r" % expect)
 
210
            self.log("actually: %r" % contents)
 
211
            self.fail("contents of %s not as expected")
 
212
            
 
213
 
 
214
 
 
215
class InTempDir(TestBase):
 
216
    """Base class for tests run in a temporary branch."""
 
217
    def setUp(self):
 
218
        import os
 
219
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
220
        os.mkdir(self.test_dir)
 
221
        os.chdir(self.test_dir)
 
222
        
 
223
    def tearDown(self):
 
224
        import os
 
225
        os.chdir(self.TEST_ROOT)
 
226
 
 
227
 
 
228
 
 
229
 
 
230
 
 
231
class _MyResult(TestResult):
292
232
    """
293
233
    Custom TestResult.
294
234
 
295
235
    No special behaviour for now.
296
236
    """
 
237
    def __init__(self, out, style):
 
238
        self.out = out
 
239
        TestResult.__init__(self)
 
240
        assert style in ('none', 'progress', 'verbose')
 
241
        self.style = style
 
242
 
297
243
 
298
244
    def startTest(self, test):
299
 
        unittest.TestResult.startTest(self, test)
300
245
        # TODO: Maybe show test.shortDescription somewhere?
301
246
        what = test.id()
302
247
        # python2.3 has the bad habit of just "runit" for doctests
303
248
        if what == 'runit':
304
249
            what = test.shortDescription()
305
 
        if self.showAll:
306
 
            self.stream.write('%-60.60s' % what)
307
 
        self.stream.flush()
 
250
        
 
251
        if self.style == 'verbose':
 
252
            print >>self.out, '%-60.60s' % what,
 
253
            self.out.flush()
 
254
        elif self.style == 'progress':
 
255
            self.out.write('~')
 
256
            self.out.flush()
 
257
        TestResult.startTest(self, test)
 
258
 
 
259
 
 
260
    def stopTest(self, test):
 
261
        # print
 
262
        TestResult.stopTest(self, test)
 
263
 
308
264
 
309
265
    def addError(self, test, err):
310
 
        super(_MyResult, self).addError(test, err)
311
 
        self.stream.flush()
 
266
        if self.style == 'verbose':
 
267
            print >>self.out, 'ERROR'
 
268
        TestResult.addError(self, test, err)
 
269
        _show_test_failure('error', test, err, self.out)
312
270
 
313
271
    def addFailure(self, test, err):
314
 
        super(_MyResult, self).addFailure(test, err)
315
 
        self.stream.flush()
 
272
        if self.style == 'verbose':
 
273
            print >>self.out, 'FAILURE'
 
274
        TestResult.addFailure(self, test, err)
 
275
        _show_test_failure('failure', test, err, self.out)
316
276
 
317
277
    def addSuccess(self, test):
318
 
        if self.showAll:
319
 
            self.stream.writeln('OK')
320
 
        elif self.dots:
321
 
            self.stream.write('~')
322
 
        self.stream.flush()
323
 
        unittest.TestResult.addSuccess(self, test)
324
 
 
325
 
    def printErrorList(self, flavour, errors):
326
 
        for test, err in errors:
327
 
            self.stream.writeln(self.separator1)
328
 
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
329
 
            self.stream.writeln(self.separator2)
330
 
            self.stream.writeln("%s" % err)
331
 
            if isinstance(test, TestCase):
332
 
                self.stream.writeln()
333
 
                self.stream.writeln('log from this test:')
334
 
                print >>self.stream, test._get_log()
335
 
 
336
 
 
337
 
class TextTestRunner(unittest.TextTestRunner):
338
 
 
339
 
    def _makeResult(self):
340
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
341
 
        return EarlyStoppingTestResultAdapter(result)
342
 
 
343
 
 
344
 
class filteringVisitor(TestUtil.TestVisitor):
345
 
    """I accruse all the testCases I visit that pass a regexp filter on id
346
 
    into my suite
347
 
    """
348
 
 
349
 
    def __init__(self, filter):
350
 
        import re
351
 
        TestUtil.TestVisitor.__init__(self)
352
 
        self._suite=None
353
 
        self.filter=re.compile(filter)
354
 
 
355
 
    def suite(self):
356
 
        """answer the suite we are building"""
357
 
        if self._suite is None:
358
 
            self._suite=TestUtil.TestSuite()
359
 
        return self._suite
360
 
 
361
 
    def visitCase(self, aCase):
362
 
        if self.filter.match(aCase.id()):
363
 
            self.suite().addTest(aCase)
364
 
 
365
 
 
366
 
def run_suite(suite, name='test', verbose=False, pattern=".*"):
 
278
        if self.style == 'verbose':
 
279
            print >>self.out, 'OK'
 
280
        TestResult.addSuccess(self, test)
 
281
 
 
282
 
 
283
 
 
284
def run_suite(suite, name='test', verbose=False):
 
285
    import os
367
286
    import shutil
368
 
    FunctionalTestCase._TEST_NAME = name
369
 
    if verbose:
370
 
        verbosity = 2
371
 
    else:
372
 
        verbosity = 1
373
 
    runner = TextTestRunner(stream=sys.stdout,
374
 
                            descriptions=0,
375
 
                            verbosity=verbosity)
376
 
    visitor = filteringVisitor(pattern)
377
 
    suite.visit(visitor)
378
 
    result = runner.run(visitor.suite())
379
 
    # This is still a little bogus, 
380
 
    # but only a little. Folk not using our testrunner will
381
 
    # have to delete their temp directories themselves.
382
 
    if result.wasSuccessful():
383
 
        if FunctionalTestCase.TEST_ROOT is not None:
384
 
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
385
 
    else:
386
 
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
 
287
    import time
 
288
    import sys
 
289
    
 
290
    _setup_test_log(name)
 
291
    _setup_test_dir(name)
 
292
    print
 
293
 
 
294
    # save stdout & stderr so there's no leakage from code-under-test
 
295
    real_stdout = sys.stdout
 
296
    real_stderr = sys.stderr
 
297
    sys.stdout = sys.stderr = TestBase.TEST_LOG
 
298
    try:
 
299
        if verbose:
 
300
            style = 'verbose'
 
301
        else:
 
302
            style = 'progress'
 
303
        result = _MyResult(real_stdout, style)
 
304
        suite.run(result)
 
305
    finally:
 
306
        sys.stdout = real_stdout
 
307
        sys.stderr = real_stderr
 
308
 
 
309
    _show_results(result)
 
310
 
387
311
    return result.wasSuccessful()
 
312
 
 
313
 
 
314
 
 
315
def _setup_test_log(name):
 
316
    import time
 
317
    import os
 
318
    
 
319
    log_filename = os.path.abspath(name + '.log')
 
320
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
 
321
 
 
322
    print >>TestBase.TEST_LOG, "tests run at " + time.ctime()
 
323
    print '%-30s %s' % ('test log', log_filename)
 
324
 
 
325
 
 
326
def _setup_test_dir(name):
 
327
    import os
 
328
    import shutil
 
329
    
 
330
    TestBase.ORIG_DIR = os.getcwdu()
 
331
    TestBase.TEST_ROOT = os.path.abspath(name + '.tmp')
 
332
 
 
333
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
 
334
 
 
335
    if os.path.exists(TestBase.TEST_ROOT):
 
336
        shutil.rmtree(TestBase.TEST_ROOT)
 
337
    os.mkdir(TestBase.TEST_ROOT)
 
338
    os.chdir(TestBase.TEST_ROOT)
 
339
 
 
340
    # make a fake bzr directory there to prevent any tests propagating
 
341
    # up onto the source directory's real branch
 
342
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
 
343
 
 
344
    
 
345
 
 
346
def _show_results(result):
 
347
     print
 
348
     print '%4d tests run' % result.testsRun
 
349
     print '%4d errors' % len(result.errors)
 
350
     print '%4d failures' % len(result.failures)
 
351
 
 
352
 
 
353
 
 
354
def _show_test_failure(kind, case, exc_info, out):
 
355
    from traceback import print_exception
 
356
 
 
357
    print >>out
 
358
    print >>out, '-' * 60
 
359
    print >>out, case
 
360
    
 
361
    desc = case.shortDescription()
 
362
    if desc:
 
363
        print >>out, '   (%s)' % desc
 
364
         
 
365
    print_exception(exc_info[0], exc_info[1], exc_info[2], None, out)
 
366
        
 
367
    if isinstance(case, TestBase):
 
368
        print >>out
 
369
        print >>out, 'log from this test:'
 
370
        print >>out, case._log_buf
 
371
         
 
372
    print >>out, '-' * 60
 
373
    
 
374