377
378
if isinstance(test, TestCase):
378
379
test.addCleanup(self._check_leaked_threads, test)
381
def stopTest(self, test):
382
super(ExtendedTestResult, self).stopTest(test)
383
# Manually break cycles, means touching various private things but hey
384
getDetails = getattr(test, "getDetails", None)
385
if getDetails is not None:
387
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
388
if type_equality_funcs is not None:
389
type_equality_funcs.clear()
390
self._traceback_from_test = None
380
392
def startTests(self):
381
393
self.report_tests_starting()
382
394
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
387
396
def _check_leaked_threads(self, test):
388
397
"""See if any threads have leaked since last call
1006
1018
if name in details:
1007
1019
del details[name]
1021
def install_counter_hook(self, hooks, name, counter_name=None):
1022
"""Install a counting hook.
1024
Any hook can be counted as long as it doesn't need to return a value.
1026
:param hooks: Where the hook should be installed.
1028
:param name: The hook name that will be counted.
1030
:param counter_name: The counter identifier in ``_counters``, defaults
1033
_counters = self._counters # Avoid closing over self
1034
if counter_name is None:
1036
if _counters.has_key(counter_name):
1037
raise AssertionError('%s is already used as a counter name'
1039
_counters[counter_name] = 0
1040
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1041
lambda: ['%d' % (_counters[counter_name],)]))
1042
def increment_counter(*args, **kwargs):
1043
_counters[counter_name] += 1
1044
label = 'count %s calls' % (counter_name,)
1045
hooks.install_named_hook(name, increment_counter, label)
1046
self.addCleanup(hooks.uninstall_named_hook, name, label)
1048
def _install_config_stats_hooks(self):
1049
"""Install config hooks to count hook calls.
1052
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1053
self.install_counter_hook(config.ConfigHooks, hook_name,
1054
'config.%s' % (hook_name,))
1056
# The OldConfigHooks are private and need special handling to protect
1057
# against recursive tests (tests that run other tests), so we just do
1058
# manually what registering them into _builtin_known_hooks will provide
1060
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1061
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
self.install_counter_hook(config.OldConfigHooks, hook_name,
1063
'old_config.%s' % (hook_name,))
1009
1065
def _clear_debug_flags(self):
1010
1066
"""Prevent externally set debug flags affecting tests.
1065
1121
# break some locks on purpose and should be taken into account by
1066
1122
# considering that breaking a lock is just a dirty way of releasing it.
1067
1123
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1068
message = ('Different number of acquired and '
1069
'released or broken locks. (%s, %s + %s)' %
1070
(acquired_locks, released_locks, broken_locks))
1125
'Different number of acquired and '
1126
'released or broken locks.\n'
1130
(acquired_locks, released_locks, broken_locks))
1071
1131
if not self._lock_check_thorough:
1072
1132
# Rather than fail, just warn
1073
1133
print "Broken test %s: %s" % (self, message)
1631
1692
The file is removed as the test is torn down.
1633
self._log_file = StringIO()
1694
pseudo_log_file = StringIO()
1695
def _get_log_contents_for_weird_testtools_api():
1696
return [pseudo_log_file.getvalue().decode(
1697
"utf-8", "replace").encode("utf-8")]
1698
self.addDetail("log", content.Content(content.ContentType("text",
1699
"plain", {"charset": "utf8"}),
1700
_get_log_contents_for_weird_testtools_api))
1701
self._log_file = pseudo_log_file
1634
1702
self._log_memento = trace.push_log_file(self._log_file)
1635
1703
self.addCleanup(self._finishLogFile)
1637
1705
def _finishLogFile(self):
1638
1706
"""Finished with the log file.
1640
Close the file and delete it, unless setKeepLogfile was called.
1708
Close the file and delete it.
1642
1710
if trace._trace_file:
1643
1711
# flush the log file, to get all content
1644
1712
trace._trace_file.flush()
1645
1713
trace.pop_log_file(self._log_memento)
1646
# Cache the log result and delete the file on disk
1647
self._get_log(False)
1649
1715
def thisFailsStrictLockCheck(self):
1650
1716
"""It is known that this test would fail with -Dstrict_locks.
1692
1761
self.addCleanup(osutils.set_or_unset_env, name, value)
1764
def recordCalls(self, obj, attr_name):
1765
"""Monkeypatch in a wrapper that will record calls.
1767
The monkeypatch is automatically removed when the test concludes.
1769
:param obj: The namespace holding the reference to be replaced;
1770
typically a module, class, or object.
1771
:param attr_name: A string for the name of the attribute to
1773
:returns: A list that will be extended with one item every time the
1774
function is called, with a tuple of (args, kwargs).
1778
def decorator(*args, **kwargs):
1779
calls.append((args, kwargs))
1780
return orig(*args, **kwargs)
1781
orig = self.overrideAttr(obj, attr_name, decorator)
1695
1784
def _cleanEnvironment(self):
1696
1785
for name, value in isolated_environ.iteritems():
1697
1786
self.overrideEnv(name, value)
1699
1788
def _restoreHooks(self):
1700
1789
for klass, (name, hooks) in self._preserved_hooks.items():
1701
1790
setattr(klass, name, hooks)
1702
hooks._lazy_hooks = self._preserved_lazy_hooks
1791
self._preserved_hooks.clear()
1792
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1793
self._preserved_lazy_hooks.clear()
1704
1795
def knownFailure(self, reason):
1705
"""This test has failed for some known reason."""
1706
raise KnownFailure(reason)
1796
"""Declare that this test fails for a known reason
1798
Tests that are known to fail should generally be using expectedFailure
1799
with an appropriate reverse assertion if a change could cause the test
1800
to start passing. Conversely if the test has no immediate prospect of
1801
succeeding then using skip is more suitable.
1803
When this method is called while an exception is being handled, that
1804
traceback will be used, otherwise a new exception will be thrown to
1805
provide one but won't be reported.
1807
self._add_reason(reason)
1809
exc_info = sys.exc_info()
1810
if exc_info != (None, None, None):
1811
self._report_traceback(exc_info)
1814
raise self.failureException(reason)
1815
except self.failureException:
1816
exc_info = sys.exc_info()
1817
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1818
raise testtools.testcase._ExpectedFailure(exc_info)
1708
1822
def _suppress_log(self):
1709
1823
"""Remove the log info from details."""
1797
1911
def log(self, *args):
1798
1912
trace.mutter(*args)
1800
def _get_log(self, keep_log_file=False):
1801
"""Internal helper to get the log from bzrlib.trace for this test.
1803
Please use self.getDetails, or self.get_log to access this in test case
1806
:param keep_log_file: When True, if the log is still a file on disk
1807
leave it as a file on disk. When False, if the log is still a file
1808
on disk, the log file is deleted and the log preserved as
1810
:return: A string containing the log.
1812
if self._log_contents is not None:
1814
self._log_contents.decode('utf8')
1815
except UnicodeDecodeError:
1816
unicodestr = self._log_contents.decode('utf8', 'replace')
1817
self._log_contents = unicodestr.encode('utf8')
1818
return self._log_contents
1819
if self._log_file is not None:
1820
log_contents = self._log_file.getvalue()
1822
log_contents.decode('utf8')
1823
except UnicodeDecodeError:
1824
unicodestr = log_contents.decode('utf8', 'replace')
1825
log_contents = unicodestr.encode('utf8')
1826
if not keep_log_file:
1827
self._log_file = None
1828
# Permit multiple calls to get_log until we clean it up in
1830
self._log_contents = log_contents
1833
return "No log file content."
1835
1914
def get_log(self):
1836
1915
"""Get a unicode string containing the log from bzrlib.trace.
2191
def _add_subprocess_log(self, log_file_path):
2192
if len(self._log_files) == 0:
2193
# Register an addCleanup func. We do this on the first call to
2194
# _add_subprocess_log rather than in TestCase.setUp so that this
2195
# addCleanup is registered after any cleanups for tempdirs that
2196
# subclasses might create, which will probably remove the log file
2198
self.addCleanup(self._subprocess_log_cleanup)
2199
# self._log_files is a set, so if a log file is reused we won't grab it
2201
self._log_files.add(log_file_path)
2203
def _subprocess_log_cleanup(self):
2204
for count, log_file_path in enumerate(self._log_files):
2205
# We use buffer_now=True to avoid holding the file open beyond
2206
# the life of this function, which might interfere with e.g.
2207
# cleaning tempdirs on Windows.
2208
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2209
#detail_content = content.content_from_file(
2210
# log_file_path, buffer_now=True)
2211
with open(log_file_path, 'rb') as log_file:
2212
log_file_bytes = log_file.read()
2213
detail_content = content.Content(content.ContentType("text",
2214
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2215
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2106
2218
def _popen(self, *args, **kwargs):
2107
2219
"""Place a call to Popen.
2260
2372
class TestCaseWithMemoryTransport(TestCase):
2261
2373
"""Common test class for tests that do not need disk resources.
2263
Tests that need disk resources should derive from TestCaseWithTransport.
2375
Tests that need disk resources should derive from TestCaseInTempDir
2376
orTestCaseWithTransport.
2265
2378
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2267
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2380
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2268
2381
a directory which does not exist. This serves to help ensure test isolation
2269
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2270
must exist. However, TestCaseWithMemoryTransport does not offer local
2271
file defaults for the transport in tests, nor does it obey the command line
2382
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2383
must exist. However, TestCaseWithMemoryTransport does not offer local file
2384
defaults for the transport in tests, nor does it obey the command line
2272
2385
override, so tests that accidentally write to the common directory should
2275
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2276
a .bzr directory that stops us ascending higher into the filesystem.
2388
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2389
``.bzr`` directory that stops us ascending higher into the filesystem.
2279
2392
TEST_ROOT = None
2547
2664
def setUp(self):
2548
2665
super(TestCaseWithMemoryTransport, self).setUp()
2549
2666
# Ensure that ConnectedTransport doesn't leak sockets
2550
def get_transport_with_cleanup(*args, **kwargs):
2551
t = orig_get_transport(*args, **kwargs)
2667
def get_transport_from_url_with_cleanup(*args, **kwargs):
2668
t = orig_get_transport_from_url(*args, **kwargs)
2552
2669
if isinstance(t, _mod_transport.ConnectedTransport):
2553
2670
self.addCleanup(t.disconnect)
2556
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2557
get_transport_with_cleanup)
2673
orig_get_transport_from_url = self.overrideAttr(
2674
_mod_transport, 'get_transport_from_url',
2675
get_transport_from_url_with_cleanup)
2558
2676
self._make_test_root()
2559
2677
self.addCleanup(os.chdir, os.getcwdu())
2560
2678
self.makeAndChdirToTestDir()
3811
3937
'bzrlib.tests.test_export',
3812
3938
'bzrlib.tests.test_export_pot',
3813
3939
'bzrlib.tests.test_extract',
3940
'bzrlib.tests.test_features',
3814
3941
'bzrlib.tests.test_fetch',
3815
3942
'bzrlib.tests.test_fixtures',
3816
3943
'bzrlib.tests.test_fifo_cache',
3817
3944
'bzrlib.tests.test_filters',
3945
'bzrlib.tests.test_filter_tree',
3818
3946
'bzrlib.tests.test_ftp_transport',
3819
3947
'bzrlib.tests.test_foreign',
3820
3948
'bzrlib.tests.test_generate_docs',
4256
4386
% (os.path.basename(dirname), printable_e))
4259
class Feature(object):
4260
"""An operating system Feature."""
4263
self._available = None
4265
def available(self):
4266
"""Is the feature available?
4268
:return: True if the feature is available.
4270
if self._available is None:
4271
self._available = self._probe()
4272
return self._available
4275
"""Implement this method in concrete features.
4277
:return: True if the feature is available.
4279
raise NotImplementedError
4282
if getattr(self, 'feature_name', None):
4283
return self.feature_name()
4284
return self.__class__.__name__
4287
class _SymlinkFeature(Feature):
4290
return osutils.has_symlinks()
4292
def feature_name(self):
4295
SymlinkFeature = _SymlinkFeature()
4298
class _HardlinkFeature(Feature):
4301
return osutils.has_hardlinks()
4303
def feature_name(self):
4306
HardlinkFeature = _HardlinkFeature()
4309
class _OsFifoFeature(Feature):
4312
return getattr(os, 'mkfifo', None)
4314
def feature_name(self):
4315
return 'filesystem fifos'
4317
OsFifoFeature = _OsFifoFeature()
4320
class _UnicodeFilenameFeature(Feature):
4321
"""Does the filesystem support Unicode filenames?"""
4325
# Check for character combinations unlikely to be covered by any
4326
# single non-unicode encoding. We use the characters
4327
# - greek small letter alpha (U+03B1) and
4328
# - braille pattern dots-123456 (U+283F).
4329
os.stat(u'\u03b1\u283f')
4330
except UnicodeEncodeError:
4332
except (IOError, OSError):
4333
# The filesystem allows the Unicode filename but the file doesn't
4337
# The filesystem allows the Unicode filename and the file exists,
4341
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4344
class _CompatabilityThunkFeature(Feature):
4345
"""This feature is just a thunk to another feature.
4347
It issues a deprecation warning if it is accessed, to let you know that you
4348
should really use a different feature.
4351
def __init__(self, dep_version, module, name,
4352
replacement_name, replacement_module=None):
4353
super(_CompatabilityThunkFeature, self).__init__()
4354
self._module = module
4355
if replacement_module is None:
4356
replacement_module = module
4357
self._replacement_module = replacement_module
4359
self._replacement_name = replacement_name
4360
self._dep_version = dep_version
4361
self._feature = None
4364
if self._feature is None:
4365
depr_msg = self._dep_version % ('%s.%s'
4366
% (self._module, self._name))
4367
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4368
self._replacement_name)
4369
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4370
# Import the new feature and use it as a replacement for the
4372
self._feature = pyutils.get_named_object(
4373
self._replacement_module, self._replacement_name)
4377
return self._feature._probe()
4380
class ModuleAvailableFeature(Feature):
4381
"""This is a feature than describes a module we want to be available.
4383
Declare the name of the module in __init__(), and then after probing, the
4384
module will be available as 'self.module'.
4386
:ivar module: The module if it is available, else None.
4389
def __init__(self, module_name):
4390
super(ModuleAvailableFeature, self).__init__()
4391
self.module_name = module_name
4395
self._module = __import__(self.module_name, {}, {}, [''])
4402
if self.available(): # Make sure the probe has been done
4406
def feature_name(self):
4407
return self.module_name
4410
4389
def probe_unicode_in_user_encoding():
4411
4390
"""Try to encode several unicode strings to use in unicode-aware tests.
4412
4391
Return first successfull match.
4443
class _HTTPSServerFeature(Feature):
4444
"""Some tests want an https Server, check if one is available.
4446
Right now, the only way this is available is under python2.6 which provides
4457
def feature_name(self):
4458
return 'HTTPSServer'
4461
HTTPSServerFeature = _HTTPSServerFeature()
4464
class _UnicodeFilename(Feature):
4465
"""Does the filesystem support Unicode filenames?"""
4470
except UnicodeEncodeError:
4472
except (IOError, OSError):
4473
# The filesystem allows the Unicode filename but the file doesn't
4477
# The filesystem allows the Unicode filename and the file exists,
4481
UnicodeFilename = _UnicodeFilename()
4484
class _ByteStringNamedFilesystem(Feature):
4485
"""Is the filesystem based on bytes?"""
4488
if os.name == "posix":
4492
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4495
class _UTF8Filesystem(Feature):
4496
"""Is the filesystem UTF-8?"""
4499
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4503
UTF8Filesystem = _UTF8Filesystem()
4506
class _BreakinFeature(Feature):
4507
"""Does this platform support the breakin feature?"""
4510
from bzrlib import breakin
4511
if breakin.determine_signal() is None:
4513
if sys.platform == 'win32':
4514
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4515
# We trigger SIGBREAK via a Console api so we need ctypes to
4516
# access the function
4523
def feature_name(self):
4524
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4527
BreakinFeature = _BreakinFeature()
4530
class _CaseInsCasePresFilenameFeature(Feature):
4531
"""Is the file-system case insensitive, but case-preserving?"""
4534
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4536
# first check truly case-preserving for created files, then check
4537
# case insensitive when opening existing files.
4538
name = osutils.normpath(name)
4539
base, rel = osutils.split(name)
4540
found_rel = osutils.canonical_relpath(base, name)
4541
return (found_rel == rel
4542
and os.path.isfile(name.upper())
4543
and os.path.isfile(name.lower()))
4548
def feature_name(self):
4549
return "case-insensitive case-preserving filesystem"
4551
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4554
class _CaseInsensitiveFilesystemFeature(Feature):
4555
"""Check if underlying filesystem is case-insensitive but *not* case
4558
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4559
# more likely to be case preserving, so this case is rare.
4562
if CaseInsCasePresFilenameFeature.available():
4565
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4566
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4567
TestCaseWithMemoryTransport.TEST_ROOT = root
4569
root = TestCaseWithMemoryTransport.TEST_ROOT
4570
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4572
name_a = osutils.pathjoin(tdir, 'a')
4573
name_A = osutils.pathjoin(tdir, 'A')
4575
result = osutils.isdir(name_A)
4576
_rmtree_temp_dir(tdir)
4579
def feature_name(self):
4580
return 'case-insensitive filesystem'
4582
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4585
class _CaseSensitiveFilesystemFeature(Feature):
4588
if CaseInsCasePresFilenameFeature.available():
4590
elif CaseInsensitiveFilesystemFeature.available():
4595
def feature_name(self):
4596
return 'case-sensitive filesystem'
4598
# new coding style is for feature instances to be lowercase
4599
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4602
4422
# Only define SubUnitBzrRunner if subunit is available.
4604
4424
from subunit import TestProtocolClient
4622
4442
except ImportError:
4625
class _PosixPermissionsFeature(Feature):
4629
# create temporary file and check if specified perms are maintained.
4632
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4633
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4636
os.chmod(name, write_perms)
4638
read_perms = os.stat(name).st_mode & 0777
4640
return (write_perms == read_perms)
4642
return (os.name == 'posix') and has_perms()
4644
def feature_name(self):
4645
return 'POSIX permissions support'
4647
posix_permissions_feature = _PosixPermissionsFeature()
4446
@deprecated_function(deprecated_in((2, 5, 0)))
4447
def ModuleAvailableFeature(name):
4448
from bzrlib.tests import features
4449
return features.ModuleAvailableFeature(name)