139
135
SUBUNIT_SEEK_SET = 0
140
136
SUBUNIT_SEEK_CUR = 1
138
# These are intentionally brought into this namespace. That way plugins, etc
139
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
140
TestSuite = TestUtil.TestSuite
141
TestLoader = TestUtil.TestLoader
143
class ExtendedTestResult(unittest._TextTestResult):
143
class ExtendedTestResult(testtools.TextTestResult):
144
144
"""Accepts, reports and accumulates the results of running tests.
146
146
Compared to the unittest version this class adds support for
167
167
:param bench_history: Optionally, a writable file object to accumulate
168
168
benchmark results.
170
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
170
testtools.TextTestResult.__init__(self, stream)
171
171
if bench_history is not None:
172
172
from bzrlib.version import _get_bzr_source_tree
173
173
src_tree = _get_bzr_source_tree()
200
200
actionTaken = "Ran"
201
201
stopTime = time.time()
202
202
timeTaken = stopTime - self.startTime
204
self.stream.writeln(self.separator2)
205
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
203
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
204
# the parent class method is similar have to duplicate
205
self._show_list('ERROR', self.errors)
206
self._show_list('FAIL', self.failures)
207
self.stream.write(self.sep2)
208
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
206
209
run, run != 1 and "s" or "", timeTaken))
207
self.stream.writeln()
208
210
if not self.wasSuccessful():
209
211
self.stream.write("FAILED (")
210
212
failed, errored = map(len, (self.failures, self.errors))
217
219
if failed or errored: self.stream.write(", ")
218
220
self.stream.write("known_failure_count=%d" %
219
221
self.known_failure_count)
220
self.stream.writeln(")")
222
self.stream.write(")\n")
222
224
if self.known_failure_count:
223
self.stream.writeln("OK (known_failures=%d)" %
225
self.stream.write("OK (known_failures=%d)\n" %
224
226
self.known_failure_count)
226
self.stream.writeln("OK")
228
self.stream.write("OK\n")
227
229
if self.skip_count > 0:
228
230
skipped = self.skip_count
229
self.stream.writeln('%d test%s skipped' %
231
self.stream.write('%d test%s skipped\n' %
230
232
(skipped, skipped != 1 and "s" or ""))
231
233
if self.unsupported:
232
234
for feature, count in sorted(self.unsupported.items()):
233
self.stream.writeln("Missing feature '%s' skipped %d tests." %
235
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
234
236
(feature, count))
236
238
ok = self.wasStrictlySuccessful()
357
359
self.report_success(test)
358
360
self._cleanupLogFile(test)
359
unittest.TestResult.addSuccess(self, test)
361
super(ExtendedTestResult, self).addSuccess(test)
360
362
test._log_contents = ''
362
364
def addExpectedFailure(self, test, err):
550
552
return '%s%s' % (indent, err[1])
552
554
def report_error(self, test, err):
553
self.stream.writeln('ERROR %s\n%s'
555
self.stream.write('ERROR %s\n%s\n'
554
556
% (self._testTimeString(test),
555
557
self._error_summary(err)))
557
559
def report_failure(self, test, err):
558
self.stream.writeln(' FAIL %s\n%s'
560
self.stream.write(' FAIL %s\n%s\n'
559
561
% (self._testTimeString(test),
560
562
self._error_summary(err)))
562
564
def report_known_failure(self, test, err):
563
self.stream.writeln('XFAIL %s\n%s'
565
self.stream.write('XFAIL %s\n%s\n'
564
566
% (self._testTimeString(test),
565
567
self._error_summary(err)))
567
569
def report_success(self, test):
568
self.stream.writeln(' OK %s' % self._testTimeString(test))
570
self.stream.write(' OK %s\n' % self._testTimeString(test))
569
571
for bench_called, stats in getattr(test, '_benchcalls', []):
570
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
572
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
571
573
stats.pprint(file=self.stream)
572
574
# flush the stream so that we get smooth output. This verbose mode is
573
575
# used to show the output in PQM.
574
576
self.stream.flush()
576
578
def report_skip(self, test, reason):
577
self.stream.writeln(' SKIP %s\n%s'
579
self.stream.write(' SKIP %s\n%s\n'
578
580
% (self._testTimeString(test), reason))
580
582
def report_not_applicable(self, test, reason):
581
self.stream.writeln(' N/A %s\n %s'
583
self.stream.write(' N/A %s\n %s\n'
582
584
% (self._testTimeString(test), reason))
584
586
def report_unsupported(self, test, feature):
585
587
"""test cannot be run because feature is missing."""
586
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
588
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
587
589
%(self._testTimeString(test), feature))
618
620
encode = codec.encode
619
621
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
620
622
stream.encoding = new_encoding
621
self.stream = unittest._WritelnDecorator(stream)
622
624
self.descriptions = descriptions
623
625
self.verbosity = verbosity
624
626
self._bench_history = bench_history
845
847
# going away but leak one) but it seems less likely than the actual
846
848
# false positives (the test see threads going away and does not leak).
847
849
if leaked_threads > 0:
850
if 'threads' in selftest_debug_flags:
851
print '%s is leaking, active is now %d' % (self.id(), active)
848
852
TestCase._leaking_threads_tests += 1
849
853
if TestCase._first_thread_leaker_id is None:
850
854
TestCase._first_thread_leaker_id = self.id()
1455
1459
The file is removed as the test is torn down.
1457
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1458
self._log_file = os.fdopen(fileno, 'w+')
1461
self._log_file = StringIO()
1459
1462
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1460
self._log_file_name = name
1461
1463
self.addCleanup(self._finishLogFile)
1463
1465
def _finishLogFile(self):
1665
1667
unicodestr = self._log_contents.decode('utf8', 'replace')
1666
1668
self._log_contents = unicodestr.encode('utf8')
1667
1669
return self._log_contents
1669
if bzrlib.trace._trace_file:
1670
# flush the log file, to get all content
1671
bzrlib.trace._trace_file.flush()
1672
if self._log_file_name is not None:
1673
logfile = open(self._log_file_name)
1675
log_contents = logfile.read()
1670
if self._log_file is not None:
1671
log_contents = self._log_file.getvalue()
1679
1673
log_contents.decode('utf8')
1680
1674
except UnicodeDecodeError:
1681
1675
unicodestr = log_contents.decode('utf8', 'replace')
1682
1676
log_contents = unicodestr.encode('utf8')
1683
1677
if not keep_log_file:
1685
max_close_attempts = 100
1686
first_close_error = None
1687
while close_attempts < max_close_attempts:
1690
self._log_file.close()
1691
except IOError, ioe:
1692
if ioe.errno is None:
1693
# No errno implies 'close() called during
1694
# concurrent operation on the same file object', so
1695
# retry. Probably a thread is trying to write to
1697
if first_close_error is None:
1698
first_close_error = ioe
1703
if close_attempts > 1:
1705
'Unable to close log file on first attempt, '
1706
'will retry: %s\n' % (first_close_error,))
1707
if close_attempts == max_close_attempts:
1709
'Unable to close log file after %d attempts.\n'
1710
% (max_close_attempts,))
1711
1678
self._log_file = None
1712
1679
# Permit multiple calls to get_log until we clean it up in
1713
1680
# finishLogFile
1714
1681
self._log_contents = log_contents
1716
os.remove(self._log_file_name)
1718
if sys.platform == 'win32' and e.errno == errno.EACCES:
1719
sys.stderr.write(('Unable to delete log file '
1720
' %r\n' % self._log_file_name))
1723
self._log_file_name = None
1724
1682
return log_contents
1726
return "No log file content and no log file name."
1684
return "No log file content."
1728
1686
def get_log(self):
1729
1687
"""Get a unicode string containing the log from bzrlib.trace.
1944
1902
variables. A value of None will unset the env variable.
1945
1903
The values must be strings. The change will only occur in the
1946
1904
child, so you don't need to fix the environment after running.
1947
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1905
:param skip_if_plan_to_signal: raise TestSkipped when true and system
1906
doesn't support signalling subprocesses.
1949
1907
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1951
1909
:returns: Popen object for the started process.
1953
1911
if skip_if_plan_to_signal:
1954
if not getattr(os, 'kill', None):
1955
raise TestSkipped("os.kill not available.")
1912
if os.name != "posix":
1913
raise TestSkipped("Sending signals not supported")
1957
1915
if env_changes is None:
1958
1916
env_changes = {}
2437
2395
def setUp(self):
2438
2396
super(TestCaseWithMemoryTransport, self).setUp()
2397
# Ensure that ConnectedTransport doesn't leak sockets
2398
def get_transport_with_cleanup(*args, **kwargs):
2399
t = orig_get_transport(*args, **kwargs)
2400
if isinstance(t, _mod_transport.ConnectedTransport):
2401
self.addCleanup(t.disconnect)
2404
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2405
get_transport_with_cleanup)
2439
2406
self._make_test_root()
2440
2407
self.addCleanup(os.chdir, os.getcwdu())
2441
2408
self.makeAndChdirToTestDir()
2735
2702
def setUp(self):
2703
from bzrlib.tests import http_server
2736
2704
super(ChrootedTestCase, self).setUp()
2737
2705
if not self.vfs_transport_factory == memory.MemoryServer:
2738
self.transport_readonly_server = HttpServer
2706
self.transport_readonly_server = http_server.HttpServer
2741
2709
def condition_id_re(pattern):
3197
3165
def partition_tests(suite, count):
3198
3166
"""Partition suite into count lists of tests."""
3200
tests = list(iter_suite_tests(suite))
3201
tests_per_process = int(math.ceil(float(len(tests)) / count))
3202
for block in range(count):
3203
low_test = block * tests_per_process
3204
high_test = low_test + tests_per_process
3205
process_tests = tests[low_test:high_test]
3206
result.append(process_tests)
3167
# This just assigns tests in a round-robin fashion. On one hand this
3168
# splits up blocks of related tests that might run faster if they shared
3169
# resources, but on the other it avoids assigning blocks of slow tests to
3170
# just one partition. So the slowest partition shouldn't be much slower
3172
partitions = [list() for i in range(count)]
3173
tests = iter_suite_tests(suite)
3174
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3175
partition.append(test)
3210
3179
def workaround_zealous_crypto_random():
3245
3214
test_blocks = partition_tests(suite, concurrency)
3246
3215
for process_tests in test_blocks:
3247
process_suite = TestSuite()
3216
process_suite = TestUtil.TestSuite()
3248
3217
process_suite.addTests(process_tests)
3249
3218
c2pread, c2pwrite = os.pipe()
3250
3219
pid = os.fork()
3644
3615
'bzrlib.tests.blackbox',
3645
3616
'bzrlib.tests.commands',
3617
'bzrlib.tests.doc_generate',
3646
3618
'bzrlib.tests.per_branch',
3647
3619
'bzrlib.tests.per_bzrdir',
3648
'bzrlib.tests.per_bzrdir_colo',
3620
'bzrlib.tests.per_controldir',
3621
'bzrlib.tests.per_controldir_colo',
3649
3622
'bzrlib.tests.per_foreign_vcs',
3650
3623
'bzrlib.tests.per_interrepository',
3651
3624
'bzrlib.tests.per_intertree',
3664
3637
'bzrlib.tests.per_workingtree',
3665
3638
'bzrlib.tests.test__annotator',
3666
3639
'bzrlib.tests.test__bencode',
3640
'bzrlib.tests.test__btree_serializer',
3667
3641
'bzrlib.tests.test__chk_map',
3668
3642
'bzrlib.tests.test__dirstate_helpers',
3669
3643
'bzrlib.tests.test__groupcompress',
3802
3776
'bzrlib.tests.test_switch',
3803
3777
'bzrlib.tests.test_symbol_versioning',
3804
3778
'bzrlib.tests.test_tag',
3779
'bzrlib.tests.test_test_server',
3805
3780
'bzrlib.tests.test_testament',
3806
3781
'bzrlib.tests.test_textfile',
3807
3782
'bzrlib.tests.test_textmerge',
3823
3798
'bzrlib.tests.test_urlutils',
3824
3799
'bzrlib.tests.test_version',
3825
3800
'bzrlib.tests.test_version_info',
3801
'bzrlib.tests.test_versionedfile',
3826
3802
'bzrlib.tests.test_weave',
3827
3803
'bzrlib.tests.test_whitebox',
3828
3804
'bzrlib.tests.test_win32utils',
3997
3973
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3998
3974
... [('one', dict(param=1)),
3999
3975
... ('two', dict(param=2))],
3976
... TestUtil.TestSuite())
4001
3977
>>> tests = list(iter_suite_tests(r))