379
377
if isinstance(test, TestCase):
380
378
test.addCleanup(self._check_leaked_threads, test)
382
def stopTest(self, test):
383
super(ExtendedTestResult, self).stopTest(test)
384
# Manually break cycles, means touching various private things but hey
385
getDetails = getattr(test, "getDetails", None)
386
if getDetails is not None:
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
400
self._traceback_from_test = None
402
380
def startTests(self):
403
381
self.report_tests_starting()
404
382
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
406
387
def _check_leaked_threads(self, test):
407
388
"""See if any threads have leaked since last call
468
449
self.known_failure_count += 1
469
450
self.report_known_failure(test, err)
471
def addUnexpectedSuccess(self, test, details=None):
472
"""Tell result the test unexpectedly passed, counting as a failure
474
When the minimum version of testtools required becomes 0.9.8 this
475
can be updated to use the new handling there.
477
super(ExtendedTestResult, self).addFailure(test, details=details)
478
self.failure_count += 1
479
self.report_unexpected_success(test,
480
"".join(details["reason"].iter_text()))
484
452
def addNotSupported(self, test, feature):
485
453
"""The test will not be run because of a missing feature.
709
670
% (self._testTimeString(test),
710
671
self._error_summary(err)))
712
def report_unexpected_success(self, test, reason):
713
self.stream.write(' FAIL %s\n%s: %s\n'
714
% (self._testTimeString(test),
715
"Unexpected success. Should have failed",
718
673
def report_success(self, test):
719
674
self.stream.write(' OK %s\n' % self._testTimeString(test))
720
675
for bench_called, stats in getattr(test, '_benchcalls', []):
999
958
# settled on or a the FIXME associated with _get_expand_default_value
1000
959
# is addressed -- vila 20110219
1001
960
self.overrideAttr(config, '_expand_default_value', None)
1002
self._log_files = set()
1003
# Each key in the ``_counters`` dict holds a value for a different
1004
# counter. When the test ends, addDetail() should be used to output the
1005
# counter values. This happens in install_counter_hook().
1007
if 'config_stats' in selftest_debug_flags:
1008
self._install_config_stats_hooks()
1009
# Do not use i18n for tests (unless the test reverses this)
1012
962
def debug(self):
1013
963
# debug a frame up.
1015
# The sys preserved stdin/stdout should allow blackbox tests debugging
1016
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
).set_trace(sys._getframe().f_back)
965
pdb.Pdb().set_trace(sys._getframe().f_back)
1019
967
def discardDetail(self, name):
1020
968
"""Extend the addDetail, getDetails api so we can remove a detail.
1032
980
if name in details:
1033
981
del details[name]
1035
def install_counter_hook(self, hooks, name, counter_name=None):
1036
"""Install a counting hook.
1038
Any hook can be counted as long as it doesn't need to return a value.
1040
:param hooks: Where the hook should be installed.
1042
:param name: The hook name that will be counted.
1044
:param counter_name: The counter identifier in ``_counters``, defaults
1047
_counters = self._counters # Avoid closing over self
1048
if counter_name is None:
1050
if _counters.has_key(counter_name):
1051
raise AssertionError('%s is already used as a counter name'
1053
_counters[counter_name] = 0
1054
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
lambda: ['%d' % (_counters[counter_name],)]))
1056
def increment_counter(*args, **kwargs):
1057
_counters[counter_name] += 1
1058
label = 'count %s calls' % (counter_name,)
1059
hooks.install_named_hook(name, increment_counter, label)
1060
self.addCleanup(hooks.uninstall_named_hook, name, label)
1062
def _install_config_stats_hooks(self):
1063
"""Install config hooks to count hook calls.
1066
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
self.install_counter_hook(config.ConfigHooks, hook_name,
1068
'config.%s' % (hook_name,))
1070
# The OldConfigHooks are private and need special handling to protect
1071
# against recursive tests (tests that run other tests), so we just do
1072
# manually what registering them into _builtin_known_hooks will provide
1074
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
'old_config.%s' % (hook_name,))
1079
983
def _clear_debug_flags(self):
1080
984
"""Prevent externally set debug flags affecting tests.
1135
1037
# break some locks on purpose and should be taken into account by
1136
1038
# considering that breaking a lock is just a dirty way of releasing it.
1137
1039
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1139
'Different number of acquired and '
1140
'released or broken locks.\n'
1144
(acquired_locks, released_locks, broken_locks))
1040
message = ('Different number of acquired and '
1041
'released or broken locks. (%s, %s + %s)' %
1042
(acquired_locks, released_locks, broken_locks))
1145
1043
if not self._lock_check_thorough:
1146
1044
# Rather than fail, just warn
1147
1045
print "Broken test %s: %s" % (self, message)
1367
1265
'st_mtime did not match')
1368
1266
self.assertEqual(expected.st_ctime, actual.st_ctime,
1369
1267
'st_ctime did not match')
1370
if sys.platform == 'win32':
1268
if sys.platform != 'win32':
1371
1269
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1372
1270
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1373
# odd. We just force it to always be 0 to avoid any problems.
1374
self.assertEqual(0, expected.st_dev)
1375
self.assertEqual(0, actual.st_dev)
1376
self.assertEqual(0, expected.st_ino)
1377
self.assertEqual(0, actual.st_ino)
1271
# odd. Regardless we shouldn't actually try to assert anything
1272
# about their values
1379
1273
self.assertEqual(expected.st_dev, actual.st_dev,
1380
1274
'st_dev did not match')
1381
1275
self.assertEqual(expected.st_ino, actual.st_ino,
1390
1284
length, len(obj_with_len), obj_with_len))
1392
1286
def assertLogsError(self, exception_class, func, *args, **kwargs):
1393
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1287
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1396
1290
orig_log_exception_quietly = trace.log_exception_quietly
1399
1293
orig_log_exception_quietly()
1400
captured.append(sys.exc_info()[1])
1294
captured.append(sys.exc_info())
1401
1295
trace.log_exception_quietly = capture
1402
1296
func(*args, **kwargs)
1404
1298
trace.log_exception_quietly = orig_log_exception_quietly
1405
1299
self.assertLength(1, captured)
1300
err = captured[0][1]
1407
1301
self.assertIsInstance(err, exception_class)
1561
1455
self.assertEqual(expected_docstring, obj.__doc__)
1563
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1564
1457
def failUnlessExists(self, path):
1565
return self.assertPathExists(path)
1567
def assertPathExists(self, path):
1568
1458
"""Fail unless path or paths, which may be abs or relative, exist."""
1569
1459
if not isinstance(path, basestring):
1571
self.assertPathExists(p)
1461
self.failUnlessExists(p)
1573
self.assertTrue(osutils.lexists(path),
1574
path + " does not exist")
1463
self.failUnless(osutils.lexists(path),path+" does not exist")
1576
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1577
1465
def failIfExists(self, path):
1578
return self.assertPathDoesNotExist(path)
1580
def assertPathDoesNotExist(self, path):
1581
1466
"""Fail if path or paths, which may be abs or relative, exist."""
1582
1467
if not isinstance(path, basestring):
1584
self.assertPathDoesNotExist(p)
1469
self.failIfExists(p)
1586
self.assertFalse(osutils.lexists(path),
1471
self.failIf(osutils.lexists(path),path+" exists")
1589
1473
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1590
1474
"""A helper for callDeprecated and applyDeprecated.
1706
1589
The file is removed as the test is torn down.
1708
pseudo_log_file = StringIO()
1709
def _get_log_contents_for_weird_testtools_api():
1710
return [pseudo_log_file.getvalue().decode(
1711
"utf-8", "replace").encode("utf-8")]
1712
self.addDetail("log", content.Content(content.ContentType("text",
1713
"plain", {"charset": "utf8"}),
1714
_get_log_contents_for_weird_testtools_api))
1715
self._log_file = pseudo_log_file
1591
self._log_file = StringIO()
1716
1592
self._log_memento = trace.push_log_file(self._log_file)
1717
1593
self.addCleanup(self._finishLogFile)
1719
1595
def _finishLogFile(self):
1720
1596
"""Finished with the log file.
1722
Close the file and delete it.
1598
Close the file and delete it, unless setKeepLogfile was called.
1724
1600
if trace._trace_file:
1725
1601
# flush the log file, to get all content
1726
1602
trace._trace_file.flush()
1727
1603
trace.pop_log_file(self._log_memento)
1604
# Cache the log result and delete the file on disk
1605
self._get_log(False)
1729
1607
def thisFailsStrictLockCheck(self):
1730
1608
"""It is known that this test would fail with -Dstrict_locks.
1775
1650
self.addCleanup(osutils.set_or_unset_env, name, value)
1778
def recordCalls(self, obj, attr_name):
1779
"""Monkeypatch in a wrapper that will record calls.
1781
The monkeypatch is automatically removed when the test concludes.
1783
:param obj: The namespace holding the reference to be replaced;
1784
typically a module, class, or object.
1785
:param attr_name: A string for the name of the attribute to
1787
:returns: A list that will be extended with one item every time the
1788
function is called, with a tuple of (args, kwargs).
1792
def decorator(*args, **kwargs):
1793
calls.append((args, kwargs))
1794
return orig(*args, **kwargs)
1795
orig = self.overrideAttr(obj, attr_name, decorator)
1798
1653
def _cleanEnvironment(self):
1799
1654
for name, value in isolated_environ.iteritems():
1800
1655
self.overrideEnv(name, value)
1802
1657
def _restoreHooks(self):
1803
1658
for klass, (name, hooks) in self._preserved_hooks.items():
1804
1659
setattr(klass, name, hooks)
1805
self._preserved_hooks.clear()
1806
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1807
self._preserved_lazy_hooks.clear()
1809
1661
def knownFailure(self, reason):
1810
"""Declare that this test fails for a known reason
1812
Tests that are known to fail should generally be using expectedFailure
1813
with an appropriate reverse assertion if a change could cause the test
1814
to start passing. Conversely if the test has no immediate prospect of
1815
succeeding then using skip is more suitable.
1817
When this method is called while an exception is being handled, that
1818
traceback will be used, otherwise a new exception will be thrown to
1819
provide one but won't be reported.
1821
self._add_reason(reason)
1823
exc_info = sys.exc_info()
1824
if exc_info != (None, None, None):
1825
self._report_traceback(exc_info)
1828
raise self.failureException(reason)
1829
except self.failureException:
1830
exc_info = sys.exc_info()
1831
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
raise testtools.testcase._ExpectedFailure(exc_info)
1662
"""This test has failed for some known reason."""
1663
raise KnownFailure(reason)
1836
1665
def _suppress_log(self):
1837
1666
"""Remove the log info from details."""
1925
1754
def log(self, *args):
1926
1755
trace.mutter(*args)
1757
def _get_log(self, keep_log_file=False):
1758
"""Internal helper to get the log from bzrlib.trace for this test.
1760
Please use self.getDetails, or self.get_log to access this in test case
1763
:param keep_log_file: When True, if the log is still a file on disk
1764
leave it as a file on disk. When False, if the log is still a file
1765
on disk, the log file is deleted and the log preserved as
1767
:return: A string containing the log.
1769
if self._log_contents is not None:
1771
self._log_contents.decode('utf8')
1772
except UnicodeDecodeError:
1773
unicodestr = self._log_contents.decode('utf8', 'replace')
1774
self._log_contents = unicodestr.encode('utf8')
1775
return self._log_contents
1776
if self._log_file is not None:
1777
log_contents = self._log_file.getvalue()
1779
log_contents.decode('utf8')
1780
except UnicodeDecodeError:
1781
unicodestr = log_contents.decode('utf8', 'replace')
1782
log_contents = unicodestr.encode('utf8')
1783
if not keep_log_file:
1784
self._log_file = None
1785
# Permit multiple calls to get_log until we clean it up in
1787
self._log_contents = log_contents
1790
return "No log file content."
1928
1792
def get_log(self):
1929
1793
"""Get a unicode string containing the log from bzrlib.trace.
2205
def _add_subprocess_log(self, log_file_path):
2206
if len(self._log_files) == 0:
2207
# Register an addCleanup func. We do this on the first call to
2208
# _add_subprocess_log rather than in TestCase.setUp so that this
2209
# addCleanup is registered after any cleanups for tempdirs that
2210
# subclasses might create, which will probably remove the log file
2212
self.addCleanup(self._subprocess_log_cleanup)
2213
# self._log_files is a set, so if a log file is reused we won't grab it
2215
self._log_files.add(log_file_path)
2217
def _subprocess_log_cleanup(self):
2218
for count, log_file_path in enumerate(self._log_files):
2219
# We use buffer_now=True to avoid holding the file open beyond
2220
# the life of this function, which might interfere with e.g.
2221
# cleaning tempdirs on Windows.
2222
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
#detail_content = content.content_from_file(
2224
# log_file_path, buffer_now=True)
2225
with open(log_file_path, 'rb') as log_file:
2226
log_file_bytes = log_file.read()
2227
detail_content = content.Content(content.ContentType("text",
2228
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2232
2063
def _popen(self, *args, **kwargs):
2233
2064
"""Place a call to Popen.
2277
2108
% (process_args, retcode, process.returncode))
2278
2109
return [out, err]
2280
def check_tree_shape(self, tree, shape):
2281
"""Compare a tree to a list of expected names.
2111
def check_inventory_shape(self, inv, shape):
2112
"""Compare an inventory to a list of expected names.
2283
2114
Fail if they are not precisely equal.
2286
2117
shape = list(shape) # copy
2287
for path, ie in tree.iter_entries_by_dir():
2118
for path, ie in inv.entries():
2288
2119
name = path.replace('\\', '/')
2289
2120
if ie.kind == 'directory':
2290
2121
name = name + '/'
2292
pass # ignore root entry
2294
2123
shape.remove(name)
2296
2125
extras.append(name)
2386
2215
class TestCaseWithMemoryTransport(TestCase):
2387
2216
"""Common test class for tests that do not need disk resources.
2389
Tests that need disk resources should derive from TestCaseInTempDir
2390
orTestCaseWithTransport.
2218
Tests that need disk resources should derive from TestCaseWithTransport.
2392
2220
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2394
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2222
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2223
a directory which does not exist. This serves to help ensure test isolation
2396
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
defaults for the transport in tests, nor does it obey the command line
2224
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2225
must exist. However, TestCaseWithMemoryTransport does not offer local
2226
file defaults for the transport in tests, nor does it obey the command line
2399
2227
override, so tests that accidentally write to the common directory should
2402
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
``.bzr`` directory that stops us ascending higher into the filesystem.
2230
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2231
a .bzr directory that stops us ascending higher into the filesystem.
2406
2234
TEST_ROOT = None
2566
2393
root = TestCaseWithMemoryTransport.TEST_ROOT
2568
# Make sure we get a readable and accessible home for .bzr.log
2569
# and/or config files, and not fallback to weird defaults (see
2570
# http://pad.lv/825027).
2571
self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
os.environ['BZR_HOME'] = root
2573
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
del os.environ['BZR_HOME']
2575
except Exception, e:
2576
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
# Hack for speed: remember the raw bytes of the dirstate file so that
2578
# we don't need to re-open the wt to check it hasn't changed.
2579
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
wt.control_transport.get_bytes('dirstate'))
2394
bzrdir.BzrDir.create_standalone_workingtree(root)
2582
2396
def _check_safety_net(self):
2583
2397
"""Check that the safety .bzr directory have not been touched.
2630
2444
def make_branch(self, relpath, format=None):
2631
2445
"""Create a branch on the transport at relpath."""
2632
2446
repo = self.make_repository(relpath, format=format)
2633
return repo.bzrdir.create_branch(append_revisions_only=False)
2635
def resolve_format(self, format):
2636
"""Resolve an object to a ControlDir format object.
2638
The initial format object can either already be
2639
a ControlDirFormat, None (for the default format),
2640
or a string with the name of the control dir format.
2642
:param format: Object to resolve
2643
:return A ControlDirFormat instance
2647
if isinstance(format, basestring):
2648
format = bzrdir.format_registry.make_bzrdir(format)
2651
def resolve_format(self, format):
2652
"""Resolve an object to a ControlDir format object.
2654
The initial format object can either already be
2655
a ControlDirFormat, None (for the default format),
2656
or a string with the name of the control dir format.
2658
:param format: Object to resolve
2659
:return A ControlDirFormat instance
2663
if isinstance(format, basestring):
2664
format = bzrdir.format_registry.make_bzrdir(format)
2447
return repo.bzrdir.create_branch()
2667
2449
def make_bzrdir(self, relpath, format=None):
2672
2454
t = _mod_transport.get_transport(maybe_a_url)
2673
2455
if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2456
t.ensure_base()
2675
format = self.resolve_format(format)
2459
if isinstance(format, basestring):
2460
format = bzrdir.format_registry.make_bzrdir(format)
2676
2461
return format.initialize_on_transport(t)
2677
2462
except errors.UninitializableFormat:
2678
2463
raise TestSkipped("Format %s is not initializable." % format)
2680
def make_repository(self, relpath, shared=None, format=None):
2465
def make_repository(self, relpath, shared=False, format=None):
2681
2466
"""Create a repository on our default transport at relpath.
2683
2468
Note that relpath must be a relative path, not a full url.
2717
2502
def setUp(self):
2718
2503
super(TestCaseWithMemoryTransport, self).setUp()
2719
2504
# Ensure that ConnectedTransport doesn't leak sockets
2720
def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
t = orig_get_transport_from_url(*args, **kwargs)
2505
def get_transport_with_cleanup(*args, **kwargs):
2506
t = orig_get_transport(*args, **kwargs)
2722
2507
if isinstance(t, _mod_transport.ConnectedTransport):
2723
2508
self.addCleanup(t.disconnect)
2726
orig_get_transport_from_url = self.overrideAttr(
2727
_mod_transport, 'get_transport_from_url',
2728
get_transport_from_url_with_cleanup)
2511
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2512
get_transport_with_cleanup)
2729
2513
self._make_test_root()
2730
2514
self.addCleanup(os.chdir, os.getcwdu())
2731
2515
self.makeAndChdirToTestDir()
3981
3756
'bzrlib.tests.test_decorators',
3982
3757
'bzrlib.tests.test_delta',
3983
3758
'bzrlib.tests.test_debug',
3759
'bzrlib.tests.test_deprecated_graph',
3984
3760
'bzrlib.tests.test_diff',
3985
3761
'bzrlib.tests.test_directory_service',
3986
3762
'bzrlib.tests.test_dirstate',
3987
3763
'bzrlib.tests.test_email_message',
3988
3764
'bzrlib.tests.test_eol_filters',
3989
3765
'bzrlib.tests.test_errors',
3990
'bzrlib.tests.test_estimate_compressed_size',
3991
3766
'bzrlib.tests.test_export',
3992
'bzrlib.tests.test_export_pot',
3993
3767
'bzrlib.tests.test_extract',
3994
'bzrlib.tests.test_features',
3995
3768
'bzrlib.tests.test_fetch',
3996
3769
'bzrlib.tests.test_fixtures',
3997
3770
'bzrlib.tests.test_fifo_cache',
3998
3771
'bzrlib.tests.test_filters',
3999
'bzrlib.tests.test_filter_tree',
4000
3772
'bzrlib.tests.test_ftp_transport',
4001
3773
'bzrlib.tests.test_foreign',
4002
3774
'bzrlib.tests.test_generate_docs',
4441
4210
% (os.path.basename(dirname), printable_e))
4213
class Feature(object):
4214
"""An operating system Feature."""
4217
self._available = None
4219
def available(self):
4220
"""Is the feature available?
4222
:return: True if the feature is available.
4224
if self._available is None:
4225
self._available = self._probe()
4226
return self._available
4229
"""Implement this method in concrete features.
4231
:return: True if the feature is available.
4233
raise NotImplementedError
4236
if getattr(self, 'feature_name', None):
4237
return self.feature_name()
4238
return self.__class__.__name__
4241
class _SymlinkFeature(Feature):
4244
return osutils.has_symlinks()
4246
def feature_name(self):
4249
SymlinkFeature = _SymlinkFeature()
4252
class _HardlinkFeature(Feature):
4255
return osutils.has_hardlinks()
4257
def feature_name(self):
4260
HardlinkFeature = _HardlinkFeature()
4263
class _OsFifoFeature(Feature):
4266
return getattr(os, 'mkfifo', None)
4268
def feature_name(self):
4269
return 'filesystem fifos'
4271
OsFifoFeature = _OsFifoFeature()
4274
class _UnicodeFilenameFeature(Feature):
4275
"""Does the filesystem support Unicode filenames?"""
4279
# Check for character combinations unlikely to be covered by any
4280
# single non-unicode encoding. We use the characters
4281
# - greek small letter alpha (U+03B1) and
4282
# - braille pattern dots-123456 (U+283F).
4283
os.stat(u'\u03b1\u283f')
4284
except UnicodeEncodeError:
4286
except (IOError, OSError):
4287
# The filesystem allows the Unicode filename but the file doesn't
4291
# The filesystem allows the Unicode filename and the file exists,
4295
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4298
class _CompatabilityThunkFeature(Feature):
4299
"""This feature is just a thunk to another feature.
4301
It issues a deprecation warning if it is accessed, to let you know that you
4302
should really use a different feature.
4305
def __init__(self, dep_version, module, name,
4306
replacement_name, replacement_module=None):
4307
super(_CompatabilityThunkFeature, self).__init__()
4308
self._module = module
4309
if replacement_module is None:
4310
replacement_module = module
4311
self._replacement_module = replacement_module
4313
self._replacement_name = replacement_name
4314
self._dep_version = dep_version
4315
self._feature = None
4318
if self._feature is None:
4319
depr_msg = self._dep_version % ('%s.%s'
4320
% (self._module, self._name))
4321
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4322
self._replacement_name)
4323
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4324
# Import the new feature and use it as a replacement for the
4326
self._feature = pyutils.get_named_object(
4327
self._replacement_module, self._replacement_name)
4331
return self._feature._probe()
4334
class ModuleAvailableFeature(Feature):
4335
"""This is a feature than describes a module we want to be available.
4337
Declare the name of the module in __init__(), and then after probing, the
4338
module will be available as 'self.module'.
4340
:ivar module: The module if it is available, else None.
4343
def __init__(self, module_name):
4344
super(ModuleAvailableFeature, self).__init__()
4345
self.module_name = module_name
4349
self._module = __import__(self.module_name, {}, {}, [''])
4356
if self.available(): # Make sure the probe has been done
4360
def feature_name(self):
4361
return self.module_name
4444
4364
def probe_unicode_in_user_encoding():
4445
4365
"""Try to encode several unicode strings to use in unicode-aware tests.
4446
4366
Return first successfull match.
4397
class _HTTPSServerFeature(Feature):
4398
"""Some tests want an https Server, check if one is available.
4400
Right now, the only way this is available is under python2.6 which provides
4411
def feature_name(self):
4412
return 'HTTPSServer'
4415
HTTPSServerFeature = _HTTPSServerFeature()
4418
class _UnicodeFilename(Feature):
4419
"""Does the filesystem support Unicode filenames?"""
4424
except UnicodeEncodeError:
4426
except (IOError, OSError):
4427
# The filesystem allows the Unicode filename but the file doesn't
4431
# The filesystem allows the Unicode filename and the file exists,
4435
UnicodeFilename = _UnicodeFilename()
4438
class _ByteStringNamedFilesystem(Feature):
4439
"""Is the filesystem based on bytes?"""
4442
if os.name == "posix":
4446
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4449
class _UTF8Filesystem(Feature):
4450
"""Is the filesystem UTF-8?"""
4453
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4457
UTF8Filesystem = _UTF8Filesystem()
4460
class _BreakinFeature(Feature):
4461
"""Does this platform support the breakin feature?"""
4464
from bzrlib import breakin
4465
if breakin.determine_signal() is None:
4467
if sys.platform == 'win32':
4468
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4469
# We trigger SIGBREAK via a Console api so we need ctypes to
4470
# access the function
4477
def feature_name(self):
4478
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4481
BreakinFeature = _BreakinFeature()
4484
class _CaseInsCasePresFilenameFeature(Feature):
4485
"""Is the file-system case insensitive, but case-preserving?"""
4488
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4490
# first check truly case-preserving for created files, then check
4491
# case insensitive when opening existing files.
4492
name = osutils.normpath(name)
4493
base, rel = osutils.split(name)
4494
found_rel = osutils.canonical_relpath(base, name)
4495
return (found_rel == rel
4496
and os.path.isfile(name.upper())
4497
and os.path.isfile(name.lower()))
4502
def feature_name(self):
4503
return "case-insensitive case-preserving filesystem"
4505
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4508
class _CaseInsensitiveFilesystemFeature(Feature):
4509
"""Check if underlying filesystem is case-insensitive but *not* case
4512
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4513
# more likely to be case preserving, so this case is rare.
4516
if CaseInsCasePresFilenameFeature.available():
4519
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4520
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4521
TestCaseWithMemoryTransport.TEST_ROOT = root
4523
root = TestCaseWithMemoryTransport.TEST_ROOT
4524
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4526
name_a = osutils.pathjoin(tdir, 'a')
4527
name_A = osutils.pathjoin(tdir, 'A')
4529
result = osutils.isdir(name_A)
4530
_rmtree_temp_dir(tdir)
4533
def feature_name(self):
4534
return 'case-insensitive filesystem'
4536
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4539
class _CaseSensitiveFilesystemFeature(Feature):
4542
if CaseInsCasePresFilenameFeature.available():
4544
elif CaseInsensitiveFilesystemFeature.available():
4549
def feature_name(self):
4550
return 'case-sensitive filesystem'
4552
# new coding style is for feature instances to be lowercase
4553
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4477
4556
# Only define SubUnitBzrRunner if subunit is available.
4479
4558
from subunit import TestProtocolClient
4497
4576
except ImportError:
4501
@deprecated_function(deprecated_in((2, 5, 0)))
4502
def ModuleAvailableFeature(name):
4503
from bzrlib.tests import features
4504
return features.ModuleAvailableFeature(name)
4579
class _PosixPermissionsFeature(Feature):
4583
# create temporary file and check if specified perms are maintained.
4586
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4587
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4590
os.chmod(name, write_perms)
4592
read_perms = os.stat(name).st_mode & 0777
4594
return (write_perms == read_perms)
4596
return (os.name == 'posix') and has_perms()
4598
def feature_name(self):
4599
return 'POSIX permissions support'
4601
posix_permissions_feature = _PosixPermissionsFeature()