~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: John Arbash Meinel
  • Date: 2005-09-16 14:57:59 UTC
  • mto: (1393.2.1)
  • mto: This revision was merged to the branch mainline in revision 1396.
  • Revision ID: john@arbash-meinel.com-20050916145758-ad06c9ae86840f17
Merged up-to-date against mainline, still broken.

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
* utilities to run external commands and check their return code
31
31
  and/or output
32
32
 
33
 
Test cases should normally subclass TestBase.  The test runner should
 
33
Test cases should normally subclass testsweet.TestCase.  The test runner should
34
34
call runsuite().
35
35
 
36
36
This is meant to become independent of bzr, though that's not quite
37
37
true yet.
38
38
"""  
39
39
 
40
 
 
41
 
from unittest import TestResult, TestCase
42
 
 
 
40
import unittest
 
41
import sys
 
42
from bzrlib.selftest import TestUtil
 
43
 
 
44
# XXX: Don't need this anymore now we depend on python2.4
43
45
def _need_subprocess():
44
46
    sys.stderr.write("sorry, this test suite requires the subprocess module\n"
45
47
                     "this is shipped with python2.4 and available separately for 2.3\n")
49
51
    pass
50
52
 
51
53
 
52
 
 
53
54
class TestSkipped(Exception):
54
55
    """Indicates that a test was intentionally skipped, rather than failing."""
55
56
    # XXX: Not used yet
56
57
 
57
58
 
58
 
class TestBase(TestCase):
59
 
    """Base class for bzr test cases.
60
 
 
61
 
    Just defines some useful helper functions; doesn't actually test
62
 
    anything.
63
 
    """
64
 
    
65
 
    # TODO: Special methods to invoke bzr, so that we can run it
66
 
    # through a specified Python intepreter
67
 
 
68
 
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
69
 
    BZRPATH = 'bzr'
70
 
 
71
 
    _log_buf = ""
72
 
 
73
 
 
74
 
    def setUp(self):
75
 
        super(TestBase, self).setUp()
76
 
        self.log("%s setup" % self.id())
77
 
 
78
 
 
79
 
    def tearDown(self):
80
 
        super(TestBase, self).tearDown()
81
 
        self.log("%s teardown" % self.id())
82
 
        self.log('')
83
 
 
84
 
 
85
 
    def formcmd(self, cmd):
86
 
        if isinstance(cmd, basestring):
87
 
            cmd = cmd.split()
88
 
 
89
 
        if cmd[0] == 'bzr':
90
 
            cmd[0] = self.BZRPATH
91
 
            if self.OVERRIDE_PYTHON:
92
 
                cmd.insert(0, self.OVERRIDE_PYTHON)
93
 
 
94
 
        self.log('$ %r' % cmd)
95
 
 
96
 
        return cmd
97
 
 
98
 
 
99
 
    def runcmd(self, cmd, retcode=0):
100
 
        """Run one command and check the return code.
101
 
 
102
 
        Returns a tuple of (stdout,stderr) strings.
103
 
 
104
 
        If a single string is based, it is split into words.
105
 
        For commands that are not simple space-separated words, please
106
 
        pass a list instead."""
107
 
        try:
108
 
            import shutil
109
 
            from subprocess import call
110
 
        except ImportError, e:
111
 
            _need_subprocess()
112
 
            raise
113
 
 
114
 
 
115
 
        cmd = self.formcmd(cmd)
116
 
 
117
 
        self.log('$ ' + ' '.join(cmd))
118
 
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
119
 
 
120
 
        if retcode != actual_retcode:
121
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
122
 
                                % (cmd, actual_retcode, retcode))
123
 
 
124
 
 
125
 
    def backtick(self, cmd, retcode=0):
126
 
        """Run a command and return its output"""
127
 
        try:
128
 
            import shutil
129
 
            from subprocess import Popen, PIPE
130
 
        except ImportError, e:
131
 
            _need_subprocess()
132
 
            raise
133
 
 
134
 
        cmd = self.formcmd(cmd)
135
 
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
136
 
        outd, errd = child.communicate()
137
 
        self.log(outd)
138
 
        actual_retcode = child.wait()
139
 
 
140
 
        outd = outd.replace('\r', '')
141
 
 
142
 
        if retcode != actual_retcode:
143
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
144
 
                                % (cmd, actual_retcode, retcode))
145
 
 
146
 
        return outd
147
 
 
148
 
 
149
 
 
150
 
    def build_tree(self, shape):
151
 
        """Build a test tree according to a pattern.
152
 
 
153
 
        shape is a sequence of file specifications.  If the final
154
 
        character is '/', a directory is created.
155
 
 
156
 
        This doesn't add anything to a branch.
157
 
        """
158
 
        # XXX: It's OK to just create them using forward slashes on windows?
159
 
        import os
160
 
        for name in shape:
161
 
            assert isinstance(name, basestring)
162
 
            if name[-1] == '/':
163
 
                os.mkdir(name[:-1])
164
 
            else:
165
 
                f = file(name, 'wt')
166
 
                print >>f, "contents of", name
167
 
                f.close()
168
 
 
169
 
 
170
 
    def log(self, msg):
171
 
        """Log a message to a progress file"""
172
 
        self._log_buf = self._log_buf + str(msg) + '\n'
173
 
        print >>self.TEST_LOG, msg
174
 
 
175
 
 
176
 
    def check_inventory_shape(self, inv, shape):
177
 
        """
178
 
        Compare an inventory to a list of expected names.
179
 
 
180
 
        Fail if they are not precisely equal.
181
 
        """
182
 
        extras = []
183
 
        shape = list(shape)             # copy
184
 
        for path, ie in inv.entries():
185
 
            name = path.replace('\\', '/')
186
 
            if ie.kind == 'dir':
187
 
                name = name + '/'
188
 
            if name in shape:
189
 
                shape.remove(name)
190
 
            else:
191
 
                extras.append(name)
192
 
        if shape:
193
 
            self.fail("expected paths not found in inventory: %r" % shape)
194
 
        if extras:
195
 
            self.fail("unexpected paths found in inventory: %r" % extras)
196
 
 
197
 
 
198
 
    def check_file_contents(self, filename, expect):
199
 
        self.log("check contents of file %s" % filename)
200
 
        contents = file(filename, 'r').read()
201
 
        if contents != expect:
202
 
            self.log("expected: %r" % expect)
203
 
            self.log("actually: %r" % contents)
204
 
            self.fail("contents of %s not as expected")
205
 
            
206
 
 
207
 
 
208
 
class InTempDir(TestBase):
209
 
    """Base class for tests run in a temporary branch."""
210
 
    def setUp(self):
211
 
        import os
212
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
213
 
        os.mkdir(self.test_dir)
214
 
        os.chdir(self.test_dir)
215
 
        
216
 
    def tearDown(self):
217
 
        import os
218
 
        os.chdir(self.TEST_ROOT)
219
 
 
220
 
 
221
 
 
222
 
 
223
 
 
224
 
class _MyResult(TestResult):
 
59
class EarlyStoppingTestResultAdapter(object):
 
60
    """An adapter for TestResult to stop at the first first failure or error"""
 
61
 
 
62
    def __init__(self, result):
 
63
        self._result = result
 
64
 
 
65
    def addError(self, test, err):
 
66
        self._result.addError(test, err)
 
67
        self._result.stop()
 
68
 
 
69
    def addFailure(self, test, err):
 
70
        self._result.addFailure(test, err)
 
71
        self._result.stop()
 
72
 
 
73
    def __getattr__(self, name):
 
74
        return getattr(self._result, name)
 
75
 
 
76
    def __setattr__(self, name, value):
 
77
        if name == '_result':
 
78
            object.__setattr__(self, name, value)
 
79
        return setattr(self._result, name, value)
 
80
 
 
81
 
 
82
class _MyResult(unittest._TextTestResult):
225
83
    """
226
84
    Custom TestResult.
227
85
 
228
86
    No special behaviour for now.
229
87
    """
230
 
    def __init__(self, out, style):
231
 
        self.out = out
232
 
        TestResult.__init__(self)
233
 
        assert style in ('none', 'progress', 'verbose')
234
 
        self.style = style
235
 
 
236
88
 
237
89
    def startTest(self, test):
 
90
        unittest.TestResult.startTest(self, test)
238
91
        # TODO: Maybe show test.shortDescription somewhere?
239
92
        what = test.id()
240
93
        # python2.3 has the bad habit of just "runit" for doctests
241
94
        if what == 'runit':
242
95
            what = test.shortDescription()
243
 
        
244
 
        if self.style == 'verbose':
245
 
            print >>self.out, '%-60.60s' % what,
246
 
            self.out.flush()
247
 
        elif self.style == 'progress':
248
 
            self.out.write('~')
249
 
            self.out.flush()
250
 
        TestResult.startTest(self, test)
251
 
 
252
 
 
253
 
    def stopTest(self, test):
254
 
        # print
255
 
        TestResult.stopTest(self, test)
256
 
 
 
96
        if self.showAll:
 
97
            self.stream.write('%-60.60s' % what)
 
98
        self.stream.flush()
257
99
 
258
100
    def addError(self, test, err):
259
 
        if self.style == 'verbose':
260
 
            print >>self.out, 'ERROR'
261
 
        TestResult.addError(self, test, err)
262
 
        _show_test_failure('error', test, err, self.out)
 
101
        super(_MyResult, self).addError(test, err)
 
102
        self.stream.flush()
263
103
 
264
104
    def addFailure(self, test, err):
265
 
        if self.style == 'verbose':
266
 
            print >>self.out, 'FAILURE'
267
 
        TestResult.addFailure(self, test, err)
268
 
        _show_test_failure('failure', test, err, self.out)
 
105
        super(_MyResult, self).addFailure(test, err)
 
106
        self.stream.flush()
269
107
 
270
108
    def addSuccess(self, test):
271
 
        if self.style == 'verbose':
272
 
            print >>self.out, 'OK'
273
 
        TestResult.addSuccess(self, test)
274
 
 
275
 
 
276
 
 
277
 
def run_suite(suite, name='test', verbose=False):
278
 
    import os
 
109
        if self.showAll:
 
110
            self.stream.writeln('OK')
 
111
        elif self.dots:
 
112
            self.stream.write('~')
 
113
        self.stream.flush()
 
114
        unittest.TestResult.addSuccess(self, test)
 
115
 
 
116
    def printErrorList(self, flavour, errors):
 
117
        for test, err in errors:
 
118
            self.stream.writeln(self.separator1)
 
119
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
 
120
            self.stream.writeln(self.separator2)
 
121
            self.stream.writeln("%s" % err)
 
122
            if hasattr(test, '_get_log'):
 
123
                self.stream.writeln()
 
124
                self.stream.writeln('log from this test:')
 
125
                print >>self.stream, test._get_log()
 
126
 
 
127
 
 
128
class TextTestRunner(unittest.TextTestRunner):
 
129
 
 
130
    def _makeResult(self):
 
131
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
132
        return EarlyStoppingTestResultAdapter(result)
 
133
 
 
134
 
 
135
class filteringVisitor(TestUtil.TestVisitor):
 
136
    """I accruse all the testCases I visit that pass a regexp filter on id
 
137
    into my suite
 
138
    """
 
139
 
 
140
    def __init__(self, filter):
 
141
        import re
 
142
        TestUtil.TestVisitor.__init__(self)
 
143
        self._suite=None
 
144
        self.filter=re.compile(filter)
 
145
 
 
146
    def suite(self):
 
147
        """answer the suite we are building"""
 
148
        if self._suite is None:
 
149
            self._suite=TestUtil.TestSuite()
 
150
        return self._suite
 
151
 
 
152
    def visitCase(self, aCase):
 
153
        if self.filter.match(aCase.id()):
 
154
            self.suite().addTest(aCase)
 
155
 
 
156
 
 
157
def run_suite(suite, name='test', verbose=False, pattern=".*"):
279
158
    import shutil
280
 
    import time
281
 
    import sys
282
 
    
283
 
    _setup_test_log(name)
284
 
    _setup_test_dir(name)
285
 
    print
286
 
 
287
 
    # save stdout & stderr so there's no leakage from code-under-test
288
 
    real_stdout = sys.stdout
289
 
    real_stderr = sys.stderr
290
 
    sys.stdout = sys.stderr = TestBase.TEST_LOG
291
 
    try:
292
 
        if verbose:
293
 
            style = 'verbose'
294
 
        else:
295
 
            style = 'progress'
296
 
        result = _MyResult(real_stdout, style)
297
 
        suite.run(result)
298
 
    finally:
299
 
        sys.stdout = real_stdout
300
 
        sys.stderr = real_stderr
301
 
 
302
 
    _show_results(result)
303
 
 
 
159
    from bzrlib.selftest import TestCaseInTempDir
 
160
    TestCaseInTempDir._TEST_NAME = name
 
161
    if verbose:
 
162
        verbosity = 2
 
163
    else:
 
164
        verbosity = 1
 
165
    runner = TextTestRunner(stream=sys.stdout,
 
166
                            descriptions=0,
 
167
                            verbosity=verbosity)
 
168
    visitor = filteringVisitor(pattern)
 
169
    suite.visit(visitor)
 
170
    result = runner.run(visitor.suite())
 
171
    # This is still a little bogus, 
 
172
    # but only a little. Folk not using our testrunner will
 
173
    # have to delete their temp directories themselves.
 
174
    if result.wasSuccessful():
 
175
        if TestCaseInTempDir.TEST_ROOT is not None:
 
176
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
177
    else:
 
178
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
304
179
    return result.wasSuccessful()
305
 
 
306
 
 
307
 
 
308
 
def _setup_test_log(name):
309
 
    import time
310
 
    import os
311
 
    
312
 
    log_filename = os.path.abspath(name + '.log')
313
 
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
314
 
 
315
 
    print >>TestBase.TEST_LOG, "tests run at " + time.ctime()
316
 
    print '%-30s %s' % ('test log', log_filename)
317
 
 
318
 
 
319
 
def _setup_test_dir(name):
320
 
    import os
321
 
    import shutil
322
 
    
323
 
    TestBase.ORIG_DIR = os.getcwdu()
324
 
    TestBase.TEST_ROOT = os.path.abspath(name + '.tmp')
325
 
 
326
 
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
327
 
 
328
 
    if os.path.exists(TestBase.TEST_ROOT):
329
 
        shutil.rmtree(TestBase.TEST_ROOT)
330
 
    os.mkdir(TestBase.TEST_ROOT)
331
 
    os.chdir(TestBase.TEST_ROOT)
332
 
 
333
 
    # make a fake bzr directory there to prevent any tests propagating
334
 
    # up onto the source directory's real branch
335
 
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
336
 
 
337
 
    
338
 
 
339
 
def _show_results(result):
340
 
     print
341
 
     print '%4d tests run' % result.testsRun
342
 
     print '%4d errors' % len(result.errors)
343
 
     print '%4d failures' % len(result.failures)
344
 
 
345
 
 
346
 
 
347
 
def _show_test_failure(kind, case, exc_info, out):
348
 
    from traceback import print_exception
349
 
    
350
 
    print >>out, '-' * 60
351
 
    print >>out, case
352
 
    
353
 
    desc = case.shortDescription()
354
 
    if desc:
355
 
        print >>out, '   (%s)' % desc
356
 
         
357
 
    print_exception(exc_info[0], exc_info[1], exc_info[2], None, out)
358
 
        
359
 
    if isinstance(case, TestBase):
360
 
        print >>out
361
 
        print >>out, 'log from this test:'
362
 
        print >>out, case._log_buf
363
 
         
364
 
    print >>out, '-' * 60
365
 
    
366