14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
106
103
import bzrlib.trace
107
104
from bzrlib.transport import (
109
import bzrlib.transport
111
110
from bzrlib.trace import mutter, note
112
111
from bzrlib.tests import (
115
from bzrlib.tests.http_server import HttpServer
116
from bzrlib.tests.TestUtil import (
120
from bzrlib.tests.treeshape import build_tree_contents
117
121
from bzrlib.ui import NullProgressView
118
122
from bzrlib.ui.text import TextUIFactory
119
123
import bzrlib.version_info_formats.format_custom
163
167
:param bench_history: Optionally, a writable file object to accumulate
164
168
benchmark results.
166
testtools.TextTestResult.__init__(self, stream)
170
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
167
171
if bench_history is not None:
168
172
from bzrlib.version import _get_bzr_source_tree
169
173
src_tree = _get_bzr_source_tree()
196
200
actionTaken = "Ran"
197
201
stopTime = time.time()
198
202
timeTaken = stopTime - self.startTime
199
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
200
# the parent class method is similar have to duplicate
201
self._show_list('ERROR', self.errors)
202
self._show_list('FAIL', self.failures)
203
self.stream.write(self.sep2)
204
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
204
self.stream.writeln(self.separator2)
205
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
205
206
run, run != 1 and "s" or "", timeTaken))
207
self.stream.writeln()
206
208
if not self.wasSuccessful():
207
209
self.stream.write("FAILED (")
208
210
failed, errored = map(len, (self.failures, self.errors))
215
217
if failed or errored: self.stream.write(", ")
216
218
self.stream.write("known_failure_count=%d" %
217
219
self.known_failure_count)
218
self.stream.write(")\n")
220
self.stream.writeln(")")
220
222
if self.known_failure_count:
221
self.stream.write("OK (known_failures=%d)\n" %
223
self.stream.writeln("OK (known_failures=%d)" %
222
224
self.known_failure_count)
224
self.stream.write("OK\n")
226
self.stream.writeln("OK")
225
227
if self.skip_count > 0:
226
228
skipped = self.skip_count
227
self.stream.write('%d test%s skipped\n' %
229
self.stream.writeln('%d test%s skipped' %
228
230
(skipped, skipped != 1 and "s" or ""))
229
231
if self.unsupported:
230
232
for feature, count in sorted(self.unsupported.items()):
231
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
233
self.stream.writeln("Missing feature '%s' skipped %d tests." %
232
234
(feature, count))
234
236
ok = self.wasStrictlySuccessful()
273
275
def _shortened_test_description(self, test):
275
what = re.sub(r'^bzrlib\.tests\.', '', what)
277
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
278
280
def startTest(self, test):
279
super(ExtendedTestResult, self).startTest(test)
281
unittest.TestResult.startTest(self, test)
280
282
if self.count == 0:
281
283
self.startTests()
282
284
self.report_test_start(test)
355
357
self.report_success(test)
356
358
self._cleanupLogFile(test)
357
super(ExtendedTestResult, self).addSuccess(test)
359
unittest.TestResult.addSuccess(self, test)
358
360
test._log_contents = ''
360
362
def addExpectedFailure(self, test, err):
487
489
return self._shortened_test_description(test)
489
491
def report_error(self, test, err):
490
self.stream.write('ERROR: %s\n %s\n' % (
492
self.ui.note('ERROR: %s\n %s\n' % (
491
493
self._test_description(test),
495
497
def report_failure(self, test, err):
496
self.stream.write('FAIL: %s\n %s\n' % (
498
self.ui.note('FAIL: %s\n %s\n' % (
497
499
self._test_description(test),
548
550
return '%s%s' % (indent, err[1])
550
552
def report_error(self, test, err):
551
self.stream.write('ERROR %s\n%s\n'
553
self.stream.writeln('ERROR %s\n%s'
552
554
% (self._testTimeString(test),
553
555
self._error_summary(err)))
555
557
def report_failure(self, test, err):
556
self.stream.write(' FAIL %s\n%s\n'
558
self.stream.writeln(' FAIL %s\n%s'
557
559
% (self._testTimeString(test),
558
560
self._error_summary(err)))
560
562
def report_known_failure(self, test, err):
561
self.stream.write('XFAIL %s\n%s\n'
563
self.stream.writeln('XFAIL %s\n%s'
562
564
% (self._testTimeString(test),
563
565
self._error_summary(err)))
565
567
def report_success(self, test):
566
self.stream.write(' OK %s\n' % self._testTimeString(test))
568
self.stream.writeln(' OK %s' % self._testTimeString(test))
567
569
for bench_called, stats in getattr(test, '_benchcalls', []):
568
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
570
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
569
571
stats.pprint(file=self.stream)
570
572
# flush the stream so that we get smooth output. This verbose mode is
571
573
# used to show the output in PQM.
572
574
self.stream.flush()
574
576
def report_skip(self, test, reason):
575
self.stream.write(' SKIP %s\n%s\n'
577
self.stream.writeln(' SKIP %s\n%s'
576
578
% (self._testTimeString(test), reason))
578
580
def report_not_applicable(self, test, reason):
579
self.stream.write(' N/A %s\n %s\n'
581
self.stream.writeln(' N/A %s\n %s'
580
582
% (self._testTimeString(test), reason))
582
584
def report_unsupported(self, test, feature):
583
585
"""test cannot be run because feature is missing."""
584
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
586
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
585
587
%(self._testTimeString(test), feature))
616
618
encode = codec.encode
617
619
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
618
620
stream.encoding = new_encoding
621
self.stream = unittest._WritelnDecorator(stream)
620
622
self.descriptions = descriptions
621
623
self.verbosity = verbosity
622
624
self._bench_history = bench_history
746
748
# XXX: Should probably unify more with CannedInputUIFactory or a
747
749
# particular configuration of TextUIFactory, or otherwise have a clearer
748
750
# idea of how they're supposed to be different.
749
# See https://bugs.launchpad.net/bzr/+bug/408213
751
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
751
753
def __init__(self, stdout=None, stderr=None, stdin=None):
752
754
if stdin is not None:
843
845
# going away but leak one) but it seems less likely than the actual
844
846
# false positives (the test see threads going away and does not leak).
845
847
if leaked_threads > 0:
846
if 'threads' in selftest_debug_flags:
847
print '%s is leaking, active is now %d' % (self.id(), active)
848
848
TestCase._leaking_threads_tests += 1
849
849
if TestCase._first_thread_leaker_id is None:
850
850
TestCase._first_thread_leaker_id = self.id()
1027
1027
self.addCleanup(transport_server.stop_server)
1028
1028
# Obtain a real transport because if the server supplies a password, it
1029
1029
# will be hidden from the base on the client side.
1030
t = _mod_transport.get_transport(transport_server.get_url())
1030
t = get_transport(transport_server.get_url())
1031
1031
# Some transport servers effectively chroot the backing transport;
1032
1032
# others like SFTPServer don't - users of the transport can walk up the
1033
1033
# transport to read the entire backing transport. This wouldn't matter
1313
1313
self.assertEqualDiff(content, s)
1315
def assertDocstring(self, expected_docstring, obj):
1316
"""Fail if obj does not have expected_docstring"""
1318
# With -OO the docstring should be None instead
1319
self.assertIs(obj.__doc__, None)
1321
self.assertEqual(expected_docstring, obj.__doc__)
1323
1315
def failUnlessExists(self, path):
1324
1316
"""Fail unless path or paths, which may be abs or relative, exist."""
1325
1317
if not isinstance(path, basestring):
2386
2376
# might be a relative or absolute path
2387
2377
maybe_a_url = self.get_url(relpath)
2388
2378
segments = maybe_a_url.rsplit('/', 1)
2389
t = _mod_transport.get_transport(maybe_a_url)
2379
t = get_transport(maybe_a_url)
2390
2380
if len(segments) > 1 and segments[-1] not in ('', '.'):
2391
2381
t.ensure_base()
2392
2382
if format is None:
2409
2399
made_control = self.make_bzrdir(relpath, format=format)
2410
2400
return made_control.create_repository(shared=shared)
2412
def make_smart_server(self, path, backing_server=None):
2413
if backing_server is None:
2414
backing_server = self.get_server()
2402
def make_smart_server(self, path):
2415
2403
smart_server = test_server.SmartTCPServer_for_testing()
2416
self.start_server(smart_server, backing_server)
2417
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2404
self.start_server(smart_server, self.get_server())
2405
remote_transport = get_transport(smart_server.get_url()).clone(path)
2419
2406
return remote_transport
2421
2408
def make_branch_and_memory_tree(self, relpath, format=None):
2437
2424
def setUp(self):
2438
2425
super(TestCaseWithMemoryTransport, self).setUp()
2439
# Ensure that ConnectedTransport doesn't leak sockets
2440
def get_transport_with_cleanup(*args, **kwargs):
2441
t = orig_get_transport(*args, **kwargs)
2442
if isinstance(t, _mod_transport.ConnectedTransport):
2443
self.addCleanup(t.disconnect)
2446
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
get_transport_with_cleanup)
2448
2426
self._make_test_root()
2449
2427
self.addCleanup(os.chdir, os.getcwdu())
2450
2428
self.makeAndChdirToTestDir()
2579
2553
"a list or a tuple. Got %r instead" % (shape,))
2580
2554
# It's OK to just create them using forward slashes on windows.
2581
2555
if transport is None or transport.is_readonly():
2582
transport = _mod_transport.get_transport(".")
2556
transport = get_transport(".")
2583
2557
for name in shape:
2584
2558
self.assertIsInstance(name, basestring)
2585
2559
if name[-1] == '/':
2595
2569
content = "contents of %s%s" % (name.encode('utf-8'), end)
2596
2570
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2598
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2572
def build_tree_contents(self, shape):
2573
build_tree_contents(shape)
2600
2575
def assertInWorkingTree(self, path, root_path='.', tree=None):
2601
2576
"""Assert whether path or paths are in the WorkingTree"""
2744
2719
def setUp(self):
2745
from bzrlib.tests import http_server
2746
2720
super(ChrootedTestCase, self).setUp()
2747
2721
if not self.vfs_transport_factory == memory.MemoryServer:
2748
self.transport_readonly_server = http_server.HttpServer
2722
self.transport_readonly_server = HttpServer
2751
2725
def condition_id_re(pattern):
3207
3182
def partition_tests(suite, count):
3208
3183
"""Partition suite into count lists of tests."""
3209
# This just assigns tests in a round-robin fashion. On one hand this
3210
# splits up blocks of related tests that might run faster if they shared
3211
# resources, but on the other it avoids assigning blocks of slow tests to
3212
# just one partition. So the slowest partition shouldn't be much slower
3214
partitions = [list() for i in range(count)]
3215
tests = iter_suite_tests(suite)
3216
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3217
partition.append(test)
3221
def workaround_zealous_crypto_random():
3222
"""Crypto.Random want to help us being secure, but we don't care here.
3224
This workaround some test failure related to the sftp server. Once paramiko
3225
stop using the controversial API in Crypto.Random, we may get rid of it.
3228
from Crypto.Random import atfork
3185
tests = list(iter_suite_tests(suite))
3186
tests_per_process = int(math.ceil(float(len(tests)) / count))
3187
for block in range(count):
3188
low_test = block * tests_per_process
3189
high_test = low_test + tests_per_process
3190
process_tests = tests[low_test:high_test]
3191
result.append(process_tests)
3234
3195
def fork_for_tests(suite):
3252
3213
ProtocolTestCase.run(self, result)
3254
os.waitpid(self.pid, 0)
3215
os.waitpid(self.pid, os.WNOHANG)
3256
3217
test_blocks = partition_tests(suite, concurrency)
3257
3218
for process_tests in test_blocks:
3258
process_suite = TestUtil.TestSuite()
3219
process_suite = TestSuite()
3259
3220
process_suite.addTests(process_tests)
3260
3221
c2pread, c2pwrite = os.pipe()
3261
3222
pid = os.fork()
3263
workaround_zealous_crypto_random()
3265
3225
os.close(c2pread)
3266
3226
# Leave stderr and stdout open so we can see test noise
3328
3288
if '--no-plugins' in sys.argv:
3329
3289
argv.append('--no-plugins')
3330
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3331
# noise on stderr it can interrupt the subunit protocol.
3332
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3333
stdout=subprocess.PIPE,
3334
stderr=subprocess.PIPE,
3290
# stderr=STDOUT would be ideal, but until we prevent noise on
3291
# stderr it can interrupt the subunit protocol.
3292
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3336
3294
test = TestInSubprocess(process, test_list_file_name)
3337
3295
result.append(test)
3657
3610
'bzrlib.tests.blackbox',
3658
3611
'bzrlib.tests.commands',
3659
'bzrlib.tests.doc_generate',
3660
3612
'bzrlib.tests.per_branch',
3661
'bzrlib.tests.per_controldir',
3662
'bzrlib.tests.per_controldir_colo',
3613
'bzrlib.tests.per_bzrdir',
3663
3614
'bzrlib.tests.per_foreign_vcs',
3664
3615
'bzrlib.tests.per_interrepository',
3665
3616
'bzrlib.tests.per_intertree',
3678
3629
'bzrlib.tests.per_workingtree',
3679
3630
'bzrlib.tests.test__annotator',
3680
3631
'bzrlib.tests.test__bencode',
3681
'bzrlib.tests.test__btree_serializer',
3682
3632
'bzrlib.tests.test__chk_map',
3683
3633
'bzrlib.tests.test__dirstate_helpers',
3684
3634
'bzrlib.tests.test__groupcompress',
3727
3677
'bzrlib.tests.test_export',
3728
3678
'bzrlib.tests.test_extract',
3729
3679
'bzrlib.tests.test_fetch',
3730
'bzrlib.tests.test_fixtures',
3731
3680
'bzrlib.tests.test_fifo_cache',
3732
3681
'bzrlib.tests.test_filters',
3733
3682
'bzrlib.tests.test_ftp_transport',
3754
3703
'bzrlib.tests.test_knit',
3755
3704
'bzrlib.tests.test_lazy_import',
3756
3705
'bzrlib.tests.test_lazy_regex',
3757
'bzrlib.tests.test_library_state',
3758
3706
'bzrlib.tests.test_lock',
3759
3707
'bzrlib.tests.test_lockable_files',
3760
3708
'bzrlib.tests.test_lockdir',
3762
3710
'bzrlib.tests.test_lru_cache',
3763
3711
'bzrlib.tests.test_lsprof',
3764
3712
'bzrlib.tests.test_mail_client',
3765
'bzrlib.tests.test_matchers',
3766
3713
'bzrlib.tests.test_memorytree',
3767
3714
'bzrlib.tests.test_merge',
3768
3715
'bzrlib.tests.test_merge3',
3817
3764
'bzrlib.tests.test_switch',
3818
3765
'bzrlib.tests.test_symbol_versioning',
3819
3766
'bzrlib.tests.test_tag',
3820
'bzrlib.tests.test_test_server',
3821
3767
'bzrlib.tests.test_testament',
3822
3768
'bzrlib.tests.test_textfile',
3823
3769
'bzrlib.tests.test_textmerge',
3829
3775
'bzrlib.tests.test_transport_log',
3830
3776
'bzrlib.tests.test_tree',
3831
3777
'bzrlib.tests.test_treebuilder',
3832
'bzrlib.tests.test_treeshape',
3833
3778
'bzrlib.tests.test_tsort',
3834
3779
'bzrlib.tests.test_tuned_gzip',
3835
3780
'bzrlib.tests.test_ui',
3839
3784
'bzrlib.tests.test_urlutils',
3840
3785
'bzrlib.tests.test_version',
3841
3786
'bzrlib.tests.test_version_info',
3842
'bzrlib.tests.test_versionedfile',
3843
3787
'bzrlib.tests.test_weave',
3844
3788
'bzrlib.tests.test_whitebox',
3845
3789
'bzrlib.tests.test_win32utils',
4014
3954
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4015
3955
... [('one', dict(param=1)),
4016
3956
... ('two', dict(param=2))],
4017
... TestUtil.TestSuite())
4018
3958
>>> tests = list(iter_suite_tests(r))
4134
4074
if test_id != None:
4135
4075
ui.ui_factory.clear_term()
4136
4076
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4137
# Ugly, but the last thing we want here is fail, so bear with it.
4138
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4139
).encode('ascii', 'replace')
4140
4077
sys.stderr.write('Unable to remove testing dir %s\n%s'
4141
% (os.path.basename(dirname), printable_e))
4078
% (os.path.basename(dirname), e))
4144
4081
class Feature(object):
4509
4435
except ImportError:
4512
class _PosixPermissionsFeature(Feature):
4516
# create temporary file and check if specified perms are maintained.
4519
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4520
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4523
os.chmod(name, write_perms)
4525
read_perms = os.stat(name).st_mode & 0777
4527
return (write_perms == read_perms)
4529
return (os.name == 'posix') and has_perms()
4531
def feature_name(self):
4532
return 'POSIX permissions support'
4534
posix_permissions_feature = _PosixPermissionsFeature()