~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Robert Collins
  • Date: 2005-08-23 06:52:09 UTC
  • mto: (974.1.50) (1185.1.10) (1092.3.1)
  • mto: This revision was merged to the branch mainline in revision 1139.
  • Revision ID: robertc@robertcollins.net-20050823065209-81cd5962c401751b
move io redirection into each test case from the global runner

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
 
40
40
import unittest
41
41
import sys
42
 
from bzrlib.selftest import TestUtil
43
42
 
44
43
# XXX: Don't need this anymore now we depend on python2.4
45
44
def _need_subprocess():
57
56
 
58
57
 
59
58
class TestCase(unittest.TestCase):
60
 
    """Base class for bzr unit tests.
61
 
    
62
 
    Tests that need access to disk resources should subclass 
63
 
    FunctionalTestCase not TestCase.
 
59
    """Base class for bzr test cases.
 
60
 
 
61
    Just defines some useful helper functions; doesn't actually test
 
62
    anything.
64
63
    """
65
64
    
66
65
    # TODO: Special methods to invoke bzr, so that we can run it
69
68
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
70
69
    BZRPATH = 'bzr'
71
70
 
72
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
73
 
                         a_callable=None, *args, **kwargs):
74
 
        """Call callable with redirected std io pipes.
 
71
    _log_buf = ""
75
72
 
76
 
        Returns the return code."""
77
 
        from StringIO import StringIO
78
 
        if not callable(a_callable):
79
 
            raise ValueError("a_callable must be callable.")
80
 
        if stdin is None:
81
 
            stdin = StringIO("")
82
 
        if stdout is None:
83
 
            stdout = self.TEST_LOG
84
 
        if stderr is None:
85
 
            stderr = self.TEST_LOG
86
 
        real_stdin = sys.stdin
87
 
        real_stdout = sys.stdout
88
 
        real_stderr = sys.stderr
89
 
        result = None
90
 
        try:
91
 
            sys.stdout = stdout
92
 
            sys.stderr = stderr
93
 
            sys.stdin = stdin
94
 
            result = a_callable(*args, **kwargs)
95
 
        finally:
96
 
            sys.stdout = real_stdout
97
 
            sys.stderr = real_stderr
98
 
            sys.stdin = real_stdin
99
 
        return result
100
73
 
101
74
    def setUp(self):
102
75
        super(TestCase, self).setUp()
103
 
        # setup a temporary log for the test 
104
 
        import tempfile
105
 
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
 
76
        # save stdout & stderr so there's no leakage from code-under-test
 
77
        self.real_stdout = sys.stdout
 
78
        self.real_stderr = sys.stderr
 
79
        sys.stdout = sys.stderr = TestCase.TEST_LOG
106
80
        self.log("%s setup" % self.id())
107
81
 
108
82
    def tearDown(self):
 
83
        sys.stdout = self.real_stdout
 
84
        sys.stderr = self.real_stderr
109
85
        self.log("%s teardown" % self.id())
110
86
        self.log('')
111
87
        super(TestCase, self).tearDown()
112
88
 
113
 
    def log(self, msg):
114
 
        """Log a message to a progress file"""
115
 
        print >>self.TEST_LOG, msg
116
 
 
117
 
    def check_inventory_shape(self, inv, shape):
118
 
        """
119
 
        Compare an inventory to a list of expected names.
120
 
 
121
 
        Fail if they are not precisely equal.
122
 
        """
123
 
        extras = []
124
 
        shape = list(shape)             # copy
125
 
        for path, ie in inv.entries():
126
 
            name = path.replace('\\', '/')
127
 
            if ie.kind == 'dir':
128
 
                name = name + '/'
129
 
            if name in shape:
130
 
                shape.remove(name)
131
 
            else:
132
 
                extras.append(name)
133
 
        if shape:
134
 
            self.fail("expected paths not found in inventory: %r" % shape)
135
 
        if extras:
136
 
            self.fail("unexpected paths found in inventory: %r" % extras)
137
 
     
138
 
    def _get_log(self):
139
 
        """Get the log the test case used. This can only be called once,
140
 
        after which an exception will be raised.
141
 
        """
142
 
        self.TEST_LOG.flush()
143
 
        log = open(self.TEST_LOG.name, 'rt').read()
144
 
        self.TEST_LOG.close()
145
 
        return log
146
 
 
147
 
 
148
 
class FunctionalTestCase(TestCase):
149
 
    """Base class for tests that perform function testing - running bzr,
150
 
    using files on disk, and similar activities.
151
 
 
152
 
    InTempDir is an old alias for FunctionalTestCase.
153
 
    """
154
 
 
155
 
    TEST_ROOT = None
156
 
    _TEST_NAME = 'test'
157
 
 
158
 
    def check_file_contents(self, filename, expect):
159
 
        self.log("check contents of file %s" % filename)
160
 
        contents = file(filename, 'r').read()
161
 
        if contents != expect:
162
 
            self.log("expected: %r" % expect)
163
 
            self.log("actually: %r" % contents)
164
 
            self.fail("contents of %s not as expected")
165
 
 
166
 
    def _make_test_root(self):
167
 
        import os
168
 
        import shutil
169
 
        import tempfile
170
 
        
171
 
        if FunctionalTestCase.TEST_ROOT is not None:
172
 
            return
173
 
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
174
 
                                 tempfile.mkdtemp(suffix='.tmp',
175
 
                                                  prefix=self._TEST_NAME + '-',
176
 
                                                  dir=os.curdir))
177
 
    
178
 
        # make a fake bzr directory there to prevent any tests propagating
179
 
        # up onto the source directory's real branch
180
 
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
181
 
 
182
 
    def setUp(self):
183
 
        super(FunctionalTestCase, self).setUp()
184
 
        import os
185
 
        self._make_test_root()
186
 
        self._currentdir = os.getcwdu()
187
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
188
 
        os.mkdir(self.test_dir)
189
 
        os.chdir(self.test_dir)
190
 
        
191
 
    def tearDown(self):
192
 
        import os
193
 
        os.chdir(self._currentdir)
194
 
        super(FunctionalTestCase, self).tearDown()
195
 
 
196
89
    def formcmd(self, cmd):
197
90
        if isinstance(cmd, basestring):
198
91
            cmd = cmd.split()
 
92
 
199
93
        if cmd[0] == 'bzr':
200
94
            cmd[0] = self.BZRPATH
201
95
            if self.OVERRIDE_PYTHON:
202
96
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
97
 
203
98
        self.log('$ %r' % cmd)
 
99
 
204
100
        return cmd
205
101
 
 
102
 
206
103
    def runcmd(self, cmd, retcode=0):
207
104
        """Run one command and check the return code.
208
105
 
217
114
        except ImportError, e:
218
115
            _need_subprocess()
219
116
            raise
 
117
 
 
118
 
220
119
        cmd = self.formcmd(cmd)
 
120
 
221
121
        self.log('$ ' + ' '.join(cmd))
222
122
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
123
 
223
124
        if retcode != actual_retcode:
224
125
            raise CommandFailed("test failed: %r returned %d, expected %d"
225
126
                                % (cmd, actual_retcode, retcode))
226
127
 
 
128
 
227
129
    def backtick(self, cmd, retcode=0):
228
130
        """Run a command and return its output"""
229
131
        try:
232
134
        except ImportError, e:
233
135
            _need_subprocess()
234
136
            raise
 
137
 
235
138
        cmd = self.formcmd(cmd)
236
139
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
237
140
        outd, errd = child.communicate()
238
141
        self.log(outd)
239
142
        actual_retcode = child.wait()
 
143
 
240
144
        outd = outd.replace('\r', '')
 
145
 
241
146
        if retcode != actual_retcode:
242
147
            raise CommandFailed("test failed: %r returned %d, expected %d"
243
148
                                % (cmd, actual_retcode, retcode))
 
149
 
244
150
        return outd
245
151
 
 
152
 
 
153
 
246
154
    def build_tree(self, shape):
247
155
        """Build a test tree according to a pattern.
248
156
 
261
169
                f = file(name, 'wt')
262
170
                print >>f, "contents of", name
263
171
                f.close()
264
 
                
265
 
InTempDir = FunctionalTestCase
266
 
 
267
 
 
268
 
class EarlyStoppingTestResultAdapter(object):
269
 
    """An adapter for TestResult to stop at the first first failure or error"""
270
 
 
271
 
    def __init__(self, result):
272
 
        self._result = result
273
 
 
274
 
    def addError(self, test, err):
275
 
        self._result.addError(test, err)
276
 
        self._result.stop()
277
 
 
278
 
    def addFailure(self, test, err):
279
 
        self._result.addFailure(test, err)
280
 
        self._result.stop()
281
 
 
282
 
    def __getattr__(self, name):
283
 
        return getattr(self._result, name)
284
 
 
285
 
    def __setattr__(self, name, value):
286
 
        if name == '_result':
287
 
            object.__setattr__(self, name, value)
288
 
        return setattr(self._result, name, value)
 
172
 
 
173
 
 
174
    def log(self, msg):
 
175
        """Log a message to a progress file"""
 
176
        # XXX: The problem with this is that code that writes straight
 
177
        # to the log file won't be shown when we display the log
 
178
        # buffer; would be better to not have the in-memory buffer and
 
179
        # instead just a log file per test, which is read in and
 
180
        # displayed if the test fails.  That seems to imply one log
 
181
        # per test case, not globally.  OK?
 
182
        self._log_buf = self._log_buf + str(msg) + '\n'
 
183
        print >>self.TEST_LOG, msg
 
184
 
 
185
 
 
186
    def check_inventory_shape(self, inv, shape):
 
187
        """
 
188
        Compare an inventory to a list of expected names.
 
189
 
 
190
        Fail if they are not precisely equal.
 
191
        """
 
192
        extras = []
 
193
        shape = list(shape)             # copy
 
194
        for path, ie in inv.entries():
 
195
            name = path.replace('\\', '/')
 
196
            if ie.kind == 'dir':
 
197
                name = name + '/'
 
198
            if name in shape:
 
199
                shape.remove(name)
 
200
            else:
 
201
                extras.append(name)
 
202
        if shape:
 
203
            self.fail("expected paths not found in inventory: %r" % shape)
 
204
        if extras:
 
205
            self.fail("unexpected paths found in inventory: %r" % extras)
 
206
 
 
207
 
 
208
    def check_file_contents(self, filename, expect):
 
209
        self.log("check contents of file %s" % filename)
 
210
        contents = file(filename, 'r').read()
 
211
        if contents != expect:
 
212
            self.log("expected: %r" % expect)
 
213
            self.log("actually: %r" % contents)
 
214
            self.fail("contents of %s not as expected")
 
215
            
 
216
 
 
217
 
 
218
class InTempDir(TestCase):
 
219
    """Base class for tests run in a temporary branch."""
 
220
    def setUp(self):
 
221
        super(InTempDir, self).setUp()
 
222
        import os
 
223
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
224
        os.mkdir(self.test_dir)
 
225
        os.chdir(self.test_dir)
 
226
        
 
227
    def tearDown(self):
 
228
        import os
 
229
        os.chdir(self.TEST_ROOT)
 
230
        super(InTempDir, self).tearDown()
289
231
 
290
232
 
291
233
class _MyResult(unittest._TextTestResult):
294
236
 
295
237
    No special behaviour for now.
296
238
    """
 
239
    def __init__(self, out, style):
 
240
        super(_MyResult, self).__init__(out, False, 0)
 
241
        self.out = out
 
242
        assert style in ('none', 'progress', 'verbose')
 
243
        self.style = style
297
244
 
298
245
    def startTest(self, test):
299
 
        unittest.TestResult.startTest(self, test)
 
246
        super(_MyResult, self).startTest(test)
300
247
        # TODO: Maybe show test.shortDescription somewhere?
301
248
        what = test.id()
302
249
        # python2.3 has the bad habit of just "runit" for doctests
303
250
        if what == 'runit':
304
251
            what = test.shortDescription()
305
 
        if self.showAll:
306
 
            self.stream.write('%-60.60s' % what)
307
 
        self.stream.flush()
 
252
        if self.style == 'verbose':
 
253
            print >>self.out, '%-60.60s' % what,
 
254
            self.out.flush()
308
255
 
309
256
    def addError(self, test, err):
 
257
        if self.style == 'verbose':
 
258
            print >>self.out, 'ERROR'
 
259
        elif self.style == 'progress':
 
260
            self.stream.write('E')
 
261
        self.stream.flush()
310
262
        super(_MyResult, self).addError(test, err)
311
 
        self.stream.flush()
312
263
 
313
264
    def addFailure(self, test, err):
 
265
        if self.style == 'verbose':
 
266
            print >>self.out, 'FAILURE'
 
267
        elif self.style == 'progress':
 
268
            self.stream.write('F')
 
269
        self.stream.flush()
314
270
        super(_MyResult, self).addFailure(test, err)
315
 
        self.stream.flush()
316
271
 
317
272
    def addSuccess(self, test):
318
 
        if self.showAll:
319
 
            self.stream.writeln('OK')
320
 
        elif self.dots:
 
273
        if self.style == 'verbose':
 
274
            print >>self.out, 'OK'
 
275
        elif self.style == 'progress':
321
276
            self.stream.write('~')
322
277
        self.stream.flush()
323
 
        unittest.TestResult.addSuccess(self, test)
 
278
        super(_MyResult, self).addSuccess(test)
 
279
 
 
280
    def printErrors(self):
 
281
        if self.style == 'progress':
 
282
            self.stream.writeln()
 
283
        super(_MyResult, self).printErrors()
324
284
 
325
285
    def printErrorList(self, flavour, errors):
326
286
        for test, err in errors:
331
291
            if isinstance(test, TestCase):
332
292
                self.stream.writeln()
333
293
                self.stream.writeln('log from this test:')
334
 
                print >>self.stream, test._get_log()
 
294
                print >>self.stream, test._log_buf
 
295
 
 
296
 
 
297
class TestSuite(unittest.TestSuite):
 
298
    
 
299
    def __init__(self, tests=(), name='test'):
 
300
        super(TestSuite, self).__init__(tests)
 
301
        self._name = name
 
302
 
 
303
    def run(self, result):
 
304
        import os
 
305
        import shutil
 
306
        import time
 
307
        
 
308
        self._setup_test_log()
 
309
        self._setup_test_dir()
 
310
        print
 
311
    
 
312
        return super(TestSuite,self).run(result)
 
313
 
 
314
    def _setup_test_log(self):
 
315
        import time
 
316
        import os
 
317
        
 
318
        log_filename = os.path.abspath(self._name + '.log')
 
319
        # line buffered
 
320
        TestCase.TEST_LOG = open(log_filename, 'wt', buffering=1)
 
321
    
 
322
        print >>TestCase.TEST_LOG, "tests run at " + time.ctime()
 
323
        print '%-30s %s' % ('test log', log_filename)
 
324
 
 
325
    def _setup_test_dir(self):
 
326
        import os
 
327
        import shutil
 
328
        
 
329
        TestCase.ORIG_DIR = os.getcwdu()
 
330
        TestCase.TEST_ROOT = os.path.abspath(self._name + '.tmp')
 
331
    
 
332
        print '%-30s %s' % ('running tests in', TestCase.TEST_ROOT)
 
333
    
 
334
        if os.path.exists(TestCase.TEST_ROOT):
 
335
            shutil.rmtree(TestCase.TEST_ROOT)
 
336
        os.mkdir(TestCase.TEST_ROOT)
 
337
        os.chdir(TestCase.TEST_ROOT)
 
338
    
 
339
        # make a fake bzr directory there to prevent any tests propagating
 
340
        # up onto the source directory's real branch
 
341
        os.mkdir(os.path.join(TestCase.TEST_ROOT, '.bzr'))
335
342
 
336
343
 
337
344
class TextTestRunner(unittest.TextTestRunner):
338
345
 
 
346
    def __init__(self, stream=sys.stderr, descriptions=1, verbosity=0, style='progress'):
 
347
        super(TextTestRunner, self).__init__(stream, descriptions, verbosity)
 
348
        self.style = style
 
349
 
339
350
    def _makeResult(self):
340
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
341
 
        return EarlyStoppingTestResultAdapter(result)
342
 
 
343
 
 
344
 
class filteringVisitor(TestUtil.TestVisitor):
345
 
    """I accruse all the testCases I visit that pass a regexp filter on id
346
 
    into my suite
347
 
    """
348
 
 
349
 
    def __init__(self, filter):
350
 
        import re
351
 
        TestUtil.TestVisitor.__init__(self)
352
 
        self._suite=None
353
 
        self.filter=re.compile(filter)
354
 
 
355
 
    def suite(self):
356
 
        """answer the suite we are building"""
357
 
        if self._suite is None:
358
 
            self._suite=TestUtil.TestSuite()
359
 
        return self._suite
360
 
 
361
 
    def visitCase(self, aCase):
362
 
        if self.filter.match(aCase.id()):
363
 
            self.suite().addTest(aCase)
364
 
 
365
 
 
366
 
def run_suite(suite, name='test', verbose=False, pattern=".*"):
367
 
    import shutil
368
 
    FunctionalTestCase._TEST_NAME = name
 
351
        return _MyResult(self.stream, self.style)
 
352
 
 
353
    # If we want the old 4 line summary output (count, 0 failures, 0 errors)
 
354
    # we can override run() too.
 
355
 
 
356
 
 
357
def run_suite(a_suite, name='test', verbose=False):
 
358
    suite = TestSuite((a_suite,),name)
369
359
    if verbose:
370
 
        verbosity = 2
371
 
    else:
372
 
        verbosity = 1
373
 
    runner = TextTestRunner(stream=sys.stdout,
374
 
                            descriptions=0,
375
 
                            verbosity=verbosity)
376
 
    visitor = filteringVisitor(pattern)
377
 
    suite.visit(visitor)
378
 
    result = runner.run(visitor.suite())
379
 
    # This is still a little bogus, 
380
 
    # but only a little. Folk not using our testrunner will
381
 
    # have to delete their temp directories themselves.
382
 
    if result.wasSuccessful():
383
 
        if FunctionalTestCase.TEST_ROOT is not None:
384
 
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
385
 
    else:
386
 
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
 
360
        style = 'verbose'
 
361
    else:
 
362
        style = 'progress'
 
363
    runner = TextTestRunner(stream=sys.stdout, style=style)
 
364
    result = runner.run(suite)
387
365
    return result.wasSuccessful()