~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Robert Collins
  • Date: 2005-08-23 06:52:09 UTC
  • mto: (974.1.50) (1185.1.10) (1092.3.1)
  • mto: This revision was merged to the branch mainline in revision 1139.
  • Revision ID: robertc@robertcollins.net-20050823065209-81cd5962c401751b
move io redirection into each test case from the global runner

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
 
40
40
import unittest
41
41
import sys
42
 
from bzrlib.selftest import TestUtil
43
42
 
44
43
# XXX: Don't need this anymore now we depend on python2.4
45
44
def _need_subprocess():
56
55
    # XXX: Not used yet
57
56
 
58
57
 
59
 
 
60
 
class EarlyStoppingTestResultAdapter(object):
61
 
    """An adapter for TestResult to stop at the first first failure or error"""
62
 
 
63
 
    def __init__(self, result):
64
 
        self._result = result
65
 
 
66
 
    def addError(self, test, err):
67
 
        self._result.addError(test, err)
68
 
        self._result.stop()
69
 
 
70
 
    def addFailure(self, test, err):
71
 
        self._result.addFailure(test, err)
72
 
        self._result.stop()
73
 
 
74
 
    def __getattr__(self, name):
75
 
        return getattr(self._result, name)
76
 
 
77
 
    def __setattr__(self, name, value):
78
 
        if name == '_result':
79
 
            object.__setattr__(self, name, value)
80
 
        return setattr(self._result, name, value)
 
58
class TestCase(unittest.TestCase):
 
59
    """Base class for bzr test cases.
 
60
 
 
61
    Just defines some useful helper functions; doesn't actually test
 
62
    anything.
 
63
    """
 
64
    
 
65
    # TODO: Special methods to invoke bzr, so that we can run it
 
66
    # through a specified Python intepreter
 
67
 
 
68
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
 
69
    BZRPATH = 'bzr'
 
70
 
 
71
    _log_buf = ""
 
72
 
 
73
 
 
74
    def setUp(self):
 
75
        super(TestCase, self).setUp()
 
76
        # save stdout & stderr so there's no leakage from code-under-test
 
77
        self.real_stdout = sys.stdout
 
78
        self.real_stderr = sys.stderr
 
79
        sys.stdout = sys.stderr = TestCase.TEST_LOG
 
80
        self.log("%s setup" % self.id())
 
81
 
 
82
    def tearDown(self):
 
83
        sys.stdout = self.real_stdout
 
84
        sys.stderr = self.real_stderr
 
85
        self.log("%s teardown" % self.id())
 
86
        self.log('')
 
87
        super(TestCase, self).tearDown()
 
88
 
 
89
    def formcmd(self, cmd):
 
90
        if isinstance(cmd, basestring):
 
91
            cmd = cmd.split()
 
92
 
 
93
        if cmd[0] == 'bzr':
 
94
            cmd[0] = self.BZRPATH
 
95
            if self.OVERRIDE_PYTHON:
 
96
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
97
 
 
98
        self.log('$ %r' % cmd)
 
99
 
 
100
        return cmd
 
101
 
 
102
 
 
103
    def runcmd(self, cmd, retcode=0):
 
104
        """Run one command and check the return code.
 
105
 
 
106
        Returns a tuple of (stdout,stderr) strings.
 
107
 
 
108
        If a single string is based, it is split into words.
 
109
        For commands that are not simple space-separated words, please
 
110
        pass a list instead."""
 
111
        try:
 
112
            import shutil
 
113
            from subprocess import call
 
114
        except ImportError, e:
 
115
            _need_subprocess()
 
116
            raise
 
117
 
 
118
 
 
119
        cmd = self.formcmd(cmd)
 
120
 
 
121
        self.log('$ ' + ' '.join(cmd))
 
122
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
123
 
 
124
        if retcode != actual_retcode:
 
125
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
126
                                % (cmd, actual_retcode, retcode))
 
127
 
 
128
 
 
129
    def backtick(self, cmd, retcode=0):
 
130
        """Run a command and return its output"""
 
131
        try:
 
132
            import shutil
 
133
            from subprocess import Popen, PIPE
 
134
        except ImportError, e:
 
135
            _need_subprocess()
 
136
            raise
 
137
 
 
138
        cmd = self.formcmd(cmd)
 
139
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
 
140
        outd, errd = child.communicate()
 
141
        self.log(outd)
 
142
        actual_retcode = child.wait()
 
143
 
 
144
        outd = outd.replace('\r', '')
 
145
 
 
146
        if retcode != actual_retcode:
 
147
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
148
                                % (cmd, actual_retcode, retcode))
 
149
 
 
150
        return outd
 
151
 
 
152
 
 
153
 
 
154
    def build_tree(self, shape):
 
155
        """Build a test tree according to a pattern.
 
156
 
 
157
        shape is a sequence of file specifications.  If the final
 
158
        character is '/', a directory is created.
 
159
 
 
160
        This doesn't add anything to a branch.
 
161
        """
 
162
        # XXX: It's OK to just create them using forward slashes on windows?
 
163
        import os
 
164
        for name in shape:
 
165
            assert isinstance(name, basestring)
 
166
            if name[-1] == '/':
 
167
                os.mkdir(name[:-1])
 
168
            else:
 
169
                f = file(name, 'wt')
 
170
                print >>f, "contents of", name
 
171
                f.close()
 
172
 
 
173
 
 
174
    def log(self, msg):
 
175
        """Log a message to a progress file"""
 
176
        # XXX: The problem with this is that code that writes straight
 
177
        # to the log file won't be shown when we display the log
 
178
        # buffer; would be better to not have the in-memory buffer and
 
179
        # instead just a log file per test, which is read in and
 
180
        # displayed if the test fails.  That seems to imply one log
 
181
        # per test case, not globally.  OK?
 
182
        self._log_buf = self._log_buf + str(msg) + '\n'
 
183
        print >>self.TEST_LOG, msg
 
184
 
 
185
 
 
186
    def check_inventory_shape(self, inv, shape):
 
187
        """
 
188
        Compare an inventory to a list of expected names.
 
189
 
 
190
        Fail if they are not precisely equal.
 
191
        """
 
192
        extras = []
 
193
        shape = list(shape)             # copy
 
194
        for path, ie in inv.entries():
 
195
            name = path.replace('\\', '/')
 
196
            if ie.kind == 'dir':
 
197
                name = name + '/'
 
198
            if name in shape:
 
199
                shape.remove(name)
 
200
            else:
 
201
                extras.append(name)
 
202
        if shape:
 
203
            self.fail("expected paths not found in inventory: %r" % shape)
 
204
        if extras:
 
205
            self.fail("unexpected paths found in inventory: %r" % extras)
 
206
 
 
207
 
 
208
    def check_file_contents(self, filename, expect):
 
209
        self.log("check contents of file %s" % filename)
 
210
        contents = file(filename, 'r').read()
 
211
        if contents != expect:
 
212
            self.log("expected: %r" % expect)
 
213
            self.log("actually: %r" % contents)
 
214
            self.fail("contents of %s not as expected")
 
215
            
 
216
 
 
217
 
 
218
class InTempDir(TestCase):
 
219
    """Base class for tests run in a temporary branch."""
 
220
    def setUp(self):
 
221
        super(InTempDir, self).setUp()
 
222
        import os
 
223
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
224
        os.mkdir(self.test_dir)
 
225
        os.chdir(self.test_dir)
 
226
        
 
227
    def tearDown(self):
 
228
        import os
 
229
        os.chdir(self.TEST_ROOT)
 
230
        super(InTempDir, self).tearDown()
81
231
 
82
232
 
83
233
class _MyResult(unittest._TextTestResult):
86
236
 
87
237
    No special behaviour for now.
88
238
    """
 
239
    def __init__(self, out, style):
 
240
        super(_MyResult, self).__init__(out, False, 0)
 
241
        self.out = out
 
242
        assert style in ('none', 'progress', 'verbose')
 
243
        self.style = style
89
244
 
90
245
    def startTest(self, test):
91
 
        unittest.TestResult.startTest(self, test)
 
246
        super(_MyResult, self).startTest(test)
92
247
        # TODO: Maybe show test.shortDescription somewhere?
93
248
        what = test.id()
94
249
        # python2.3 has the bad habit of just "runit" for doctests
95
250
        if what == 'runit':
96
251
            what = test.shortDescription()
97
 
        if self.showAll:
98
 
            self.stream.write('%-60.60s' % what)
99
 
        self.stream.flush()
 
252
        if self.style == 'verbose':
 
253
            print >>self.out, '%-60.60s' % what,
 
254
            self.out.flush()
100
255
 
101
256
    def addError(self, test, err):
 
257
        if self.style == 'verbose':
 
258
            print >>self.out, 'ERROR'
 
259
        elif self.style == 'progress':
 
260
            self.stream.write('E')
 
261
        self.stream.flush()
102
262
        super(_MyResult, self).addError(test, err)
103
 
        self.stream.flush()
104
263
 
105
264
    def addFailure(self, test, err):
 
265
        if self.style == 'verbose':
 
266
            print >>self.out, 'FAILURE'
 
267
        elif self.style == 'progress':
 
268
            self.stream.write('F')
 
269
        self.stream.flush()
106
270
        super(_MyResult, self).addFailure(test, err)
107
 
        self.stream.flush()
108
271
 
109
272
    def addSuccess(self, test):
110
 
        if self.showAll:
111
 
            self.stream.writeln('OK')
112
 
        elif self.dots:
 
273
        if self.style == 'verbose':
 
274
            print >>self.out, 'OK'
 
275
        elif self.style == 'progress':
113
276
            self.stream.write('~')
114
277
        self.stream.flush()
115
 
        unittest.TestResult.addSuccess(self, test)
 
278
        super(_MyResult, self).addSuccess(test)
 
279
 
 
280
    def printErrors(self):
 
281
        if self.style == 'progress':
 
282
            self.stream.writeln()
 
283
        super(_MyResult, self).printErrors()
116
284
 
117
285
    def printErrorList(self, flavour, errors):
118
286
        for test, err in errors:
120
288
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
121
289
            self.stream.writeln(self.separator2)
122
290
            self.stream.writeln("%s" % err)
123
 
            if hasattr(test, '_get_log'):
 
291
            if isinstance(test, TestCase):
124
292
                self.stream.writeln()
125
293
                self.stream.writeln('log from this test:')
126
 
                print >>self.stream, test._get_log()
 
294
                print >>self.stream, test._log_buf
 
295
 
 
296
 
 
297
class TestSuite(unittest.TestSuite):
 
298
    
 
299
    def __init__(self, tests=(), name='test'):
 
300
        super(TestSuite, self).__init__(tests)
 
301
        self._name = name
 
302
 
 
303
    def run(self, result):
 
304
        import os
 
305
        import shutil
 
306
        import time
 
307
        
 
308
        self._setup_test_log()
 
309
        self._setup_test_dir()
 
310
        print
 
311
    
 
312
        return super(TestSuite,self).run(result)
 
313
 
 
314
    def _setup_test_log(self):
 
315
        import time
 
316
        import os
 
317
        
 
318
        log_filename = os.path.abspath(self._name + '.log')
 
319
        # line buffered
 
320
        TestCase.TEST_LOG = open(log_filename, 'wt', buffering=1)
 
321
    
 
322
        print >>TestCase.TEST_LOG, "tests run at " + time.ctime()
 
323
        print '%-30s %s' % ('test log', log_filename)
 
324
 
 
325
    def _setup_test_dir(self):
 
326
        import os
 
327
        import shutil
 
328
        
 
329
        TestCase.ORIG_DIR = os.getcwdu()
 
330
        TestCase.TEST_ROOT = os.path.abspath(self._name + '.tmp')
 
331
    
 
332
        print '%-30s %s' % ('running tests in', TestCase.TEST_ROOT)
 
333
    
 
334
        if os.path.exists(TestCase.TEST_ROOT):
 
335
            shutil.rmtree(TestCase.TEST_ROOT)
 
336
        os.mkdir(TestCase.TEST_ROOT)
 
337
        os.chdir(TestCase.TEST_ROOT)
 
338
    
 
339
        # make a fake bzr directory there to prevent any tests propagating
 
340
        # up onto the source directory's real branch
 
341
        os.mkdir(os.path.join(TestCase.TEST_ROOT, '.bzr'))
127
342
 
128
343
 
129
344
class TextTestRunner(unittest.TextTestRunner):
130
345
 
 
346
    def __init__(self, stream=sys.stderr, descriptions=1, verbosity=0, style='progress'):
 
347
        super(TextTestRunner, self).__init__(stream, descriptions, verbosity)
 
348
        self.style = style
 
349
 
131
350
    def _makeResult(self):
132
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
133
 
        return EarlyStoppingTestResultAdapter(result)
134
 
 
135
 
 
136
 
class filteringVisitor(TestUtil.TestVisitor):
137
 
    """I accruse all the testCases I visit that pass a regexp filter on id
138
 
    into my suite
139
 
    """
140
 
 
141
 
    def __init__(self, filter):
142
 
        import re
143
 
        TestUtil.TestVisitor.__init__(self)
144
 
        self._suite=None
145
 
        self.filter=re.compile(filter)
146
 
 
147
 
    def suite(self):
148
 
        """answer the suite we are building"""
149
 
        if self._suite is None:
150
 
            self._suite=TestUtil.TestSuite()
151
 
        return self._suite
152
 
 
153
 
    def visitCase(self, aCase):
154
 
        if self.filter.match(aCase.id()):
155
 
            self.suite().addTest(aCase)
156
 
 
157
 
 
158
 
def run_suite(suite, name='test', verbose=False, pattern=".*"):
159
 
    import shutil
160
 
    from bzrlib.selftest import TestCaseInTempDir
161
 
    TestCaseInTempDir._TEST_NAME = name
 
351
        return _MyResult(self.stream, self.style)
 
352
 
 
353
    # If we want the old 4 line summary output (count, 0 failures, 0 errors)
 
354
    # we can override run() too.
 
355
 
 
356
 
 
357
def run_suite(a_suite, name='test', verbose=False):
 
358
    suite = TestSuite((a_suite,),name)
162
359
    if verbose:
163
 
        verbosity = 2
164
 
    else:
165
 
        verbosity = 1
166
 
    runner = TextTestRunner(stream=sys.stdout,
167
 
                            descriptions=0,
168
 
                            verbosity=verbosity)
169
 
    visitor = filteringVisitor(pattern)
170
 
    suite.visit(visitor)
171
 
    result = runner.run(visitor.suite())
172
 
    # This is still a little bogus, 
173
 
    # but only a little. Folk not using our testrunner will
174
 
    # have to delete their temp directories themselves.
175
 
    if result.wasSuccessful():
176
 
        if TestCaseInTempDir.TEST_ROOT:
177
 
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
178
 
    else:
179
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
360
        style = 'verbose'
 
361
    else:
 
362
        style = 'progress'
 
363
    runner = TextTestRunner(stream=sys.stdout, style=style)
 
364
    result = runner.run(suite)
180
365
    return result.wasSuccessful()