385
383
getDetails = getattr(test, "getDetails", None)
386
384
if getDetails is not None:
387
385
getDetails().clear()
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
386
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
387
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
388
type_equality_funcs.clear()
400
389
self._traceback_from_test = None
402
391
def startTests(self):
999
988
# settled on or a the FIXME associated with _get_expand_default_value
1000
989
# is addressed -- vila 20110219
1001
990
self.overrideAttr(config, '_expand_default_value', None)
1002
self._log_files = set()
1003
# Each key in the ``_counters`` dict holds a value for a different
1004
# counter. When the test ends, addDetail() should be used to output the
1005
# counter values. This happens in install_counter_hook().
1007
if 'config_stats' in selftest_debug_flags:
1008
self._install_config_stats_hooks()
1009
# Do not use i18n for tests (unless the test reverses this)
1012
992
def debug(self):
1013
993
# debug a frame up.
1015
# The sys preserved stdin/stdout should allow blackbox tests debugging
1016
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
).set_trace(sys._getframe().f_back)
995
pdb.Pdb().set_trace(sys._getframe().f_back)
1019
997
def discardDetail(self, name):
1020
998
"""Extend the addDetail, getDetails api so we can remove a detail.
1032
1010
if name in details:
1033
1011
del details[name]
1035
def install_counter_hook(self, hooks, name, counter_name=None):
1036
"""Install a counting hook.
1038
Any hook can be counted as long as it doesn't need to return a value.
1040
:param hooks: Where the hook should be installed.
1042
:param name: The hook name that will be counted.
1044
:param counter_name: The counter identifier in ``_counters``, defaults
1047
_counters = self._counters # Avoid closing over self
1048
if counter_name is None:
1050
if _counters.has_key(counter_name):
1051
raise AssertionError('%s is already used as a counter name'
1053
_counters[counter_name] = 0
1054
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
lambda: ['%d' % (_counters[counter_name],)]))
1056
def increment_counter(*args, **kwargs):
1057
_counters[counter_name] += 1
1058
label = 'count %s calls' % (counter_name,)
1059
hooks.install_named_hook(name, increment_counter, label)
1060
self.addCleanup(hooks.uninstall_named_hook, name, label)
1062
def _install_config_stats_hooks(self):
1063
"""Install config hooks to count hook calls.
1066
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
self.install_counter_hook(config.ConfigHooks, hook_name,
1068
'config.%s' % (hook_name,))
1070
# The OldConfigHooks are private and need special handling to protect
1071
# against recursive tests (tests that run other tests), so we just do
1072
# manually what registering them into _builtin_known_hooks will provide
1074
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
'old_config.%s' % (hook_name,))
1079
1013
def _clear_debug_flags(self):
1080
1014
"""Prevent externally set debug flags affecting tests.
1135
1069
# break some locks on purpose and should be taken into account by
1136
1070
# considering that breaking a lock is just a dirty way of releasing it.
1137
1071
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1139
'Different number of acquired and '
1140
'released or broken locks.\n'
1144
(acquired_locks, released_locks, broken_locks))
1072
message = ('Different number of acquired and '
1073
'released or broken locks. (%s, %s + %s)' %
1074
(acquired_locks, released_locks, broken_locks))
1145
1075
if not self._lock_check_thorough:
1146
1076
# Rather than fail, just warn
1147
1077
print "Broken test %s: %s" % (self, message)
1775
1702
self.addCleanup(osutils.set_or_unset_env, name, value)
1778
def recordCalls(self, obj, attr_name):
1779
"""Monkeypatch in a wrapper that will record calls.
1781
The monkeypatch is automatically removed when the test concludes.
1783
:param obj: The namespace holding the reference to be replaced;
1784
typically a module, class, or object.
1785
:param attr_name: A string for the name of the attribute to
1787
:returns: A list that will be extended with one item every time the
1788
function is called, with a tuple of (args, kwargs).
1792
def decorator(*args, **kwargs):
1793
calls.append((args, kwargs))
1794
return orig(*args, **kwargs)
1795
orig = self.overrideAttr(obj, attr_name, decorator)
1798
1705
def _cleanEnvironment(self):
1799
1706
for name, value in isolated_environ.iteritems():
1800
1707
self.overrideEnv(name, value)
1807
1714
self._preserved_lazy_hooks.clear()
1809
1716
def knownFailure(self, reason):
1810
"""Declare that this test fails for a known reason
1812
Tests that are known to fail should generally be using expectedFailure
1813
with an appropriate reverse assertion if a change could cause the test
1814
to start passing. Conversely if the test has no immediate prospect of
1815
succeeding then using skip is more suitable.
1817
When this method is called while an exception is being handled, that
1818
traceback will be used, otherwise a new exception will be thrown to
1819
provide one but won't be reported.
1821
self._add_reason(reason)
1823
exc_info = sys.exc_info()
1824
if exc_info != (None, None, None):
1825
self._report_traceback(exc_info)
1828
raise self.failureException(reason)
1829
except self.failureException:
1830
exc_info = sys.exc_info()
1831
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
raise testtools.testcase._ExpectedFailure(exc_info)
1717
"""This test has failed for some known reason."""
1718
raise KnownFailure(reason)
1836
1720
def _suppress_log(self):
1837
1721
"""Remove the log info from details."""
2205
def _add_subprocess_log(self, log_file_path):
2206
if len(self._log_files) == 0:
2207
# Register an addCleanup func. We do this on the first call to
2208
# _add_subprocess_log rather than in TestCase.setUp so that this
2209
# addCleanup is registered after any cleanups for tempdirs that
2210
# subclasses might create, which will probably remove the log file
2212
self.addCleanup(self._subprocess_log_cleanup)
2213
# self._log_files is a set, so if a log file is reused we won't grab it
2215
self._log_files.add(log_file_path)
2217
def _subprocess_log_cleanup(self):
2218
for count, log_file_path in enumerate(self._log_files):
2219
# We use buffer_now=True to avoid holding the file open beyond
2220
# the life of this function, which might interfere with e.g.
2221
# cleaning tempdirs on Windows.
2222
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
#detail_content = content.content_from_file(
2224
# log_file_path, buffer_now=True)
2225
with open(log_file_path, 'rb') as log_file:
2226
log_file_bytes = log_file.read()
2227
detail_content = content.Content(content.ContentType("text",
2228
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2232
2086
def _popen(self, *args, **kwargs):
2233
2087
"""Place a call to Popen.
2386
2240
class TestCaseWithMemoryTransport(TestCase):
2387
2241
"""Common test class for tests that do not need disk resources.
2389
Tests that need disk resources should derive from TestCaseInTempDir
2390
orTestCaseWithTransport.
2243
Tests that need disk resources should derive from TestCaseWithTransport.
2392
2245
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2394
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2247
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2248
a directory which does not exist. This serves to help ensure test isolation
2396
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
defaults for the transport in tests, nor does it obey the command line
2249
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2250
must exist. However, TestCaseWithMemoryTransport does not offer local
2251
file defaults for the transport in tests, nor does it obey the command line
2399
2252
override, so tests that accidentally write to the common directory should
2402
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
``.bzr`` directory that stops us ascending higher into the filesystem.
2255
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2256
a .bzr directory that stops us ascending higher into the filesystem.
2406
2259
TEST_ROOT = None
2566
2418
root = TestCaseWithMemoryTransport.TEST_ROOT
2568
# Make sure we get a readable and accessible home for .bzr.log
2569
# and/or config files, and not fallback to weird defaults (see
2570
# http://pad.lv/825027).
2571
self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
os.environ['BZR_HOME'] = root
2573
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
del os.environ['BZR_HOME']
2575
except Exception, e:
2576
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
# Hack for speed: remember the raw bytes of the dirstate file so that
2578
# we don't need to re-open the wt to check it hasn't changed.
2579
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
wt.control_transport.get_bytes('dirstate'))
2419
bzrdir.BzrDir.create_standalone_workingtree(root)
2582
2421
def _check_safety_net(self):
2583
2422
"""Check that the safety .bzr directory have not been touched.
2630
2469
def make_branch(self, relpath, format=None):
2631
2470
"""Create a branch on the transport at relpath."""
2632
2471
repo = self.make_repository(relpath, format=format)
2633
return repo.bzrdir.create_branch(append_revisions_only=False)
2635
def resolve_format(self, format):
2636
"""Resolve an object to a ControlDir format object.
2638
The initial format object can either already be
2639
a ControlDirFormat, None (for the default format),
2640
or a string with the name of the control dir format.
2642
:param format: Object to resolve
2643
:return A ControlDirFormat instance
2647
if isinstance(format, basestring):
2648
format = bzrdir.format_registry.make_bzrdir(format)
2651
def resolve_format(self, format):
2652
"""Resolve an object to a ControlDir format object.
2654
The initial format object can either already be
2655
a ControlDirFormat, None (for the default format),
2656
or a string with the name of the control dir format.
2658
:param format: Object to resolve
2659
:return A ControlDirFormat instance
2663
if isinstance(format, basestring):
2664
format = bzrdir.format_registry.make_bzrdir(format)
2472
return repo.bzrdir.create_branch()
2667
2474
def make_bzrdir(self, relpath, format=None):
2672
2479
t = _mod_transport.get_transport(maybe_a_url)
2673
2480
if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2481
t.ensure_base()
2675
format = self.resolve_format(format)
2484
if isinstance(format, basestring):
2485
format = bzrdir.format_registry.make_bzrdir(format)
2676
2486
return format.initialize_on_transport(t)
2677
2487
except errors.UninitializableFormat:
2678
2488
raise TestSkipped("Format %s is not initializable." % format)
2680
def make_repository(self, relpath, shared=None, format=None):
2490
def make_repository(self, relpath, shared=False, format=None):
2681
2491
"""Create a repository on our default transport at relpath.
2683
2493
Note that relpath must be a relative path, not a full url.
2717
2527
def setUp(self):
2718
2528
super(TestCaseWithMemoryTransport, self).setUp()
2719
2529
# Ensure that ConnectedTransport doesn't leak sockets
2720
def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
t = orig_get_transport_from_url(*args, **kwargs)
2530
def get_transport_with_cleanup(*args, **kwargs):
2531
t = orig_get_transport(*args, **kwargs)
2722
2532
if isinstance(t, _mod_transport.ConnectedTransport):
2723
2533
self.addCleanup(t.disconnect)
2726
orig_get_transport_from_url = self.overrideAttr(
2727
_mod_transport, 'get_transport_from_url',
2728
get_transport_from_url_with_cleanup)
2536
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2537
get_transport_with_cleanup)
2729
2538
self._make_test_root()
2730
2539
self.addCleanup(os.chdir, os.getcwdu())
2731
2540
self.makeAndChdirToTestDir()
3987
3788
'bzrlib.tests.test_email_message',
3988
3789
'bzrlib.tests.test_eol_filters',
3989
3790
'bzrlib.tests.test_errors',
3990
'bzrlib.tests.test_estimate_compressed_size',
3991
3791
'bzrlib.tests.test_export',
3992
3792
'bzrlib.tests.test_export_pot',
3993
3793
'bzrlib.tests.test_extract',
3994
'bzrlib.tests.test_features',
3995
3794
'bzrlib.tests.test_fetch',
3996
3795
'bzrlib.tests.test_fixtures',
3997
3796
'bzrlib.tests.test_fifo_cache',
3998
3797
'bzrlib.tests.test_filters',
3999
'bzrlib.tests.test_filter_tree',
4000
3798
'bzrlib.tests.test_ftp_transport',
4001
3799
'bzrlib.tests.test_foreign',
4002
3800
'bzrlib.tests.test_generate_docs',
4441
4237
% (os.path.basename(dirname), printable_e))
4240
class Feature(object):
4241
"""An operating system Feature."""
4244
self._available = None
4246
def available(self):
4247
"""Is the feature available?
4249
:return: True if the feature is available.
4251
if self._available is None:
4252
self._available = self._probe()
4253
return self._available
4256
"""Implement this method in concrete features.
4258
:return: True if the feature is available.
4260
raise NotImplementedError
4263
if getattr(self, 'feature_name', None):
4264
return self.feature_name()
4265
return self.__class__.__name__
4268
class _SymlinkFeature(Feature):
4271
return osutils.has_symlinks()
4273
def feature_name(self):
4276
SymlinkFeature = _SymlinkFeature()
4279
class _HardlinkFeature(Feature):
4282
return osutils.has_hardlinks()
4284
def feature_name(self):
4287
HardlinkFeature = _HardlinkFeature()
4290
class _OsFifoFeature(Feature):
4293
return getattr(os, 'mkfifo', None)
4295
def feature_name(self):
4296
return 'filesystem fifos'
4298
OsFifoFeature = _OsFifoFeature()
4301
class _UnicodeFilenameFeature(Feature):
4302
"""Does the filesystem support Unicode filenames?"""
4306
# Check for character combinations unlikely to be covered by any
4307
# single non-unicode encoding. We use the characters
4308
# - greek small letter alpha (U+03B1) and
4309
# - braille pattern dots-123456 (U+283F).
4310
os.stat(u'\u03b1\u283f')
4311
except UnicodeEncodeError:
4313
except (IOError, OSError):
4314
# The filesystem allows the Unicode filename but the file doesn't
4318
# The filesystem allows the Unicode filename and the file exists,
4322
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4325
class _CompatabilityThunkFeature(Feature):
4326
"""This feature is just a thunk to another feature.
4328
It issues a deprecation warning if it is accessed, to let you know that you
4329
should really use a different feature.
4332
def __init__(self, dep_version, module, name,
4333
replacement_name, replacement_module=None):
4334
super(_CompatabilityThunkFeature, self).__init__()
4335
self._module = module
4336
if replacement_module is None:
4337
replacement_module = module
4338
self._replacement_module = replacement_module
4340
self._replacement_name = replacement_name
4341
self._dep_version = dep_version
4342
self._feature = None
4345
if self._feature is None:
4346
depr_msg = self._dep_version % ('%s.%s'
4347
% (self._module, self._name))
4348
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4349
self._replacement_name)
4350
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4351
# Import the new feature and use it as a replacement for the
4353
self._feature = pyutils.get_named_object(
4354
self._replacement_module, self._replacement_name)
4358
return self._feature._probe()
4361
class ModuleAvailableFeature(Feature):
4362
"""This is a feature than describes a module we want to be available.
4364
Declare the name of the module in __init__(), and then after probing, the
4365
module will be available as 'self.module'.
4367
:ivar module: The module if it is available, else None.
4370
def __init__(self, module_name):
4371
super(ModuleAvailableFeature, self).__init__()
4372
self.module_name = module_name
4376
self._module = __import__(self.module_name, {}, {}, [''])
4383
if self.available(): # Make sure the probe has been done
4387
def feature_name(self):
4388
return self.module_name
4444
4391
def probe_unicode_in_user_encoding():
4445
4392
"""Try to encode several unicode strings to use in unicode-aware tests.
4446
4393
Return first successfull match.
4424
class _HTTPSServerFeature(Feature):
4425
"""Some tests want an https Server, check if one is available.
4427
Right now, the only way this is available is under python2.6 which provides
4438
def feature_name(self):
4439
return 'HTTPSServer'
4442
HTTPSServerFeature = _HTTPSServerFeature()
4445
class _UnicodeFilename(Feature):
4446
"""Does the filesystem support Unicode filenames?"""
4451
except UnicodeEncodeError:
4453
except (IOError, OSError):
4454
# The filesystem allows the Unicode filename but the file doesn't
4458
# The filesystem allows the Unicode filename and the file exists,
4462
UnicodeFilename = _UnicodeFilename()
4465
class _ByteStringNamedFilesystem(Feature):
4466
"""Is the filesystem based on bytes?"""
4469
if os.name == "posix":
4473
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4476
class _UTF8Filesystem(Feature):
4477
"""Is the filesystem UTF-8?"""
4480
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4484
UTF8Filesystem = _UTF8Filesystem()
4487
class _BreakinFeature(Feature):
4488
"""Does this platform support the breakin feature?"""
4491
from bzrlib import breakin
4492
if breakin.determine_signal() is None:
4494
if sys.platform == 'win32':
4495
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4496
# We trigger SIGBREAK via a Console api so we need ctypes to
4497
# access the function
4504
def feature_name(self):
4505
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4508
BreakinFeature = _BreakinFeature()
4511
class _CaseInsCasePresFilenameFeature(Feature):
4512
"""Is the file-system case insensitive, but case-preserving?"""
4515
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4517
# first check truly case-preserving for created files, then check
4518
# case insensitive when opening existing files.
4519
name = osutils.normpath(name)
4520
base, rel = osutils.split(name)
4521
found_rel = osutils.canonical_relpath(base, name)
4522
return (found_rel == rel
4523
and os.path.isfile(name.upper())
4524
and os.path.isfile(name.lower()))
4529
def feature_name(self):
4530
return "case-insensitive case-preserving filesystem"
4532
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4535
class _CaseInsensitiveFilesystemFeature(Feature):
4536
"""Check if underlying filesystem is case-insensitive but *not* case
4539
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4540
# more likely to be case preserving, so this case is rare.
4543
if CaseInsCasePresFilenameFeature.available():
4546
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4547
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4548
TestCaseWithMemoryTransport.TEST_ROOT = root
4550
root = TestCaseWithMemoryTransport.TEST_ROOT
4551
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4553
name_a = osutils.pathjoin(tdir, 'a')
4554
name_A = osutils.pathjoin(tdir, 'A')
4556
result = osutils.isdir(name_A)
4557
_rmtree_temp_dir(tdir)
4560
def feature_name(self):
4561
return 'case-insensitive filesystem'
4563
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4566
class _CaseSensitiveFilesystemFeature(Feature):
4569
if CaseInsCasePresFilenameFeature.available():
4571
elif CaseInsensitiveFilesystemFeature.available():
4576
def feature_name(self):
4577
return 'case-sensitive filesystem'
4579
# new coding style is for feature instances to be lowercase
4580
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4477
4583
# Only define SubUnitBzrRunner if subunit is available.
4479
4585
from subunit import TestProtocolClient
4497
4603
except ImportError:
4501
@deprecated_function(deprecated_in((2, 5, 0)))
4502
def ModuleAvailableFeature(name):
4503
from bzrlib.tests import features
4504
return features.ModuleAvailableFeature(name)
4606
class _PosixPermissionsFeature(Feature):
4610
# create temporary file and check if specified perms are maintained.
4613
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4614
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4617
os.chmod(name, write_perms)
4619
read_perms = os.stat(name).st_mode & 0777
4621
return (write_perms == read_perms)
4623
return (os.name == 'posix') and has_perms()
4625
def feature_name(self):
4626
return 'POSIX permissions support'
4628
posix_permissions_feature = _PosixPermissionsFeature()