211
217
osutils.set_or_unset_env(var, value)
220
def _clear__type_equality_funcs(test):
221
"""Cleanup bound methods stored on TestCase instances
223
Clear the dict breaking a few (mostly) harmless cycles in the affected
224
unittests released with Python 2.6 and initial Python 2.7 versions.
226
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
227
shipped in Oneiric, an object with no clear method was used, hence the
228
extra complications, see bug 809048 for details.
230
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
231
if type_equality_funcs is not None:
232
tef_clear = getattr(type_equality_funcs, "clear", None)
233
if tef_clear is None:
234
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
235
if tef_instance_dict is not None:
236
tef_clear = tef_instance_dict.clear
237
if tef_clear is not None:
214
241
class ExtendedTestResult(testtools.TextTestResult):
215
242
"""Accepts, reports and accumulates the results of running tests.
384
411
getDetails = getattr(test, "getDetails", None)
385
412
if getDetails is not None:
386
413
getDetails().clear()
387
# Clear _type_equality_funcs to try to stop TestCase instances
388
# from wasting memory. 'clear' is not available in all Python
389
# versions (bug 809048)
390
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
391
if type_equality_funcs is not None:
392
tef_clear = getattr(type_equality_funcs, "clear", None)
393
if tef_clear is None:
394
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
395
if tef_instance_dict is not None:
396
tef_clear = tef_instance_dict.clear
397
if tef_clear is not None:
414
_clear__type_equality_funcs(test)
399
415
self._traceback_from_test = None
401
417
def startTests(self):
502
518
self.not_applicable_count += 1
503
519
self.report_not_applicable(test, reason)
521
def _count_stored_tests(self):
522
"""Count of tests instances kept alive due to not succeeding"""
523
return self.error_count + self.failure_count + self.known_failure_count
505
525
def _post_mortem(self, tb=None):
506
526
"""Start a PDB post mortem session."""
507
527
if os.environ.get('BZR_TEST_PDB', None):
980
1000
def setUp(self):
981
1001
super(TestCase, self).setUp()
1003
timeout = config.GlobalStack().get('selftest.timeout')
1005
timeout_fixture = fixtures.TimeoutFixture(timeout)
1006
timeout_fixture.setUp()
1007
self.addCleanup(timeout_fixture.cleanUp)
982
1009
for feature in getattr(self, '_test_needs_features', []):
983
1010
self.requireFeature(feature)
984
1011
self._cleanEnvironment()
1013
if bzrlib.global_state is not None:
1014
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1015
config.CommandLineStore())
985
1017
self._silenceUI()
986
1018
self._startLogFile()
987
1019
self._benchcalls = []
1714
1745
self.addCleanup(self._finishLogFile)
1716
1747
def _finishLogFile(self):
1717
"""Finished with the log file.
1719
Close the file and delete it.
1748
"""Flush and dereference the in-memory log for this testcase"""
1721
1749
if trace._trace_file:
1722
1750
# flush the log file, to get all content
1723
1751
trace._trace_file.flush()
1724
1752
trace.pop_log_file(self._log_memento)
1753
# The logging module now tracks references for cleanup so discard ours
1754
del self._log_memento
1726
1756
def thisFailsStrictLockCheck(self):
1727
1757
"""It is known that this test would fail with -Dstrict_locks.
1963
1993
self.log('run bzr: %r', args)
1964
1994
# FIXME: don't call into logging here
1965
handler = logging.StreamHandler(stderr)
1966
handler.setLevel(logging.INFO)
1995
handler = trace.EncodedStreamHandler(stderr, errors="replace",
1967
1997
logger = logging.getLogger('')
1968
1998
logger.addHandler(handler)
1969
1999
old_ui_factory = ui.ui_factory
2353
2388
from bzrlib.smart import request
2354
2389
request_handlers = request.request_handlers
2355
2390
orig_method = request_handlers.get(verb)
2391
orig_info = request_handlers.get_info(verb)
2356
2392
request_handlers.remove(verb)
2357
self.addCleanup(request_handlers.register, verb, orig_method)
2393
self.addCleanup(request_handlers.register, verb, orig_method,
2360
2397
class CapturedCall(object):
2563
2600
root = TestCaseWithMemoryTransport.TEST_ROOT
2601
# Make sure we get a readable and accessible home for .bzr.log
2602
# and/or config files, and not fallback to weird defaults (see
2603
# http://pad.lv/825027).
2604
self.assertIs(None, os.environ.get('BZR_HOME', None))
2605
os.environ['BZR_HOME'] = root
2564
2606
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2607
del os.environ['BZR_HOME']
2565
2608
# Hack for speed: remember the raw bytes of the dirstate file so that
2566
2609
# we don't need to re-open the wt to check it hasn't changed.
2567
2610
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2615
2658
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2616
2659
self.permit_dir(self.test_dir)
2618
def make_branch(self, relpath, format=None):
2661
def make_branch(self, relpath, format=None, name=None):
2619
2662
"""Create a branch on the transport at relpath."""
2620
2663
repo = self.make_repository(relpath, format=format)
2621
return repo.bzrdir.create_branch()
2664
return repo.bzrdir.create_branch(append_revisions_only=False,
2667
def get_default_format(self):
2670
def resolve_format(self, format):
2671
"""Resolve an object to a ControlDir format object.
2673
The initial format object can either already be
2674
a ControlDirFormat, None (for the default format),
2675
or a string with the name of the control dir format.
2677
:param format: Object to resolve
2678
:return A ControlDirFormat instance
2681
format = self.get_default_format()
2682
if isinstance(format, basestring):
2683
format = bzrdir.format_registry.make_bzrdir(format)
2623
2686
def make_bzrdir(self, relpath, format=None):
2628
2691
t = _mod_transport.get_transport(maybe_a_url)
2629
2692
if len(segments) > 1 and segments[-1] not in ('', '.'):
2630
2693
t.ensure_base()
2633
if isinstance(format, basestring):
2634
format = bzrdir.format_registry.make_bzrdir(format)
2694
format = self.resolve_format(format)
2635
2695
return format.initialize_on_transport(t)
2636
2696
except errors.UninitializableFormat:
2637
2697
raise TestSkipped("Format %s is not initializable." % format)
2639
def make_repository(self, relpath, shared=False, format=None):
2699
def make_repository(self, relpath, shared=None, format=None):
2640
2700
"""Create a repository on our default transport at relpath.
2642
2702
Note that relpath must be a relative path, not a full url.
2676
2736
def setUp(self):
2677
2737
super(TestCaseWithMemoryTransport, self).setUp()
2678
# Ensure that ConnectedTransport doesn't leak sockets
2679
def get_transport_from_url_with_cleanup(*args, **kwargs):
2680
t = orig_get_transport_from_url(*args, **kwargs)
2681
if isinstance(t, _mod_transport.ConnectedTransport):
2682
self.addCleanup(t.disconnect)
2685
orig_get_transport_from_url = self.overrideAttr(
2686
_mod_transport, 'get_transport_from_url',
2687
get_transport_from_url_with_cleanup)
2739
def _add_disconnect_cleanup(transport):
2740
"""Schedule disconnection of given transport at test cleanup
2742
This needs to happen for all connected transports or leaks occur.
2744
Note reconnections may mean we call disconnect multiple times per
2745
transport which is suboptimal but seems harmless.
2747
self.addCleanup(transport.disconnect)
2749
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2750
_add_disconnect_cleanup, None)
2688
2752
self._make_test_root()
2689
2753
self.addCleanup(os.chdir, os.getcwdu())
2690
2754
self.makeAndChdirToTestDir()
2704
2769
def capture_hpss_call(params):
2705
2770
self.hpss_calls.append(
2706
2771
CapturedCall(params, prefix_length))
2772
def capture_connect(transport):
2773
self.hpss_connections.append(transport)
2707
2774
client._SmartClient.hooks.install_named_hook(
2708
2775
'call', capture_hpss_call, None)
2776
_mod_transport.Transport.hooks.install_named_hook(
2777
'post_connect', capture_connect, None)
2710
2779
def reset_smart_call_log(self):
2711
2780
self.hpss_calls = []
2781
self.hpss_connections = []
2714
2784
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2916
2986
# this obviously requires a format that supports branch references
2917
2987
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2989
format = self.resolve_format(format=format)
2990
if not format.supports_workingtrees:
2991
b = self.make_branch(relpath+'.branch', format=format)
2992
return b.create_checkout(relpath, lightweight=True)
2919
2993
b = self.make_branch(relpath, format=format)
2921
2995
return b.bzrdir.create_workingtree()
3220
3294
result_decorators=result_decorators,
3222
3296
runner.stop_on_failure=stop_on_failure
3297
if isinstance(suite, unittest.TestSuite):
3298
# Empty out _tests list of passed suite and populate new TestSuite
3299
suite._tests[:], suite = [], TestSuite(suite)
3223
3300
# built in decorator factories:
3225
3302
random_order(random_seed, runner),
3324
3401
class TestDecorator(TestUtil.TestSuite):
3325
3402
"""A decorator for TestCase/TestSuite objects.
3327
Usually, subclasses should override __iter__(used when flattening test
3328
suites), which we do to filter, reorder, parallelise and so on, run() and
3404
Contains rather than flattening suite passed on construction
3332
def __init__(self, suite):
3333
TestUtil.TestSuite.__init__(self)
3336
def countTestCases(self):
3339
cases += test.countTestCases()
3346
def run(self, result):
3347
# Use iteration on self, not self._tests, to allow subclasses to hook
3350
if result.shouldStop:
3407
def __init__(self, suite=None):
3408
super(TestDecorator, self).__init__()
3409
if suite is not None:
3412
# Don't need subclass run method with suite emptying
3413
run = unittest.TestSuite.run
3356
3416
class CountingDecorator(TestDecorator):
3367
3427
"""A decorator which excludes test matching an exclude pattern."""
3369
3429
def __init__(self, suite, exclude_pattern):
3370
TestDecorator.__init__(self, suite)
3371
self.exclude_pattern = exclude_pattern
3372
self.excluded = False
3376
return iter(self._tests)
3377
self.excluded = True
3378
suite = exclude_tests_by_re(self, self.exclude_pattern)
3380
self.addTests(suite)
3381
return iter(self._tests)
3430
super(ExcludeDecorator, self).__init__(
3431
exclude_tests_by_re(suite, exclude_pattern))
3384
3434
class FilterTestsDecorator(TestDecorator):
3385
3435
"""A decorator which filters tests to those matching a pattern."""
3387
3437
def __init__(self, suite, pattern):
3388
TestDecorator.__init__(self, suite)
3389
self.pattern = pattern
3390
self.filtered = False
3394
return iter(self._tests)
3395
self.filtered = True
3396
suite = filter_suite_by_re(self, self.pattern)
3398
self.addTests(suite)
3399
return iter(self._tests)
3438
super(FilterTestsDecorator, self).__init__(
3439
filter_suite_by_re(suite, pattern))
3402
3442
class RandomDecorator(TestDecorator):
3403
3443
"""A decorator which randomises the order of its tests."""
3405
3445
def __init__(self, suite, random_seed, stream):
3406
TestDecorator.__init__(self, suite)
3407
self.random_seed = random_seed
3408
self.randomised = False
3409
self.stream = stream
3413
return iter(self._tests)
3414
self.randomised = True
3415
self.stream.write("Randomizing test order using seed %s\n\n" %
3416
(self.actual_seed()))
3446
random_seed = self.actual_seed(random_seed)
3447
stream.write("Randomizing test order using seed %s\n\n" %
3417
3449
# Initialise the random number generator.
3418
random.seed(self.actual_seed())
3419
suite = randomize_suite(self)
3421
self.addTests(suite)
3422
return iter(self._tests)
3450
random.seed(random_seed)
3451
super(RandomDecorator, self).__init__(randomize_suite(suite))
3424
def actual_seed(self):
3425
if self.random_seed == "now":
3454
def actual_seed(seed):
3426
3456
# We convert the seed to a long to make it reuseable across
3427
3457
# invocations (because the user can reenter it).
3428
self.random_seed = long(time.time())
3458
return long(time.time())
3430
3460
# Convert the seed to a long if we can
3432
self.random_seed = long(self.random_seed)
3463
except (TypeError, ValueError):
3435
return self.random_seed
3438
3468
class TestFirstDecorator(TestDecorator):
3439
3469
"""A decorator which moves named tests to the front."""
3441
3471
def __init__(self, suite, pattern):
3442
TestDecorator.__init__(self, suite)
3443
self.pattern = pattern
3444
self.filtered = False
3448
return iter(self._tests)
3449
self.filtered = True
3450
suites = split_suite_by_re(self, self.pattern)
3452
self.addTests(suites)
3453
return iter(self._tests)
3472
super(TestFirstDecorator, self).__init__()
3473
self.addTests(split_suite_by_re(suite, pattern))
3456
3476
def partition_tests(suite, count):
3501
3521
ProtocolTestCase.run(self, result)
3503
os.waitpid(self.pid, 0)
3523
pid, status = os.waitpid(self.pid, 0)
3524
# GZ 2011-10-18: If status is nonzero, should report to the result
3525
# that something went wrong.
3505
3527
test_blocks = partition_tests(suite, concurrency)
3528
# Clear the tests from the original suite so it doesn't keep them alive
3529
suite._tests[:] = []
3506
3530
for process_tests in test_blocks:
3507
process_suite = TestUtil.TestSuite()
3508
process_suite.addTests(process_tests)
3531
process_suite = TestUtil.TestSuite(process_tests)
3532
# Also clear each split list so new suite has only reference
3533
process_tests[:] = []
3509
3534
c2pread, c2pwrite = os.pipe()
3510
3535
pid = os.fork()
3512
workaround_zealous_crypto_random()
3538
stream = os.fdopen(c2pwrite, 'wb', 1)
3539
workaround_zealous_crypto_random()
3514
3540
os.close(c2pread)
3515
3541
# Leave stderr and stdout open so we can see test noise
3516
3542
# Close stdin so that the child goes away if it decides to
3517
3543
# read from stdin (otherwise its a roulette to see what
3518
3544
# child actually gets keystrokes for pdb etc).
3519
3545
sys.stdin.close()
3521
stream = os.fdopen(c2pwrite, 'wb', 1)
3522
3546
subunit_result = AutoTimingTestResultDecorator(
3523
TestProtocolClient(stream))
3547
SubUnitBzrProtocolClient(stream))
3524
3548
process_suite.run(subunit_result)
3550
# Try and report traceback on stream, but exit with error even
3551
# if stream couldn't be created or something else goes wrong.
3552
# The traceback is formatted to a string and written in one go
3553
# to avoid interleaving lines from multiple failing children.
3555
stream.write(traceback.format_exc())
3528
3560
os.close(c2pwrite)
3529
3561
stream = os.fdopen(c2pread, 'rb', 1)
3635
3667
# with proper exclusion rules.
3636
3668
# -Ethreads Will display thread ident at creation/join time to
3637
3669
# help track thread leaks
3670
# -Euncollected_cases Display the identity of any test cases that weren't
3671
# deallocated after being completed.
3639
3672
# -Econfig_stats Will collect statistics using addDetail
3640
3673
selftest_debug_flags = set()
3946
3979
'bzrlib.tests.test_email_message',
3947
3980
'bzrlib.tests.test_eol_filters',
3948
3981
'bzrlib.tests.test_errors',
3982
'bzrlib.tests.test_estimate_compressed_size',
3949
3983
'bzrlib.tests.test_export',
3950
3984
'bzrlib.tests.test_export_pot',
3951
3985
'bzrlib.tests.test_extract',
3969
4003
'bzrlib.tests.test_http',
3970
4004
'bzrlib.tests.test_http_response',
3971
4005
'bzrlib.tests.test_https_ca_bundle',
4006
'bzrlib.tests.test_https_urllib',
3972
4007
'bzrlib.tests.test_i18n',
3973
4008
'bzrlib.tests.test_identitymap',
3974
4009
'bzrlib.tests.test_ignores',
4023
4058
'bzrlib.tests.test_revisiontree',
4024
4059
'bzrlib.tests.test_rio',
4025
4060
'bzrlib.tests.test_rules',
4061
'bzrlib.tests.test_url_policy_open',
4026
4062
'bzrlib.tests.test_sampler',
4027
4063
'bzrlib.tests.test_scenarios',
4028
4064
'bzrlib.tests.test_script',
4035
4071
'bzrlib.tests.test_smart',
4036
4072
'bzrlib.tests.test_smart_add',
4037
4073
'bzrlib.tests.test_smart_request',
4074
'bzrlib.tests.test_smart_signals',
4038
4075
'bzrlib.tests.test_smart_transport',
4039
4076
'bzrlib.tests.test_smtp_connection',
4040
4077
'bzrlib.tests.test_source',
4071
4108
'bzrlib.tests.test_version',
4072
4109
'bzrlib.tests.test_version_info',
4073
4110
'bzrlib.tests.test_versionedfile',
4111
'bzrlib.tests.test_vf_search',
4074
4112
'bzrlib.tests.test_weave',
4075
4113
'bzrlib.tests.test_whitebox',
4076
4114
'bzrlib.tests.test_win32utils',
4437
4475
from subunit.test_results import AutoTimingTestResultDecorator
4438
4476
class SubUnitBzrProtocolClient(TestProtocolClient):
4478
def stopTest(self, test):
4479
super(SubUnitBzrProtocolClient, self).stopTest(test)
4480
_clear__type_equality_funcs(test)
4440
4482
def addSuccess(self, test, details=None):
4441
4483
# The subunit client always includes the details in the subunit
4442
4484
# stream, but we don't want to include it in ours.
4458
@deprecated_function(deprecated_in((2, 5, 0)))
4459
def ModuleAvailableFeature(name):
4460
from bzrlib.tests import features
4461
return features.ModuleAvailableFeature(name)
4500
# API compatibility for old plugins; see bug 892622.
4503
'HTTPServerFeature',
4504
'ModuleAvailableFeature',
4505
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4506
'OsFifoFeature', 'UnicodeFilenameFeature',
4507
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4508
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4509
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4510
'posix_permissions_feature',
4512
globals()[name] = _CompatabilityThunkFeature(
4513
symbol_versioning.deprecated_in((2, 5, 0)),
4514
'bzrlib.tests', name,
4515
name, 'bzrlib.tests.features')
4518
for (old_name, new_name) in [
4519
('UnicodeFilename', 'UnicodeFilenameFeature'),
4521
globals()[name] = _CompatabilityThunkFeature(
4522
symbol_versioning.deprecated_in((2, 5, 0)),
4523
'bzrlib.tests', old_name,
4524
new_name, 'bzrlib.tests.features')