14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
104
101
deprecated_passed,
106
103
import bzrlib.trace
107
from bzrlib.transport import (
104
from bzrlib.transport import get_transport, pathfilter
105
import bzrlib.transport
106
from bzrlib.transport.local import LocalURLServer
107
from bzrlib.transport.memory import MemoryServer
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
110
from bzrlib.tests import TestUtil
111
from bzrlib.tests.http_server import HttpServer
112
from bzrlib.tests.TestUtil import (
116
from bzrlib.tests.treeshape import build_tree_contents
117
117
from bzrlib.ui import NullProgressView
118
118
from bzrlib.ui.text import TextUIFactory
119
119
import bzrlib.version_info_formats.format_custom
124
124
# shown frame is the test code, not our assertXYZ.
127
default_transport = test_server.LocalURLServer
130
_unitialized_attr = object()
131
"""A sentinel needed to act as a default value in a method signature."""
127
default_transport = LocalURLServer
134
129
# Subunit result codes, defined here to prevent a hard dependency on subunit.
135
130
SUBUNIT_SEEK_SET = 0
136
131
SUBUNIT_SEEK_CUR = 1
139
class ExtendedTestResult(testtools.TextTestResult):
134
class ExtendedTestResult(unittest._TextTestResult):
140
135
"""Accepts, reports and accumulates the results of running tests.
142
137
Compared to the unittest version this class adds support for
163
158
:param bench_history: Optionally, a writable file object to accumulate
164
159
benchmark results.
166
testtools.TextTestResult.__init__(self, stream)
161
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
167
162
if bench_history is not None:
168
163
from bzrlib.version import _get_bzr_source_tree
169
164
src_tree = _get_bzr_source_tree()
196
191
actionTaken = "Ran"
197
192
stopTime = time.time()
198
193
timeTaken = stopTime - self.startTime
199
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
200
# the parent class method is similar have to duplicate
201
self._show_list('ERROR', self.errors)
202
self._show_list('FAIL', self.failures)
203
self.stream.write(self.sep2)
204
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
195
self.stream.writeln(self.separator2)
196
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
205
197
run, run != 1 and "s" or "", timeTaken))
198
self.stream.writeln()
206
199
if not self.wasSuccessful():
207
200
self.stream.write("FAILED (")
208
201
failed, errored = map(len, (self.failures, self.errors))
215
208
if failed or errored: self.stream.write(", ")
216
209
self.stream.write("known_failure_count=%d" %
217
210
self.known_failure_count)
218
self.stream.write(")\n")
211
self.stream.writeln(")")
220
213
if self.known_failure_count:
221
self.stream.write("OK (known_failures=%d)\n" %
214
self.stream.writeln("OK (known_failures=%d)" %
222
215
self.known_failure_count)
224
self.stream.write("OK\n")
217
self.stream.writeln("OK")
225
218
if self.skip_count > 0:
226
219
skipped = self.skip_count
227
self.stream.write('%d test%s skipped\n' %
220
self.stream.writeln('%d test%s skipped' %
228
221
(skipped, skipped != 1 and "s" or ""))
229
222
if self.unsupported:
230
223
for feature, count in sorted(self.unsupported.items()):
231
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
224
self.stream.writeln("Missing feature '%s' skipped %d tests." %
232
225
(feature, count))
234
227
ok = self.wasStrictlySuccessful()
273
266
def _shortened_test_description(self, test):
275
what = re.sub(r'^bzrlib\.tests\.', '', what)
268
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
278
271
def startTest(self, test):
279
super(ExtendedTestResult, self).startTest(test)
272
unittest.TestResult.startTest(self, test)
280
273
if self.count == 0:
281
274
self.startTests()
282
275
self.report_test_start(test)
355
348
self.report_success(test)
356
349
self._cleanupLogFile(test)
357
super(ExtendedTestResult, self).addSuccess(test)
350
unittest.TestResult.addSuccess(self, test)
358
351
test._log_contents = ''
360
353
def addExpectedFailure(self, test, err):
487
480
return self._shortened_test_description(test)
489
482
def report_error(self, test, err):
490
self.stream.write('ERROR: %s\n %s\n' % (
483
ui.ui_factory.note('ERROR: %s\n %s\n' % (
491
484
self._test_description(test),
495
488
def report_failure(self, test, err):
496
self.stream.write('FAIL: %s\n %s\n' % (
489
ui.ui_factory.note('FAIL: %s\n %s\n' % (
497
490
self._test_description(test),
548
541
return '%s%s' % (indent, err[1])
550
543
def report_error(self, test, err):
551
self.stream.write('ERROR %s\n%s\n'
544
self.stream.writeln('ERROR %s\n%s'
552
545
% (self._testTimeString(test),
553
546
self._error_summary(err)))
555
548
def report_failure(self, test, err):
556
self.stream.write(' FAIL %s\n%s\n'
549
self.stream.writeln(' FAIL %s\n%s'
557
550
% (self._testTimeString(test),
558
551
self._error_summary(err)))
560
553
def report_known_failure(self, test, err):
561
self.stream.write('XFAIL %s\n%s\n'
554
self.stream.writeln('XFAIL %s\n%s'
562
555
% (self._testTimeString(test),
563
556
self._error_summary(err)))
565
558
def report_success(self, test):
566
self.stream.write(' OK %s\n' % self._testTimeString(test))
559
self.stream.writeln(' OK %s' % self._testTimeString(test))
567
560
for bench_called, stats in getattr(test, '_benchcalls', []):
568
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
561
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
569
562
stats.pprint(file=self.stream)
570
563
# flush the stream so that we get smooth output. This verbose mode is
571
564
# used to show the output in PQM.
572
565
self.stream.flush()
574
567
def report_skip(self, test, reason):
575
self.stream.write(' SKIP %s\n%s\n'
568
self.stream.writeln(' SKIP %s\n%s'
576
569
% (self._testTimeString(test), reason))
578
571
def report_not_applicable(self, test, reason):
579
self.stream.write(' N/A %s\n %s\n'
572
self.stream.writeln(' N/A %s\n %s'
580
573
% (self._testTimeString(test), reason))
582
575
def report_unsupported(self, test, feature):
583
576
"""test cannot be run because feature is missing."""
584
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
577
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
585
578
%(self._testTimeString(test), feature))
616
609
encode = codec.encode
617
610
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
618
611
stream.encoding = new_encoding
612
self.stream = unittest._WritelnDecorator(stream)
620
613
self.descriptions = descriptions
621
614
self.verbosity = verbosity
622
615
self._bench_history = bench_history
746
743
# XXX: Should probably unify more with CannedInputUIFactory or a
747
744
# particular configuration of TextUIFactory, or otherwise have a clearer
748
745
# idea of how they're supposed to be different.
749
# See https://bugs.launchpad.net/bzr/+bug/408213
746
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
751
748
def __init__(self, stdout=None, stderr=None, stdin=None):
752
749
if stdin is not None:
843
840
# going away but leak one) but it seems less likely than the actual
844
841
# false positives (the test see threads going away and does not leak).
845
842
if leaked_threads > 0:
846
if 'threads' in selftest_debug_flags:
847
print '%s is leaking, active is now %d' % (self.id(), active)
848
843
TestCase._leaking_threads_tests += 1
849
844
if TestCase._first_thread_leaker_id is None:
850
845
TestCase._first_thread_leaker_id = self.id()
855
850
Tests that want to use debug flags can just set them in the
856
851
debug_flags set during setup/teardown.
858
# Start with a copy of the current debug flags we can safely modify.
859
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
853
self._preserved_debug_flags = set(debug.debug_flags)
860
854
if 'allow_debug' not in selftest_debug_flags:
861
855
debug.debug_flags.clear()
862
856
if 'disable_lock_checks' not in selftest_debug_flags:
863
857
debug.debug_flags.add('strict_locks')
858
self.addCleanup(self._restore_debug_flags)
865
860
def _clear_hooks(self):
866
861
# prevent hooks affecting tests
887
882
def _silenceUI(self):
888
883
"""Turn off UI for duration of test"""
889
884
# by default the UI is off; tests can turn it on if they want it.
890
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
885
saved = ui.ui_factory
887
ui.ui_factory = saved
888
ui.ui_factory = ui.SilentUIFactory()
889
self.addCleanup(_restore)
892
891
def _check_locks(self):
893
892
"""Check that all lock take/release actions have been paired."""
1027
1026
self.addCleanup(transport_server.stop_server)
1028
1027
# Obtain a real transport because if the server supplies a password, it
1029
1028
# will be hidden from the base on the client side.
1030
t = _mod_transport.get_transport(transport_server.get_url())
1029
t = get_transport(transport_server.get_url())
1031
1030
# Some transport servers effectively chroot the backing transport;
1032
1031
# others like SFTPServer don't - users of the transport can walk up the
1033
1032
# transport to read the entire backing transport. This wouldn't matter
1044
1043
if t.base.endswith('/work/'):
1045
1044
# we have safety net/test root/work
1046
1045
t = t.clone('../..')
1047
elif isinstance(transport_server,
1048
test_server.SmartTCPServer_for_testing):
1046
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1049
1047
# The smart server adds a path similar to work, which is traversed
1050
1048
# up from by the client. But the server is chrooted - the actual
1051
1049
# backing transport is not escaped from, and VFS requests to the
1206
1204
raise AssertionError('pattern "%s" found in "%s"'
1207
1205
% (needle_re, haystack))
1209
def assertContainsString(self, haystack, needle):
1210
if haystack.find(needle) == -1:
1211
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1213
1207
def assertSubset(self, sublist, superlist):
1214
1208
"""Assert that every entry in sublist is present in superlist."""
1215
1209
missing = set(sublist) - set(superlist)
1313
1307
self.assertEqualDiff(content, s)
1315
def assertDocstring(self, expected_docstring, obj):
1316
"""Fail if obj does not have expected_docstring"""
1318
# With -OO the docstring should be None instead
1319
self.assertIs(obj.__doc__, None)
1321
self.assertEqual(expected_docstring, obj.__doc__)
1323
1309
def failUnlessExists(self, path):
1324
1310
"""Fail unless path or paths, which may be abs or relative, exist."""
1325
1311
if not isinstance(path, basestring):
1494
1480
self._cleanups.append((callable, args, kwargs))
1496
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1497
"""Overrides an object attribute restoring it after the test.
1499
:param obj: The object that will be mutated.
1501
:param attr_name: The attribute name we want to preserve/override in
1504
:param new: The optional value we want to set the attribute to.
1506
:returns: The actual attr value.
1508
value = getattr(obj, attr_name)
1509
# The actual value is captured by the call below
1510
self.addCleanup(setattr, obj, attr_name, value)
1511
if new is not _unitialized_attr:
1512
setattr(obj, attr_name, new)
1515
1482
def _cleanEnvironment(self):
1517
1484
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1523
1490
'EDITOR': None,
1524
1491
'BZR_EMAIL': None,
1525
1492
'BZREMAIL': None, # may still be present in the environment
1526
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1527
1494
'BZR_PROGRESS_BAR': None,
1528
1495
'BZR_LOG': None,
1529
1496
'BZR_PLUGIN_PATH': None,
1530
'BZR_DISABLE_PLUGINS': None,
1531
'BZR_PLUGINS_AT': None,
1532
1497
'BZR_CONCURRENCY': None,
1533
1498
# Make sure that any text ui tests are consistent regardless of
1534
1499
# the environment the test case is run in; you may want tests that
1555
1520
'ftp_proxy': None,
1556
1521
'FTP_PROXY': None,
1557
1522
'BZR_REMOTE_PATH': None,
1558
# Generally speaking, we don't want apport reporting on crashes in
1559
# the test envirnoment unless we're specifically testing apport,
1560
# so that it doesn't leak into the real system environment. We
1561
# use an env var so it propagates to subprocesses.
1562
'APPORT_DISABLE': '1',
1565
1525
self.addCleanup(self._restoreEnvironment)
1566
1526
for name, value in new_env.iteritems():
1567
1527
self._captureVar(name, value)
1569
1529
def _captureVar(self, name, newvalue):
1570
1530
"""Set an environment variable, and reset it when finished."""
1571
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1531
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1533
def _restore_debug_flags(self):
1534
debug.debug_flags.clear()
1535
debug.debug_flags.update(self._preserved_debug_flags)
1573
1537
def _restoreEnvironment(self):
1574
for name, value in self._old_env.iteritems():
1538
for name, value in self.__old_env.iteritems():
1575
1539
osutils.set_or_unset_env(name, value)
1577
1541
def _restoreHooks(self):
1681
1645
unicodestr = log_contents.decode('utf8', 'replace')
1682
1646
log_contents = unicodestr.encode('utf8')
1683
1647
if not keep_log_file:
1685
max_close_attempts = 100
1686
first_close_error = None
1687
while close_attempts < max_close_attempts:
1690
self._log_file.close()
1691
except IOError, ioe:
1692
if ioe.errno is None:
1693
# No errno implies 'close() called during
1694
# concurrent operation on the same file object', so
1695
# retry. Probably a thread is trying to write to
1697
if first_close_error is None:
1698
first_close_error = ioe
1703
if close_attempts > 1:
1705
'Unable to close log file on first attempt, '
1706
'will retry: %s\n' % (first_close_error,))
1707
if close_attempts == max_close_attempts:
1709
'Unable to close log file after %d attempts.\n'
1710
% (max_close_attempts,))
1648
self._log_file.close()
1711
1649
self._log_file = None
1712
1650
# Permit multiple calls to get_log until we clean it up in
1713
1651
# finishLogFile
2102
2038
Tests that expect to provoke LockContention errors should call this.
2104
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2040
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2042
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2043
self.addCleanup(resetTimeout)
2044
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2106
2046
def make_utf8_encoded_stringio(self, encoding_type=None):
2107
2047
"""Return a StringIOWrapper instance, that will encode Unicode
2121
2061
request_handlers = request.request_handlers
2122
2062
orig_method = request_handlers.get(verb)
2123
2063
request_handlers.remove(verb)
2124
self.addCleanup(request_handlers.register, verb, orig_method)
2065
request_handlers.register(verb, orig_method)
2066
self.addCleanup(restoreVerb)
2127
2069
class CapturedCall(object):
2218
2160
if self.__readonly_server is None:
2219
2161
if self.transport_readonly_server is None:
2220
2162
# readonly decorator requested
2221
self.__readonly_server = test_server.ReadonlyServer()
2163
self.__readonly_server = ReadonlyServer()
2223
2165
# explicit readonly transport.
2224
2166
self.__readonly_server = self.create_transport_readonly_server()
2386
2328
# might be a relative or absolute path
2387
2329
maybe_a_url = self.get_url(relpath)
2388
2330
segments = maybe_a_url.rsplit('/', 1)
2389
t = _mod_transport.get_transport(maybe_a_url)
2331
t = get_transport(maybe_a_url)
2390
2332
if len(segments) > 1 and segments[-1] not in ('', '.'):
2391
2333
t.ensure_base()
2392
2334
if format is None:
2409
2351
made_control = self.make_bzrdir(relpath, format=format)
2410
2352
return made_control.create_repository(shared=shared)
2412
def make_smart_server(self, path, backing_server=None):
2413
if backing_server is None:
2414
backing_server = self.get_server()
2415
smart_server = test_server.SmartTCPServer_for_testing()
2416
self.start_server(smart_server, backing_server)
2417
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2354
def make_smart_server(self, path):
2355
smart_server = server.SmartTCPServer_for_testing()
2356
self.start_server(smart_server, self.get_server())
2357
remote_transport = get_transport(smart_server.get_url()).clone(path)
2419
2358
return remote_transport
2421
2360
def make_branch_and_memory_tree(self, relpath, format=None):
2437
2376
def setUp(self):
2438
2377
super(TestCaseWithMemoryTransport, self).setUp()
2439
# Ensure that ConnectedTransport doesn't leak sockets
2440
def get_transport_with_cleanup(*args, **kwargs):
2441
t = orig_get_transport(*args, **kwargs)
2442
if isinstance(t, _mod_transport.ConnectedTransport):
2443
self.addCleanup(t.disconnect)
2446
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
get_transport_with_cleanup)
2448
2378
self._make_test_root()
2449
self.addCleanup(os.chdir, os.getcwdu())
2379
_currentdir = os.getcwdu()
2380
def _leaveDirectory():
2381
os.chdir(_currentdir)
2382
self.addCleanup(_leaveDirectory)
2450
2383
self.makeAndChdirToTestDir()
2451
2384
self.overrideEnvironmentForTesting()
2452
2385
self.__readonly_server = None
2579
2508
"a list or a tuple. Got %r instead" % (shape,))
2580
2509
# It's OK to just create them using forward slashes on windows.
2581
2510
if transport is None or transport.is_readonly():
2582
transport = _mod_transport.get_transport(".")
2511
transport = get_transport(".")
2583
2512
for name in shape:
2584
2513
self.assertIsInstance(name, basestring)
2585
2514
if name[-1] == '/':
2595
2524
content = "contents of %s%s" % (name.encode('utf-8'), end)
2596
2525
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2598
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2527
def build_tree_contents(self, shape):
2528
build_tree_contents(shape)
2600
2530
def assertInWorkingTree(self, path, root_path='.', tree=None):
2601
2531
"""Assert whether path or paths are in the WorkingTree"""
2677
2607
# We can only make working trees locally at the moment. If the
2678
2608
# transport can't support them, then we keep the non-disk-backed
2679
2609
# branch and create a local checkout.
2680
if self.vfs_transport_factory is test_server.LocalURLServer:
2610
if self.vfs_transport_factory is LocalURLServer:
2681
2611
# the branch is colocated on disk, we cannot create a checkout.
2682
2612
# hopefully callers will expect this.
2683
2613
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2744
2674
def setUp(self):
2745
from bzrlib.tests import http_server
2746
2675
super(ChrootedTestCase, self).setUp()
2747
if not self.vfs_transport_factory == memory.MemoryServer:
2748
self.transport_readonly_server = http_server.HttpServer
2676
if not self.vfs_transport_factory == MemoryServer:
2677
self.transport_readonly_server = HttpServer
2751
2680
def condition_id_re(pattern):
3207
3137
def partition_tests(suite, count):
3208
3138
"""Partition suite into count lists of tests."""
3209
# This just assigns tests in a round-robin fashion. On one hand this
3210
# splits up blocks of related tests that might run faster if they shared
3211
# resources, but on the other it avoids assigning blocks of slow tests to
3212
# just one partition. So the slowest partition shouldn't be much slower
3214
partitions = [list() for i in range(count)]
3215
tests = iter_suite_tests(suite)
3216
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3217
partition.append(test)
3221
def workaround_zealous_crypto_random():
3222
"""Crypto.Random want to help us being secure, but we don't care here.
3224
This workaround some test failure related to the sftp server. Once paramiko
3225
stop using the controversial API in Crypto.Random, we may get rid of it.
3228
from Crypto.Random import atfork
3140
tests = list(iter_suite_tests(suite))
3141
tests_per_process = int(math.ceil(float(len(tests)) / count))
3142
for block in range(count):
3143
low_test = block * tests_per_process
3144
high_test = low_test + tests_per_process
3145
process_tests = tests[low_test:high_test]
3146
result.append(process_tests)
3234
3150
def fork_for_tests(suite):
3252
3168
ProtocolTestCase.run(self, result)
3254
os.waitpid(self.pid, 0)
3170
os.waitpid(self.pid, os.WNOHANG)
3256
3172
test_blocks = partition_tests(suite, concurrency)
3257
3173
for process_tests in test_blocks:
3258
process_suite = TestUtil.TestSuite()
3174
process_suite = TestSuite()
3259
3175
process_suite.addTests(process_tests)
3260
3176
c2pread, c2pwrite = os.pipe()
3261
3177
pid = os.fork()
3263
workaround_zealous_crypto_random()
3265
3180
os.close(c2pread)
3266
3181
# Leave stderr and stdout open so we can see test noise
3328
3243
if '--no-plugins' in sys.argv:
3329
3244
argv.append('--no-plugins')
3330
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3331
# noise on stderr it can interrupt the subunit protocol.
3332
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3333
stdout=subprocess.PIPE,
3334
stderr=subprocess.PIPE,
3245
# stderr=STDOUT would be ideal, but until we prevent noise on
3246
# stderr it can interrupt the subunit protocol.
3247
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3336
3249
test = TestInSubprocess(process, test_list_file_name)
3337
3250
result.append(test)
3642
3550
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3643
3551
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3645
# Obvious highest levels prefixes, feel free to add your own via a plugin
3553
# Obvious higest levels prefixes, feel free to add your own via a plugin
3646
3554
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3647
3555
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3648
3556
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3657
3565
'bzrlib.tests.blackbox',
3658
3566
'bzrlib.tests.commands',
3659
'bzrlib.tests.doc_generate',
3660
3567
'bzrlib.tests.per_branch',
3661
'bzrlib.tests.per_controldir',
3662
'bzrlib.tests.per_controldir_colo',
3568
'bzrlib.tests.per_bzrdir',
3663
3569
'bzrlib.tests.per_foreign_vcs',
3664
3570
'bzrlib.tests.per_interrepository',
3665
3571
'bzrlib.tests.per_intertree',
3678
3584
'bzrlib.tests.per_workingtree',
3679
3585
'bzrlib.tests.test__annotator',
3680
3586
'bzrlib.tests.test__bencode',
3681
'bzrlib.tests.test__btree_serializer',
3682
3587
'bzrlib.tests.test__chk_map',
3683
3588
'bzrlib.tests.test__dirstate_helpers',
3684
3589
'bzrlib.tests.test__groupcompress',
3706
3611
'bzrlib.tests.test_chunk_writer',
3707
3612
'bzrlib.tests.test_clean_tree',
3708
3613
'bzrlib.tests.test_cleanup',
3709
'bzrlib.tests.test_cmdline',
3710
3614
'bzrlib.tests.test_commands',
3711
3615
'bzrlib.tests.test_commit',
3712
3616
'bzrlib.tests.test_commit_merge',
3727
3631
'bzrlib.tests.test_export',
3728
3632
'bzrlib.tests.test_extract',
3729
3633
'bzrlib.tests.test_fetch',
3730
'bzrlib.tests.test_fixtures',
3731
3634
'bzrlib.tests.test_fifo_cache',
3732
3635
'bzrlib.tests.test_filters',
3733
3636
'bzrlib.tests.test_ftp_transport',
3747
3650
'bzrlib.tests.test_identitymap',
3748
3651
'bzrlib.tests.test_ignores',
3749
3652
'bzrlib.tests.test_index',
3750
'bzrlib.tests.test_import_tariff',
3751
3653
'bzrlib.tests.test_info',
3752
3654
'bzrlib.tests.test_inv',
3753
3655
'bzrlib.tests.test_inventory_delta',
3754
3656
'bzrlib.tests.test_knit',
3755
3657
'bzrlib.tests.test_lazy_import',
3756
3658
'bzrlib.tests.test_lazy_regex',
3757
'bzrlib.tests.test_library_state',
3758
3659
'bzrlib.tests.test_lock',
3759
3660
'bzrlib.tests.test_lockable_files',
3760
3661
'bzrlib.tests.test_lockdir',
3762
3663
'bzrlib.tests.test_lru_cache',
3763
3664
'bzrlib.tests.test_lsprof',
3764
3665
'bzrlib.tests.test_mail_client',
3765
'bzrlib.tests.test_matchers',
3766
3666
'bzrlib.tests.test_memorytree',
3767
3667
'bzrlib.tests.test_merge',
3768
3668
'bzrlib.tests.test_merge3',
3817
3717
'bzrlib.tests.test_switch',
3818
3718
'bzrlib.tests.test_symbol_versioning',
3819
3719
'bzrlib.tests.test_tag',
3820
'bzrlib.tests.test_test_server',
3821
3720
'bzrlib.tests.test_testament',
3822
3721
'bzrlib.tests.test_textfile',
3823
3722
'bzrlib.tests.test_textmerge',
3829
3728
'bzrlib.tests.test_transport_log',
3830
3729
'bzrlib.tests.test_tree',
3831
3730
'bzrlib.tests.test_treebuilder',
3832
'bzrlib.tests.test_treeshape',
3833
3731
'bzrlib.tests.test_tsort',
3834
3732
'bzrlib.tests.test_tuned_gzip',
3835
3733
'bzrlib.tests.test_ui',
3839
3737
'bzrlib.tests.test_urlutils',
3840
3738
'bzrlib.tests.test_version',
3841
3739
'bzrlib.tests.test_version_info',
3842
'bzrlib.tests.test_versionedfile',
3843
3740
'bzrlib.tests.test_weave',
3844
3741
'bzrlib.tests.test_whitebox',
3845
3742
'bzrlib.tests.test_win32utils',
4014
3907
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4015
3908
... [('one', dict(param=1)),
4016
3909
... ('two', dict(param=2))],
4017
... TestUtil.TestSuite())
4018
3911
>>> tests = list(iter_suite_tests(r))
4134
4027
if test_id != None:
4135
4028
ui.ui_factory.clear_term()
4136
4029
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4137
# Ugly, but the last thing we want here is fail, so bear with it.
4138
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4139
).encode('ascii', 'replace')
4140
4030
sys.stderr.write('Unable to remove testing dir %s\n%s'
4141
% (os.path.basename(dirname), printable_e))
4031
% (os.path.basename(dirname), e))
4144
4034
class Feature(object):
4233
4123
should really use a different feature.
4236
def __init__(self, dep_version, module, name,
4237
replacement_name, replacement_module=None):
4126
def __init__(self, module, name, this_name, dep_version):
4238
4127
super(_CompatabilityThunkFeature, self).__init__()
4239
4128
self._module = module
4240
if replacement_module is None:
4241
replacement_module = module
4242
self._replacement_module = replacement_module
4243
4129
self._name = name
4244
self._replacement_name = replacement_name
4130
self._this_name = this_name
4245
4131
self._dep_version = dep_version
4246
4132
self._feature = None
4248
4134
def _ensure(self):
4249
4135
if self._feature is None:
4250
depr_msg = self._dep_version % ('%s.%s'
4251
% (self._module, self._name))
4252
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4253
self._replacement_name)
4254
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4255
# Import the new feature and use it as a replacement for the
4257
mod = __import__(self._replacement_module, {}, {},
4258
[self._replacement_name])
4259
self._feature = getattr(mod, self._replacement_name)
4136
msg = (self._dep_version % self._this_name) + (
4137
' Use %s.%s instead.' % (self._module, self._name))
4138
symbol_versioning.warn(msg, DeprecationWarning)
4139
mod = __import__(self._module, {}, {}, [self._name])
4140
self._feature = getattr(mod, self._name)
4261
4142
def _probe(self):
4288
4169
if self.available(): # Make sure the probe has been done
4289
4170
return self._module
4292
4173
def feature_name(self):
4293
4174
return self.module_name
4296
4177
# This is kept here for compatibility, it is recommended to use
4297
4178
# 'bzrlib.tests.feature.paramiko' instead
4298
ParamikoFeature = _CompatabilityThunkFeature(
4299
deprecated_in((2,1,0)),
4300
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4179
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4180
'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4303
4183
def probe_unicode_in_user_encoding():
4495
4364
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4496
SubUnitFeature = _CompatabilityThunkFeature(
4497
deprecated_in((2,1,0)),
4498
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4365
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4366
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4499
4367
# Only define SubUnitBzrRunner if subunit is available.
4501
4369
from subunit import TestProtocolClient
4509
4377
except ImportError:
4512
class _PosixPermissionsFeature(Feature):
4516
# create temporary file and check if specified perms are maintained.
4519
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4520
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4523
os.chmod(name, write_perms)
4525
read_perms = os.stat(name).st_mode & 0777
4527
return (write_perms == read_perms)
4529
return (os.name == 'posix') and has_perms()
4531
def feature_name(self):
4532
return 'POSIX permissions support'
4534
posix_permissions_feature = _PosixPermissionsFeature()