211
212
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
214
236
class ExtendedTestResult(testtools.TextTestResult):
215
237
"""Accepts, reports and accumulates the results of running tests.
384
406
getDetails = getattr(test, "getDetails", None)
385
407
if getDetails is not None:
386
408
getDetails().clear()
387
# Clear _type_equality_funcs to try to stop TestCase instances
388
# from wasting memory. 'clear' is not available in all Python
389
# versions (bug 809048)
390
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
391
if type_equality_funcs is not None:
392
tef_clear = getattr(type_equality_funcs, "clear", None)
393
if tef_clear is None:
394
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
395
if tef_instance_dict is not None:
396
tef_clear = tef_instance_dict.clear
397
if tef_clear is not None:
409
_clear__type_equality_funcs(test)
399
410
self._traceback_from_test = None
401
412
def startTests(self):
502
513
self.not_applicable_count += 1
503
514
self.report_not_applicable(test, reason)
516
def _count_stored_tests(self):
517
"""Count of tests instances kept alive due to not succeeding"""
518
return self.error_count + self.failure_count + self.known_failure_count
505
520
def _post_mortem(self, tb=None):
506
521
"""Start a PDB post mortem session."""
507
522
if os.environ.get('BZR_TEST_PDB', None):
1714
1731
self.addCleanup(self._finishLogFile)
1716
1733
def _finishLogFile(self):
1717
"""Finished with the log file.
1719
Close the file and delete it.
1734
"""Flush and dereference the in-memory log for this testcase"""
1721
1735
if trace._trace_file:
1722
1736
# flush the log file, to get all content
1723
1737
trace._trace_file.flush()
1724
1738
trace.pop_log_file(self._log_memento)
1739
# The logging module now tracks references for cleanup so discard ours
1740
del self._log_memento
1726
1742
def thisFailsStrictLockCheck(self):
1727
1743
"""It is known that this test would fail with -Dstrict_locks.
2563
2579
root = TestCaseWithMemoryTransport.TEST_ROOT
2565
# Make sure we get a readable and accessible home for .bzr.log
2566
# and/or config files, and not fallback to weird defaults (see
2567
# http://pad.lv/825027).
2568
self.assertIs(None, os.environ.get('BZR_HOME', None))
2569
os.environ['BZR_HOME'] = root
2570
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2571
del os.environ['BZR_HOME']
2572
except Exception, e:
2573
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2580
# Make sure we get a readable and accessible home for .bzr.log
2581
# and/or config files, and not fallback to weird defaults (see
2582
# http://pad.lv/825027).
2583
self.assertIs(None, os.environ.get('BZR_HOME', None))
2584
os.environ['BZR_HOME'] = root
2585
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2586
del os.environ['BZR_HOME']
2574
2587
# Hack for speed: remember the raw bytes of the dirstate file so that
2575
2588
# we don't need to re-open the wt to check it hasn't changed.
2576
2589
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2627
2640
def make_branch(self, relpath, format=None):
2628
2641
"""Create a branch on the transport at relpath."""
2629
2642
repo = self.make_repository(relpath, format=format)
2630
return repo.bzrdir.create_branch()
2643
return repo.bzrdir.create_branch(append_revisions_only=False)
2645
def get_default_format(self):
2648
def resolve_format(self, format):
2649
"""Resolve an object to a ControlDir format object.
2651
The initial format object can either already be
2652
a ControlDirFormat, None (for the default format),
2653
or a string with the name of the control dir format.
2655
:param format: Object to resolve
2656
:return A ControlDirFormat instance
2659
format = self.get_default_format()
2660
if isinstance(format, basestring):
2661
format = bzrdir.format_registry.make_bzrdir(format)
2632
2664
def make_bzrdir(self, relpath, format=None):
2637
2669
t = _mod_transport.get_transport(maybe_a_url)
2638
2670
if len(segments) > 1 and segments[-1] not in ('', '.'):
2639
2671
t.ensure_base()
2642
if isinstance(format, basestring):
2643
format = bzrdir.format_registry.make_bzrdir(format)
2672
format = self.resolve_format(format)
2644
2673
return format.initialize_on_transport(t)
2645
2674
except errors.UninitializableFormat:
2646
2675
raise TestSkipped("Format %s is not initializable." % format)
2648
def make_repository(self, relpath, shared=False, format=None):
2677
def make_repository(self, relpath, shared=None, format=None):
2649
2678
"""Create a repository on our default transport at relpath.
2651
2680
Note that relpath must be a relative path, not a full url.
2925
2954
# this obviously requires a format that supports branch references
2926
2955
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2957
format = self.resolve_format(format=format)
2958
if not format.supports_workingtrees:
2959
b = self.make_branch(relpath+'.branch', format=format)
2960
return b.create_checkout(relpath, lightweight=True)
2928
2961
b = self.make_branch(relpath, format=format)
2930
2963
return b.bzrdir.create_workingtree()
3229
3262
result_decorators=result_decorators,
3231
3264
runner.stop_on_failure=stop_on_failure
3265
if isinstance(suite, unittest.TestSuite):
3266
# Empty out _tests list of passed suite and populate new TestSuite
3267
suite._tests[:], suite = [], TestSuite(suite)
3232
3268
# built in decorator factories:
3234
3270
random_order(random_seed, runner),
3333
3369
class TestDecorator(TestUtil.TestSuite):
3334
3370
"""A decorator for TestCase/TestSuite objects.
3336
Usually, subclasses should override __iter__(used when flattening test
3337
suites), which we do to filter, reorder, parallelise and so on, run() and
3372
Contains rather than flattening suite passed on construction
3341
def __init__(self, suite):
3342
TestUtil.TestSuite.__init__(self)
3345
def countTestCases(self):
3348
cases += test.countTestCases()
3355
def run(self, result):
3356
# Use iteration on self, not self._tests, to allow subclasses to hook
3359
if result.shouldStop:
3375
def __init__(self, suite=None):
3376
super(TestDecorator, self).__init__()
3377
if suite is not None:
3380
# Don't need subclass run method with suite emptying
3381
run = unittest.TestSuite.run
3365
3384
class CountingDecorator(TestDecorator):
3376
3395
"""A decorator which excludes test matching an exclude pattern."""
3378
3397
def __init__(self, suite, exclude_pattern):
3379
TestDecorator.__init__(self, suite)
3380
self.exclude_pattern = exclude_pattern
3381
self.excluded = False
3385
return iter(self._tests)
3386
self.excluded = True
3387
suite = exclude_tests_by_re(self, self.exclude_pattern)
3389
self.addTests(suite)
3390
return iter(self._tests)
3398
super(ExcludeDecorator, self).__init__(
3399
exclude_tests_by_re(suite, exclude_pattern))
3393
3402
class FilterTestsDecorator(TestDecorator):
3394
3403
"""A decorator which filters tests to those matching a pattern."""
3396
3405
def __init__(self, suite, pattern):
3397
TestDecorator.__init__(self, suite)
3398
self.pattern = pattern
3399
self.filtered = False
3403
return iter(self._tests)
3404
self.filtered = True
3405
suite = filter_suite_by_re(self, self.pattern)
3407
self.addTests(suite)
3408
return iter(self._tests)
3406
super(FilterTestsDecorator, self).__init__(
3407
filter_suite_by_re(suite, pattern))
3411
3410
class RandomDecorator(TestDecorator):
3412
3411
"""A decorator which randomises the order of its tests."""
3414
3413
def __init__(self, suite, random_seed, stream):
3415
TestDecorator.__init__(self, suite)
3416
self.random_seed = random_seed
3417
self.randomised = False
3418
self.stream = stream
3422
return iter(self._tests)
3423
self.randomised = True
3424
self.stream.write("Randomizing test order using seed %s\n\n" %
3425
(self.actual_seed()))
3414
random_seed = self.actual_seed(random_seed)
3415
stream.write("Randomizing test order using seed %s\n\n" %
3426
3417
# Initialise the random number generator.
3427
random.seed(self.actual_seed())
3428
suite = randomize_suite(self)
3430
self.addTests(suite)
3431
return iter(self._tests)
3418
random.seed(random_seed)
3419
super(RandomDecorator, self).__init__(randomize_suite(suite))
3433
def actual_seed(self):
3434
if self.random_seed == "now":
3422
def actual_seed(seed):
3435
3424
# We convert the seed to a long to make it reuseable across
3436
3425
# invocations (because the user can reenter it).
3437
self.random_seed = long(time.time())
3426
return long(time.time())
3439
3428
# Convert the seed to a long if we can
3441
self.random_seed = long(self.random_seed)
3431
except (TypeError, ValueError):
3444
return self.random_seed
3447
3436
class TestFirstDecorator(TestDecorator):
3448
3437
"""A decorator which moves named tests to the front."""
3450
3439
def __init__(self, suite, pattern):
3451
TestDecorator.__init__(self, suite)
3452
self.pattern = pattern
3453
self.filtered = False
3457
return iter(self._tests)
3458
self.filtered = True
3459
suites = split_suite_by_re(self, self.pattern)
3461
self.addTests(suites)
3462
return iter(self._tests)
3440
super(TestFirstDecorator, self).__init__()
3441
self.addTests(split_suite_by_re(suite, pattern))
3465
3444
def partition_tests(suite, count):
3510
3489
ProtocolTestCase.run(self, result)
3512
os.waitpid(self.pid, 0)
3491
pid, status = os.waitpid(self.pid, 0)
3492
# GZ 2011-10-18: If status is nonzero, should report to the result
3493
# that something went wrong.
3514
3495
test_blocks = partition_tests(suite, concurrency)
3496
# Clear the tests from the original suite so it doesn't keep them alive
3497
suite._tests[:] = []
3515
3498
for process_tests in test_blocks:
3516
process_suite = TestUtil.TestSuite()
3517
process_suite.addTests(process_tests)
3499
process_suite = TestUtil.TestSuite(process_tests)
3500
# Also clear each split list so new suite has only reference
3501
process_tests[:] = []
3518
3502
c2pread, c2pwrite = os.pipe()
3519
3503
pid = os.fork()
3521
workaround_zealous_crypto_random()
3506
stream = os.fdopen(c2pwrite, 'wb', 1)
3507
workaround_zealous_crypto_random()
3523
3508
os.close(c2pread)
3524
3509
# Leave stderr and stdout open so we can see test noise
3525
3510
# Close stdin so that the child goes away if it decides to
3526
3511
# read from stdin (otherwise its a roulette to see what
3527
3512
# child actually gets keystrokes for pdb etc).
3528
3513
sys.stdin.close()
3530
stream = os.fdopen(c2pwrite, 'wb', 1)
3531
3514
subunit_result = AutoTimingTestResultDecorator(
3532
TestProtocolClient(stream))
3515
SubUnitBzrProtocolClient(stream))
3533
3516
process_suite.run(subunit_result)
3518
# Try and report traceback on stream, but exit with error even
3519
# if stream couldn't be created or something else goes wrong
3521
traceback.print_exc(file=stream)
3537
3526
os.close(c2pwrite)
3538
3527
stream = os.fdopen(c2pread, 'rb', 1)
3644
3633
# with proper exclusion rules.
3645
3634
# -Ethreads Will display thread ident at creation/join time to
3646
3635
# help track thread leaks
3636
# -Euncollected_cases Display the identity of any test cases that weren't
3637
# deallocated after being completed.
3648
3638
# -Econfig_stats Will collect statistics using addDetail
3649
3639
selftest_debug_flags = set()
3955
3945
'bzrlib.tests.test_email_message',
3956
3946
'bzrlib.tests.test_eol_filters',
3957
3947
'bzrlib.tests.test_errors',
3948
'bzrlib.tests.test_estimate_compressed_size',
3958
3949
'bzrlib.tests.test_export',
3959
3950
'bzrlib.tests.test_export_pot',
3960
3951
'bzrlib.tests.test_extract',
4044
4035
'bzrlib.tests.test_smart',
4045
4036
'bzrlib.tests.test_smart_add',
4046
4037
'bzrlib.tests.test_smart_request',
4038
'bzrlib.tests.test_smart_signals',
4047
4039
'bzrlib.tests.test_smart_transport',
4048
4040
'bzrlib.tests.test_smtp_connection',
4049
4041
'bzrlib.tests.test_source',
4446
4438
from subunit.test_results import AutoTimingTestResultDecorator
4447
4439
class SubUnitBzrProtocolClient(TestProtocolClient):
4441
def stopTest(self, test):
4442
super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
_clear__type_equality_funcs(test)
4449
4445
def addSuccess(self, test, details=None):
4450
4446
# The subunit client always includes the details in the subunit
4451
4447
# stream, but we don't want to include it in ours.