~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: Robert Collins
  • Date: 2005-08-24 12:13:13 UTC
  • mto: (974.1.50) (1185.1.10) (1092.3.1)
  • mto: This revision was merged to the branch mainline in revision 1139.
  • Revision ID: robertc@robertcollins.net-20050824121313-f604a90d56310911
merge up with mpool

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
 
40
40
import unittest
41
41
import sys
 
42
from bzrlib.selftest import TestUtil
42
43
 
43
44
# XXX: Don't need this anymore now we depend on python2.4
44
45
def _need_subprocess():
56
57
 
57
58
 
58
59
class TestCase(unittest.TestCase):
59
 
    """Base class for bzr test cases.
60
 
 
61
 
    Just defines some useful helper functions; doesn't actually test
62
 
    anything.
 
60
    """Base class for bzr unit tests.
 
61
    
 
62
    Tests that need access to disk resources should subclass 
 
63
    FunctionalTestCase not TestCase.
63
64
    """
64
65
    
65
66
    # TODO: Special methods to invoke bzr, so that we can run it
68
69
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
69
70
    BZRPATH = 'bzr'
70
71
 
71
 
    _log_buf = ""
 
72
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
73
                         a_callable=None, *args, **kwargs):
 
74
        """Call callable with redirected std io pipes.
72
75
 
 
76
        Returns the return code."""
 
77
        from StringIO import StringIO
 
78
        if not callable(a_callable):
 
79
            raise ValueError("a_callable must be callable.")
 
80
        if stdin is None:
 
81
            stdin = StringIO("")
 
82
        if stdout is None:
 
83
            stdout = self.TEST_LOG
 
84
        if stderr is None:
 
85
            stderr = self.TEST_LOG
 
86
        real_stdin = sys.stdin
 
87
        real_stdout = sys.stdout
 
88
        real_stderr = sys.stderr
 
89
        result = None
 
90
        try:
 
91
            sys.stdout = stdout
 
92
            sys.stderr = stderr
 
93
            sys.stdin = stdin
 
94
            result = a_callable(*args, **kwargs)
 
95
        finally:
 
96
            sys.stdout = real_stdout
 
97
            sys.stderr = real_stderr
 
98
            sys.stdin = real_stdin
 
99
        return result
73
100
 
74
101
    def setUp(self):
75
102
        super(TestCase, self).setUp()
76
 
        # save stdout & stderr so there's no leakage from code-under-test
77
 
        self.real_stdout = sys.stdout
78
 
        self.real_stderr = sys.stderr
79
 
        sys.stdout = sys.stderr = TestCase.TEST_LOG
 
103
        # setup a temporary log for the test 
 
104
        import tempfile
 
105
        self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
80
106
        self.log("%s setup" % self.id())
81
107
 
82
108
    def tearDown(self):
83
 
        sys.stdout = self.real_stdout
84
 
        sys.stderr = self.real_stderr
85
109
        self.log("%s teardown" % self.id())
86
110
        self.log('')
87
111
        super(TestCase, self).tearDown()
88
112
 
 
113
    def log(self, msg):
 
114
        """Log a message to a progress file"""
 
115
        print >>self.TEST_LOG, msg
 
116
 
 
117
    def check_inventory_shape(self, inv, shape):
 
118
        """
 
119
        Compare an inventory to a list of expected names.
 
120
 
 
121
        Fail if they are not precisely equal.
 
122
        """
 
123
        extras = []
 
124
        shape = list(shape)             # copy
 
125
        for path, ie in inv.entries():
 
126
            name = path.replace('\\', '/')
 
127
            if ie.kind == 'dir':
 
128
                name = name + '/'
 
129
            if name in shape:
 
130
                shape.remove(name)
 
131
            else:
 
132
                extras.append(name)
 
133
        if shape:
 
134
            self.fail("expected paths not found in inventory: %r" % shape)
 
135
        if extras:
 
136
            self.fail("unexpected paths found in inventory: %r" % extras)
 
137
     
 
138
    def _get_log(self):
 
139
        """Get the log the test case used. This can only be called once,
 
140
        after which an exception will be raised.
 
141
        """
 
142
        self.TEST_LOG.flush()
 
143
        log = open(self.TEST_LOG.name, 'rt').read()
 
144
        self.TEST_LOG.close()
 
145
        return log
 
146
 
 
147
 
 
148
class FunctionalTestCase(TestCase):
 
149
    """Base class for tests that perform function testing - running bzr,
 
150
    using files on disk, and similar activities.
 
151
 
 
152
    InTempDir is an old alias for FunctionalTestCase.
 
153
    """
 
154
 
 
155
    TEST_ROOT = None
 
156
    _TEST_NAME = 'test'
 
157
 
 
158
    def check_file_contents(self, filename, expect):
 
159
        self.log("check contents of file %s" % filename)
 
160
        contents = file(filename, 'r').read()
 
161
        if contents != expect:
 
162
            self.log("expected: %r" % expect)
 
163
            self.log("actually: %r" % contents)
 
164
            self.fail("contents of %s not as expected")
 
165
 
 
166
    def _make_test_root(self):
 
167
        import os
 
168
        import shutil
 
169
        import tempfile
 
170
        
 
171
        if FunctionalTestCase.TEST_ROOT is not None:
 
172
            return
 
173
        FunctionalTestCase.TEST_ROOT = os.path.abspath(
 
174
                                 tempfile.mkdtemp(suffix='.tmp',
 
175
                                                  prefix=self._TEST_NAME + '-',
 
176
                                                  dir=os.curdir))
 
177
    
 
178
        # make a fake bzr directory there to prevent any tests propagating
 
179
        # up onto the source directory's real branch
 
180
        os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
 
181
 
 
182
    def setUp(self):
 
183
        super(FunctionalTestCase, self).setUp()
 
184
        import os
 
185
        self._make_test_root()
 
186
        self._currentdir = os.getcwdu()
 
187
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
 
188
        os.mkdir(self.test_dir)
 
189
        os.chdir(self.test_dir)
 
190
        
 
191
    def tearDown(self):
 
192
        import os
 
193
        os.chdir(self._currentdir)
 
194
        super(FunctionalTestCase, self).tearDown()
 
195
 
89
196
    def formcmd(self, cmd):
90
197
        if isinstance(cmd, basestring):
91
198
            cmd = cmd.split()
92
 
 
93
199
        if cmd[0] == 'bzr':
94
200
            cmd[0] = self.BZRPATH
95
201
            if self.OVERRIDE_PYTHON:
96
202
                cmd.insert(0, self.OVERRIDE_PYTHON)
97
 
 
98
203
        self.log('$ %r' % cmd)
99
 
 
100
204
        return cmd
101
205
 
102
 
 
103
206
    def runcmd(self, cmd, retcode=0):
104
207
        """Run one command and check the return code.
105
208
 
114
217
        except ImportError, e:
115
218
            _need_subprocess()
116
219
            raise
117
 
 
118
 
 
119
220
        cmd = self.formcmd(cmd)
120
 
 
121
221
        self.log('$ ' + ' '.join(cmd))
122
222
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
123
 
 
124
223
        if retcode != actual_retcode:
125
224
            raise CommandFailed("test failed: %r returned %d, expected %d"
126
225
                                % (cmd, actual_retcode, retcode))
127
226
 
128
 
 
129
227
    def backtick(self, cmd, retcode=0):
130
228
        """Run a command and return its output"""
131
229
        try:
134
232
        except ImportError, e:
135
233
            _need_subprocess()
136
234
            raise
137
 
 
138
235
        cmd = self.formcmd(cmd)
139
236
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
140
237
        outd, errd = child.communicate()
141
238
        self.log(outd)
142
239
        actual_retcode = child.wait()
143
 
 
144
240
        outd = outd.replace('\r', '')
145
 
 
146
241
        if retcode != actual_retcode:
147
242
            raise CommandFailed("test failed: %r returned %d, expected %d"
148
243
                                % (cmd, actual_retcode, retcode))
149
 
 
150
244
        return outd
151
245
 
152
 
 
153
 
 
154
246
    def build_tree(self, shape):
155
247
        """Build a test tree according to a pattern.
156
248
 
169
261
                f = file(name, 'wt')
170
262
                print >>f, "contents of", name
171
263
                f.close()
172
 
 
173
 
 
174
 
    def log(self, msg):
175
 
        """Log a message to a progress file"""
176
 
        # XXX: The problem with this is that code that writes straight
177
 
        # to the log file won't be shown when we display the log
178
 
        # buffer; would be better to not have the in-memory buffer and
179
 
        # instead just a log file per test, which is read in and
180
 
        # displayed if the test fails.  That seems to imply one log
181
 
        # per test case, not globally.  OK?
182
 
        self._log_buf = self._log_buf + str(msg) + '\n'
183
 
        print >>self.TEST_LOG, msg
184
 
 
185
 
 
186
 
    def check_inventory_shape(self, inv, shape):
187
 
        """
188
 
        Compare an inventory to a list of expected names.
189
 
 
190
 
        Fail if they are not precisely equal.
191
 
        """
192
 
        extras = []
193
 
        shape = list(shape)             # copy
194
 
        for path, ie in inv.entries():
195
 
            name = path.replace('\\', '/')
196
 
            if ie.kind == 'dir':
197
 
                name = name + '/'
198
 
            if name in shape:
199
 
                shape.remove(name)
200
 
            else:
201
 
                extras.append(name)
202
 
        if shape:
203
 
            self.fail("expected paths not found in inventory: %r" % shape)
204
 
        if extras:
205
 
            self.fail("unexpected paths found in inventory: %r" % extras)
206
 
 
207
 
 
208
 
    def check_file_contents(self, filename, expect):
209
 
        self.log("check contents of file %s" % filename)
210
 
        contents = file(filename, 'r').read()
211
 
        if contents != expect:
212
 
            self.log("expected: %r" % expect)
213
 
            self.log("actually: %r" % contents)
214
 
            self.fail("contents of %s not as expected")
215
 
            
216
 
 
217
 
 
218
 
class InTempDir(TestCase):
219
 
    """Base class for tests run in a temporary branch."""
220
 
    def setUp(self):
221
 
        super(InTempDir, self).setUp()
222
 
        import os
223
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
224
 
        os.mkdir(self.test_dir)
225
 
        os.chdir(self.test_dir)
226
 
        
227
 
    def tearDown(self):
228
 
        import os
229
 
        os.chdir(self.TEST_ROOT)
230
 
        super(InTempDir, self).tearDown()
 
264
                
 
265
InTempDir = FunctionalTestCase
 
266
 
 
267
 
 
268
class EarlyStoppingTestResultAdapter(object):
 
269
    """An adapter for TestResult to stop at the first first failure or error"""
 
270
 
 
271
    def __init__(self, result):
 
272
        self._result = result
 
273
 
 
274
    def addError(self, test, err):
 
275
        self._result.addError(test, err)
 
276
        self._result.stop()
 
277
 
 
278
    def addFailure(self, test, err):
 
279
        self._result.addFailure(test, err)
 
280
        self._result.stop()
 
281
 
 
282
    def __getattr__(self, name):
 
283
        return getattr(self._result, name)
 
284
 
 
285
    def __setattr__(self, name, value):
 
286
        if name == '_result':
 
287
            object.__setattr__(self, name, value)
 
288
        return setattr(self._result, name, value)
231
289
 
232
290
 
233
291
class _MyResult(unittest._TextTestResult):
236
294
 
237
295
    No special behaviour for now.
238
296
    """
239
 
    def __init__(self, out, style):
240
 
        super(_MyResult, self).__init__(out, False, 0)
241
 
        self.out = out
242
 
        assert style in ('none', 'progress', 'verbose')
243
 
        self.style = style
244
297
 
245
298
    def startTest(self, test):
246
 
        super(_MyResult, self).startTest(test)
 
299
        unittest.TestResult.startTest(self, test)
247
300
        # TODO: Maybe show test.shortDescription somewhere?
248
301
        what = test.id()
249
302
        # python2.3 has the bad habit of just "runit" for doctests
250
303
        if what == 'runit':
251
304
            what = test.shortDescription()
252
 
        if self.style == 'verbose':
253
 
            print >>self.out, '%-60.60s' % what,
254
 
            self.out.flush()
 
305
        if self.showAll:
 
306
            self.stream.write('%-60.60s' % what)
 
307
        self.stream.flush()
255
308
 
256
309
    def addError(self, test, err):
257
 
        if self.style == 'verbose':
258
 
            print >>self.out, 'ERROR'
259
 
        elif self.style == 'progress':
260
 
            self.stream.write('E')
261
 
        self.stream.flush()
262
310
        super(_MyResult, self).addError(test, err)
 
311
        self.stream.flush()
263
312
 
264
313
    def addFailure(self, test, err):
265
 
        if self.style == 'verbose':
266
 
            print >>self.out, 'FAILURE'
267
 
        elif self.style == 'progress':
268
 
            self.stream.write('F')
269
 
        self.stream.flush()
270
314
        super(_MyResult, self).addFailure(test, err)
 
315
        self.stream.flush()
271
316
 
272
317
    def addSuccess(self, test):
273
 
        if self.style == 'verbose':
274
 
            print >>self.out, 'OK'
275
 
        elif self.style == 'progress':
 
318
        if self.showAll:
 
319
            self.stream.writeln('OK')
 
320
        elif self.dots:
276
321
            self.stream.write('~')
277
322
        self.stream.flush()
278
 
        super(_MyResult, self).addSuccess(test)
279
 
 
280
 
    def printErrors(self):
281
 
        if self.style == 'progress':
282
 
            self.stream.writeln()
283
 
        super(_MyResult, self).printErrors()
 
323
        unittest.TestResult.addSuccess(self, test)
284
324
 
285
325
    def printErrorList(self, flavour, errors):
286
326
        for test, err in errors:
291
331
            if isinstance(test, TestCase):
292
332
                self.stream.writeln()
293
333
                self.stream.writeln('log from this test:')
294
 
                print >>self.stream, test._log_buf
295
 
 
296
 
 
297
 
class TestSuite(unittest.TestSuite):
298
 
    
299
 
    def __init__(self, tests=(), name='test'):
300
 
        super(TestSuite, self).__init__(tests)
301
 
        self._name = name
302
 
 
303
 
    def run(self, result):
304
 
        import os
305
 
        import shutil
306
 
        import time
307
 
        
308
 
        self._setup_test_log()
309
 
        self._setup_test_dir()
310
 
        print
311
 
    
312
 
        return super(TestSuite,self).run(result)
313
 
 
314
 
    def _setup_test_log(self):
315
 
        import time
316
 
        import os
317
 
        
318
 
        log_filename = os.path.abspath(self._name + '.log')
319
 
        # line buffered
320
 
        TestCase.TEST_LOG = open(log_filename, 'wt', buffering=1)
321
 
    
322
 
        print >>TestCase.TEST_LOG, "tests run at " + time.ctime()
323
 
        print '%-30s %s' % ('test log', log_filename)
324
 
 
325
 
    def _setup_test_dir(self):
326
 
        import os
327
 
        import shutil
328
 
        
329
 
        TestCase.ORIG_DIR = os.getcwdu()
330
 
        TestCase.TEST_ROOT = os.path.abspath(self._name + '.tmp')
331
 
    
332
 
        print '%-30s %s' % ('running tests in', TestCase.TEST_ROOT)
333
 
    
334
 
        if os.path.exists(TestCase.TEST_ROOT):
335
 
            shutil.rmtree(TestCase.TEST_ROOT)
336
 
        os.mkdir(TestCase.TEST_ROOT)
337
 
        os.chdir(TestCase.TEST_ROOT)
338
 
    
339
 
        # make a fake bzr directory there to prevent any tests propagating
340
 
        # up onto the source directory's real branch
341
 
        os.mkdir(os.path.join(TestCase.TEST_ROOT, '.bzr'))
 
334
                print >>self.stream, test._get_log()
342
335
 
343
336
 
344
337
class TextTestRunner(unittest.TextTestRunner):
345
338
 
346
 
    def __init__(self, stream=sys.stderr, descriptions=1, verbosity=0, style='progress'):
347
 
        super(TextTestRunner, self).__init__(stream, descriptions, verbosity)
348
 
        self.style = style
349
 
 
350
339
    def _makeResult(self):
351
 
        return _MyResult(self.stream, self.style)
352
 
 
353
 
    # If we want the old 4 line summary output (count, 0 failures, 0 errors)
354
 
    # we can override run() too.
355
 
 
356
 
 
357
 
def run_suite(a_suite, name='test', verbose=False):
358
 
    suite = TestSuite((a_suite,),name)
 
340
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
341
        return EarlyStoppingTestResultAdapter(result)
 
342
 
 
343
 
 
344
class filteringVisitor(TestUtil.TestVisitor):
 
345
    """I accruse all the testCases I visit that pass a regexp filter on id
 
346
    into my suite
 
347
    """
 
348
 
 
349
    def __init__(self, filter):
 
350
        import re
 
351
        TestUtil.TestVisitor.__init__(self)
 
352
        self._suite=None
 
353
        self.filter=re.compile(filter)
 
354
 
 
355
    def suite(self):
 
356
        """answer the suite we are building"""
 
357
        if self._suite is None:
 
358
            self._suite=TestUtil.TestSuite()
 
359
        return self._suite
 
360
 
 
361
    def visitCase(self, aCase):
 
362
        if self.filter.match(aCase.id()):
 
363
            self.suite().addTest(aCase)
 
364
 
 
365
 
 
366
def run_suite(suite, name='test', verbose=False, pattern=".*"):
 
367
    import shutil
 
368
    FunctionalTestCase._TEST_NAME = name
359
369
    if verbose:
360
 
        style = 'verbose'
361
 
    else:
362
 
        style = 'progress'
363
 
    runner = TextTestRunner(stream=sys.stdout, style=style)
364
 
    result = runner.run(suite)
 
370
        verbosity = 2
 
371
    else:
 
372
        verbosity = 1
 
373
    runner = TextTestRunner(stream=sys.stdout,
 
374
                            descriptions=0,
 
375
                            verbosity=verbosity)
 
376
    visitor = filteringVisitor(pattern)
 
377
    suite.visit(visitor)
 
378
    result = runner.run(visitor.suite())
 
379
    # This is still a little bogus, 
 
380
    # but only a little. Folk not using our testrunner will
 
381
    # have to delete their temp directories themselves.
 
382
    if result.wasSuccessful():
 
383
        if FunctionalTestCase.TEST_ROOT is not None:
 
384
            shutil.rmtree(FunctionalTestCase.TEST_ROOT) 
 
385
    else:
 
386
        print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
365
387
    return result.wasSuccessful()