~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Aaron Bentley
  • Date: 2005-07-26 15:25:58 UTC
  • mto: (1092.1.41) (1185.3.4) (974.1.47)
  • mto: This revision was merged to the branch mainline in revision 1020.
  • Revision ID: abentley@panoramicfeedback.com-20050726152558-c47bb2bd0163e1a8
Replaced popen with subprocess in patch..py

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
* utilities to run external commands and check their return code
31
31
  and/or output
32
32
 
33
 
Test cases should normally subclass testsweet.TestCase.  The test runner should
 
33
Test cases should normally subclass TestBase.  The test runner should
34
34
call runsuite().
35
35
 
36
36
This is meant to become independent of bzr, though that's not quite
37
37
true yet.
38
38
"""  
39
39
 
40
 
import unittest
41
 
import sys
42
 
from bzrlib.selftest import TestUtil
43
 
 
44
 
# XXX: Don't need this anymore now we depend on python2.4
 
40
 
 
41
from unittest import TestResult, TestCase
 
42
 
45
43
def _need_subprocess():
46
44
    sys.stderr.write("sorry, this test suite requires the subprocess module\n"
47
45
                     "this is shipped with python2.4 and available separately for 2.3\n")
51
49
    pass
52
50
 
53
51
 
 
52
 
54
53
class TestSkipped(Exception):
55
54
    """Indicates that a test was intentionally skipped, rather than failing."""
56
55
    # XXX: Not used yet
57
56
 
58
57
 
59
 
 
60
 
class EarlyStoppingTestResultAdapter(object):
61
 
    """An adapter for TestResult to stop at the first first failure or error"""
62
 
 
63
 
    def __init__(self, result):
64
 
        self._result = result
65
 
 
66
 
    def addError(self, test, err):
67
 
        self._result.addError(test, err)
68
 
        self._result.stop()
69
 
 
70
 
    def addFailure(self, test, err):
71
 
        self._result.addFailure(test, err)
72
 
        self._result.stop()
73
 
 
74
 
    def __getattr__(self, name):
75
 
        return getattr(self._result, name)
76
 
 
77
 
    def __setattr__(self, name, value):
78
 
        if name == '_result':
79
 
            object.__setattr__(self, name, value)
80
 
        return setattr(self._result, name, value)
81
 
 
82
 
 
83
 
class _MyResult(unittest._TextTestResult):
 
58
class TestBase(TestCase):
 
59
    """Base class for bzr test cases.
 
60
 
 
61
    Just defines some useful helper functions; doesn't actually test
 
62
    anything.
 
63
    """
 
64
    
 
65
    # TODO: Special methods to invoke bzr, so that we can run it
 
66
    # through a specified Python intepreter
 
67
 
 
68
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
 
69
    BZRPATH = 'bzr'
 
70
 
 
71
    _log_buf = ""
 
72
 
 
73
 
 
74
    def setUp(self):
 
75
        super(TestBase, self).setUp()
 
76
        self.log("%s setup" % self.id())
 
77
 
 
78
 
 
79
    def tearDown(self):
 
80
        super(TestBase, self).tearDown()
 
81
        self.log("%s teardown" % self.id())
 
82
        self.log('')
 
83
 
 
84
 
 
85
    def formcmd(self, cmd):
 
86
        if isinstance(cmd, basestring):
 
87
            cmd = cmd.split()
 
88
 
 
89
        if cmd[0] == 'bzr':
 
90
            cmd[0] = self.BZRPATH
 
91
            if self.OVERRIDE_PYTHON:
 
92
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
93
 
 
94
        self.log('$ %r' % cmd)
 
95
 
 
96
        return cmd
 
97
 
 
98
 
 
99
    def runcmd(self, cmd, retcode=0):
 
100
        """Run one command and check the return code.
 
101
 
 
102
        Returns a tuple of (stdout,stderr) strings.
 
103
 
 
104
        If a single string is based, it is split into words.
 
105
        For commands that are not simple space-separated words, please
 
106
        pass a list instead."""
 
107
        try:
 
108
            import shutil
 
109
            from subprocess import call
 
110
        except ImportError, e:
 
111
            _need_subprocess()
 
112
            raise
 
113
 
 
114
 
 
115
        cmd = self.formcmd(cmd)
 
116
 
 
117
        self.log('$ ' + ' '.join(cmd))
 
118
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
119
 
 
120
        if retcode != actual_retcode:
 
121
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
122
                                % (cmd, actual_retcode, retcode))
 
123
 
 
124
 
 
125
    def backtick(self, cmd, retcode=0):
 
126
        """Run a command and return its output"""
 
127
        try:
 
128
            import shutil
 
129
            from subprocess import Popen, PIPE
 
130
        except ImportError, e:
 
131
            _need_subprocess()
 
132
            raise
 
133
 
 
134
        cmd = self.formcmd(cmd)
 
135
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
 
136
        outd, errd = child.communicate()
 
137
        self.log(outd)
 
138
        actual_retcode = child.wait()
 
139
 
 
140
        outd = outd.replace('\r', '')
 
141
 
 
142
        if retcode != actual_retcode:
 
143
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
144
                                % (cmd, actual_retcode, retcode))
 
145
 
 
146
        return outd
 
147
 
 
148
 
 
149
 
 
150
    def build_tree(self, shape):
 
151
        """Build a test tree according to a pattern.
 
152
 
 
153
        shape is a sequence of file specifications.  If the final
 
154
        character is '/', a directory is created.
 
155
 
 
156
        This doesn't add anything to a branch.
 
157
        """
 
158
        # XXX: It's OK to just create them using forward slashes on windows?
 
159
        import os
 
160
        for name in shape:
 
161
            assert isinstance(name, basestring)
 
162
            if name[-1] == '/':
 
163
                os.mkdir(name[:-1])
 
164
            else:
 
165
                f = file(name, 'wt')
 
166
                print >>f, "contents of", name
 
167
                f.close()
 
168
 
 
169
 
 
170
    def log(self, msg):
 
171
        """Log a message to a progress file"""
 
172
        self._log_buf = self._log_buf + str(msg) + '\n'
 
173
        print >>self.TEST_LOG, msg
 
174
 
 
175
 
 
176
    def check_inventory_shape(self, inv, shape):
 
177
        """
 
178
        Compare an inventory to a list of expected names.
 
179
 
 
180
        Fail if they are not precisely equal.
 
181
        """
 
182
        extras = []
 
183
        shape = list(shape)             # copy
 
184
        for path, ie in inv.entries():
 
185
            name = path.replace('\\', '/')
 
186
            if ie.kind == 'dir':
 
187
                name = name + '/'
 
188
            if name in shape:
 
189
                shape.remove(name)
 
190
            else:
 
191
                extras.append(name)
 
192
        if shape:
 
193
            self.fail("expected paths not found in inventory: %r" % shape)
 
194
        if extras:
 
195
            self.fail("unexpected paths found in inventory: %r" % extras)
 
196
 
 
197
 
 
198
    def check_file_contents(self, filename, expect):
 
199
        self.log("check contents of file %s" % filename)
 
200
        contents = file(filename, 'r').read()
 
201
        if contents != expect:
 
202
            self.log("expected: %r" % expect)
 
203
            self.log("actually: %r" % contents)
 
204
            self.fail("contents of %s not as expected")
 
205
            
 
206
 
 
207
 
 
208
class InTempDir(TestBase):
 
209
    """Base class for tests run in a temporary branch."""
 
210
    def setUp(self):
 
211
        import os
 
212
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
213
        os.mkdir(self.test_dir)
 
214
        os.chdir(self.test_dir)
 
215
        
 
216
    def tearDown(self):
 
217
        import os
 
218
        os.chdir(self.TEST_ROOT)
 
219
 
 
220
 
 
221
 
 
222
 
 
223
 
 
224
class _MyResult(TestResult):
84
225
    """
85
226
    Custom TestResult.
86
227
 
87
228
    No special behaviour for now.
88
229
    """
 
230
    def __init__(self, out, style):
 
231
        self.out = out
 
232
        TestResult.__init__(self)
 
233
        assert style in ('none', 'progress', 'verbose')
 
234
        self.style = style
 
235
 
89
236
 
90
237
    def startTest(self, test):
91
 
        unittest.TestResult.startTest(self, test)
92
238
        # TODO: Maybe show test.shortDescription somewhere?
93
239
        what = test.id()
94
240
        # python2.3 has the bad habit of just "runit" for doctests
95
241
        if what == 'runit':
96
242
            what = test.shortDescription()
97
 
        if self.showAll:
98
 
            self.stream.write('%-60.60s' % what)
99
 
        self.stream.flush()
 
243
        
 
244
        if self.style == 'verbose':
 
245
            print >>self.out, '%-60.60s' % what,
 
246
            self.out.flush()
 
247
        elif self.style == 'progress':
 
248
            self.out.write('~')
 
249
            self.out.flush()
 
250
        TestResult.startTest(self, test)
 
251
 
 
252
 
 
253
    def stopTest(self, test):
 
254
        # print
 
255
        TestResult.stopTest(self, test)
 
256
 
100
257
 
101
258
    def addError(self, test, err):
102
 
        super(_MyResult, self).addError(test, err)
103
 
        self.stream.flush()
 
259
        if self.style == 'verbose':
 
260
            print >>self.out, 'ERROR'
 
261
        TestResult.addError(self, test, err)
 
262
        _show_test_failure('error', test, err, self.out)
104
263
 
105
264
    def addFailure(self, test, err):
106
 
        super(_MyResult, self).addFailure(test, err)
107
 
        self.stream.flush()
 
265
        if self.style == 'verbose':
 
266
            print >>self.out, 'FAILURE'
 
267
        TestResult.addFailure(self, test, err)
 
268
        _show_test_failure('failure', test, err, self.out)
108
269
 
109
270
    def addSuccess(self, test):
110
 
        if self.showAll:
111
 
            self.stream.writeln('OK')
112
 
        elif self.dots:
113
 
            self.stream.write('~')
114
 
        self.stream.flush()
115
 
        unittest.TestResult.addSuccess(self, test)
116
 
 
117
 
    def printErrorList(self, flavour, errors):
118
 
        for test, err in errors:
119
 
            self.stream.writeln(self.separator1)
120
 
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
121
 
            self.stream.writeln(self.separator2)
122
 
            self.stream.writeln("%s" % err)
123
 
            if hasattr(test, '_get_log'):
124
 
                self.stream.writeln()
125
 
                self.stream.writeln('log from this test:')
126
 
                print >>self.stream, test._get_log()
127
 
 
128
 
 
129
 
class TextTestRunner(unittest.TextTestRunner):
130
 
 
131
 
    def _makeResult(self):
132
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
133
 
        return EarlyStoppingTestResultAdapter(result)
134
 
 
135
 
 
136
 
class filteringVisitor(TestUtil.TestVisitor):
137
 
    """I accruse all the testCases I visit that pass a regexp filter on id
138
 
    into my suite
139
 
    """
140
 
 
141
 
    def __init__(self, filter):
142
 
        import re
143
 
        TestUtil.TestVisitor.__init__(self)
144
 
        self._suite=None
145
 
        self.filter=re.compile(filter)
146
 
 
147
 
    def suite(self):
148
 
        """answer the suite we are building"""
149
 
        if self._suite is None:
150
 
            self._suite=TestUtil.TestSuite()
151
 
        return self._suite
152
 
 
153
 
    def visitCase(self, aCase):
154
 
        if self.filter.match(aCase.id()):
155
 
            self.suite().addTest(aCase)
156
 
 
157
 
 
158
 
def run_suite(suite, name='test', verbose=False, pattern=".*"):
 
271
        if self.style == 'verbose':
 
272
            print >>self.out, 'OK'
 
273
        TestResult.addSuccess(self, test)
 
274
 
 
275
 
 
276
 
 
277
def run_suite(suite, name='test', verbose=False):
 
278
    import os
159
279
    import shutil
160
 
    from bzrlib.selftest import TestCaseInTempDir
161
 
    TestCaseInTempDir._TEST_NAME = name
162
 
    if verbose:
163
 
        verbosity = 2
164
 
    else:
165
 
        verbosity = 1
166
 
    runner = TextTestRunner(stream=sys.stdout,
167
 
                            descriptions=0,
168
 
                            verbosity=verbosity)
169
 
    visitor = filteringVisitor(pattern)
170
 
    suite.visit(visitor)
171
 
    result = runner.run(visitor.suite())
172
 
    # This is still a little bogus, 
173
 
    # but only a little. Folk not using our testrunner will
174
 
    # have to delete their temp directories themselves.
175
 
    if result.wasSuccessful():
176
 
        if TestCaseInTempDir.TEST_ROOT:
177
 
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
178
 
    else:
179
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
280
    import time
 
281
    import sys
 
282
    
 
283
    _setup_test_log(name)
 
284
    _setup_test_dir(name)
 
285
    print
 
286
 
 
287
    # save stdout & stderr so there's no leakage from code-under-test
 
288
    real_stdout = sys.stdout
 
289
    real_stderr = sys.stderr
 
290
    sys.stdout = sys.stderr = TestBase.TEST_LOG
 
291
    try:
 
292
        if verbose:
 
293
            style = 'verbose'
 
294
        else:
 
295
            style = 'progress'
 
296
        result = _MyResult(real_stdout, style)
 
297
        suite.run(result)
 
298
    finally:
 
299
        sys.stdout = real_stdout
 
300
        sys.stderr = real_stderr
 
301
 
 
302
    _show_results(result)
 
303
 
180
304
    return result.wasSuccessful()
 
305
 
 
306
 
 
307
 
 
308
def _setup_test_log(name):
 
309
    import time
 
310
    import os
 
311
    
 
312
    log_filename = os.path.abspath(name + '.log')
 
313
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
 
314
 
 
315
    print >>TestBase.TEST_LOG, "tests run at " + time.ctime()
 
316
    print '%-30s %s' % ('test log', log_filename)
 
317
 
 
318
 
 
319
def _setup_test_dir(name):
 
320
    import os
 
321
    import shutil
 
322
    
 
323
    TestBase.ORIG_DIR = os.getcwdu()
 
324
    TestBase.TEST_ROOT = os.path.abspath(name + '.tmp')
 
325
 
 
326
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
 
327
 
 
328
    if os.path.exists(TestBase.TEST_ROOT):
 
329
        shutil.rmtree(TestBase.TEST_ROOT)
 
330
    os.mkdir(TestBase.TEST_ROOT)
 
331
    os.chdir(TestBase.TEST_ROOT)
 
332
 
 
333
    # make a fake bzr directory there to prevent any tests propagating
 
334
    # up onto the source directory's real branch
 
335
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
 
336
 
 
337
    
 
338
 
 
339
def _show_results(result):
 
340
     print
 
341
     print '%4d tests run' % result.testsRun
 
342
     print '%4d errors' % len(result.errors)
 
343
     print '%4d failures' % len(result.failures)
 
344
 
 
345
 
 
346
 
 
347
def _show_test_failure(kind, case, exc_info, out):
 
348
    from traceback import print_exception
 
349
    
 
350
    print >>out, '-' * 60
 
351
    print >>out, case
 
352
    
 
353
    desc = case.shortDescription()
 
354
    if desc:
 
355
        print >>out, '   (%s)' % desc
 
356
         
 
357
    print_exception(exc_info[0], exc_info[1], exc_info[2], None, out)
 
358
        
 
359
    if isinstance(case, TestBase):
 
360
        print >>out
 
361
        print >>out, 'log from this test:'
 
362
        print >>out, case._log_buf
 
363
         
 
364
    print >>out, '-' * 60
 
365
    
 
366