383
385
getDetails = getattr(test, "getDetails", None)
384
386
if getDetails is not None:
385
387
getDetails().clear()
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
386
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
387
392
if type_equality_funcs is not None:
388
type_equality_funcs.clear()
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
389
400
self._traceback_from_test = None
391
402
def startTests(self):
992
1003
# settled on or a the FIXME associated with _get_expand_default_value
993
1004
# is addressed -- vila 20110219
994
1005
self.overrideAttr(config, '_expand_default_value', None)
1006
self._log_files = set()
1007
# Each key in the ``_counters`` dict holds a value for a different
1008
# counter. When the test ends, addDetail() should be used to output the
1009
# counter values. This happens in install_counter_hook().
1011
if 'config_stats' in selftest_debug_flags:
1012
self._install_config_stats_hooks()
1013
# Do not use i18n for tests (unless the test reverses this)
996
1016
def debug(self):
997
1017
# debug a frame up.
999
pdb.Pdb().set_trace(sys._getframe().f_back)
1019
# The sys preserved stdin/stdout should allow blackbox tests debugging
1020
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1021
).set_trace(sys._getframe().f_back)
1001
1023
def discardDetail(self, name):
1002
1024
"""Extend the addDetail, getDetails api so we can remove a detail.
1014
1036
if name in details:
1015
1037
del details[name]
1039
def install_counter_hook(self, hooks, name, counter_name=None):
1040
"""Install a counting hook.
1042
Any hook can be counted as long as it doesn't need to return a value.
1044
:param hooks: Where the hook should be installed.
1046
:param name: The hook name that will be counted.
1048
:param counter_name: The counter identifier in ``_counters``, defaults
1051
_counters = self._counters # Avoid closing over self
1052
if counter_name is None:
1054
if _counters.has_key(counter_name):
1055
raise AssertionError('%s is already used as a counter name'
1057
_counters[counter_name] = 0
1058
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1059
lambda: ['%d' % (_counters[counter_name],)]))
1060
def increment_counter(*args, **kwargs):
1061
_counters[counter_name] += 1
1062
label = 'count %s calls' % (counter_name,)
1063
hooks.install_named_hook(name, increment_counter, label)
1064
self.addCleanup(hooks.uninstall_named_hook, name, label)
1066
def _install_config_stats_hooks(self):
1067
"""Install config hooks to count hook calls.
1070
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1071
self.install_counter_hook(config.ConfigHooks, hook_name,
1072
'config.%s' % (hook_name,))
1074
# The OldConfigHooks are private and need special handling to protect
1075
# against recursive tests (tests that run other tests), so we just do
1076
# manually what registering them into _builtin_known_hooks will provide
1078
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1079
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1080
self.install_counter_hook(config.OldConfigHooks, hook_name,
1081
'old_config.%s' % (hook_name,))
1017
1083
def _clear_debug_flags(self):
1018
1084
"""Prevent externally set debug flags affecting tests.
1073
1139
# break some locks on purpose and should be taken into account by
1074
1140
# considering that breaking a lock is just a dirty way of releasing it.
1075
1141
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1076
message = ('Different number of acquired and '
1077
'released or broken locks. (%s, %s + %s)' %
1078
(acquired_locks, released_locks, broken_locks))
1143
'Different number of acquired and '
1144
'released or broken locks.\n'
1148
(acquired_locks, released_locks, broken_locks))
1079
1149
if not self._lock_check_thorough:
1080
1150
# Rather than fail, just warn
1081
1151
print "Broken test %s: %s" % (self, message)
1705
1779
self.addCleanup(osutils.set_or_unset_env, name, value)
1782
def recordCalls(self, obj, attr_name):
1783
"""Monkeypatch in a wrapper that will record calls.
1785
The monkeypatch is automatically removed when the test concludes.
1787
:param obj: The namespace holding the reference to be replaced;
1788
typically a module, class, or object.
1789
:param attr_name: A string for the name of the attribute to
1791
:returns: A list that will be extended with one item every time the
1792
function is called, with a tuple of (args, kwargs).
1796
def decorator(*args, **kwargs):
1797
calls.append((args, kwargs))
1798
return orig(*args, **kwargs)
1799
orig = self.overrideAttr(obj, attr_name, decorator)
1708
1802
def _cleanEnvironment(self):
1709
1803
for name, value in isolated_environ.iteritems():
1710
1804
self.overrideEnv(name, value)
1717
1811
self._preserved_lazy_hooks.clear()
1719
1813
def knownFailure(self, reason):
1720
"""This test has failed for some known reason."""
1721
raise KnownFailure(reason)
1814
"""Declare that this test fails for a known reason
1816
Tests that are known to fail should generally be using expectedFailure
1817
with an appropriate reverse assertion if a change could cause the test
1818
to start passing. Conversely if the test has no immediate prospect of
1819
succeeding then using skip is more suitable.
1821
When this method is called while an exception is being handled, that
1822
traceback will be used, otherwise a new exception will be thrown to
1823
provide one but won't be reported.
1825
self._add_reason(reason)
1827
exc_info = sys.exc_info()
1828
if exc_info != (None, None, None):
1829
self._report_traceback(exc_info)
1832
raise self.failureException(reason)
1833
except self.failureException:
1834
exc_info = sys.exc_info()
1835
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1836
raise testtools.testcase._ExpectedFailure(exc_info)
1723
1840
def _suppress_log(self):
1724
1841
"""Remove the log info from details."""
2209
def _add_subprocess_log(self, log_file_path):
2210
if len(self._log_files) == 0:
2211
# Register an addCleanup func. We do this on the first call to
2212
# _add_subprocess_log rather than in TestCase.setUp so that this
2213
# addCleanup is registered after any cleanups for tempdirs that
2214
# subclasses might create, which will probably remove the log file
2216
self.addCleanup(self._subprocess_log_cleanup)
2217
# self._log_files is a set, so if a log file is reused we won't grab it
2219
self._log_files.add(log_file_path)
2221
def _subprocess_log_cleanup(self):
2222
for count, log_file_path in enumerate(self._log_files):
2223
# We use buffer_now=True to avoid holding the file open beyond
2224
# the life of this function, which might interfere with e.g.
2225
# cleaning tempdirs on Windows.
2226
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2227
#detail_content = content.content_from_file(
2228
# log_file_path, buffer_now=True)
2229
with open(log_file_path, 'rb') as log_file:
2230
log_file_bytes = log_file.read()
2231
detail_content = content.Content(content.ContentType("text",
2232
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2233
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2086
2236
def _popen(self, *args, **kwargs):
2087
2237
"""Place a call to Popen.
2240
2390
class TestCaseWithMemoryTransport(TestCase):
2241
2391
"""Common test class for tests that do not need disk resources.
2243
Tests that need disk resources should derive from TestCaseWithTransport.
2393
Tests that need disk resources should derive from TestCaseInTempDir
2394
orTestCaseWithTransport.
2245
2396
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2247
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2398
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2248
2399
a directory which does not exist. This serves to help ensure test isolation
2249
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2250
must exist. However, TestCaseWithMemoryTransport does not offer local
2251
file defaults for the transport in tests, nor does it obey the command line
2400
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2401
must exist. However, TestCaseWithMemoryTransport does not offer local file
2402
defaults for the transport in tests, nor does it obey the command line
2252
2403
override, so tests that accidentally write to the common directory should
2255
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2256
a .bzr directory that stops us ascending higher into the filesystem.
2406
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2407
``.bzr`` directory that stops us ascending higher into the filesystem.
2259
2410
TEST_ROOT = None
2418
2570
root = TestCaseWithMemoryTransport.TEST_ROOT
2419
bzrdir.BzrDir.create_standalone_workingtree(root)
2572
# Make sure we get a readable and accessible home for .bzr.log
2573
# and/or config files, and not fallback to weird defaults (see
2574
# http://pad.lv/825027).
2575
self.assertIs(None, os.environ.get('BZR_HOME', None))
2576
os.environ['BZR_HOME'] = root
2577
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2578
del os.environ['BZR_HOME']
2579
except Exception, e:
2580
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2581
# Hack for speed: remember the raw bytes of the dirstate file so that
2582
# we don't need to re-open the wt to check it hasn't changed.
2583
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2584
wt.control_transport.get_bytes('dirstate'))
2421
2586
def _check_safety_net(self):
2422
2587
"""Check that the safety .bzr directory have not been touched.
2469
2634
def make_branch(self, relpath, format=None):
2470
2635
"""Create a branch on the transport at relpath."""
2471
2636
repo = self.make_repository(relpath, format=format)
2472
return repo.bzrdir.create_branch()
2637
return repo.bzrdir.create_branch(append_revisions_only=False)
2639
def resolve_format(self, format):
2640
"""Resolve an object to a ControlDir format object.
2642
The initial format object can either already be
2643
a ControlDirFormat, None (for the default format),
2644
or a string with the name of the control dir format.
2646
:param format: Object to resolve
2647
:return A ControlDirFormat instance
2651
if isinstance(format, basestring):
2652
format = bzrdir.format_registry.make_bzrdir(format)
2655
def resolve_format(self, format):
2656
"""Resolve an object to a ControlDir format object.
2658
The initial format object can either already be
2659
a ControlDirFormat, None (for the default format),
2660
or a string with the name of the control dir format.
2662
:param format: Object to resolve
2663
:return A ControlDirFormat instance
2667
if isinstance(format, basestring):
2668
format = bzrdir.format_registry.make_bzrdir(format)
2474
2671
def make_bzrdir(self, relpath, format=None):
2527
2721
def setUp(self):
2528
2722
super(TestCaseWithMemoryTransport, self).setUp()
2529
2723
# Ensure that ConnectedTransport doesn't leak sockets
2530
def get_transport_with_cleanup(*args, **kwargs):
2531
t = orig_get_transport(*args, **kwargs)
2724
def get_transport_from_url_with_cleanup(*args, **kwargs):
2725
t = orig_get_transport_from_url(*args, **kwargs)
2532
2726
if isinstance(t, _mod_transport.ConnectedTransport):
2533
2727
self.addCleanup(t.disconnect)
2536
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2537
get_transport_with_cleanup)
2730
orig_get_transport_from_url = self.overrideAttr(
2731
_mod_transport, 'get_transport_from_url',
2732
get_transport_from_url_with_cleanup)
2538
2733
self._make_test_root()
2539
2734
self.addCleanup(os.chdir, os.getcwdu())
2540
2735
self.makeAndChdirToTestDir()
3742
3944
'bzrlib.tests.test_email_message',
3743
3945
'bzrlib.tests.test_eol_filters',
3744
3946
'bzrlib.tests.test_errors',
3947
'bzrlib.tests.test_estimate_compressed_size',
3745
3948
'bzrlib.tests.test_export',
3746
3949
'bzrlib.tests.test_export_pot',
3747
3950
'bzrlib.tests.test_extract',
3951
'bzrlib.tests.test_features',
3748
3952
'bzrlib.tests.test_fetch',
3749
3953
'bzrlib.tests.test_fixtures',
3750
3954
'bzrlib.tests.test_fifo_cache',
3751
3955
'bzrlib.tests.test_filters',
3956
'bzrlib.tests.test_filter_tree',
3752
3957
'bzrlib.tests.test_ftp_transport',
3753
3958
'bzrlib.tests.test_foreign',
3754
3959
'bzrlib.tests.test_generate_docs',
4190
4397
% (os.path.basename(dirname), printable_e))
4193
class Feature(object):
4194
"""An operating system Feature."""
4197
self._available = None
4199
def available(self):
4200
"""Is the feature available?
4202
:return: True if the feature is available.
4204
if self._available is None:
4205
self._available = self._probe()
4206
return self._available
4209
"""Implement this method in concrete features.
4211
:return: True if the feature is available.
4213
raise NotImplementedError
4216
if getattr(self, 'feature_name', None):
4217
return self.feature_name()
4218
return self.__class__.__name__
4221
class _SymlinkFeature(Feature):
4224
return osutils.has_symlinks()
4226
def feature_name(self):
4229
SymlinkFeature = _SymlinkFeature()
4232
class _HardlinkFeature(Feature):
4235
return osutils.has_hardlinks()
4237
def feature_name(self):
4240
HardlinkFeature = _HardlinkFeature()
4243
class _OsFifoFeature(Feature):
4246
return getattr(os, 'mkfifo', None)
4248
def feature_name(self):
4249
return 'filesystem fifos'
4251
OsFifoFeature = _OsFifoFeature()
4254
class _UnicodeFilenameFeature(Feature):
4255
"""Does the filesystem support Unicode filenames?"""
4259
# Check for character combinations unlikely to be covered by any
4260
# single non-unicode encoding. We use the characters
4261
# - greek small letter alpha (U+03B1) and
4262
# - braille pattern dots-123456 (U+283F).
4263
os.stat(u'\u03b1\u283f')
4264
except UnicodeEncodeError:
4266
except (IOError, OSError):
4267
# The filesystem allows the Unicode filename but the file doesn't
4271
# The filesystem allows the Unicode filename and the file exists,
4275
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4278
class _CompatabilityThunkFeature(Feature):
4279
"""This feature is just a thunk to another feature.
4281
It issues a deprecation warning if it is accessed, to let you know that you
4282
should really use a different feature.
4285
def __init__(self, dep_version, module, name,
4286
replacement_name, replacement_module=None):
4287
super(_CompatabilityThunkFeature, self).__init__()
4288
self._module = module
4289
if replacement_module is None:
4290
replacement_module = module
4291
self._replacement_module = replacement_module
4293
self._replacement_name = replacement_name
4294
self._dep_version = dep_version
4295
self._feature = None
4298
if self._feature is None:
4299
depr_msg = self._dep_version % ('%s.%s'
4300
% (self._module, self._name))
4301
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4302
self._replacement_name)
4303
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4304
# Import the new feature and use it as a replacement for the
4306
self._feature = pyutils.get_named_object(
4307
self._replacement_module, self._replacement_name)
4311
return self._feature._probe()
4314
class ModuleAvailableFeature(Feature):
4315
"""This is a feature than describes a module we want to be available.
4317
Declare the name of the module in __init__(), and then after probing, the
4318
module will be available as 'self.module'.
4320
:ivar module: The module if it is available, else None.
4323
def __init__(self, module_name):
4324
super(ModuleAvailableFeature, self).__init__()
4325
self.module_name = module_name
4329
self._module = __import__(self.module_name, {}, {}, [''])
4336
if self.available(): # Make sure the probe has been done
4340
def feature_name(self):
4341
return self.module_name
4344
4400
def probe_unicode_in_user_encoding():
4345
4401
"""Try to encode several unicode strings to use in unicode-aware tests.
4346
4402
Return first successfull match.
4377
class _HTTPSServerFeature(Feature):
4378
"""Some tests want an https Server, check if one is available.
4380
Right now, the only way this is available is under python2.6 which provides
4391
def feature_name(self):
4392
return 'HTTPSServer'
4395
HTTPSServerFeature = _HTTPSServerFeature()
4398
class _UnicodeFilename(Feature):
4399
"""Does the filesystem support Unicode filenames?"""
4404
except UnicodeEncodeError:
4406
except (IOError, OSError):
4407
# The filesystem allows the Unicode filename but the file doesn't
4411
# The filesystem allows the Unicode filename and the file exists,
4415
UnicodeFilename = _UnicodeFilename()
4418
class _ByteStringNamedFilesystem(Feature):
4419
"""Is the filesystem based on bytes?"""
4422
if os.name == "posix":
4426
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4429
class _UTF8Filesystem(Feature):
4430
"""Is the filesystem UTF-8?"""
4433
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4437
UTF8Filesystem = _UTF8Filesystem()
4440
class _BreakinFeature(Feature):
4441
"""Does this platform support the breakin feature?"""
4444
from bzrlib import breakin
4445
if breakin.determine_signal() is None:
4447
if sys.platform == 'win32':
4448
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4449
# We trigger SIGBREAK via a Console api so we need ctypes to
4450
# access the function
4457
def feature_name(self):
4458
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4461
BreakinFeature = _BreakinFeature()
4464
class _CaseInsCasePresFilenameFeature(Feature):
4465
"""Is the file-system case insensitive, but case-preserving?"""
4468
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4470
# first check truly case-preserving for created files, then check
4471
# case insensitive when opening existing files.
4472
name = osutils.normpath(name)
4473
base, rel = osutils.split(name)
4474
found_rel = osutils.canonical_relpath(base, name)
4475
return (found_rel == rel
4476
and os.path.isfile(name.upper())
4477
and os.path.isfile(name.lower()))
4482
def feature_name(self):
4483
return "case-insensitive case-preserving filesystem"
4485
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4488
class _CaseInsensitiveFilesystemFeature(Feature):
4489
"""Check if underlying filesystem is case-insensitive but *not* case
4492
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4493
# more likely to be case preserving, so this case is rare.
4496
if CaseInsCasePresFilenameFeature.available():
4499
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4500
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4501
TestCaseWithMemoryTransport.TEST_ROOT = root
4503
root = TestCaseWithMemoryTransport.TEST_ROOT
4504
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4506
name_a = osutils.pathjoin(tdir, 'a')
4507
name_A = osutils.pathjoin(tdir, 'A')
4509
result = osutils.isdir(name_A)
4510
_rmtree_temp_dir(tdir)
4513
def feature_name(self):
4514
return 'case-insensitive filesystem'
4516
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4519
class _CaseSensitiveFilesystemFeature(Feature):
4522
if CaseInsCasePresFilenameFeature.available():
4524
elif CaseInsensitiveFilesystemFeature.available():
4529
def feature_name(self):
4530
return 'case-sensitive filesystem'
4532
# new coding style is for feature instances to be lowercase
4533
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4536
4433
# Only define SubUnitBzrRunner if subunit is available.
4538
4435
from subunit import TestProtocolClient
4564
4461
except ImportError:
4567
class _PosixPermissionsFeature(Feature):
4571
# create temporary file and check if specified perms are maintained.
4574
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4575
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4578
os.chmod(name, write_perms)
4580
read_perms = os.stat(name).st_mode & 0777
4582
return (write_perms == read_perms)
4584
return (os.name == 'posix') and has_perms()
4586
def feature_name(self):
4587
return 'POSIX permissions support'
4589
posix_permissions_feature = _PosixPermissionsFeature()
4465
@deprecated_function(deprecated_in((2, 5, 0)))
4466
def ModuleAvailableFeature(name):
4467
from bzrlib.tests import features
4468
return features.ModuleAvailableFeature(name)