1018
1010
if name in details:
1019
1011
del details[name]
1021
def install_counter_hook(self, hooks, name, counter_name=None):
1022
"""Install a counting hook.
1024
Any hook can be counted as long as it doesn't need to return a value.
1026
:param hooks: Where the hook should be installed.
1028
:param name: The hook name that will be counted.
1030
:param counter_name: The counter identifier in ``_counters``, defaults
1033
_counters = self._counters # Avoid closing over self
1034
if counter_name is None:
1036
if _counters.has_key(counter_name):
1037
raise AssertionError('%s is already used as a counter name'
1039
_counters[counter_name] = 0
1040
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1041
lambda: ['%d' % (_counters[counter_name],)]))
1042
def increment_counter(*args, **kwargs):
1043
_counters[counter_name] += 1
1044
label = 'count %s calls' % (counter_name,)
1045
hooks.install_named_hook(name, increment_counter, label)
1046
self.addCleanup(hooks.uninstall_named_hook, name, label)
1048
def _install_config_stats_hooks(self):
1049
"""Install config hooks to count hook calls.
1052
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1053
self.install_counter_hook(config.ConfigHooks, hook_name,
1054
'config.%s' % (hook_name,))
1056
# The OldConfigHooks are private and need special handling to protect
1057
# against recursive tests (tests that run other tests), so we just do
1058
# manually what registering them into _builtin_known_hooks will provide
1060
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1061
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
self.install_counter_hook(config.OldConfigHooks, hook_name,
1063
'old_config.%s' % (hook_name,))
1065
1013
def _clear_debug_flags(self):
1066
1014
"""Prevent externally set debug flags affecting tests.
1121
1069
# break some locks on purpose and should be taken into account by
1122
1070
# considering that breaking a lock is just a dirty way of releasing it.
1123
1071
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1125
'Different number of acquired and '
1126
'released or broken locks.\n'
1130
(acquired_locks, released_locks, broken_locks))
1072
message = ('Different number of acquired and '
1073
'released or broken locks. (%s, %s + %s)' %
1074
(acquired_locks, released_locks, broken_locks))
1131
1075
if not self._lock_check_thorough:
1132
1076
# Rather than fail, just warn
1133
1077
print "Broken test %s: %s" % (self, message)
1761
1701
self.addCleanup(osutils.set_or_unset_env, name, value)
1764
def recordCalls(self, obj, attr_name):
1765
"""Monkeypatch in a wrapper that will record calls.
1767
The monkeypatch is automatically removed when the test concludes.
1769
:param obj: The namespace holding the reference to be replaced;
1770
typically a module, class, or object.
1771
:param attr_name: A string for the name of the attribute to
1773
:returns: A list that will be extended with one item every time the
1774
function is called, with a tuple of (args, kwargs).
1778
def decorator(*args, **kwargs):
1779
calls.append((args, kwargs))
1780
return orig(*args, **kwargs)
1781
orig = self.overrideAttr(obj, attr_name, decorator)
1784
1704
def _cleanEnvironment(self):
1785
1705
for name, value in isolated_environ.iteritems():
1786
1706
self.overrideEnv(name, value)
1793
1713
self._preserved_lazy_hooks.clear()
1795
1715
def knownFailure(self, reason):
1796
"""Declare that this test fails for a known reason
1798
Tests that are known to fail should generally be using expectedFailure
1799
with an appropriate reverse assertion if a change could cause the test
1800
to start passing. Conversely if the test has no immediate prospect of
1801
succeeding then using skip is more suitable.
1803
When this method is called while an exception is being handled, that
1804
traceback will be used, otherwise a new exception will be thrown to
1805
provide one but won't be reported.
1807
self._add_reason(reason)
1809
exc_info = sys.exc_info()
1810
if exc_info != (None, None, None):
1811
self._report_traceback(exc_info)
1814
raise self.failureException(reason)
1815
except self.failureException:
1816
exc_info = sys.exc_info()
1817
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1818
raise testtools.testcase._ExpectedFailure(exc_info)
1716
"""This test has failed for some known reason."""
1717
raise KnownFailure(reason)
1822
1719
def _suppress_log(self):
1823
1720
"""Remove the log info from details."""
2191
def _add_subprocess_log(self, log_file_path):
2192
if len(self._log_files) == 0:
2193
# Register an addCleanup func. We do this on the first call to
2194
# _add_subprocess_log rather than in TestCase.setUp so that this
2195
# addCleanup is registered after any cleanups for tempdirs that
2196
# subclasses might create, which will probably remove the log file
2198
self.addCleanup(self._subprocess_log_cleanup)
2199
# self._log_files is a set, so if a log file is reused we won't grab it
2201
self._log_files.add(log_file_path)
2203
def _subprocess_log_cleanup(self):
2204
for count, log_file_path in enumerate(self._log_files):
2205
# We use buffer_now=True to avoid holding the file open beyond
2206
# the life of this function, which might interfere with e.g.
2207
# cleaning tempdirs on Windows.
2208
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2209
#detail_content = content.content_from_file(
2210
# log_file_path, buffer_now=True)
2211
with open(log_file_path, 'rb') as log_file:
2212
log_file_bytes = log_file.read()
2213
detail_content = content.Content(content.ContentType("text",
2214
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2215
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2218
2085
def _popen(self, *args, **kwargs):
2219
2086
"""Place a call to Popen.
2372
2239
class TestCaseWithMemoryTransport(TestCase):
2373
2240
"""Common test class for tests that do not need disk resources.
2375
Tests that need disk resources should derive from TestCaseInTempDir
2376
orTestCaseWithTransport.
2242
Tests that need disk resources should derive from TestCaseWithTransport.
2378
2244
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2380
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2246
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2381
2247
a directory which does not exist. This serves to help ensure test isolation
2382
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2383
must exist. However, TestCaseWithMemoryTransport does not offer local file
2384
defaults for the transport in tests, nor does it obey the command line
2248
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2249
must exist. However, TestCaseWithMemoryTransport does not offer local
2250
file defaults for the transport in tests, nor does it obey the command line
2385
2251
override, so tests that accidentally write to the common directory should
2388
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2389
``.bzr`` directory that stops us ascending higher into the filesystem.
2254
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2255
a .bzr directory that stops us ascending higher into the filesystem.
2392
2258
TEST_ROOT = None
2664
2526
def setUp(self):
2665
2527
super(TestCaseWithMemoryTransport, self).setUp()
2666
2528
# Ensure that ConnectedTransport doesn't leak sockets
2667
def get_transport_from_url_with_cleanup(*args, **kwargs):
2668
t = orig_get_transport_from_url(*args, **kwargs)
2529
def get_transport_with_cleanup(*args, **kwargs):
2530
t = orig_get_transport(*args, **kwargs)
2669
2531
if isinstance(t, _mod_transport.ConnectedTransport):
2670
2532
self.addCleanup(t.disconnect)
2673
orig_get_transport_from_url = self.overrideAttr(
2674
_mod_transport, 'get_transport_from_url',
2675
get_transport_from_url_with_cleanup)
2535
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2536
get_transport_with_cleanup)
2676
2537
self._make_test_root()
2677
2538
self.addCleanup(os.chdir, os.getcwdu())
2678
2539
self.makeAndChdirToTestDir()
3937
3790
'bzrlib.tests.test_export',
3938
3791
'bzrlib.tests.test_export_pot',
3939
3792
'bzrlib.tests.test_extract',
3940
'bzrlib.tests.test_features',
3941
3793
'bzrlib.tests.test_fetch',
3942
3794
'bzrlib.tests.test_fixtures',
3943
3795
'bzrlib.tests.test_fifo_cache',
3944
3796
'bzrlib.tests.test_filters',
3945
'bzrlib.tests.test_filter_tree',
3946
3797
'bzrlib.tests.test_ftp_transport',
3947
3798
'bzrlib.tests.test_foreign',
3948
3799
'bzrlib.tests.test_generate_docs',
4386
4236
% (os.path.basename(dirname), printable_e))
4239
class Feature(object):
4240
"""An operating system Feature."""
4243
self._available = None
4245
def available(self):
4246
"""Is the feature available?
4248
:return: True if the feature is available.
4250
if self._available is None:
4251
self._available = self._probe()
4252
return self._available
4255
"""Implement this method in concrete features.
4257
:return: True if the feature is available.
4259
raise NotImplementedError
4262
if getattr(self, 'feature_name', None):
4263
return self.feature_name()
4264
return self.__class__.__name__
4267
class _SymlinkFeature(Feature):
4270
return osutils.has_symlinks()
4272
def feature_name(self):
4275
SymlinkFeature = _SymlinkFeature()
4278
class _HardlinkFeature(Feature):
4281
return osutils.has_hardlinks()
4283
def feature_name(self):
4286
HardlinkFeature = _HardlinkFeature()
4289
class _OsFifoFeature(Feature):
4292
return getattr(os, 'mkfifo', None)
4294
def feature_name(self):
4295
return 'filesystem fifos'
4297
OsFifoFeature = _OsFifoFeature()
4300
class _UnicodeFilenameFeature(Feature):
4301
"""Does the filesystem support Unicode filenames?"""
4305
# Check for character combinations unlikely to be covered by any
4306
# single non-unicode encoding. We use the characters
4307
# - greek small letter alpha (U+03B1) and
4308
# - braille pattern dots-123456 (U+283F).
4309
os.stat(u'\u03b1\u283f')
4310
except UnicodeEncodeError:
4312
except (IOError, OSError):
4313
# The filesystem allows the Unicode filename but the file doesn't
4317
# The filesystem allows the Unicode filename and the file exists,
4321
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4324
class _CompatabilityThunkFeature(Feature):
4325
"""This feature is just a thunk to another feature.
4327
It issues a deprecation warning if it is accessed, to let you know that you
4328
should really use a different feature.
4331
def __init__(self, dep_version, module, name,
4332
replacement_name, replacement_module=None):
4333
super(_CompatabilityThunkFeature, self).__init__()
4334
self._module = module
4335
if replacement_module is None:
4336
replacement_module = module
4337
self._replacement_module = replacement_module
4339
self._replacement_name = replacement_name
4340
self._dep_version = dep_version
4341
self._feature = None
4344
if self._feature is None:
4345
depr_msg = self._dep_version % ('%s.%s'
4346
% (self._module, self._name))
4347
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4348
self._replacement_name)
4349
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4350
# Import the new feature and use it as a replacement for the
4352
self._feature = pyutils.get_named_object(
4353
self._replacement_module, self._replacement_name)
4357
return self._feature._probe()
4360
class ModuleAvailableFeature(Feature):
4361
"""This is a feature than describes a module we want to be available.
4363
Declare the name of the module in __init__(), and then after probing, the
4364
module will be available as 'self.module'.
4366
:ivar module: The module if it is available, else None.
4369
def __init__(self, module_name):
4370
super(ModuleAvailableFeature, self).__init__()
4371
self.module_name = module_name
4375
self._module = __import__(self.module_name, {}, {}, [''])
4382
if self.available(): # Make sure the probe has been done
4386
def feature_name(self):
4387
return self.module_name
4389
4390
def probe_unicode_in_user_encoding():
4390
4391
"""Try to encode several unicode strings to use in unicode-aware tests.
4391
4392
Return first successfull match.
4423
class _HTTPSServerFeature(Feature):
4424
"""Some tests want an https Server, check if one is available.
4426
Right now, the only way this is available is under python2.6 which provides
4437
def feature_name(self):
4438
return 'HTTPSServer'
4441
HTTPSServerFeature = _HTTPSServerFeature()
4444
class _UnicodeFilename(Feature):
4445
"""Does the filesystem support Unicode filenames?"""
4450
except UnicodeEncodeError:
4452
except (IOError, OSError):
4453
# The filesystem allows the Unicode filename but the file doesn't
4457
# The filesystem allows the Unicode filename and the file exists,
4461
UnicodeFilename = _UnicodeFilename()
4464
class _ByteStringNamedFilesystem(Feature):
4465
"""Is the filesystem based on bytes?"""
4468
if os.name == "posix":
4472
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4475
class _UTF8Filesystem(Feature):
4476
"""Is the filesystem UTF-8?"""
4479
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4483
UTF8Filesystem = _UTF8Filesystem()
4486
class _BreakinFeature(Feature):
4487
"""Does this platform support the breakin feature?"""
4490
from bzrlib import breakin
4491
if breakin.determine_signal() is None:
4493
if sys.platform == 'win32':
4494
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4495
# We trigger SIGBREAK via a Console api so we need ctypes to
4496
# access the function
4503
def feature_name(self):
4504
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4507
BreakinFeature = _BreakinFeature()
4510
class _CaseInsCasePresFilenameFeature(Feature):
4511
"""Is the file-system case insensitive, but case-preserving?"""
4514
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4516
# first check truly case-preserving for created files, then check
4517
# case insensitive when opening existing files.
4518
name = osutils.normpath(name)
4519
base, rel = osutils.split(name)
4520
found_rel = osutils.canonical_relpath(base, name)
4521
return (found_rel == rel
4522
and os.path.isfile(name.upper())
4523
and os.path.isfile(name.lower()))
4528
def feature_name(self):
4529
return "case-insensitive case-preserving filesystem"
4531
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4534
class _CaseInsensitiveFilesystemFeature(Feature):
4535
"""Check if underlying filesystem is case-insensitive but *not* case
4538
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4539
# more likely to be case preserving, so this case is rare.
4542
if CaseInsCasePresFilenameFeature.available():
4545
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4546
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4547
TestCaseWithMemoryTransport.TEST_ROOT = root
4549
root = TestCaseWithMemoryTransport.TEST_ROOT
4550
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4552
name_a = osutils.pathjoin(tdir, 'a')
4553
name_A = osutils.pathjoin(tdir, 'A')
4555
result = osutils.isdir(name_A)
4556
_rmtree_temp_dir(tdir)
4559
def feature_name(self):
4560
return 'case-insensitive filesystem'
4562
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4565
class _CaseSensitiveFilesystemFeature(Feature):
4568
if CaseInsCasePresFilenameFeature.available():
4570
elif CaseInsensitiveFilesystemFeature.available():
4575
def feature_name(self):
4576
return 'case-sensitive filesystem'
4578
# new coding style is for feature instances to be lowercase
4579
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4422
4582
# Only define SubUnitBzrRunner if subunit is available.
4424
4584
from subunit import TestProtocolClient
4442
4602
except ImportError:
4446
@deprecated_function(deprecated_in((2, 5, 0)))
4447
def ModuleAvailableFeature(name):
4448
from bzrlib.tests import features
4449
return features.ModuleAvailableFeature(name)
4605
class _PosixPermissionsFeature(Feature):
4609
# create temporary file and check if specified perms are maintained.
4612
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4613
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4616
os.chmod(name, write_perms)
4618
read_perms = os.stat(name).st_mode & 0777
4620
return (write_perms == read_perms)
4622
return (os.name == 'posix') and has_perms()
4624
def feature_name(self):
4625
return 'POSIX permissions support'
4627
posix_permissions_feature = _PosixPermissionsFeature()