212
212
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
236
215
class ExtendedTestResult(testtools.TextTestResult):
237
216
"""Accepts, reports and accumulates the results of running tests.
406
385
getDetails = getattr(test, "getDetails", None)
407
386
if getDetails is not None:
408
387
getDetails().clear()
409
_clear__type_equality_funcs(test)
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
410
400
self._traceback_from_test = None
412
402
def startTests(self):
513
503
self.not_applicable_count += 1
514
504
self.report_not_applicable(test, reason)
516
def _count_stored_tests(self):
517
"""Count of tests instances kept alive due to not succeeding"""
518
return self.error_count + self.failure_count + self.known_failure_count
520
506
def _post_mortem(self, tb=None):
521
507
"""Start a PDB post mortem session."""
522
508
if os.environ.get('BZR_TEST_PDB', None):
997
983
for feature in getattr(self, '_test_needs_features', []):
998
984
self.requireFeature(feature)
999
985
self._cleanEnvironment()
1000
if bzrlib.global_state is not None:
1001
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1002
config.CommandLineSection())
986
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
987
config.CommandLineSection())
1003
988
self._silenceUI()
1004
989
self._startLogFile()
1005
990
self._benchcalls = []
2583
2568
root = TestCaseWithMemoryTransport.TEST_ROOT
2584
# Make sure we get a readable and accessible home for .bzr.log
2585
# and/or config files, and not fallback to weird defaults (see
2586
# http://pad.lv/825027).
2587
self.assertIs(None, os.environ.get('BZR_HOME', None))
2588
os.environ['BZR_HOME'] = root
2589
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2590
del os.environ['BZR_HOME']
2570
# Make sure we get a readable and accessible home for .bzr.log
2571
# and/or config files, and not fallback to weird defaults (see
2572
# http://pad.lv/825027).
2573
self.assertIs(None, os.environ.get('BZR_HOME', None))
2574
os.environ['BZR_HOME'] = root
2575
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2576
del os.environ['BZR_HOME']
2577
except Exception, e:
2578
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2591
2579
# Hack for speed: remember the raw bytes of the dirstate file so that
2592
2580
# we don't need to re-open the wt to check it hasn't changed.
2593
2581
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2959
2947
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2961
2949
format = self.resolve_format(format=format)
2962
if not format.supports_workingtrees:
2963
b = self.make_branch(relpath+'.branch', format=format)
2964
return b.create_checkout(relpath, lightweight=True)
2965
2950
b = self.make_branch(relpath, format=format)
2967
2952
return b.bzrdir.create_workingtree()
3266
3251
result_decorators=result_decorators,
3268
3253
runner.stop_on_failure=stop_on_failure
3269
if isinstance(suite, unittest.TestSuite):
3270
# Empty out _tests list of passed suite and populate new TestSuite
3271
suite._tests[:], suite = [], TestSuite(suite)
3272
3254
# built in decorator factories:
3274
3256
random_order(random_seed, runner),
3373
3355
class TestDecorator(TestUtil.TestSuite):
3374
3356
"""A decorator for TestCase/TestSuite objects.
3376
Contains rather than flattening suite passed on construction
3358
Usually, subclasses should override __iter__(used when flattening test
3359
suites), which we do to filter, reorder, parallelise and so on, run() and
3379
def __init__(self, suite=None):
3380
super(TestDecorator, self).__init__()
3381
if suite is not None:
3384
# Don't need subclass run method with suite emptying
3385
run = unittest.TestSuite.run
3363
def __init__(self, suite):
3364
TestUtil.TestSuite.__init__(self)
3367
def countTestCases(self):
3370
cases += test.countTestCases()
3377
def run(self, result):
3378
# Use iteration on self, not self._tests, to allow subclasses to hook
3381
if result.shouldStop:
3388
3387
class CountingDecorator(TestDecorator):
3399
3398
"""A decorator which excludes test matching an exclude pattern."""
3401
3400
def __init__(self, suite, exclude_pattern):
3402
super(ExcludeDecorator, self).__init__(
3403
exclude_tests_by_re(suite, exclude_pattern))
3401
TestDecorator.__init__(self, suite)
3402
self.exclude_pattern = exclude_pattern
3403
self.excluded = False
3407
return iter(self._tests)
3408
self.excluded = True
3409
suite = exclude_tests_by_re(self, self.exclude_pattern)
3411
self.addTests(suite)
3412
return iter(self._tests)
3406
3415
class FilterTestsDecorator(TestDecorator):
3407
3416
"""A decorator which filters tests to those matching a pattern."""
3409
3418
def __init__(self, suite, pattern):
3410
super(FilterTestsDecorator, self).__init__(
3411
filter_suite_by_re(suite, pattern))
3419
TestDecorator.__init__(self, suite)
3420
self.pattern = pattern
3421
self.filtered = False
3425
return iter(self._tests)
3426
self.filtered = True
3427
suite = filter_suite_by_re(self, self.pattern)
3429
self.addTests(suite)
3430
return iter(self._tests)
3414
3433
class RandomDecorator(TestDecorator):
3415
3434
"""A decorator which randomises the order of its tests."""
3417
3436
def __init__(self, suite, random_seed, stream):
3418
random_seed = self.actual_seed(random_seed)
3419
stream.write("Randomizing test order using seed %s\n\n" %
3437
TestDecorator.__init__(self, suite)
3438
self.random_seed = random_seed
3439
self.randomised = False
3440
self.stream = stream
3444
return iter(self._tests)
3445
self.randomised = True
3446
self.stream.write("Randomizing test order using seed %s\n\n" %
3447
(self.actual_seed()))
3421
3448
# Initialise the random number generator.
3422
random.seed(random_seed)
3423
super(RandomDecorator, self).__init__(randomize_suite(suite))
3449
random.seed(self.actual_seed())
3450
suite = randomize_suite(self)
3452
self.addTests(suite)
3453
return iter(self._tests)
3426
def actual_seed(seed):
3455
def actual_seed(self):
3456
if self.random_seed == "now":
3428
3457
# We convert the seed to a long to make it reuseable across
3429
3458
# invocations (because the user can reenter it).
3430
return long(time.time())
3459
self.random_seed = long(time.time())
3432
3461
# Convert the seed to a long if we can
3435
except (TypeError, ValueError):
3463
self.random_seed = long(self.random_seed)
3466
return self.random_seed
3440
3469
class TestFirstDecorator(TestDecorator):
3441
3470
"""A decorator which moves named tests to the front."""
3443
3472
def __init__(self, suite, pattern):
3444
super(TestFirstDecorator, self).__init__()
3445
self.addTests(split_suite_by_re(suite, pattern))
3473
TestDecorator.__init__(self, suite)
3474
self.pattern = pattern
3475
self.filtered = False
3479
return iter(self._tests)
3480
self.filtered = True
3481
suites = split_suite_by_re(self, self.pattern)
3483
self.addTests(suites)
3484
return iter(self._tests)
3448
3487
def partition_tests(suite, count):
3495
3534
os.waitpid(self.pid, 0)
3497
3536
test_blocks = partition_tests(suite, concurrency)
3498
# Clear the tests from the original suite so it doesn't keep them alive
3499
suite._tests[:] = []
3500
3537
for process_tests in test_blocks:
3501
process_suite = TestUtil.TestSuite(process_tests)
3502
# Also clear each split list so new suite has only reference
3503
process_tests[:] = []
3538
process_suite = TestUtil.TestSuite()
3539
process_suite.addTests(process_tests)
3504
3540
c2pread, c2pwrite = os.pipe()
3505
3541
pid = os.fork()
3512
3548
# read from stdin (otherwise its a roulette to see what
3513
3549
# child actually gets keystrokes for pdb etc).
3514
3550
sys.stdin.close()
3515
# GZ 2011-06-16: Why set stdin to None? Breaks multi fork.
3517
3552
stream = os.fdopen(c2pwrite, 'wb', 1)
3518
3553
subunit_result = AutoTimingTestResultDecorator(
3519
SubUnitBzrProtocolClient(stream))
3554
TestProtocolClient(stream))
3520
3555
process_suite.run(subunit_result)
3522
# GZ 2011-06-16: Is always exiting with silent success
3523
# really the right thing? Hurts debugging.
3526
3559
os.close(c2pwrite)
3633
3666
# with proper exclusion rules.
3634
3667
# -Ethreads Will display thread ident at creation/join time to
3635
3668
# help track thread leaks
3636
# -Euncollected_cases Display the identity of any test cases that weren't
3637
# deallocated after being completed.
3638
3670
# -Econfig_stats Will collect statistics using addDetail
3639
3671
selftest_debug_flags = set()
4438
4470
from subunit.test_results import AutoTimingTestResultDecorator
4439
4471
class SubUnitBzrProtocolClient(TestProtocolClient):
4441
def stopTest(self, test):
4442
super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
_clear__type_equality_funcs(test)
4445
4473
def addSuccess(self, test, details=None):
4446
4474
# The subunit client always includes the details in the subunit
4447
4475
# stream, but we don't want to include it in ours.