~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to testsweet.py

  • Committer: aaron.bentley at utoronto
  • Date: 2005-09-04 03:32:17 UTC
  • mfrom: (974.1.52)
  • mto: (1185.3.4)
  • mto: This revision was merged to the branch mainline in revision 1178.
  • Revision ID: aaron.bentley@utoronto.ca-20050904033217-821a797652305c14
Disabled urlgrabber

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
 
40
40
import unittest
41
41
import sys
 
42
from bzrlib.selftest import TestUtil
42
43
 
43
44
# XXX: Don't need this anymore now we depend on python2.4
44
45
def _need_subprocess():
55
56
    # XXX: Not used yet
56
57
 
57
58
 
58
 
class TestCase(unittest.TestCase):
59
 
    """Base class for bzr test cases.
60
 
 
61
 
    Just defines some useful helper functions; doesn't actually test
62
 
    anything.
63
 
    """
64
 
    
65
 
    # TODO: Special methods to invoke bzr, so that we can run it
66
 
    # through a specified Python intepreter
67
 
 
68
 
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
69
 
    BZRPATH = 'bzr'
70
 
 
71
 
    _log_buf = ""
72
 
 
73
 
 
74
 
    def setUp(self):
75
 
        super(TestCase, self).setUp()
76
 
        # save stdout & stderr so there's no leakage from code-under-test
77
 
        self.real_stdout = sys.stdout
78
 
        self.real_stderr = sys.stderr
79
 
        sys.stdout = sys.stderr = TestCase.TEST_LOG
80
 
        self.log("%s setup" % self.id())
81
 
 
82
 
    def tearDown(self):
83
 
        sys.stdout = self.real_stdout
84
 
        sys.stderr = self.real_stderr
85
 
        self.log("%s teardown" % self.id())
86
 
        self.log('')
87
 
        super(TestCase, self).tearDown()
88
 
 
89
 
    def formcmd(self, cmd):
90
 
        if isinstance(cmd, basestring):
91
 
            cmd = cmd.split()
92
 
 
93
 
        if cmd[0] == 'bzr':
94
 
            cmd[0] = self.BZRPATH
95
 
            if self.OVERRIDE_PYTHON:
96
 
                cmd.insert(0, self.OVERRIDE_PYTHON)
97
 
 
98
 
        self.log('$ %r' % cmd)
99
 
 
100
 
        return cmd
101
 
 
102
 
 
103
 
    def runcmd(self, cmd, retcode=0):
104
 
        """Run one command and check the return code.
105
 
 
106
 
        Returns a tuple of (stdout,stderr) strings.
107
 
 
108
 
        If a single string is based, it is split into words.
109
 
        For commands that are not simple space-separated words, please
110
 
        pass a list instead."""
111
 
        try:
112
 
            import shutil
113
 
            from subprocess import call
114
 
        except ImportError, e:
115
 
            _need_subprocess()
116
 
            raise
117
 
 
118
 
 
119
 
        cmd = self.formcmd(cmd)
120
 
 
121
 
        self.log('$ ' + ' '.join(cmd))
122
 
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
123
 
 
124
 
        if retcode != actual_retcode:
125
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
126
 
                                % (cmd, actual_retcode, retcode))
127
 
 
128
 
 
129
 
    def backtick(self, cmd, retcode=0):
130
 
        """Run a command and return its output"""
131
 
        try:
132
 
            import shutil
133
 
            from subprocess import Popen, PIPE
134
 
        except ImportError, e:
135
 
            _need_subprocess()
136
 
            raise
137
 
 
138
 
        cmd = self.formcmd(cmd)
139
 
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
140
 
        outd, errd = child.communicate()
141
 
        self.log(outd)
142
 
        actual_retcode = child.wait()
143
 
 
144
 
        outd = outd.replace('\r', '')
145
 
 
146
 
        if retcode != actual_retcode:
147
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
148
 
                                % (cmd, actual_retcode, retcode))
149
 
 
150
 
        return outd
151
 
 
152
 
 
153
 
 
154
 
    def build_tree(self, shape):
155
 
        """Build a test tree according to a pattern.
156
 
 
157
 
        shape is a sequence of file specifications.  If the final
158
 
        character is '/', a directory is created.
159
 
 
160
 
        This doesn't add anything to a branch.
161
 
        """
162
 
        # XXX: It's OK to just create them using forward slashes on windows?
163
 
        import os
164
 
        for name in shape:
165
 
            assert isinstance(name, basestring)
166
 
            if name[-1] == '/':
167
 
                os.mkdir(name[:-1])
168
 
            else:
169
 
                f = file(name, 'wt')
170
 
                print >>f, "contents of", name
171
 
                f.close()
172
 
 
173
 
 
174
 
    def log(self, msg):
175
 
        """Log a message to a progress file"""
176
 
        # XXX: The problem with this is that code that writes straight
177
 
        # to the log file won't be shown when we display the log
178
 
        # buffer; would be better to not have the in-memory buffer and
179
 
        # instead just a log file per test, which is read in and
180
 
        # displayed if the test fails.  That seems to imply one log
181
 
        # per test case, not globally.  OK?
182
 
        self._log_buf = self._log_buf + str(msg) + '\n'
183
 
        print >>self.TEST_LOG, msg
184
 
 
185
 
 
186
 
    def check_inventory_shape(self, inv, shape):
187
 
        """
188
 
        Compare an inventory to a list of expected names.
189
 
 
190
 
        Fail if they are not precisely equal.
191
 
        """
192
 
        extras = []
193
 
        shape = list(shape)             # copy
194
 
        for path, ie in inv.entries():
195
 
            name = path.replace('\\', '/')
196
 
            if ie.kind == 'dir':
197
 
                name = name + '/'
198
 
            if name in shape:
199
 
                shape.remove(name)
200
 
            else:
201
 
                extras.append(name)
202
 
        if shape:
203
 
            self.fail("expected paths not found in inventory: %r" % shape)
204
 
        if extras:
205
 
            self.fail("unexpected paths found in inventory: %r" % extras)
206
 
 
207
 
 
208
 
    def check_file_contents(self, filename, expect):
209
 
        self.log("check contents of file %s" % filename)
210
 
        contents = file(filename, 'r').read()
211
 
        if contents != expect:
212
 
            self.log("expected: %r" % expect)
213
 
            self.log("actually: %r" % contents)
214
 
            self.fail("contents of %s not as expected")
215
 
            
216
 
 
217
 
 
218
 
class InTempDir(TestCase):
219
 
    """Base class for tests run in a temporary branch."""
220
 
    def setUp(self):
221
 
        super(InTempDir, self).setUp()
222
 
        import os
223
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
224
 
        os.mkdir(self.test_dir)
225
 
        os.chdir(self.test_dir)
226
 
        
227
 
    def tearDown(self):
228
 
        import os
229
 
        os.chdir(self.TEST_ROOT)
230
 
        super(InTempDir, self).tearDown()
 
59
 
 
60
class EarlyStoppingTestResultAdapter(object):
 
61
    """An adapter for TestResult to stop at the first first failure or error"""
 
62
 
 
63
    def __init__(self, result):
 
64
        self._result = result
 
65
 
 
66
    def addError(self, test, err):
 
67
        self._result.addError(test, err)
 
68
        self._result.stop()
 
69
 
 
70
    def addFailure(self, test, err):
 
71
        self._result.addFailure(test, err)
 
72
        self._result.stop()
 
73
 
 
74
    def __getattr__(self, name):
 
75
        return getattr(self._result, name)
 
76
 
 
77
    def __setattr__(self, name, value):
 
78
        if name == '_result':
 
79
            object.__setattr__(self, name, value)
 
80
        return setattr(self._result, name, value)
231
81
 
232
82
 
233
83
class _MyResult(unittest._TextTestResult):
236
86
 
237
87
    No special behaviour for now.
238
88
    """
239
 
    def __init__(self, out, style):
240
 
        super(_MyResult, self).__init__(out, False, 0)
241
 
        self.out = out
242
 
        assert style in ('none', 'progress', 'verbose')
243
 
        self.style = style
244
89
 
245
90
    def startTest(self, test):
246
 
        super(_MyResult, self).startTest(test)
 
91
        unittest.TestResult.startTest(self, test)
247
92
        # TODO: Maybe show test.shortDescription somewhere?
248
93
        what = test.id()
249
94
        # python2.3 has the bad habit of just "runit" for doctests
250
95
        if what == 'runit':
251
96
            what = test.shortDescription()
252
 
        if self.style == 'verbose':
253
 
            print >>self.out, '%-60.60s' % what,
254
 
            self.out.flush()
 
97
        if self.showAll:
 
98
            self.stream.write('%-60.60s' % what)
 
99
        self.stream.flush()
255
100
 
256
101
    def addError(self, test, err):
257
 
        if self.style == 'verbose':
258
 
            print >>self.out, 'ERROR'
259
 
        elif self.style == 'progress':
260
 
            self.stream.write('E')
261
 
        self.stream.flush()
262
102
        super(_MyResult, self).addError(test, err)
 
103
        self.stream.flush()
263
104
 
264
105
    def addFailure(self, test, err):
265
 
        if self.style == 'verbose':
266
 
            print >>self.out, 'FAILURE'
267
 
        elif self.style == 'progress':
268
 
            self.stream.write('F')
269
 
        self.stream.flush()
270
106
        super(_MyResult, self).addFailure(test, err)
 
107
        self.stream.flush()
271
108
 
272
109
    def addSuccess(self, test):
273
 
        if self.style == 'verbose':
274
 
            print >>self.out, 'OK'
275
 
        elif self.style == 'progress':
 
110
        if self.showAll:
 
111
            self.stream.writeln('OK')
 
112
        elif self.dots:
276
113
            self.stream.write('~')
277
114
        self.stream.flush()
278
 
        super(_MyResult, self).addSuccess(test)
279
 
 
280
 
    def printErrors(self):
281
 
        if self.style == 'progress':
282
 
            self.stream.writeln()
283
 
        super(_MyResult, self).printErrors()
 
115
        unittest.TestResult.addSuccess(self, test)
284
116
 
285
117
    def printErrorList(self, flavour, errors):
286
118
        for test, err in errors:
288
120
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
289
121
            self.stream.writeln(self.separator2)
290
122
            self.stream.writeln("%s" % err)
291
 
            if isinstance(test, TestCase):
 
123
            if hasattr(test, '_get_log'):
292
124
                self.stream.writeln()
293
125
                self.stream.writeln('log from this test:')
294
 
                print >>self.stream, test._log_buf
295
 
 
296
 
 
297
 
class TestSuite(unittest.TestSuite):
298
 
    
299
 
    def __init__(self, tests=(), name='test'):
300
 
        super(TestSuite, self).__init__(tests)
301
 
        self._name = name
302
 
 
303
 
    def run(self, result):
304
 
        import os
305
 
        import shutil
306
 
        import time
307
 
        
308
 
        self._setup_test_log()
309
 
        self._setup_test_dir()
310
 
        print
311
 
    
312
 
        return super(TestSuite,self).run(result)
313
 
 
314
 
    def _setup_test_log(self):
315
 
        import time
316
 
        import os
317
 
        
318
 
        log_filename = os.path.abspath(self._name + '.log')
319
 
        # line buffered
320
 
        TestCase.TEST_LOG = open(log_filename, 'wt', buffering=1)
321
 
    
322
 
        print >>TestCase.TEST_LOG, "tests run at " + time.ctime()
323
 
        print '%-30s %s' % ('test log', log_filename)
324
 
 
325
 
    def _setup_test_dir(self):
326
 
        import os
327
 
        import shutil
328
 
        
329
 
        TestCase.ORIG_DIR = os.getcwdu()
330
 
        TestCase.TEST_ROOT = os.path.abspath(self._name + '.tmp')
331
 
    
332
 
        print '%-30s %s' % ('running tests in', TestCase.TEST_ROOT)
333
 
    
334
 
        if os.path.exists(TestCase.TEST_ROOT):
335
 
            shutil.rmtree(TestCase.TEST_ROOT)
336
 
        os.mkdir(TestCase.TEST_ROOT)
337
 
        os.chdir(TestCase.TEST_ROOT)
338
 
    
339
 
        # make a fake bzr directory there to prevent any tests propagating
340
 
        # up onto the source directory's real branch
341
 
        os.mkdir(os.path.join(TestCase.TEST_ROOT, '.bzr'))
 
126
                print >>self.stream, test._get_log()
342
127
 
343
128
 
344
129
class TextTestRunner(unittest.TextTestRunner):
345
130
 
346
 
    def __init__(self, stream=sys.stderr, descriptions=1, verbosity=0, style='progress'):
347
 
        super(TextTestRunner, self).__init__(stream, descriptions, verbosity)
348
 
        self.style = style
349
 
 
350
131
    def _makeResult(self):
351
 
        return _MyResult(self.stream, self.style)
352
 
 
353
 
    # If we want the old 4 line summary output (count, 0 failures, 0 errors)
354
 
    # we can override run() too.
355
 
 
356
 
 
357
 
def run_suite(a_suite, name='test', verbose=False):
358
 
    suite = TestSuite((a_suite,),name)
 
132
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
133
        return EarlyStoppingTestResultAdapter(result)
 
134
 
 
135
 
 
136
class filteringVisitor(TestUtil.TestVisitor):
 
137
    """I accruse all the testCases I visit that pass a regexp filter on id
 
138
    into my suite
 
139
    """
 
140
 
 
141
    def __init__(self, filter):
 
142
        import re
 
143
        TestUtil.TestVisitor.__init__(self)
 
144
        self._suite=None
 
145
        self.filter=re.compile(filter)
 
146
 
 
147
    def suite(self):
 
148
        """answer the suite we are building"""
 
149
        if self._suite is None:
 
150
            self._suite=TestUtil.TestSuite()
 
151
        return self._suite
 
152
 
 
153
    def visitCase(self, aCase):
 
154
        if self.filter.match(aCase.id()):
 
155
            self.suite().addTest(aCase)
 
156
 
 
157
 
 
158
def run_suite(suite, name='test', verbose=False, pattern=".*"):
 
159
    import shutil
 
160
    from bzrlib.selftest import TestCaseInTempDir
 
161
    TestCaseInTempDir._TEST_NAME = name
359
162
    if verbose:
360
 
        style = 'verbose'
361
 
    else:
362
 
        style = 'progress'
363
 
    runner = TextTestRunner(stream=sys.stdout, style=style)
364
 
    result = runner.run(suite)
 
163
        verbosity = 2
 
164
    else:
 
165
        verbosity = 1
 
166
    runner = TextTestRunner(stream=sys.stdout,
 
167
                            descriptions=0,
 
168
                            verbosity=verbosity)
 
169
    visitor = filteringVisitor(pattern)
 
170
    suite.visit(visitor)
 
171
    result = runner.run(visitor.suite())
 
172
    # This is still a little bogus, 
 
173
    # but only a little. Folk not using our testrunner will
 
174
    # have to delete their temp directories themselves.
 
175
    if result.wasSuccessful():
 
176
        if TestCaseInTempDir.TEST_ROOT:
 
177
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
178
    else:
 
179
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
365
180
    return result.wasSuccessful()