~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-11-02 08:06:11 UTC
  • mfrom: (2095.4.6 test-progress)
  • Revision ID: pqm@pqm.ubuntu.com-20061102080611-5462d2713bf4bc45
(mbp) Better test progress bar

Show diffs side-by-side

added added

removed removed

Lines of Context:
73
73
from bzrlib.transport.local import LocalURLServer
74
74
from bzrlib.transport.memory import MemoryServer
75
75
from bzrlib.transport.readonly import ReadonlyServer
76
 
from bzrlib.trace import mutter
 
76
from bzrlib.trace import mutter, note
77
77
from bzrlib.tests import TestUtil
78
78
from bzrlib.tests.TestUtil import (
79
79
                          TestSuite,
131
131
            ]
132
132
 
133
133
 
134
 
class _MyResult(unittest._TextTestResult):
135
 
    """Custom TestResult.
 
134
class ExtendedTestResult(unittest._TextTestResult):
 
135
    """Accepts, reports and accumulates the results of running tests.
136
136
 
137
 
    Shows output in a different format, including displaying runtime for tests.
 
137
    Compared to this unittest version this class adds support for profiling,
 
138
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
139
    There are further-specialized subclasses for different types of display.
138
140
    """
 
141
 
139
142
    stop_early = False
140
143
    
141
 
    def __init__(self, stream, descriptions, verbosity, pb=None,
142
 
                 bench_history=None):
 
144
    def __init__(self, stream, descriptions, verbosity,
 
145
                 bench_history=None,
 
146
                 num_tests=None,
 
147
                 ):
143
148
        """Construct new TestResult.
144
149
 
145
150
        :param bench_history: Optionally, a writable file object to accumulate
146
151
            benchmark results.
147
152
        """
148
153
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
149
 
        self.pb = pb
150
154
        if bench_history is not None:
151
155
            from bzrlib.version import _get_bzr_source_tree
152
156
            src_tree = _get_bzr_source_tree()
162
166
                revision_id = ''
163
167
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
164
168
        self._bench_history = bench_history
 
169
        self.ui = bzrlib.ui.ui_factory
 
170
        self.num_tests = num_tests
 
171
        self.error_count = 0
 
172
        self.failure_count = 0
 
173
        self.skip_count = 0
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
165
176
    
166
177
    def extractBenchmarkTime(self, testCase):
167
178
        """Add a benchmark time for the current test case."""
183
194
        """Format seconds as milliseconds with leading spaces."""
184
195
        return "%5dms" % (1000 * seconds)
185
196
 
186
 
    def _ellipsise_unimportant_words(self, a_string, final_width,
187
 
                                   keep_start=False):
188
 
        """Add ellipses (sp?) for overly long strings.
189
 
        
190
 
        :param keep_start: If true preserve the start of a_string rather
191
 
                           than the end of it.
192
 
        """
193
 
        if keep_start:
194
 
            if len(a_string) > final_width:
195
 
                result = a_string[:final_width-3] + '...'
196
 
            else:
197
 
                result = a_string
198
 
        else:
199
 
            if len(a_string) > final_width:
200
 
                result = '...' + a_string[3-final_width:]
201
 
            else:
202
 
                result = a_string
203
 
        return result.ljust(final_width)
 
197
    def _shortened_test_description(self, test):
 
198
        what = test.id()
 
199
        what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
 
200
        return what
204
201
 
205
202
    def startTest(self, test):
206
203
        unittest.TestResult.startTest(self, test)
207
 
        # In a short description, the important words are in
208
 
        # the beginning, but in an id, the important words are
209
 
        # at the end
210
 
        SHOW_DESCRIPTIONS = False
211
 
 
212
 
        if not self.showAll and self.dots and self.pb is not None:
213
 
            final_width = 13
214
 
        else:
215
 
            final_width = osutils.terminal_width()
216
 
            final_width = final_width - 15 - 8
217
 
        what = None
218
 
        if SHOW_DESCRIPTIONS:
219
 
            what = test.shortDescription()
220
 
            if what:
221
 
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
222
 
        if what is None:
223
 
            what = test.id()
224
 
            if what.startswith('bzrlib.tests.'):
225
 
                what = what[13:]
226
 
            what = self._ellipsise_unimportant_words(what, final_width)
227
 
        if self.showAll:
228
 
            self.stream.write(what)
229
 
        elif self.dots and self.pb is not None:
230
 
            self.pb.update(what, self.testsRun - 1, None)
231
 
        self.stream.flush()
 
204
        self.report_test_start(test)
232
205
        self._recordTestStartTime()
233
206
 
234
207
    def _recordTestStartTime(self):
245
218
        if setKeepLogfile is not None:
246
219
            setKeepLogfile()
247
220
        self.extractBenchmarkTime(test)
248
 
        if self.showAll:
249
 
            self.stream.writeln("ERROR %s" % self._testTimeString())
250
 
        elif self.dots and self.pb is None:
251
 
            self.stream.write('E')
252
 
        elif self.dots:
253
 
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
254
 
            self.pb.note(self._ellipsise_unimportant_words(
255
 
                            test.id() + ': ERROR',
256
 
                            osutils.terminal_width()))
257
 
        self.stream.flush()
 
221
        self.report_error(test, err)
258
222
        if self.stop_early:
259
223
            self.stop()
260
224
 
266
230
        if setKeepLogfile is not None:
267
231
            setKeepLogfile()
268
232
        self.extractBenchmarkTime(test)
269
 
        if self.showAll:
270
 
            self.stream.writeln(" FAIL %s" % self._testTimeString())
271
 
        elif self.dots and self.pb is None:
272
 
            self.stream.write('F')
273
 
        elif self.dots:
274
 
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
275
 
            self.pb.note(self._ellipsise_unimportant_words(
276
 
                            test.id() + ': FAIL',
277
 
                            osutils.terminal_width()))
278
 
        self.stream.flush()
 
233
        self.report_failure(test, err)
279
234
        if self.stop_early:
280
235
            self.stop()
281
236
 
286
241
                self._bench_history.write("%s %s\n" % (
287
242
                    self._formatTime(self._benchmarkTime),
288
243
                    test.id()))
289
 
        if self.showAll:
290
 
            self.stream.writeln('   OK %s' % self._testTimeString())
291
 
            for bench_called, stats in getattr(test, '_benchcalls', []):
292
 
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
293
 
                stats.pprint(file=self.stream)
294
 
        elif self.dots and self.pb is None:
295
 
            self.stream.write('~')
296
 
        elif self.dots:
297
 
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
298
 
        self.stream.flush()
 
244
        self.report_success(test)
299
245
        unittest.TestResult.addSuccess(self, test)
300
246
 
301
247
    def addSkipped(self, test, skip_excinfo):
302
248
        self.extractBenchmarkTime(test)
303
 
        if self.showAll:
304
 
            print >>self.stream, ' SKIP %s' % self._testTimeString()
305
 
            print >>self.stream, '     %s' % skip_excinfo[1]
306
 
        elif self.dots and self.pb is None:
307
 
            self.stream.write('S')
308
 
        elif self.dots:
309
 
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
310
 
        self.stream.flush()
 
249
        self.report_skip(test, skip_excinfo)
311
250
        # seems best to treat this as success from point-of-view of unittest
312
251
        # -- it actually does nothing so it barely matters :)
313
252
        try:
333
272
            self.stream.writeln(self.separator2)
334
273
            self.stream.writeln("%s" % err)
335
274
 
 
275
    def finished(self):
 
276
        pass
 
277
 
 
278
    def report_cleaning_up(self):
 
279
        pass
 
280
 
 
281
    def report_success(self, test):
 
282
        pass
 
283
 
 
284
 
 
285
class TextTestResult(ExtendedTestResult):
 
286
    """Displays progress and results of tests in text form"""
 
287
 
 
288
    def __init__(self, *args, **kw):
 
289
        ExtendedTestResult.__init__(self, *args, **kw)
 
290
        self.pb = self.ui.nested_progress_bar()
 
291
        self.pb.show_pct = False
 
292
        self.pb.show_spinner = False
 
293
        self.pb.show_eta = False, 
 
294
        self.pb.show_count = False
 
295
        self.pb.show_bar = False
 
296
 
 
297
    def report_starting(self):
 
298
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
299
 
 
300
    def _progress_prefix_text(self):
 
301
        a = '[%d' % self.count
 
302
        if self.num_tests is not None:
 
303
            a +='/%d' % self.num_tests
 
304
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
305
        if self.error_count:
 
306
            a += ', %d errors' % self.error_count
 
307
        if self.failure_count:
 
308
            a += ', %d failed' % self.failure_count
 
309
        if self.skip_count:
 
310
            a += ', %d skipped' % self.skip_count
 
311
        a += ']'
 
312
        return a
 
313
 
 
314
    def report_test_start(self, test):
 
315
        self.count += 1
 
316
        self.pb.update(
 
317
                self._progress_prefix_text()
 
318
                + ' ' 
 
319
                + self._shortened_test_description(test))
 
320
 
 
321
    def report_error(self, test, err):
 
322
        self.error_count += 1
 
323
        self.pb.note('ERROR: %s\n    %s\n' % (
 
324
            self._shortened_test_description(test),
 
325
            err[1],
 
326
            ))
 
327
 
 
328
    def report_failure(self, test, err):
 
329
        self.failure_count += 1
 
330
        self.pb.note('FAIL: %s\n    %s\n' % (
 
331
            self._shortened_test_description(test),
 
332
            err[1],
 
333
            ))
 
334
 
 
335
    def report_skip(self, test, skip_excinfo):
 
336
        self.skip_count += 1
 
337
        if False:
 
338
            # at the moment these are mostly not things we can fix
 
339
            # and so they just produce stipple; use the verbose reporter
 
340
            # to see them.
 
341
            if False:
 
342
                # show test and reason for skip
 
343
                self.pb.note('SKIP: %s\n    %s\n' % (
 
344
                    self._shortened_test_description(test),
 
345
                    skip_excinfo[1]))
 
346
            else:
 
347
                # since the class name was left behind in the still-visible
 
348
                # progress bar...
 
349
                self.pb.note('SKIP: %s' % (skip_excinfo[1]))
 
350
 
 
351
    def report_cleaning_up(self):
 
352
        self.pb.update('cleaning up...')
 
353
 
 
354
    def finished(self):
 
355
        self.pb.finished()
 
356
 
 
357
 
 
358
class VerboseTestResult(ExtendedTestResult):
 
359
    """Produce long output, with one line per test run plus times"""
 
360
 
 
361
    def _ellipsize_to_right(self, a_string, final_width):
 
362
        """Truncate and pad a string, keeping the right hand side"""
 
363
        if len(a_string) > final_width:
 
364
            result = '...' + a_string[3-final_width:]
 
365
        else:
 
366
            result = a_string
 
367
        return result.ljust(final_width)
 
368
 
 
369
    def report_starting(self):
 
370
        self.stream.write('running %d tests...\n' % self.num_tests)
 
371
 
 
372
    def report_test_start(self, test):
 
373
        self.count += 1
 
374
        name = self._shortened_test_description(test)
 
375
        self.stream.write(self._ellipsize_to_right(name, 60))
 
376
        self.stream.flush()
 
377
 
 
378
    def report_error(self, test, err):
 
379
        self.error_count += 1
 
380
        self.stream.writeln('ERROR %s\n    %s' 
 
381
                % (self._testTimeString(), err[1]))
 
382
 
 
383
    def report_failure(self, test, err):
 
384
        self.failure_count += 1
 
385
        self.stream.writeln('FAIL %s\n    %s'
 
386
                % (self._testTimeString(), err[1]))
 
387
 
 
388
    def report_success(self, test):
 
389
        self.stream.writeln('   OK %s' % self._testTimeString())
 
390
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
391
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
392
            stats.pprint(file=self.stream)
 
393
        self.stream.flush()
 
394
 
 
395
    def report_skip(self, test, skip_excinfo):
 
396
        print >>self.stream, ' SKIP %s' % self._testTimeString()
 
397
        print >>self.stream, '     %s' % skip_excinfo[1]
 
398
 
336
399
 
337
400
class TextTestRunner(object):
338
401
    stop_on_failure = False
342
405
                 descriptions=0,
343
406
                 verbosity=1,
344
407
                 keep_output=False,
345
 
                 pb=None,
346
408
                 bench_history=None):
347
409
        self.stream = unittest._WritelnDecorator(stream)
348
410
        self.descriptions = descriptions
349
411
        self.verbosity = verbosity
350
412
        self.keep_output = keep_output
351
 
        self.pb = pb
352
413
        self._bench_history = bench_history
353
414
 
354
 
    def _makeResult(self):
355
 
        result = _MyResult(self.stream,
356
 
                           self.descriptions,
357
 
                           self.verbosity,
358
 
                           pb=self.pb,
359
 
                           bench_history=self._bench_history)
360
 
        result.stop_early = self.stop_on_failure
361
 
        return result
362
 
 
363
415
    def run(self, test):
364
416
        "Run the given test case or test suite."
365
 
        result = self._makeResult()
366
417
        startTime = time.time()
367
 
        if self.pb is not None:
368
 
            self.pb.update('Running tests', 0, test.countTestCases())
 
418
        if self.verbosity == 1:
 
419
            result_class = TextTestResult
 
420
        elif self.verbosity >= 2:
 
421
            result_class = VerboseTestResult
 
422
        result = result_class(self.stream,
 
423
                              self.descriptions,
 
424
                              self.verbosity,
 
425
                              bench_history=self._bench_history,
 
426
                              num_tests=test.countTestCases(),
 
427
                              )
 
428
        result.stop_early = self.stop_on_failure
 
429
        result.report_starting()
369
430
        test.run(result)
370
431
        stopTime = time.time()
371
432
        timeTaken = stopTime - startTime
386
447
            self.stream.writeln(")")
387
448
        else:
388
449
            self.stream.writeln("OK")
389
 
        if self.pb is not None:
390
 
            self.pb.update('Cleaning up', 0, 1)
 
450
        result.report_cleaning_up()
391
451
        # This is still a little bogus, 
392
452
        # but only a little. Folk not using our testrunner will
393
453
        # have to delete their temp directories themselves.
408
468
                        sys.getfilesystemencoding())
409
469
                osutils.rmtree(test_root)
410
470
        else:
411
 
            if self.pb is not None:
412
 
                self.pb.note("Failed tests working directories are in '%s'\n",
413
 
                             test_root)
414
 
            else:
415
 
                self.stream.writeln(
416
 
                    "Failed tests working directories are in '%s'\n" %
417
 
                    test_root)
 
471
            note("Failed tests working directories are in '%s'\n", test_root)
418
472
        TestCaseWithMemoryTransport.TEST_ROOT = None
419
 
        if self.pb is not None:
420
 
            self.pb.clear()
 
473
        result.finished()
421
474
        return result
422
475
 
423
476
 
503
556
        unittest.TestCase.setUp(self)
504
557
        self._cleanEnvironment()
505
558
        bzrlib.trace.disable_default_logging()
 
559
        self._silenceUI()
506
560
        self._startLogFile()
507
561
        self._benchcalls = []
508
562
        self._benchtime = None
509
563
 
 
564
    def _silenceUI(self):
 
565
        """Turn off UI for duration of test"""
 
566
        # by default the UI is off; tests can turn it on if they want it.
 
567
        saved = bzrlib.ui.ui_factory
 
568
        def _restore():
 
569
            bzrlib.ui.ui_factory = saved
 
570
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
571
        self.addCleanup(_restore)
 
572
 
510
573
    def _ndiff_strings(self, a, b):
511
574
        """Return ndiff between two strings containing lines.
512
575
        
1546
1609
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1547
1610
    if verbose:
1548
1611
        verbosity = 2
1549
 
        pb = None
1550
1612
    else:
1551
1613
        verbosity = 1
1552
 
        pb = progress.ProgressBar()
1553
1614
    runner = TextTestRunner(stream=sys.stdout,
1554
1615
                            descriptions=0,
1555
1616
                            verbosity=verbosity,
1556
1617
                            keep_output=keep_output,
1557
 
                            pb=pb,
1558
1618
                            bench_history=bench_history)
1559
1619
    runner.stop_on_failure=stop_on_failure
1560
1620
    if pattern != '.*':