211
217
osutils.set_or_unset_env(var, value)
220
def _clear__type_equality_funcs(test):
221
"""Cleanup bound methods stored on TestCase instances
223
Clear the dict breaking a few (mostly) harmless cycles in the affected
224
unittests released with Python 2.6 and initial Python 2.7 versions.
226
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
227
shipped in Oneiric, an object with no clear method was used, hence the
228
extra complications, see bug 809048 for details.
230
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
231
if type_equality_funcs is not None:
232
tef_clear = getattr(type_equality_funcs, "clear", None)
233
if tef_clear is None:
234
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
235
if tef_instance_dict is not None:
236
tef_clear = tef_instance_dict.clear
237
if tef_clear is not None:
214
241
class ExtendedTestResult(testtools.TextTestResult):
215
242
"""Accepts, reports and accumulates the results of running tests.
384
411
getDetails = getattr(test, "getDetails", None)
385
412
if getDetails is not None:
386
413
getDetails().clear()
387
# Clear _type_equality_funcs to try to stop TestCase instances
388
# from wasting memory. 'clear' is not available in all Python
389
# versions (bug 809048)
390
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
391
if type_equality_funcs is not None:
392
tef_clear = getattr(type_equality_funcs, "clear", None)
393
if tef_clear is None:
394
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
395
if tef_instance_dict is not None:
396
tef_clear = tef_instance_dict.clear
397
if tef_clear is not None:
414
_clear__type_equality_funcs(test)
399
415
self._traceback_from_test = None
401
417
def startTests(self):
502
518
self.not_applicable_count += 1
503
519
self.report_not_applicable(test, reason)
521
def _count_stored_tests(self):
522
"""Count of tests instances kept alive due to not succeeding"""
523
return self.error_count + self.failure_count + self.known_failure_count
505
525
def _post_mortem(self, tb=None):
506
526
"""Start a PDB post mortem session."""
507
527
if os.environ.get('BZR_TEST_PDB', None):
980
1000
def setUp(self):
981
1001
super(TestCase, self).setUp()
1003
timeout = config.GlobalStack().get('selftest.timeout')
1005
timeout_fixture = fixtures.TimeoutFixture(timeout)
1006
timeout_fixture.setUp()
1007
self.addCleanup(timeout_fixture.cleanUp)
982
1009
for feature in getattr(self, '_test_needs_features', []):
983
1010
self.requireFeature(feature)
984
1011
self._cleanEnvironment()
1013
if bzrlib.global_state is not None:
1014
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1015
config.CommandLineStore())
985
1017
self._silenceUI()
986
1018
self._startLogFile()
987
1019
self._benchcalls = []
1714
1745
self.addCleanup(self._finishLogFile)
1716
1747
def _finishLogFile(self):
1717
"""Finished with the log file.
1719
Close the file and delete it.
1748
"""Flush and dereference the in-memory log for this testcase"""
1721
1749
if trace._trace_file:
1722
1750
# flush the log file, to get all content
1723
1751
trace._trace_file.flush()
1724
1752
trace.pop_log_file(self._log_memento)
1753
# The logging module now tracks references for cleanup so discard ours
1754
del self._log_memento
1726
1756
def thisFailsStrictLockCheck(self):
1727
1757
"""It is known that this test would fail with -Dstrict_locks.
1963
1993
self.log('run bzr: %r', args)
1964
1994
# FIXME: don't call into logging here
1965
handler = logging.StreamHandler(stderr)
1966
handler.setLevel(logging.INFO)
1995
handler = trace.EncodedStreamHandler(stderr, errors="replace",
1967
1997
logger = logging.getLogger('')
1968
1998
logger.addHandler(handler)
1969
1999
old_ui_factory = ui.ui_factory
2353
2388
from bzrlib.smart import request
2354
2389
request_handlers = request.request_handlers
2355
2390
orig_method = request_handlers.get(verb)
2391
orig_info = request_handlers.get_info(verb)
2356
2392
request_handlers.remove(verb)
2357
self.addCleanup(request_handlers.register, verb, orig_method)
2393
self.addCleanup(request_handlers.register, verb, orig_method,
2360
2397
class CapturedCall(object):
2563
2600
root = TestCaseWithMemoryTransport.TEST_ROOT
2601
# Make sure we get a readable and accessible home for .bzr.log
2602
# and/or config files, and not fallback to weird defaults (see
2603
# http://pad.lv/825027).
2604
self.assertIs(None, os.environ.get('BZR_HOME', None))
2605
os.environ['BZR_HOME'] = root
2564
2606
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2607
del os.environ['BZR_HOME']
2565
2608
# Hack for speed: remember the raw bytes of the dirstate file so that
2566
2609
# we don't need to re-open the wt to check it hasn't changed.
2567
2610
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2618
2661
def make_branch(self, relpath, format=None):
2619
2662
"""Create a branch on the transport at relpath."""
2620
2663
repo = self.make_repository(relpath, format=format)
2621
return repo.bzrdir.create_branch()
2664
return repo.bzrdir.create_branch(append_revisions_only=False)
2666
def get_default_format(self):
2669
def resolve_format(self, format):
2670
"""Resolve an object to a ControlDir format object.
2672
The initial format object can either already be
2673
a ControlDirFormat, None (for the default format),
2674
or a string with the name of the control dir format.
2676
:param format: Object to resolve
2677
:return A ControlDirFormat instance
2680
format = self.get_default_format()
2681
if isinstance(format, basestring):
2682
format = bzrdir.format_registry.make_bzrdir(format)
2623
2685
def make_bzrdir(self, relpath, format=None):
2628
2690
t = _mod_transport.get_transport(maybe_a_url)
2629
2691
if len(segments) > 1 and segments[-1] not in ('', '.'):
2630
2692
t.ensure_base()
2633
if isinstance(format, basestring):
2634
format = bzrdir.format_registry.make_bzrdir(format)
2693
format = self.resolve_format(format)
2635
2694
return format.initialize_on_transport(t)
2636
2695
except errors.UninitializableFormat:
2637
2696
raise TestSkipped("Format %s is not initializable." % format)
2639
def make_repository(self, relpath, shared=False, format=None):
2698
def make_repository(self, relpath, shared=None, format=None):
2640
2699
"""Create a repository on our default transport at relpath.
2642
2701
Note that relpath must be a relative path, not a full url.
2676
2735
def setUp(self):
2677
2736
super(TestCaseWithMemoryTransport, self).setUp()
2678
# Ensure that ConnectedTransport doesn't leak sockets
2679
def get_transport_from_url_with_cleanup(*args, **kwargs):
2680
t = orig_get_transport_from_url(*args, **kwargs)
2681
if isinstance(t, _mod_transport.ConnectedTransport):
2682
self.addCleanup(t.disconnect)
2685
orig_get_transport_from_url = self.overrideAttr(
2686
_mod_transport, 'get_transport_from_url',
2687
get_transport_from_url_with_cleanup)
2738
def _add_disconnect_cleanup(transport):
2739
"""Schedule disconnection of given transport at test cleanup
2741
This needs to happen for all connected transports or leaks occur.
2743
Note reconnections may mean we call disconnect multiple times per
2744
transport which is suboptimal but seems harmless.
2746
self.addCleanup(transport.disconnect)
2748
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2749
_add_disconnect_cleanup, None)
2688
2751
self._make_test_root()
2689
2752
self.addCleanup(os.chdir, os.getcwdu())
2690
2753
self.makeAndChdirToTestDir()
2704
2768
def capture_hpss_call(params):
2705
2769
self.hpss_calls.append(
2706
2770
CapturedCall(params, prefix_length))
2771
def capture_connect(transport):
2772
self.hpss_connections.append(transport)
2707
2773
client._SmartClient.hooks.install_named_hook(
2708
2774
'call', capture_hpss_call, None)
2775
_mod_transport.Transport.hooks.install_named_hook(
2776
'post_connect', capture_connect, None)
2710
2778
def reset_smart_call_log(self):
2711
2779
self.hpss_calls = []
2780
self.hpss_connections = []
2714
2783
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2916
2985
# this obviously requires a format that supports branch references
2917
2986
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2988
format = self.resolve_format(format=format)
2989
if not format.supports_workingtrees:
2990
b = self.make_branch(relpath+'.branch', format=format)
2991
return b.create_checkout(relpath, lightweight=True)
2919
2992
b = self.make_branch(relpath, format=format)
2921
2994
return b.bzrdir.create_workingtree()
3220
3293
result_decorators=result_decorators,
3222
3295
runner.stop_on_failure=stop_on_failure
3296
if isinstance(suite, unittest.TestSuite):
3297
# Empty out _tests list of passed suite and populate new TestSuite
3298
suite._tests[:], suite = [], TestSuite(suite)
3223
3299
# built in decorator factories:
3225
3301
random_order(random_seed, runner),
3324
3400
class TestDecorator(TestUtil.TestSuite):
3325
3401
"""A decorator for TestCase/TestSuite objects.
3327
Usually, subclasses should override __iter__(used when flattening test
3328
suites), which we do to filter, reorder, parallelise and so on, run() and
3403
Contains rather than flattening suite passed on construction
3332
def __init__(self, suite):
3333
TestUtil.TestSuite.__init__(self)
3336
def countTestCases(self):
3339
cases += test.countTestCases()
3346
def run(self, result):
3347
# Use iteration on self, not self._tests, to allow subclasses to hook
3350
if result.shouldStop:
3406
def __init__(self, suite=None):
3407
super(TestDecorator, self).__init__()
3408
if suite is not None:
3411
# Don't need subclass run method with suite emptying
3412
run = unittest.TestSuite.run
3356
3415
class CountingDecorator(TestDecorator):
3367
3426
"""A decorator which excludes test matching an exclude pattern."""
3369
3428
def __init__(self, suite, exclude_pattern):
3370
TestDecorator.__init__(self, suite)
3371
self.exclude_pattern = exclude_pattern
3372
self.excluded = False
3376
return iter(self._tests)
3377
self.excluded = True
3378
suite = exclude_tests_by_re(self, self.exclude_pattern)
3380
self.addTests(suite)
3381
return iter(self._tests)
3429
super(ExcludeDecorator, self).__init__(
3430
exclude_tests_by_re(suite, exclude_pattern))
3384
3433
class FilterTestsDecorator(TestDecorator):
3385
3434
"""A decorator which filters tests to those matching a pattern."""
3387
3436
def __init__(self, suite, pattern):
3388
TestDecorator.__init__(self, suite)
3389
self.pattern = pattern
3390
self.filtered = False
3394
return iter(self._tests)
3395
self.filtered = True
3396
suite = filter_suite_by_re(self, self.pattern)
3398
self.addTests(suite)
3399
return iter(self._tests)
3437
super(FilterTestsDecorator, self).__init__(
3438
filter_suite_by_re(suite, pattern))
3402
3441
class RandomDecorator(TestDecorator):
3403
3442
"""A decorator which randomises the order of its tests."""
3405
3444
def __init__(self, suite, random_seed, stream):
3406
TestDecorator.__init__(self, suite)
3407
self.random_seed = random_seed
3408
self.randomised = False
3409
self.stream = stream
3413
return iter(self._tests)
3414
self.randomised = True
3415
self.stream.write("Randomizing test order using seed %s\n\n" %
3416
(self.actual_seed()))
3445
random_seed = self.actual_seed(random_seed)
3446
stream.write("Randomizing test order using seed %s\n\n" %
3417
3448
# Initialise the random number generator.
3418
random.seed(self.actual_seed())
3419
suite = randomize_suite(self)
3421
self.addTests(suite)
3422
return iter(self._tests)
3449
random.seed(random_seed)
3450
super(RandomDecorator, self).__init__(randomize_suite(suite))
3424
def actual_seed(self):
3425
if self.random_seed == "now":
3453
def actual_seed(seed):
3426
3455
# We convert the seed to a long to make it reuseable across
3427
3456
# invocations (because the user can reenter it).
3428
self.random_seed = long(time.time())
3457
return long(time.time())
3430
3459
# Convert the seed to a long if we can
3432
self.random_seed = long(self.random_seed)
3462
except (TypeError, ValueError):
3435
return self.random_seed
3438
3467
class TestFirstDecorator(TestDecorator):
3439
3468
"""A decorator which moves named tests to the front."""
3441
3470
def __init__(self, suite, pattern):
3442
TestDecorator.__init__(self, suite)
3443
self.pattern = pattern
3444
self.filtered = False
3448
return iter(self._tests)
3449
self.filtered = True
3450
suites = split_suite_by_re(self, self.pattern)
3452
self.addTests(suites)
3453
return iter(self._tests)
3471
super(TestFirstDecorator, self).__init__()
3472
self.addTests(split_suite_by_re(suite, pattern))
3456
3475
def partition_tests(suite, count):
3501
3520
ProtocolTestCase.run(self, result)
3503
os.waitpid(self.pid, 0)
3522
pid, status = os.waitpid(self.pid, 0)
3523
# GZ 2011-10-18: If status is nonzero, should report to the result
3524
# that something went wrong.
3505
3526
test_blocks = partition_tests(suite, concurrency)
3527
# Clear the tests from the original suite so it doesn't keep them alive
3528
suite._tests[:] = []
3506
3529
for process_tests in test_blocks:
3507
process_suite = TestUtil.TestSuite()
3508
process_suite.addTests(process_tests)
3530
process_suite = TestUtil.TestSuite(process_tests)
3531
# Also clear each split list so new suite has only reference
3532
process_tests[:] = []
3509
3533
c2pread, c2pwrite = os.pipe()
3510
3534
pid = os.fork()
3512
workaround_zealous_crypto_random()
3537
stream = os.fdopen(c2pwrite, 'wb', 1)
3538
workaround_zealous_crypto_random()
3514
3539
os.close(c2pread)
3515
3540
# Leave stderr and stdout open so we can see test noise
3516
3541
# Close stdin so that the child goes away if it decides to
3517
3542
# read from stdin (otherwise its a roulette to see what
3518
3543
# child actually gets keystrokes for pdb etc).
3519
3544
sys.stdin.close()
3521
stream = os.fdopen(c2pwrite, 'wb', 1)
3522
3545
subunit_result = AutoTimingTestResultDecorator(
3523
TestProtocolClient(stream))
3546
SubUnitBzrProtocolClient(stream))
3524
3547
process_suite.run(subunit_result)
3549
# Try and report traceback on stream, but exit with error even
3550
# if stream couldn't be created or something else goes wrong.
3551
# The traceback is formatted to a string and written in one go
3552
# to avoid interleaving lines from multiple failing children.
3554
stream.write(traceback.format_exc())
3528
3559
os.close(c2pwrite)
3529
3560
stream = os.fdopen(c2pread, 'rb', 1)
3635
3666
# with proper exclusion rules.
3636
3667
# -Ethreads Will display thread ident at creation/join time to
3637
3668
# help track thread leaks
3669
# -Euncollected_cases Display the identity of any test cases that weren't
3670
# deallocated after being completed.
3639
3671
# -Econfig_stats Will collect statistics using addDetail
3640
3672
selftest_debug_flags = set()
3946
3978
'bzrlib.tests.test_email_message',
3947
3979
'bzrlib.tests.test_eol_filters',
3948
3980
'bzrlib.tests.test_errors',
3981
'bzrlib.tests.test_estimate_compressed_size',
3949
3982
'bzrlib.tests.test_export',
3950
3983
'bzrlib.tests.test_export_pot',
3951
3984
'bzrlib.tests.test_extract',
4035
4068
'bzrlib.tests.test_smart',
4036
4069
'bzrlib.tests.test_smart_add',
4037
4070
'bzrlib.tests.test_smart_request',
4071
'bzrlib.tests.test_smart_signals',
4038
4072
'bzrlib.tests.test_smart_transport',
4039
4073
'bzrlib.tests.test_smtp_connection',
4040
4074
'bzrlib.tests.test_source',
4071
4105
'bzrlib.tests.test_version',
4072
4106
'bzrlib.tests.test_version_info',
4073
4107
'bzrlib.tests.test_versionedfile',
4108
'bzrlib.tests.test_vf_search',
4074
4109
'bzrlib.tests.test_weave',
4075
4110
'bzrlib.tests.test_whitebox',
4076
4111
'bzrlib.tests.test_win32utils',
4437
4472
from subunit.test_results import AutoTimingTestResultDecorator
4438
4473
class SubUnitBzrProtocolClient(TestProtocolClient):
4475
def stopTest(self, test):
4476
super(SubUnitBzrProtocolClient, self).stopTest(test)
4477
_clear__type_equality_funcs(test)
4440
4479
def addSuccess(self, test, details=None):
4441
4480
# The subunit client always includes the details in the subunit
4442
4481
# stream, but we don't want to include it in ours.
4458
@deprecated_function(deprecated_in((2, 5, 0)))
4459
def ModuleAvailableFeature(name):
4460
from bzrlib.tests import features
4461
return features.ModuleAvailableFeature(name)
4497
# API compatibility for old plugins; see bug 892622.
4500
'HTTPServerFeature',
4501
'ModuleAvailableFeature',
4502
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4503
'OsFifoFeature', 'UnicodeFilenameFeature',
4504
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4505
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4506
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4507
'posix_permissions_feature',
4509
globals()[name] = _CompatabilityThunkFeature(
4510
symbol_versioning.deprecated_in((2, 5, 0)),
4511
'bzrlib.tests', name,
4512
name, 'bzrlib.tests.features')
4515
for (old_name, new_name) in [
4516
('UnicodeFilename', 'UnicodeFilenameFeature'),
4518
globals()[name] = _CompatabilityThunkFeature(
4519
symbol_versioning.deprecated_in((2, 5, 0)),
4520
'bzrlib.tests', old_name,
4521
new_name, 'bzrlib.tests.features')