~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

Merge With main tree, move the NEWS entry to the good place.

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
import logging
34
34
import os
35
35
import re
36
 
import shutil
37
36
import stat
38
37
import sys
39
38
import tempfile
53
52
import bzrlib.osutils
54
53
import bzrlib.osutils as osutils
55
54
import bzrlib.plugin
 
55
import bzrlib.progress as progress
56
56
from bzrlib.revision import common_ancestor
57
57
import bzrlib.store
58
58
import bzrlib.trace
114
114
    Shows output in a different format, including displaying runtime for tests.
115
115
    """
116
116
    stop_early = False
 
117
    
 
118
    def __init__(self, stream, descriptions, verbosity, pb=None):
 
119
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
120
        self.pb = pb
117
121
 
118
122
    def _elapsedTime(self):
119
123
        return "%5dms" % (1000 * (time.time() - self._start_time))
120
124
 
 
125
    def _ellipsise_unimportant_words(self, a_string, final_width,
 
126
                                   keep_start=False):
 
127
        """Add ellipsese (sp?) for overly long strings.
 
128
        
 
129
        :param keep_start: If true preserve the start of a_string rather
 
130
                           than the end of it.
 
131
        """
 
132
        if keep_start:
 
133
            if len(a_string) > final_width:
 
134
                result = a_string[:final_width-3] + '...'
 
135
            else:
 
136
                result = a_string
 
137
        else:
 
138
            if len(a_string) > final_width:
 
139
                result = '...' + a_string[3-final_width:]
 
140
            else:
 
141
                result = a_string
 
142
        return result.ljust(final_width)
 
143
 
121
144
    def startTest(self, test):
122
145
        unittest.TestResult.startTest(self, test)
123
146
        # In a short description, the important words are in
124
147
        # the beginning, but in an id, the important words are
125
148
        # at the end
126
149
        SHOW_DESCRIPTIONS = False
 
150
 
 
151
        if not self.showAll and self.dots and self.pb is not None:
 
152
            final_width = 13
 
153
        else:
 
154
            final_width = osutils.terminal_width()
 
155
            final_width = final_width - 15
 
156
        what = None
 
157
        if SHOW_DESCRIPTIONS:
 
158
            what = test.shortDescription()
 
159
            if what:
 
160
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
 
161
        if what is None:
 
162
            what = test.id()
 
163
            if what.startswith('bzrlib.tests.'):
 
164
                what = what[13:]
 
165
            what = self._ellipsise_unimportant_words(what, final_width)
127
166
        if self.showAll:
128
 
            width = osutils.terminal_width()
129
 
            name_width = width - 15
130
 
            what = None
131
 
            if SHOW_DESCRIPTIONS:
132
 
                what = test.shortDescription()
133
 
                if what:
134
 
                    if len(what) > name_width:
135
 
                        what = what[:name_width-3] + '...'
136
 
            if what is None:
137
 
                what = test.id()
138
 
                if what.startswith('bzrlib.tests.'):
139
 
                    what = what[13:]
140
 
                if len(what) > name_width:
141
 
                    what = '...' + what[3-name_width:]
142
 
            what = what.ljust(name_width)
143
167
            self.stream.write(what)
 
168
        elif self.dots and self.pb is not None:
 
169
            self.pb.update(what, self.testsRun - 1, None)
144
170
        self.stream.flush()
145
171
        self._start_time = time.time()
146
172
 
150
176
        unittest.TestResult.addError(self, test, err)
151
177
        if self.showAll:
152
178
            self.stream.writeln("ERROR %s" % self._elapsedTime())
153
 
        elif self.dots:
 
179
        elif self.dots and self.pb is None:
154
180
            self.stream.write('E')
 
181
        elif self.dots:
 
182
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
155
183
        self.stream.flush()
156
184
        if self.stop_early:
157
185
            self.stop()
160
188
        unittest.TestResult.addFailure(self, test, err)
161
189
        if self.showAll:
162
190
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
163
 
        elif self.dots:
 
191
        elif self.dots and self.pb is None:
164
192
            self.stream.write('F')
 
193
        elif self.dots:
 
194
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
165
195
        self.stream.flush()
166
196
        if self.stop_early:
167
197
            self.stop()
169
199
    def addSuccess(self, test):
170
200
        if self.showAll:
171
201
            self.stream.writeln('   OK %s' % self._elapsedTime())
172
 
        elif self.dots:
 
202
        elif self.dots and self.pb is None:
173
203
            self.stream.write('~')
 
204
        elif self.dots:
 
205
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
174
206
        self.stream.flush()
175
207
        unittest.TestResult.addSuccess(self, test)
176
208
 
178
210
        if self.showAll:
179
211
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
180
212
            print >>self.stream, '     %s' % skip_excinfo[1]
181
 
        elif self.dots:
 
213
        elif self.dots and self.pb is None:
182
214
            self.stream.write('S')
 
215
        elif self.dots:
 
216
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
183
217
        self.stream.flush()
184
218
        # seems best to treat this as success from point-of-view of unittest
185
219
        # -- it actually does nothing so it barely matters :)
200
234
            self.stream.writeln("%s" % err)
201
235
 
202
236
 
203
 
class TextTestRunner(unittest.TextTestRunner):
 
237
class TextTestRunner(object):
204
238
    stop_on_failure = False
205
239
 
 
240
    def __init__(self,
 
241
                 stream=sys.stderr,
 
242
                 descriptions=0,
 
243
                 verbosity=1,
 
244
                 keep_output=False,
 
245
                 pb=None):
 
246
        self.stream = unittest._WritelnDecorator(stream)
 
247
        self.descriptions = descriptions
 
248
        self.verbosity = verbosity
 
249
        self.keep_output = keep_output
 
250
        self.pb = pb
 
251
 
206
252
    def _makeResult(self):
207
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
253
        result = _MyResult(self.stream,
 
254
                           self.descriptions,
 
255
                           self.verbosity,
 
256
                           pb=self.pb)
208
257
        result.stop_early = self.stop_on_failure
209
258
        return result
210
259
 
 
260
    def run(self, test):
 
261
        "Run the given test case or test suite."
 
262
        result = self._makeResult()
 
263
        startTime = time.time()
 
264
        if self.pb is not None:
 
265
            self.pb.update('Running tests', 0, test.countTestCases())
 
266
        test.run(result)
 
267
        stopTime = time.time()
 
268
        timeTaken = stopTime - startTime
 
269
        result.printErrors()
 
270
        self.stream.writeln(result.separator2)
 
271
        run = result.testsRun
 
272
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
273
                            (run, run != 1 and "s" or "", timeTaken))
 
274
        self.stream.writeln()
 
275
        if not result.wasSuccessful():
 
276
            self.stream.write("FAILED (")
 
277
            failed, errored = map(len, (result.failures, result.errors))
 
278
            if failed:
 
279
                self.stream.write("failures=%d" % failed)
 
280
            if errored:
 
281
                if failed: self.stream.write(", ")
 
282
                self.stream.write("errors=%d" % errored)
 
283
            self.stream.writeln(")")
 
284
        else:
 
285
            self.stream.writeln("OK")
 
286
        if self.pb is not None:
 
287
            self.pb.update('Cleaning up', 0, 1)
 
288
        # This is still a little bogus, 
 
289
        # but only a little. Folk not using our testrunner will
 
290
        # have to delete their temp directories themselves.
 
291
        test_root = TestCaseInTempDir.TEST_ROOT
 
292
        if result.wasSuccessful() or not self.keep_output:
 
293
            if test_root is not None:
 
294
                    osutils.rmtree(test_root)
 
295
        else:
 
296
            if self.pb is not None:
 
297
                self.pb.note("Failed tests working directories are in '%s'\n",
 
298
                             test_root)
 
299
            else:
 
300
                self.stream.writeln(
 
301
                    "Failed tests working directories are in '%s'\n" %
 
302
                    test_root)
 
303
        TestCaseInTempDir.TEST_ROOT = None
 
304
        if self.pb is not None:
 
305
            self.pb.clear()
 
306
        return result
 
307
 
211
308
 
212
309
def iter_suite_tests(suite):
213
310
    """Return all tests in a suite, recursing through nested suites"""
303
400
            raise AssertionError('string %r does not start with %r' % (s, prefix))
304
401
 
305
402
    def assertEndsWith(self, s, suffix):
306
 
        if not s.endswith(prefix):
 
403
        """Asserts that s ends with suffix."""
 
404
        if not s.endswith(suffix):
307
405
            raise AssertionError('string %r does not end with %r' % (s, suffix))
308
406
 
309
407
    def assertContainsRe(self, haystack, needle_re):
442
540
        """Shortcut that splits cmd into words, runs, and returns stdout"""
443
541
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
444
542
 
445
 
    def run_bzr_captured(self, argv, retcode=0):
 
543
    def run_bzr_captured(self, argv, retcode=0, stdin=None):
446
544
        """Invoke bzr and return (stdout, stderr).
447
545
 
448
546
        Useful for code that wants to check the contents of the
461
559
 
462
560
        argv -- arguments to invoke bzr
463
561
        retcode -- expected return code, or None for don't-care.
 
562
        :param stdin: A string to be used as stdin for the command.
464
563
        """
 
564
        if stdin is not None:
 
565
            stdin = StringIO(stdin)
465
566
        stdout = StringIO()
466
567
        stderr = StringIO()
467
568
        self.log('run bzr: %s', ' '.join(argv))
471
572
        handler.setLevel(logging.INFO)
472
573
        logger = logging.getLogger('')
473
574
        logger.addHandler(handler)
 
575
        old_ui_factory = bzrlib.ui.ui_factory
 
576
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
577
            stdout=stdout,
 
578
            stderr=stderr)
 
579
        bzrlib.ui.ui_factory.stdin = stdin
474
580
        try:
475
 
            result = self.apply_redirected(None, stdout, stderr,
 
581
            result = self.apply_redirected(stdin, stdout, stderr,
476
582
                                           bzrlib.commands.run_bzr_catch_errors,
477
583
                                           argv)
478
584
        finally:
479
585
            logger.removeHandler(handler)
 
586
            bzrlib.ui.ui_factory = old_ui_factory
480
587
        out = stdout.getvalue()
481
588
        err = stderr.getvalue()
482
589
        if out:
496
603
 
497
604
        This sends the stdout/stderr results into the test's log,
498
605
        where it may be useful for debugging.  See also run_captured.
 
606
 
 
607
        :param stdin: A string to be used as stdin for the command.
499
608
        """
500
609
        retcode = kwargs.pop('retcode', 0)
501
 
        return self.run_bzr_captured(args, retcode)
 
610
        stdin = kwargs.pop('stdin', None)
 
611
        return self.run_bzr_captured(args, retcode, stdin)
502
612
 
503
613
    def check_inventory_shape(self, inv, shape):
504
614
        """Compare an inventory to a list of expected names.
890
1000
    TestCaseInTempDir._TEST_NAME = name
891
1001
    if verbose:
892
1002
        verbosity = 2
 
1003
        pb = None
893
1004
    else:
894
1005
        verbosity = 1
 
1006
        pb = progress.ProgressBar()
895
1007
    runner = TextTestRunner(stream=sys.stdout,
896
1008
                            descriptions=0,
897
 
                            verbosity=verbosity)
 
1009
                            verbosity=verbosity,
 
1010
                            keep_output=keep_output,
 
1011
                            pb=pb)
898
1012
    runner.stop_on_failure=stop_on_failure
899
1013
    if pattern != '.*':
900
1014
        suite = filter_suite_by_re(suite, pattern)
901
1015
    result = runner.run(suite)
902
 
    # This is still a little bogus, 
903
 
    # but only a little. Folk not using our testrunner will
904
 
    # have to delete their temp directories themselves.
905
 
    test_root = TestCaseInTempDir.TEST_ROOT
906
 
    if result.wasSuccessful() or not keep_output:
907
 
        if test_root is not None:
908
 
            print 'Deleting test root %s...' % test_root
909
 
            try:
910
 
                shutil.rmtree(test_root)
911
 
            finally:
912
 
                print
913
 
    else:
914
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
915
1016
    return result.wasSuccessful()
916
1017
 
917
1018
 
942
1043
 
943
1044
    testmod_names = [ \
944
1045
                   'bzrlib.tests.test_ancestry',
945
 
                   'bzrlib.tests.test_annotate',
946
1046
                   'bzrlib.tests.test_api',
947
1047
                   'bzrlib.tests.test_bad_files',
948
1048
                   'bzrlib.tests.test_branch',