175
175
self._overall_start_time = time.time()
176
176
self._strict = strict
179
# nb: called stopTestRun in the version of this that Python merged
180
# upstream, according to lifeless 20090803
178
def stopTestRun(self):
181
stopTime = time.time()
182
timeTaken = stopTime - self.startTime
184
self.stream.writeln(self.separator2)
185
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
186
run, run != 1 and "s" or "", timeTaken))
187
self.stream.writeln()
188
if not self.wasSuccessful():
189
self.stream.write("FAILED (")
190
failed, errored = map(len, (self.failures, self.errors))
192
self.stream.write("failures=%d" % failed)
194
if failed: self.stream.write(", ")
195
self.stream.write("errors=%d" % errored)
196
if self.known_failure_count:
197
if failed or errored: self.stream.write(", ")
198
self.stream.write("known_failure_count=%d" %
199
self.known_failure_count)
200
self.stream.writeln(")")
202
if self.known_failure_count:
203
self.stream.writeln("OK (known_failures=%d)" %
204
self.known_failure_count)
206
self.stream.writeln("OK")
207
if self.skip_count > 0:
208
skipped = self.skip_count
209
self.stream.writeln('%d test%s skipped' %
210
(skipped, skipped != 1 and "s" or ""))
212
for feature, count in sorted(self.unsupported.items()):
213
self.stream.writeln("Missing feature '%s' skipped %d tests." %
182
216
ok = self.wasStrictlySuccessful()
184
218
ok = self.wasSuccessful()
186
self.stream.write('tests passed\n')
188
self.stream.write('tests failed\n')
189
219
if TestCase._first_thread_leaker_id:
190
220
self.stream.write(
191
221
'%s is leaking threads among %d leaking tests.\n' % (
421
451
self.pb.update_latency = 0
422
452
self.pb.show_transport_activity = False
454
def stopTestRun(self):
425
455
# called when the tests that are going to run have run
427
super(TextTestResult, self).done()
430
457
self.pb.finished()
458
super(TextTestResult, self).stopTestRun()
432
def report_starting(self):
460
def startTestRun(self):
461
super(TextTestResult, self).startTestRun()
433
462
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
435
464
def printErrors(self):
593
622
self.descriptions = descriptions
594
623
self.verbosity = verbosity
595
624
self._bench_history = bench_history
596
self.list_only = list_only
597
625
self._strict = strict
598
626
self._result_decorators = result_decorators or []
600
628
def run(self, test):
601
629
"Run the given test case or test suite."
602
startTime = time.time()
603
630
if self.verbosity == 1:
604
631
result_class = TextTestResult
605
632
elif self.verbosity >= 2:
606
633
result_class = VerboseTestResult
607
result = result_class(self.stream,
634
original_result = result_class(self.stream,
608
635
self.descriptions,
610
637
bench_history=self._bench_history,
611
638
strict=self._strict,
640
# Signal to result objects that look at stop early policy to stop,
641
original_result.stop_early = self.stop_on_failure
642
result = original_result
614
643
for decorator in self._result_decorators:
615
run_result = decorator(run_result)
616
result.stop_early = self.stop_on_failure
617
result.report_starting()
619
if self.verbosity >= 2:
620
self.stream.writeln("Listing tests only ...\n")
622
for t in iter_suite_tests(test):
623
self.stream.writeln("%s" % (t.id()))
632
if isinstance(test, testtools.ConcurrentTestSuite):
633
# We need to catch bzr specific behaviors
634
test.run(BZRTransformingResult(run_result))
637
run = result.testsRun
639
stopTime = time.time()
640
timeTaken = stopTime - startTime
642
self.stream.writeln(result.separator2)
643
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
644
run, run != 1 and "s" or "", timeTaken))
645
self.stream.writeln()
646
if not result.wasSuccessful():
647
self.stream.write("FAILED (")
648
failed, errored = map(len, (result.failures, result.errors))
650
self.stream.write("failures=%d" % failed)
652
if failed: self.stream.write(", ")
653
self.stream.write("errors=%d" % errored)
654
if result.known_failure_count:
655
if failed or errored: self.stream.write(", ")
656
self.stream.write("known_failure_count=%d" %
657
result.known_failure_count)
658
self.stream.writeln(")")
660
if result.known_failure_count:
661
self.stream.writeln("OK (known_failures=%d)" %
662
result.known_failure_count)
664
self.stream.writeln("OK")
665
if result.skip_count > 0:
666
skipped = result.skip_count
667
self.stream.writeln('%d test%s skipped' %
668
(skipped, skipped != 1 and "s" or ""))
669
if result.unsupported:
670
for feature, count in sorted(result.unsupported.items()):
671
self.stream.writeln("Missing feature '%s' skipped %d tests." %
644
result = decorator(result)
645
result.stop_early = self.stop_on_failure
651
if isinstance(test, testtools.ConcurrentTestSuite):
652
# We need to catch bzr specific behaviors
653
result = BZRTransformingResult(result)
654
result.startTestRun()
659
# higher level code uses our extended protocol to determine
660
# what exit code to give.
661
return original_result
677
664
def iter_suite_tests(suite):
2830
2816
decorators.append(CountingDecorator)
2831
2817
for decorator in decorators:
2832
2818
suite = decorator(suite)
2833
result = runner.run(suite)
2820
# Done after test suite decoration to allow randomisation etc
2821
# to take effect, though that is of marginal benefit.
2823
stream.write("Listing tests only ...\n")
2824
for t in iter_suite_tests(suite):
2825
stream.write("%s\n" % (t.id()))
2827
result = runner.run(suite)
2838
2829
return result.wasStrictlySuccessful()