212
210
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
236
213
class ExtendedTestResult(testtools.TextTestResult):
237
214
"""Accepts, reports and accumulates the results of running tests.
1017
989
# is addressed -- vila 20110219
1018
990
self.overrideAttr(config, '_expand_default_value', None)
1019
991
self._log_files = set()
1020
# Each key in the ``_counters`` dict holds a value for a different
1021
# counter. When the test ends, addDetail() should be used to output the
1022
# counter values. This happens in install_counter_hook().
1024
if 'config_stats' in selftest_debug_flags:
1025
self._install_config_stats_hooks()
1026
# Do not use i18n for tests (unless the test reverses this)
1029
993
def debug(self):
1030
994
# debug a frame up.
1032
# The sys preserved stdin/stdout should allow blackbox tests debugging
1033
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1034
).set_trace(sys._getframe().f_back)
996
pdb.Pdb().set_trace(sys._getframe().f_back)
1036
998
def discardDetail(self, name):
1037
999
"""Extend the addDetail, getDetails api so we can remove a detail.
1049
1011
if name in details:
1050
1012
del details[name]
1052
def install_counter_hook(self, hooks, name, counter_name=None):
1053
"""Install a counting hook.
1055
Any hook can be counted as long as it doesn't need to return a value.
1057
:param hooks: Where the hook should be installed.
1059
:param name: The hook name that will be counted.
1061
:param counter_name: The counter identifier in ``_counters``, defaults
1064
_counters = self._counters # Avoid closing over self
1065
if counter_name is None:
1067
if _counters.has_key(counter_name):
1068
raise AssertionError('%s is already used as a counter name'
1070
_counters[counter_name] = 0
1071
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1072
lambda: ['%d' % (_counters[counter_name],)]))
1073
def increment_counter(*args, **kwargs):
1074
_counters[counter_name] += 1
1075
label = 'count %s calls' % (counter_name,)
1076
hooks.install_named_hook(name, increment_counter, label)
1077
self.addCleanup(hooks.uninstall_named_hook, name, label)
1079
def _install_config_stats_hooks(self):
1080
"""Install config hooks to count hook calls.
1083
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1084
self.install_counter_hook(config.ConfigHooks, hook_name,
1085
'config.%s' % (hook_name,))
1087
# The OldConfigHooks are private and need special handling to protect
1088
# against recursive tests (tests that run other tests), so we just do
1089
# manually what registering them into _builtin_known_hooks will provide
1091
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1092
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1093
self.install_counter_hook(config.OldConfigHooks, hook_name,
1094
'old_config.%s' % (hook_name,))
1096
1014
def _clear_debug_flags(self):
1097
1015
"""Prevent externally set debug flags affecting tests.
1792
1707
self.addCleanup(osutils.set_or_unset_env, name, value)
1795
def recordCalls(self, obj, attr_name):
1796
"""Monkeypatch in a wrapper that will record calls.
1798
The monkeypatch is automatically removed when the test concludes.
1800
:param obj: The namespace holding the reference to be replaced;
1801
typically a module, class, or object.
1802
:param attr_name: A string for the name of the attribute to
1804
:returns: A list that will be extended with one item every time the
1805
function is called, with a tuple of (args, kwargs).
1809
def decorator(*args, **kwargs):
1810
calls.append((args, kwargs))
1811
return orig(*args, **kwargs)
1812
orig = self.overrideAttr(obj, attr_name, decorator)
1815
1710
def _cleanEnvironment(self):
1816
1711
for name, value in isolated_environ.iteritems():
1817
1712
self.overrideEnv(name, value)
1824
1719
self._preserved_lazy_hooks.clear()
1826
1721
def knownFailure(self, reason):
1827
"""Declare that this test fails for a known reason
1829
Tests that are known to fail should generally be using expectedFailure
1830
with an appropriate reverse assertion if a change could cause the test
1831
to start passing. Conversely if the test has no immediate prospect of
1832
succeeding then using skip is more suitable.
1834
When this method is called while an exception is being handled, that
1835
traceback will be used, otherwise a new exception will be thrown to
1836
provide one but won't be reported.
1838
self._add_reason(reason)
1840
exc_info = sys.exc_info()
1841
if exc_info != (None, None, None):
1842
self._report_traceback(exc_info)
1845
raise self.failureException(reason)
1846
except self.failureException:
1847
exc_info = sys.exc_info()
1848
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1849
raise testtools.testcase._ExpectedFailure(exc_info)
1722
"""This test has failed for some known reason."""
1723
raise KnownFailure(reason)
1853
1725
def _suppress_log(self):
1854
1726
"""Remove the log info from details."""
2403
2275
class TestCaseWithMemoryTransport(TestCase):
2404
2276
"""Common test class for tests that do not need disk resources.
2406
Tests that need disk resources should derive from TestCaseInTempDir
2407
orTestCaseWithTransport.
2278
Tests that need disk resources should derive from TestCaseWithTransport.
2409
2280
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2411
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2282
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2412
2283
a directory which does not exist. This serves to help ensure test isolation
2413
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2414
must exist. However, TestCaseWithMemoryTransport does not offer local file
2415
defaults for the transport in tests, nor does it obey the command line
2284
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2285
must exist. However, TestCaseWithMemoryTransport does not offer local
2286
file defaults for the transport in tests, nor does it obey the command line
2416
2287
override, so tests that accidentally write to the common directory should
2419
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2420
``.bzr`` directory that stops us ascending higher into the filesystem.
2290
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2291
a .bzr directory that stops us ascending higher into the filesystem.
2423
2294
TEST_ROOT = None
2583
2453
root = TestCaseWithMemoryTransport.TEST_ROOT
2584
# Make sure we get a readable and accessible home for .bzr.log
2585
# and/or config files, and not fallback to weird defaults (see
2586
# http://pad.lv/825027).
2587
self.assertIs(None, os.environ.get('BZR_HOME', None))
2588
os.environ['BZR_HOME'] = root
2589
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2590
del os.environ['BZR_HOME']
2591
# Hack for speed: remember the raw bytes of the dirstate file so that
2592
# we don't need to re-open the wt to check it hasn't changed.
2593
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2594
wt.control_transport.get_bytes('dirstate'))
2454
bzrdir.BzrDir.create_standalone_workingtree(root)
2596
2456
def _check_safety_net(self):
2597
2457
"""Check that the safety .bzr directory have not been touched.
2644
2504
def make_branch(self, relpath, format=None):
2645
2505
"""Create a branch on the transport at relpath."""
2646
2506
repo = self.make_repository(relpath, format=format)
2647
return repo.bzrdir.create_branch(append_revisions_only=False)
2649
def get_default_format(self):
2652
def resolve_format(self, format):
2653
"""Resolve an object to a ControlDir format object.
2655
The initial format object can either already be
2656
a ControlDirFormat, None (for the default format),
2657
or a string with the name of the control dir format.
2659
:param format: Object to resolve
2660
:return A ControlDirFormat instance
2663
format = self.get_default_format()
2664
if isinstance(format, basestring):
2665
format = bzrdir.format_registry.make_bzrdir(format)
2507
return repo.bzrdir.create_branch()
2668
2509
def make_bzrdir(self, relpath, format=None):
2673
2514
t = _mod_transport.get_transport(maybe_a_url)
2674
2515
if len(segments) > 1 and segments[-1] not in ('', '.'):
2675
2516
t.ensure_base()
2676
format = self.resolve_format(format)
2519
if isinstance(format, basestring):
2520
format = bzrdir.format_registry.make_bzrdir(format)
2677
2521
return format.initialize_on_transport(t)
2678
2522
except errors.UninitializableFormat:
2679
2523
raise TestSkipped("Format %s is not initializable." % format)
2681
def make_repository(self, relpath, shared=None, format=None):
2525
def make_repository(self, relpath, shared=False, format=None):
2682
2526
"""Create a repository on our default transport at relpath.
2684
2528
Note that relpath must be a relative path, not a full url.
2718
2562
def setUp(self):
2719
2563
super(TestCaseWithMemoryTransport, self).setUp()
2720
2564
# Ensure that ConnectedTransport doesn't leak sockets
2721
def get_transport_from_url_with_cleanup(*args, **kwargs):
2722
t = orig_get_transport_from_url(*args, **kwargs)
2565
def get_transport_with_cleanup(*args, **kwargs):
2566
t = orig_get_transport(*args, **kwargs)
2723
2567
if isinstance(t, _mod_transport.ConnectedTransport):
2724
2568
self.addCleanup(t.disconnect)
2727
orig_get_transport_from_url = self.overrideAttr(
2728
_mod_transport, 'get_transport_from_url',
2729
get_transport_from_url_with_cleanup)
2571
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2572
get_transport_with_cleanup)
2730
2573
self._make_test_root()
2731
2574
self.addCleanup(os.chdir, os.getcwdu())
2732
2575
self.makeAndChdirToTestDir()
3373
3203
class TestDecorator(TestUtil.TestSuite):
3374
3204
"""A decorator for TestCase/TestSuite objects.
3376
Contains rather than flattening suite passed on construction
3206
Usually, subclasses should override __iter__(used when flattening test
3207
suites), which we do to filter, reorder, parallelise and so on, run() and
3379
def __init__(self, suite=None):
3380
super(TestDecorator, self).__init__()
3381
if suite is not None:
3384
# Don't need subclass run method with suite emptying
3385
run = unittest.TestSuite.run
3211
def __init__(self, suite):
3212
TestUtil.TestSuite.__init__(self)
3215
def countTestCases(self):
3218
cases += test.countTestCases()
3225
def run(self, result):
3226
# Use iteration on self, not self._tests, to allow subclasses to hook
3229
if result.shouldStop:
3388
3235
class CountingDecorator(TestDecorator):
3399
3246
"""A decorator which excludes test matching an exclude pattern."""
3401
3248
def __init__(self, suite, exclude_pattern):
3402
super(ExcludeDecorator, self).__init__(
3403
exclude_tests_by_re(suite, exclude_pattern))
3249
TestDecorator.__init__(self, suite)
3250
self.exclude_pattern = exclude_pattern
3251
self.excluded = False
3255
return iter(self._tests)
3256
self.excluded = True
3257
suite = exclude_tests_by_re(self, self.exclude_pattern)
3259
self.addTests(suite)
3260
return iter(self._tests)
3406
3263
class FilterTestsDecorator(TestDecorator):
3407
3264
"""A decorator which filters tests to those matching a pattern."""
3409
3266
def __init__(self, suite, pattern):
3410
super(FilterTestsDecorator, self).__init__(
3411
filter_suite_by_re(suite, pattern))
3267
TestDecorator.__init__(self, suite)
3268
self.pattern = pattern
3269
self.filtered = False
3273
return iter(self._tests)
3274
self.filtered = True
3275
suite = filter_suite_by_re(self, self.pattern)
3277
self.addTests(suite)
3278
return iter(self._tests)
3414
3281
class RandomDecorator(TestDecorator):
3415
3282
"""A decorator which randomises the order of its tests."""
3417
3284
def __init__(self, suite, random_seed, stream):
3418
random_seed = self.actual_seed(random_seed)
3419
stream.write("Randomizing test order using seed %s\n\n" %
3285
TestDecorator.__init__(self, suite)
3286
self.random_seed = random_seed
3287
self.randomised = False
3288
self.stream = stream
3292
return iter(self._tests)
3293
self.randomised = True
3294
self.stream.write("Randomizing test order using seed %s\n\n" %
3295
(self.actual_seed()))
3421
3296
# Initialise the random number generator.
3422
random.seed(random_seed)
3423
super(RandomDecorator, self).__init__(randomize_suite(suite))
3297
random.seed(self.actual_seed())
3298
suite = randomize_suite(self)
3300
self.addTests(suite)
3301
return iter(self._tests)
3426
def actual_seed(seed):
3303
def actual_seed(self):
3304
if self.random_seed == "now":
3428
3305
# We convert the seed to a long to make it reuseable across
3429
3306
# invocations (because the user can reenter it).
3430
return long(time.time())
3307
self.random_seed = long(time.time())
3432
3309
# Convert the seed to a long if we can
3435
except (TypeError, ValueError):
3311
self.random_seed = long(self.random_seed)
3314
return self.random_seed
3440
3317
class TestFirstDecorator(TestDecorator):
3441
3318
"""A decorator which moves named tests to the front."""
3443
3320
def __init__(self, suite, pattern):
3444
super(TestFirstDecorator, self).__init__()
3445
self.addTests(split_suite_by_re(suite, pattern))
3321
TestDecorator.__init__(self, suite)
3322
self.pattern = pattern
3323
self.filtered = False
3327
return iter(self._tests)
3328
self.filtered = True
3329
suites = split_suite_by_re(self, self.pattern)
3331
self.addTests(suites)
3332
return iter(self._tests)
3448
3335
def partition_tests(suite, count):
3945
3823
'bzrlib.tests.test_email_message',
3946
3824
'bzrlib.tests.test_eol_filters',
3947
3825
'bzrlib.tests.test_errors',
3948
'bzrlib.tests.test_estimate_compressed_size',
3949
3826
'bzrlib.tests.test_export',
3950
3827
'bzrlib.tests.test_export_pot',
3951
3828
'bzrlib.tests.test_extract',
3952
'bzrlib.tests.test_features',
3953
3829
'bzrlib.tests.test_fetch',
3954
3830
'bzrlib.tests.test_fixtures',
3955
3831
'bzrlib.tests.test_fifo_cache',
3956
3832
'bzrlib.tests.test_filters',
3957
'bzrlib.tests.test_filter_tree',
3958
3833
'bzrlib.tests.test_ftp_transport',
3959
3834
'bzrlib.tests.test_foreign',
3960
3835
'bzrlib.tests.test_generate_docs',
4399
4272
% (os.path.basename(dirname), printable_e))
4275
class Feature(object):
4276
"""An operating system Feature."""
4279
self._available = None
4281
def available(self):
4282
"""Is the feature available?
4284
:return: True if the feature is available.
4286
if self._available is None:
4287
self._available = self._probe()
4288
return self._available
4291
"""Implement this method in concrete features.
4293
:return: True if the feature is available.
4295
raise NotImplementedError
4298
if getattr(self, 'feature_name', None):
4299
return self.feature_name()
4300
return self.__class__.__name__
4303
class _SymlinkFeature(Feature):
4306
return osutils.has_symlinks()
4308
def feature_name(self):
4311
SymlinkFeature = _SymlinkFeature()
4314
class _HardlinkFeature(Feature):
4317
return osutils.has_hardlinks()
4319
def feature_name(self):
4322
HardlinkFeature = _HardlinkFeature()
4325
class _OsFifoFeature(Feature):
4328
return getattr(os, 'mkfifo', None)
4330
def feature_name(self):
4331
return 'filesystem fifos'
4333
OsFifoFeature = _OsFifoFeature()
4336
class _UnicodeFilenameFeature(Feature):
4337
"""Does the filesystem support Unicode filenames?"""
4341
# Check for character combinations unlikely to be covered by any
4342
# single non-unicode encoding. We use the characters
4343
# - greek small letter alpha (U+03B1) and
4344
# - braille pattern dots-123456 (U+283F).
4345
os.stat(u'\u03b1\u283f')
4346
except UnicodeEncodeError:
4348
except (IOError, OSError):
4349
# The filesystem allows the Unicode filename but the file doesn't
4353
# The filesystem allows the Unicode filename and the file exists,
4357
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4360
class _CompatabilityThunkFeature(Feature):
4361
"""This feature is just a thunk to another feature.
4363
It issues a deprecation warning if it is accessed, to let you know that you
4364
should really use a different feature.
4367
def __init__(self, dep_version, module, name,
4368
replacement_name, replacement_module=None):
4369
super(_CompatabilityThunkFeature, self).__init__()
4370
self._module = module
4371
if replacement_module is None:
4372
replacement_module = module
4373
self._replacement_module = replacement_module
4375
self._replacement_name = replacement_name
4376
self._dep_version = dep_version
4377
self._feature = None
4380
if self._feature is None:
4381
depr_msg = self._dep_version % ('%s.%s'
4382
% (self._module, self._name))
4383
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4384
self._replacement_name)
4385
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4386
# Import the new feature and use it as a replacement for the
4388
self._feature = pyutils.get_named_object(
4389
self._replacement_module, self._replacement_name)
4393
return self._feature._probe()
4396
class ModuleAvailableFeature(Feature):
4397
"""This is a feature than describes a module we want to be available.
4399
Declare the name of the module in __init__(), and then after probing, the
4400
module will be available as 'self.module'.
4402
:ivar module: The module if it is available, else None.
4405
def __init__(self, module_name):
4406
super(ModuleAvailableFeature, self).__init__()
4407
self.module_name = module_name
4411
self._module = __import__(self.module_name, {}, {}, [''])
4418
if self.available(): # Make sure the probe has been done
4422
def feature_name(self):
4423
return self.module_name
4402
4426
def probe_unicode_in_user_encoding():
4403
4427
"""Try to encode several unicode strings to use in unicode-aware tests.
4404
4428
Return first successfull match.
4459
class _HTTPSServerFeature(Feature):
4460
"""Some tests want an https Server, check if one is available.
4462
Right now, the only way this is available is under python2.6 which provides
4473
def feature_name(self):
4474
return 'HTTPSServer'
4477
HTTPSServerFeature = _HTTPSServerFeature()
4480
class _UnicodeFilename(Feature):
4481
"""Does the filesystem support Unicode filenames?"""
4486
except UnicodeEncodeError:
4488
except (IOError, OSError):
4489
# The filesystem allows the Unicode filename but the file doesn't
4493
# The filesystem allows the Unicode filename and the file exists,
4497
UnicodeFilename = _UnicodeFilename()
4500
class _ByteStringNamedFilesystem(Feature):
4501
"""Is the filesystem based on bytes?"""
4504
if os.name == "posix":
4508
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4511
class _UTF8Filesystem(Feature):
4512
"""Is the filesystem UTF-8?"""
4515
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4519
UTF8Filesystem = _UTF8Filesystem()
4522
class _BreakinFeature(Feature):
4523
"""Does this platform support the breakin feature?"""
4526
from bzrlib import breakin
4527
if breakin.determine_signal() is None:
4529
if sys.platform == 'win32':
4530
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4531
# We trigger SIGBREAK via a Console api so we need ctypes to
4532
# access the function
4539
def feature_name(self):
4540
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4543
BreakinFeature = _BreakinFeature()
4546
class _CaseInsCasePresFilenameFeature(Feature):
4547
"""Is the file-system case insensitive, but case-preserving?"""
4550
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4552
# first check truly case-preserving for created files, then check
4553
# case insensitive when opening existing files.
4554
name = osutils.normpath(name)
4555
base, rel = osutils.split(name)
4556
found_rel = osutils.canonical_relpath(base, name)
4557
return (found_rel == rel
4558
and os.path.isfile(name.upper())
4559
and os.path.isfile(name.lower()))
4564
def feature_name(self):
4565
return "case-insensitive case-preserving filesystem"
4567
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4570
class _CaseInsensitiveFilesystemFeature(Feature):
4571
"""Check if underlying filesystem is case-insensitive but *not* case
4574
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4575
# more likely to be case preserving, so this case is rare.
4578
if CaseInsCasePresFilenameFeature.available():
4581
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4582
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4583
TestCaseWithMemoryTransport.TEST_ROOT = root
4585
root = TestCaseWithMemoryTransport.TEST_ROOT
4586
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4588
name_a = osutils.pathjoin(tdir, 'a')
4589
name_A = osutils.pathjoin(tdir, 'A')
4591
result = osutils.isdir(name_A)
4592
_rmtree_temp_dir(tdir)
4595
def feature_name(self):
4596
return 'case-insensitive filesystem'
4598
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4601
class _CaseSensitiveFilesystemFeature(Feature):
4604
if CaseInsCasePresFilenameFeature.available():
4606
elif CaseInsensitiveFilesystemFeature.available():
4611
def feature_name(self):
4612
return 'case-sensitive filesystem'
4614
# new coding style is for feature instances to be lowercase
4615
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4435
4618
# Only define SubUnitBzrRunner if subunit is available.
4437
4620
from subunit import TestProtocolClient
4438
4621
from subunit.test_results import AutoTimingTestResultDecorator
4439
4622
class SubUnitBzrProtocolClient(TestProtocolClient):
4441
def stopTest(self, test):
4442
super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
_clear__type_equality_funcs(test)
4445
4624
def addSuccess(self, test, details=None):
4446
4625
# The subunit client always includes the details in the subunit
4447
4626
# stream, but we don't want to include it in ours.
4459
4638
except ImportError:
4463
@deprecated_function(deprecated_in((2, 5, 0)))
4464
def ModuleAvailableFeature(name):
4465
from bzrlib.tests import features
4466
return features.ModuleAvailableFeature(name)
4641
class _PosixPermissionsFeature(Feature):
4645
# create temporary file and check if specified perms are maintained.
4648
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4649
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4652
os.chmod(name, write_perms)
4654
read_perms = os.stat(name).st_mode & 0777
4656
return (write_perms == read_perms)
4658
return (os.name == 'posix') and has_perms()
4660
def feature_name(self):
4661
return 'POSIX permissions support'
4663
posix_permissions_feature = _PosixPermissionsFeature()