213
217
osutils.set_or_unset_env(var, value)
220
def _clear__type_equality_funcs(test):
221
"""Cleanup bound methods stored on TestCase instances
223
Clear the dict breaking a few (mostly) harmless cycles in the affected
224
unittests released with Python 2.6 and initial Python 2.7 versions.
226
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
227
shipped in Oneiric, an object with no clear method was used, hence the
228
extra complications, see bug 809048 for details.
230
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
231
if type_equality_funcs is not None:
232
tef_clear = getattr(type_equality_funcs, "clear", None)
233
if tef_clear is None:
234
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
235
if tef_instance_dict is not None:
236
tef_clear = tef_instance_dict.clear
237
if tef_clear is not None:
216
241
class ExtendedTestResult(testtools.TextTestResult):
217
242
"""Accepts, reports and accumulates the results of running tests.
1760
1798
self.addCleanup(osutils.set_or_unset_env, name, value)
1801
def recordCalls(self, obj, attr_name):
1802
"""Monkeypatch in a wrapper that will record calls.
1804
The monkeypatch is automatically removed when the test concludes.
1806
:param obj: The namespace holding the reference to be replaced;
1807
typically a module, class, or object.
1808
:param attr_name: A string for the name of the attribute to
1810
:returns: A list that will be extended with one item every time the
1811
function is called, with a tuple of (args, kwargs).
1815
def decorator(*args, **kwargs):
1816
calls.append((args, kwargs))
1817
return orig(*args, **kwargs)
1818
orig = self.overrideAttr(obj, attr_name, decorator)
1763
1821
def _cleanEnvironment(self):
1764
1822
for name, value in isolated_environ.iteritems():
1765
1823
self.overrideEnv(name, value)
1772
1830
self._preserved_lazy_hooks.clear()
1774
1832
def knownFailure(self, reason):
1775
"""This test has failed for some known reason."""
1776
raise KnownFailure(reason)
1833
"""Declare that this test fails for a known reason
1835
Tests that are known to fail should generally be using expectedFailure
1836
with an appropriate reverse assertion if a change could cause the test
1837
to start passing. Conversely if the test has no immediate prospect of
1838
succeeding then using skip is more suitable.
1840
When this method is called while an exception is being handled, that
1841
traceback will be used, otherwise a new exception will be thrown to
1842
provide one but won't be reported.
1844
self._add_reason(reason)
1846
exc_info = sys.exc_info()
1847
if exc_info != (None, None, None):
1848
self._report_traceback(exc_info)
1851
raise self.failureException(reason)
1852
except self.failureException:
1853
exc_info = sys.exc_info()
1854
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1855
raise testtools.testcase._ExpectedFailure(exc_info)
1778
1859
def _suppress_log(self):
1779
1860
"""Remove the log info from details."""
2507
2596
root = TestCaseWithMemoryTransport.TEST_ROOT
2508
bzrdir.BzrDir.create_standalone_workingtree(root)
2597
# Make sure we get a readable and accessible home for .bzr.log
2598
# and/or config files, and not fallback to weird defaults (see
2599
# http://pad.lv/825027).
2600
self.assertIs(None, os.environ.get('BZR_HOME', None))
2601
os.environ['BZR_HOME'] = root
2602
wt = controldir.ControlDir.create_standalone_workingtree(root)
2603
del os.environ['BZR_HOME']
2604
# Hack for speed: remember the raw bytes of the dirstate file so that
2605
# we don't need to re-open the wt to check it hasn't changed.
2606
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2607
wt.control_transport.get_bytes('dirstate'))
2510
2609
def _check_safety_net(self):
2511
2610
"""Check that the safety .bzr directory have not been touched.
2555
2654
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2556
2655
self.permit_dir(self.test_dir)
2558
def make_branch(self, relpath, format=None):
2657
def make_branch(self, relpath, format=None, name=None):
2559
2658
"""Create a branch on the transport at relpath."""
2560
2659
repo = self.make_repository(relpath, format=format)
2561
return repo.bzrdir.create_branch()
2660
return repo.bzrdir.create_branch(append_revisions_only=False,
2663
def get_default_format(self):
2666
def resolve_format(self, format):
2667
"""Resolve an object to a ControlDir format object.
2669
The initial format object can either already be
2670
a ControlDirFormat, None (for the default format),
2671
or a string with the name of the control dir format.
2673
:param format: Object to resolve
2674
:return A ControlDirFormat instance
2677
format = self.get_default_format()
2678
if isinstance(format, basestring):
2679
format = controldir.format_registry.make_bzrdir(format)
2563
2682
def make_bzrdir(self, relpath, format=None):
2568
2687
t = _mod_transport.get_transport(maybe_a_url)
2569
2688
if len(segments) > 1 and segments[-1] not in ('', '.'):
2570
2689
t.ensure_base()
2573
if isinstance(format, basestring):
2574
format = bzrdir.format_registry.make_bzrdir(format)
2690
format = self.resolve_format(format)
2575
2691
return format.initialize_on_transport(t)
2576
2692
except errors.UninitializableFormat:
2577
2693
raise TestSkipped("Format %s is not initializable." % format)
2579
def make_repository(self, relpath, shared=False, format=None):
2695
def make_repository(self, relpath, shared=None, format=None):
2580
2696
"""Create a repository on our default transport at relpath.
2582
2698
Note that relpath must be a relative path, not a full url.
2616
2732
def setUp(self):
2617
2733
super(TestCaseWithMemoryTransport, self).setUp()
2618
# Ensure that ConnectedTransport doesn't leak sockets
2619
def get_transport_with_cleanup(*args, **kwargs):
2620
t = orig_get_transport(*args, **kwargs)
2621
if isinstance(t, _mod_transport.ConnectedTransport):
2622
self.addCleanup(t.disconnect)
2625
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2626
get_transport_with_cleanup)
2735
def _add_disconnect_cleanup(transport):
2736
"""Schedule disconnection of given transport at test cleanup
2738
This needs to happen for all connected transports or leaks occur.
2740
Note reconnections may mean we call disconnect multiple times per
2741
transport which is suboptimal but seems harmless.
2743
self.addCleanup(transport.disconnect)
2745
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2746
_add_disconnect_cleanup, None)
2627
2748
self._make_test_root()
2628
2749
self.addCleanup(os.chdir, os.getcwdu())
2629
2750
self.makeAndChdirToTestDir()
3263
3398
class TestDecorator(TestUtil.TestSuite):
3264
3399
"""A decorator for TestCase/TestSuite objects.
3266
Usually, subclasses should override __iter__(used when flattening test
3267
suites), which we do to filter, reorder, parallelise and so on, run() and
3401
Contains rather than flattening suite passed on construction
3271
def __init__(self, suite):
3272
TestUtil.TestSuite.__init__(self)
3275
def countTestCases(self):
3278
cases += test.countTestCases()
3285
def run(self, result):
3286
# Use iteration on self, not self._tests, to allow subclasses to hook
3289
if result.shouldStop:
3404
def __init__(self, suite=None):
3405
super(TestDecorator, self).__init__()
3406
if suite is not None:
3409
# Don't need subclass run method with suite emptying
3410
run = unittest.TestSuite.run
3295
3413
class CountingDecorator(TestDecorator):
3306
3424
"""A decorator which excludes test matching an exclude pattern."""
3308
3426
def __init__(self, suite, exclude_pattern):
3309
TestDecorator.__init__(self, suite)
3310
self.exclude_pattern = exclude_pattern
3311
self.excluded = False
3315
return iter(self._tests)
3316
self.excluded = True
3317
suite = exclude_tests_by_re(self, self.exclude_pattern)
3319
self.addTests(suite)
3320
return iter(self._tests)
3427
super(ExcludeDecorator, self).__init__(
3428
exclude_tests_by_re(suite, exclude_pattern))
3323
3431
class FilterTestsDecorator(TestDecorator):
3324
3432
"""A decorator which filters tests to those matching a pattern."""
3326
3434
def __init__(self, suite, pattern):
3327
TestDecorator.__init__(self, suite)
3328
self.pattern = pattern
3329
self.filtered = False
3333
return iter(self._tests)
3334
self.filtered = True
3335
suite = filter_suite_by_re(self, self.pattern)
3337
self.addTests(suite)
3338
return iter(self._tests)
3435
super(FilterTestsDecorator, self).__init__(
3436
filter_suite_by_re(suite, pattern))
3341
3439
class RandomDecorator(TestDecorator):
3342
3440
"""A decorator which randomises the order of its tests."""
3344
3442
def __init__(self, suite, random_seed, stream):
3345
TestDecorator.__init__(self, suite)
3346
self.random_seed = random_seed
3347
self.randomised = False
3348
self.stream = stream
3352
return iter(self._tests)
3353
self.randomised = True
3354
self.stream.write("Randomizing test order using seed %s\n\n" %
3355
(self.actual_seed()))
3443
random_seed = self.actual_seed(random_seed)
3444
stream.write("Randomizing test order using seed %s\n\n" %
3356
3446
# Initialise the random number generator.
3357
random.seed(self.actual_seed())
3358
suite = randomize_suite(self)
3360
self.addTests(suite)
3361
return iter(self._tests)
3447
random.seed(random_seed)
3448
super(RandomDecorator, self).__init__(randomize_suite(suite))
3363
def actual_seed(self):
3364
if self.random_seed == "now":
3451
def actual_seed(seed):
3365
3453
# We convert the seed to a long to make it reuseable across
3366
3454
# invocations (because the user can reenter it).
3367
self.random_seed = long(time.time())
3455
return long(time.time())
3369
3457
# Convert the seed to a long if we can
3371
self.random_seed = long(self.random_seed)
3460
except (TypeError, ValueError):
3374
return self.random_seed
3377
3465
class TestFirstDecorator(TestDecorator):
3378
3466
"""A decorator which moves named tests to the front."""
3380
3468
def __init__(self, suite, pattern):
3381
TestDecorator.__init__(self, suite)
3382
self.pattern = pattern
3383
self.filtered = False
3387
return iter(self._tests)
3388
self.filtered = True
3389
suites = split_suite_by_re(self, self.pattern)
3391
self.addTests(suites)
3392
return iter(self._tests)
3469
super(TestFirstDecorator, self).__init__()
3470
self.addTests(split_suite_by_re(suite, pattern))
3395
3473
def partition_tests(suite, count):
3440
3518
ProtocolTestCase.run(self, result)
3442
os.waitpid(self.pid, 0)
3520
pid, status = os.waitpid(self.pid, 0)
3521
# GZ 2011-10-18: If status is nonzero, should report to the result
3522
# that something went wrong.
3444
3524
test_blocks = partition_tests(suite, concurrency)
3525
# Clear the tests from the original suite so it doesn't keep them alive
3526
suite._tests[:] = []
3445
3527
for process_tests in test_blocks:
3446
process_suite = TestUtil.TestSuite()
3447
process_suite.addTests(process_tests)
3528
process_suite = TestUtil.TestSuite(process_tests)
3529
# Also clear each split list so new suite has only reference
3530
process_tests[:] = []
3448
3531
c2pread, c2pwrite = os.pipe()
3449
3532
pid = os.fork()
3451
workaround_zealous_crypto_random()
3535
stream = os.fdopen(c2pwrite, 'wb', 1)
3536
workaround_zealous_crypto_random()
3453
3537
os.close(c2pread)
3454
3538
# Leave stderr and stdout open so we can see test noise
3455
3539
# Close stdin so that the child goes away if it decides to
3456
3540
# read from stdin (otherwise its a roulette to see what
3457
3541
# child actually gets keystrokes for pdb etc).
3458
3542
sys.stdin.close()
3460
stream = os.fdopen(c2pwrite, 'wb', 1)
3461
3543
subunit_result = AutoTimingTestResultDecorator(
3462
TestProtocolClient(stream))
3544
SubUnitBzrProtocolClient(stream))
3463
3545
process_suite.run(subunit_result)
3547
# Try and report traceback on stream, but exit with error even
3548
# if stream couldn't be created or something else goes wrong.
3549
# The traceback is formatted to a string and written in one go
3550
# to avoid interleaving lines from multiple failing children.
3552
stream.write(traceback.format_exc())
3467
3557
os.close(c2pwrite)
3468
3558
stream = os.fdopen(c2pread, 'rb', 1)
3885
3975
'bzrlib.tests.test_email_message',
3886
3976
'bzrlib.tests.test_eol_filters',
3887
3977
'bzrlib.tests.test_errors',
3978
'bzrlib.tests.test_estimate_compressed_size',
3888
3979
'bzrlib.tests.test_export',
3889
3980
'bzrlib.tests.test_export_pot',
3890
3981
'bzrlib.tests.test_extract',
3982
'bzrlib.tests.test_features',
3891
3983
'bzrlib.tests.test_fetch',
3892
3984
'bzrlib.tests.test_fixtures',
3893
3985
'bzrlib.tests.test_fifo_cache',
3894
3986
'bzrlib.tests.test_filters',
3987
'bzrlib.tests.test_filter_tree',
3895
3988
'bzrlib.tests.test_ftp_transport',
3896
3989
'bzrlib.tests.test_foreign',
3897
3990
'bzrlib.tests.test_generate_docs',
4334
4432
% (os.path.basename(dirname), printable_e))
4337
class Feature(object):
4338
"""An operating system Feature."""
4341
self._available = None
4343
def available(self):
4344
"""Is the feature available?
4346
:return: True if the feature is available.
4348
if self._available is None:
4349
self._available = self._probe()
4350
return self._available
4353
"""Implement this method in concrete features.
4355
:return: True if the feature is available.
4357
raise NotImplementedError
4360
if getattr(self, 'feature_name', None):
4361
return self.feature_name()
4362
return self.__class__.__name__
4365
class _SymlinkFeature(Feature):
4368
return osutils.has_symlinks()
4370
def feature_name(self):
4373
SymlinkFeature = _SymlinkFeature()
4376
class _HardlinkFeature(Feature):
4379
return osutils.has_hardlinks()
4381
def feature_name(self):
4384
HardlinkFeature = _HardlinkFeature()
4387
class _OsFifoFeature(Feature):
4390
return getattr(os, 'mkfifo', None)
4392
def feature_name(self):
4393
return 'filesystem fifos'
4395
OsFifoFeature = _OsFifoFeature()
4398
class _UnicodeFilenameFeature(Feature):
4399
"""Does the filesystem support Unicode filenames?"""
4403
# Check for character combinations unlikely to be covered by any
4404
# single non-unicode encoding. We use the characters
4405
# - greek small letter alpha (U+03B1) and
4406
# - braille pattern dots-123456 (U+283F).
4407
os.stat(u'\u03b1\u283f')
4408
except UnicodeEncodeError:
4410
except (IOError, OSError):
4411
# The filesystem allows the Unicode filename but the file doesn't
4415
# The filesystem allows the Unicode filename and the file exists,
4419
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4422
class _CompatabilityThunkFeature(Feature):
4423
"""This feature is just a thunk to another feature.
4425
It issues a deprecation warning if it is accessed, to let you know that you
4426
should really use a different feature.
4429
def __init__(self, dep_version, module, name,
4430
replacement_name, replacement_module=None):
4431
super(_CompatabilityThunkFeature, self).__init__()
4432
self._module = module
4433
if replacement_module is None:
4434
replacement_module = module
4435
self._replacement_module = replacement_module
4437
self._replacement_name = replacement_name
4438
self._dep_version = dep_version
4439
self._feature = None
4442
if self._feature is None:
4443
depr_msg = self._dep_version % ('%s.%s'
4444
% (self._module, self._name))
4445
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4446
self._replacement_name)
4447
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4448
# Import the new feature and use it as a replacement for the
4450
self._feature = pyutils.get_named_object(
4451
self._replacement_module, self._replacement_name)
4455
return self._feature._probe()
4458
class ModuleAvailableFeature(Feature):
4459
"""This is a feature than describes a module we want to be available.
4461
Declare the name of the module in __init__(), and then after probing, the
4462
module will be available as 'self.module'.
4464
:ivar module: The module if it is available, else None.
4467
def __init__(self, module_name):
4468
super(ModuleAvailableFeature, self).__init__()
4469
self.module_name = module_name
4473
self._module = __import__(self.module_name, {}, {}, [''])
4480
if self.available(): # Make sure the probe has been done
4484
def feature_name(self):
4485
return self.module_name
4488
4435
def probe_unicode_in_user_encoding():
4489
4436
"""Try to encode several unicode strings to use in unicode-aware tests.
4490
4437
Return first successfull match.
4521
class _HTTPSServerFeature(Feature):
4522
"""Some tests want an https Server, check if one is available.
4524
Right now, the only way this is available is under python2.6 which provides
4535
def feature_name(self):
4536
return 'HTTPSServer'
4539
HTTPSServerFeature = _HTTPSServerFeature()
4542
class _UnicodeFilename(Feature):
4543
"""Does the filesystem support Unicode filenames?"""
4548
except UnicodeEncodeError:
4550
except (IOError, OSError):
4551
# The filesystem allows the Unicode filename but the file doesn't
4555
# The filesystem allows the Unicode filename and the file exists,
4559
UnicodeFilename = _UnicodeFilename()
4562
class _ByteStringNamedFilesystem(Feature):
4563
"""Is the filesystem based on bytes?"""
4566
if os.name == "posix":
4570
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4573
class _UTF8Filesystem(Feature):
4574
"""Is the filesystem UTF-8?"""
4577
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4581
UTF8Filesystem = _UTF8Filesystem()
4584
class _BreakinFeature(Feature):
4585
"""Does this platform support the breakin feature?"""
4588
from bzrlib import breakin
4589
if breakin.determine_signal() is None:
4591
if sys.platform == 'win32':
4592
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4593
# We trigger SIGBREAK via a Console api so we need ctypes to
4594
# access the function
4601
def feature_name(self):
4602
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4605
BreakinFeature = _BreakinFeature()
4608
class _CaseInsCasePresFilenameFeature(Feature):
4609
"""Is the file-system case insensitive, but case-preserving?"""
4612
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4614
# first check truly case-preserving for created files, then check
4615
# case insensitive when opening existing files.
4616
name = osutils.normpath(name)
4617
base, rel = osutils.split(name)
4618
found_rel = osutils.canonical_relpath(base, name)
4619
return (found_rel == rel
4620
and os.path.isfile(name.upper())
4621
and os.path.isfile(name.lower()))
4626
def feature_name(self):
4627
return "case-insensitive case-preserving filesystem"
4629
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4632
class _CaseInsensitiveFilesystemFeature(Feature):
4633
"""Check if underlying filesystem is case-insensitive but *not* case
4636
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4637
# more likely to be case preserving, so this case is rare.
4640
if CaseInsCasePresFilenameFeature.available():
4643
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4644
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4645
TestCaseWithMemoryTransport.TEST_ROOT = root
4647
root = TestCaseWithMemoryTransport.TEST_ROOT
4648
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4650
name_a = osutils.pathjoin(tdir, 'a')
4651
name_A = osutils.pathjoin(tdir, 'A')
4653
result = osutils.isdir(name_A)
4654
_rmtree_temp_dir(tdir)
4657
def feature_name(self):
4658
return 'case-insensitive filesystem'
4660
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4663
class _CaseSensitiveFilesystemFeature(Feature):
4666
if CaseInsCasePresFilenameFeature.available():
4668
elif CaseInsensitiveFilesystemFeature.available():
4673
def feature_name(self):
4674
return 'case-sensitive filesystem'
4676
# new coding style is for feature instances to be lowercase
4677
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4680
4468
# Only define SubUnitBzrRunner if subunit is available.
4682
4470
from subunit import TestProtocolClient
4683
4471
from subunit.test_results import AutoTimingTestResultDecorator
4684
4472
class SubUnitBzrProtocolClient(TestProtocolClient):
4474
def stopTest(self, test):
4475
super(SubUnitBzrProtocolClient, self).stopTest(test)
4476
_clear__type_equality_funcs(test)
4686
4478
def addSuccess(self, test, details=None):
4687
4479
# The subunit client always includes the details in the subunit
4688
4480
# stream, but we don't want to include it in ours.
4700
4492
except ImportError:
4703
class _PosixPermissionsFeature(Feature):
4707
# create temporary file and check if specified perms are maintained.
4710
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4711
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4714
os.chmod(name, write_perms)
4716
read_perms = os.stat(name).st_mode & 0777
4718
return (write_perms == read_perms)
4720
return (os.name == 'posix') and has_perms()
4722
def feature_name(self):
4723
return 'POSIX permissions support'
4725
posix_permissions_feature = _PosixPermissionsFeature()
4496
# API compatibility for old plugins; see bug 892622.
4499
'HTTPServerFeature',
4500
'ModuleAvailableFeature',
4501
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4502
'OsFifoFeature', 'UnicodeFilenameFeature',
4503
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4504
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4505
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4506
'posix_permissions_feature',
4508
globals()[name] = _CompatabilityThunkFeature(
4509
symbol_versioning.deprecated_in((2, 5, 0)),
4510
'bzrlib.tests', name,
4511
name, 'bzrlib.tests.features')
4514
for (old_name, new_name) in [
4515
('UnicodeFilename', 'UnicodeFilenameFeature'),
4517
globals()[name] = _CompatabilityThunkFeature(
4518
symbol_versioning.deprecated_in((2, 5, 0)),
4519
'bzrlib.tests', old_name,
4520
new_name, 'bzrlib.tests.features')