379
377
if isinstance(test, TestCase):
380
378
test.addCleanup(self._check_leaked_threads, test)
382
def stopTest(self, test):
383
super(ExtendedTestResult, self).stopTest(test)
384
# Manually break cycles, means touching various private things but hey
385
getDetails = getattr(test, "getDetails", None)
386
if getDetails is not None:
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
400
self._traceback_from_test = None
402
380
def startTests(self):
403
381
self.report_tests_starting()
404
382
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
406
387
def _check_leaked_threads(self, test):
407
388
"""See if any threads have leaked since last call
468
449
self.known_failure_count += 1
469
450
self.report_known_failure(test, err)
471
def addUnexpectedSuccess(self, test, details=None):
472
"""Tell result the test unexpectedly passed, counting as a failure
474
When the minimum version of testtools required becomes 0.9.8 this
475
can be updated to use the new handling there.
477
super(ExtendedTestResult, self).addFailure(test, details=details)
478
self.failure_count += 1
479
self.report_unexpected_success(test,
480
"".join(details["reason"].iter_text()))
484
452
def addNotSupported(self, test, feature):
485
453
"""The test will not be run because of a missing feature.
709
670
% (self._testTimeString(test),
710
671
self._error_summary(err)))
712
def report_unexpected_success(self, test, reason):
713
self.stream.write(' FAIL %s\n%s: %s\n'
714
% (self._testTimeString(test),
715
"Unexpected success. Should have failed",
718
673
def report_success(self, test):
719
674
self.stream.write(' OK %s\n' % self._testTimeString(test))
720
675
for bench_called, stats in getattr(test, '_benchcalls', []):
995
954
# between tests. We should get rid of this altogether: bug 656694. --
997
956
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
998
# Isolate config option expansion until its default value for bzrlib is
999
# settled on or a the FIXME associated with _get_expand_default_value
1000
# is addressed -- vila 20110219
1001
self.overrideAttr(config, '_expand_default_value', None)
1002
self._log_files = set()
1003
# Each key in the ``_counters`` dict holds a value for a different
1004
# counter. When the test ends, addDetail() should be used to output the
1005
# counter values. This happens in install_counter_hook().
1007
if 'config_stats' in selftest_debug_flags:
1008
self._install_config_stats_hooks()
1009
# Do not use i18n for tests (unless the test reverses this)
1012
958
def debug(self):
1013
959
# debug a frame up.
1015
# The sys preserved stdin/stdout should allow blackbox tests debugging
1016
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
).set_trace(sys._getframe().f_back)
961
pdb.Pdb().set_trace(sys._getframe().f_back)
1019
963
def discardDetail(self, name):
1020
964
"""Extend the addDetail, getDetails api so we can remove a detail.
1032
976
if name in details:
1033
977
del details[name]
1035
def install_counter_hook(self, hooks, name, counter_name=None):
1036
"""Install a counting hook.
1038
Any hook can be counted as long as it doesn't need to return a value.
1040
:param hooks: Where the hook should be installed.
1042
:param name: The hook name that will be counted.
1044
:param counter_name: The counter identifier in ``_counters``, defaults
1047
_counters = self._counters # Avoid closing over self
1048
if counter_name is None:
1050
if _counters.has_key(counter_name):
1051
raise AssertionError('%s is already used as a counter name'
1053
_counters[counter_name] = 0
1054
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
lambda: ['%d' % (_counters[counter_name],)]))
1056
def increment_counter(*args, **kwargs):
1057
_counters[counter_name] += 1
1058
label = 'count %s calls' % (counter_name,)
1059
hooks.install_named_hook(name, increment_counter, label)
1060
self.addCleanup(hooks.uninstall_named_hook, name, label)
1062
def _install_config_stats_hooks(self):
1063
"""Install config hooks to count hook calls.
1066
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
self.install_counter_hook(config.ConfigHooks, hook_name,
1068
'config.%s' % (hook_name,))
1070
# The OldConfigHooks are private and need special handling to protect
1071
# against recursive tests (tests that run other tests), so we just do
1072
# manually what registering them into _builtin_known_hooks will provide
1074
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
'old_config.%s' % (hook_name,))
1079
979
def _clear_debug_flags(self):
1080
980
"""Prevent externally set debug flags affecting tests.
1135
1033
# break some locks on purpose and should be taken into account by
1136
1034
# considering that breaking a lock is just a dirty way of releasing it.
1137
1035
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1139
'Different number of acquired and '
1140
'released or broken locks.\n'
1144
(acquired_locks, released_locks, broken_locks))
1036
message = ('Different number of acquired and '
1037
'released or broken locks. (%s, %s + %s)' %
1038
(acquired_locks, released_locks, broken_locks))
1145
1039
if not self._lock_check_thorough:
1146
1040
# Rather than fail, just warn
1147
1041
print "Broken test %s: %s" % (self, message)
1367
1261
'st_mtime did not match')
1368
1262
self.assertEqual(expected.st_ctime, actual.st_ctime,
1369
1263
'st_ctime did not match')
1370
if sys.platform == 'win32':
1264
if sys.platform != 'win32':
1371
1265
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1372
1266
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1373
# odd. We just force it to always be 0 to avoid any problems.
1374
self.assertEqual(0, expected.st_dev)
1375
self.assertEqual(0, actual.st_dev)
1376
self.assertEqual(0, expected.st_ino)
1377
self.assertEqual(0, actual.st_ino)
1267
# odd. Regardless we shouldn't actually try to assert anything
1268
# about their values
1379
1269
self.assertEqual(expected.st_dev, actual.st_dev,
1380
1270
'st_dev did not match')
1381
1271
self.assertEqual(expected.st_ino, actual.st_ino,
1390
1280
length, len(obj_with_len), obj_with_len))
1392
1282
def assertLogsError(self, exception_class, func, *args, **kwargs):
1393
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1283
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1396
1286
orig_log_exception_quietly = trace.log_exception_quietly
1399
1289
orig_log_exception_quietly()
1400
captured.append(sys.exc_info()[1])
1290
captured.append(sys.exc_info())
1401
1291
trace.log_exception_quietly = capture
1402
1292
func(*args, **kwargs)
1404
1294
trace.log_exception_quietly = orig_log_exception_quietly
1405
1295
self.assertLength(1, captured)
1296
err = captured[0][1]
1407
1297
self.assertIsInstance(err, exception_class)
1561
1451
self.assertEqual(expected_docstring, obj.__doc__)
1563
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1564
1453
def failUnlessExists(self, path):
1565
return self.assertPathExists(path)
1567
def assertPathExists(self, path):
1568
1454
"""Fail unless path or paths, which may be abs or relative, exist."""
1569
1455
if not isinstance(path, basestring):
1571
self.assertPathExists(p)
1457
self.failUnlessExists(p)
1573
self.assertTrue(osutils.lexists(path),
1574
path + " does not exist")
1459
self.failUnless(osutils.lexists(path),path+" does not exist")
1576
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1577
1461
def failIfExists(self, path):
1578
return self.assertPathDoesNotExist(path)
1580
def assertPathDoesNotExist(self, path):
1581
1462
"""Fail if path or paths, which may be abs or relative, exist."""
1582
1463
if not isinstance(path, basestring):
1584
self.assertPathDoesNotExist(p)
1465
self.failIfExists(p)
1586
self.assertFalse(osutils.lexists(path),
1467
self.failIf(osutils.lexists(path),path+" exists")
1589
1469
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1590
1470
"""A helper for callDeprecated and applyDeprecated.
1706
1585
The file is removed as the test is torn down.
1708
pseudo_log_file = StringIO()
1709
def _get_log_contents_for_weird_testtools_api():
1710
return [pseudo_log_file.getvalue().decode(
1711
"utf-8", "replace").encode("utf-8")]
1712
self.addDetail("log", content.Content(content.ContentType("text",
1713
"plain", {"charset": "utf8"}),
1714
_get_log_contents_for_weird_testtools_api))
1715
self._log_file = pseudo_log_file
1587
self._log_file = StringIO()
1716
1588
self._log_memento = trace.push_log_file(self._log_file)
1717
1589
self.addCleanup(self._finishLogFile)
1719
1591
def _finishLogFile(self):
1720
1592
"""Finished with the log file.
1722
Close the file and delete it.
1594
Close the file and delete it, unless setKeepLogfile was called.
1724
1596
if trace._trace_file:
1725
1597
# flush the log file, to get all content
1726
1598
trace._trace_file.flush()
1727
1599
trace.pop_log_file(self._log_memento)
1600
# Cache the log result and delete the file on disk
1601
self._get_log(False)
1729
1603
def thisFailsStrictLockCheck(self):
1730
1604
"""It is known that this test would fail with -Dstrict_locks.
1775
1646
self.addCleanup(osutils.set_or_unset_env, name, value)
1778
def recordCalls(self, obj, attr_name):
1779
"""Monkeypatch in a wrapper that will record calls.
1781
The monkeypatch is automatically removed when the test concludes.
1783
:param obj: The namespace holding the reference to be replaced;
1784
typically a module, class, or object.
1785
:param attr_name: A string for the name of the attribute to
1787
:returns: A list that will be extended with one item every time the
1788
function is called, with a tuple of (args, kwargs).
1792
def decorator(*args, **kwargs):
1793
calls.append((args, kwargs))
1794
return orig(*args, **kwargs)
1795
orig = self.overrideAttr(obj, attr_name, decorator)
1798
1649
def _cleanEnvironment(self):
1799
1650
for name, value in isolated_environ.iteritems():
1800
1651
self.overrideEnv(name, value)
1802
1653
def _restoreHooks(self):
1803
1654
for klass, (name, hooks) in self._preserved_hooks.items():
1804
1655
setattr(klass, name, hooks)
1805
self._preserved_hooks.clear()
1806
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1807
self._preserved_lazy_hooks.clear()
1809
1657
def knownFailure(self, reason):
1810
"""Declare that this test fails for a known reason
1812
Tests that are known to fail should generally be using expectedFailure
1813
with an appropriate reverse assertion if a change could cause the test
1814
to start passing. Conversely if the test has no immediate prospect of
1815
succeeding then using skip is more suitable.
1817
When this method is called while an exception is being handled, that
1818
traceback will be used, otherwise a new exception will be thrown to
1819
provide one but won't be reported.
1821
self._add_reason(reason)
1823
exc_info = sys.exc_info()
1824
if exc_info != (None, None, None):
1825
self._report_traceback(exc_info)
1828
raise self.failureException(reason)
1829
except self.failureException:
1830
exc_info = sys.exc_info()
1831
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
raise testtools.testcase._ExpectedFailure(exc_info)
1658
"""This test has failed for some known reason."""
1659
raise KnownFailure(reason)
1836
1661
def _suppress_log(self):
1837
1662
"""Remove the log info from details."""
1925
1750
def log(self, *args):
1926
1751
trace.mutter(*args)
1753
def _get_log(self, keep_log_file=False):
1754
"""Internal helper to get the log from bzrlib.trace for this test.
1756
Please use self.getDetails, or self.get_log to access this in test case
1759
:param keep_log_file: When True, if the log is still a file on disk
1760
leave it as a file on disk. When False, if the log is still a file
1761
on disk, the log file is deleted and the log preserved as
1763
:return: A string containing the log.
1765
if self._log_contents is not None:
1767
self._log_contents.decode('utf8')
1768
except UnicodeDecodeError:
1769
unicodestr = self._log_contents.decode('utf8', 'replace')
1770
self._log_contents = unicodestr.encode('utf8')
1771
return self._log_contents
1772
if self._log_file is not None:
1773
log_contents = self._log_file.getvalue()
1775
log_contents.decode('utf8')
1776
except UnicodeDecodeError:
1777
unicodestr = log_contents.decode('utf8', 'replace')
1778
log_contents = unicodestr.encode('utf8')
1779
if not keep_log_file:
1780
self._log_file = None
1781
# Permit multiple calls to get_log until we clean it up in
1783
self._log_contents = log_contents
1786
return "No log file content."
1928
1788
def get_log(self):
1929
1789
"""Get a unicode string containing the log from bzrlib.trace.
2205
def _add_subprocess_log(self, log_file_path):
2206
if len(self._log_files) == 0:
2207
# Register an addCleanup func. We do this on the first call to
2208
# _add_subprocess_log rather than in TestCase.setUp so that this
2209
# addCleanup is registered after any cleanups for tempdirs that
2210
# subclasses might create, which will probably remove the log file
2212
self.addCleanup(self._subprocess_log_cleanup)
2213
# self._log_files is a set, so if a log file is reused we won't grab it
2215
self._log_files.add(log_file_path)
2217
def _subprocess_log_cleanup(self):
2218
for count, log_file_path in enumerate(self._log_files):
2219
# We use buffer_now=True to avoid holding the file open beyond
2220
# the life of this function, which might interfere with e.g.
2221
# cleaning tempdirs on Windows.
2222
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
#detail_content = content.content_from_file(
2224
# log_file_path, buffer_now=True)
2225
with open(log_file_path, 'rb') as log_file:
2226
log_file_bytes = log_file.read()
2227
detail_content = content.Content(content.ContentType("text",
2228
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2232
2059
def _popen(self, *args, **kwargs):
2233
2060
"""Place a call to Popen.
2277
2104
% (process_args, retcode, process.returncode))
2278
2105
return [out, err]
2280
def check_tree_shape(self, tree, shape):
2281
"""Compare a tree to a list of expected names.
2107
def check_inventory_shape(self, inv, shape):
2108
"""Compare an inventory to a list of expected names.
2283
2110
Fail if they are not precisely equal.
2286
2113
shape = list(shape) # copy
2287
for path, ie in tree.iter_entries_by_dir():
2114
for path, ie in inv.entries():
2288
2115
name = path.replace('\\', '/')
2289
2116
if ie.kind == 'directory':
2290
2117
name = name + '/'
2292
pass # ignore root entry
2294
2119
shape.remove(name)
2296
2121
extras.append(name)
2386
2211
class TestCaseWithMemoryTransport(TestCase):
2387
2212
"""Common test class for tests that do not need disk resources.
2389
Tests that need disk resources should derive from TestCaseInTempDir
2390
orTestCaseWithTransport.
2214
Tests that need disk resources should derive from TestCaseWithTransport.
2392
2216
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2394
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2218
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2219
a directory which does not exist. This serves to help ensure test isolation
2396
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
defaults for the transport in tests, nor does it obey the command line
2220
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2221
must exist. However, TestCaseWithMemoryTransport does not offer local
2222
file defaults for the transport in tests, nor does it obey the command line
2399
2223
override, so tests that accidentally write to the common directory should
2402
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
``.bzr`` directory that stops us ascending higher into the filesystem.
2226
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2227
a .bzr directory that stops us ascending higher into the filesystem.
2406
2230
TEST_ROOT = None
2566
2389
root = TestCaseWithMemoryTransport.TEST_ROOT
2568
# Make sure we get a readable and accessible home for .bzr.log
2569
# and/or config files, and not fallback to weird defaults (see
2570
# http://pad.lv/825027).
2571
self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
os.environ['BZR_HOME'] = root
2573
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
del os.environ['BZR_HOME']
2575
except Exception, e:
2576
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
# Hack for speed: remember the raw bytes of the dirstate file so that
2578
# we don't need to re-open the wt to check it hasn't changed.
2579
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
wt.control_transport.get_bytes('dirstate'))
2390
bzrdir.BzrDir.create_standalone_workingtree(root)
2582
2392
def _check_safety_net(self):
2583
2393
"""Check that the safety .bzr directory have not been touched.
2630
2440
def make_branch(self, relpath, format=None):
2631
2441
"""Create a branch on the transport at relpath."""
2632
2442
repo = self.make_repository(relpath, format=format)
2633
return repo.bzrdir.create_branch(append_revisions_only=False)
2635
def resolve_format(self, format):
2636
"""Resolve an object to a ControlDir format object.
2638
The initial format object can either already be
2639
a ControlDirFormat, None (for the default format),
2640
or a string with the name of the control dir format.
2642
:param format: Object to resolve
2643
:return A ControlDirFormat instance
2647
if isinstance(format, basestring):
2648
format = bzrdir.format_registry.make_bzrdir(format)
2651
def resolve_format(self, format):
2652
"""Resolve an object to a ControlDir format object.
2654
The initial format object can either already be
2655
a ControlDirFormat, None (for the default format),
2656
or a string with the name of the control dir format.
2658
:param format: Object to resolve
2659
:return A ControlDirFormat instance
2663
if isinstance(format, basestring):
2664
format = bzrdir.format_registry.make_bzrdir(format)
2443
return repo.bzrdir.create_branch()
2667
2445
def make_bzrdir(self, relpath, format=None):
2672
2450
t = _mod_transport.get_transport(maybe_a_url)
2673
2451
if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2452
t.ensure_base()
2675
format = self.resolve_format(format)
2455
if isinstance(format, basestring):
2456
format = bzrdir.format_registry.make_bzrdir(format)
2676
2457
return format.initialize_on_transport(t)
2677
2458
except errors.UninitializableFormat:
2678
2459
raise TestSkipped("Format %s is not initializable." % format)
2680
def make_repository(self, relpath, shared=None, format=None):
2461
def make_repository(self, relpath, shared=False, format=None):
2681
2462
"""Create a repository on our default transport at relpath.
2683
2464
Note that relpath must be a relative path, not a full url.
2717
2498
def setUp(self):
2718
2499
super(TestCaseWithMemoryTransport, self).setUp()
2719
2500
# Ensure that ConnectedTransport doesn't leak sockets
2720
def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
t = orig_get_transport_from_url(*args, **kwargs)
2501
def get_transport_with_cleanup(*args, **kwargs):
2502
t = orig_get_transport(*args, **kwargs)
2722
2503
if isinstance(t, _mod_transport.ConnectedTransport):
2723
2504
self.addCleanup(t.disconnect)
2726
orig_get_transport_from_url = self.overrideAttr(
2727
_mod_transport, 'get_transport_from_url',
2728
get_transport_from_url_with_cleanup)
2507
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2508
get_transport_with_cleanup)
2729
2509
self._make_test_root()
2730
2510
self.addCleanup(os.chdir, os.getcwdu())
2731
2511
self.makeAndChdirToTestDir()
3975
3746
'bzrlib.tests.test_commit_merge',
3976
3747
'bzrlib.tests.test_config',
3977
3748
'bzrlib.tests.test_conflicts',
3978
'bzrlib.tests.test_controldir',
3979
3749
'bzrlib.tests.test_counted_lock',
3980
3750
'bzrlib.tests.test_crash',
3981
3751
'bzrlib.tests.test_decorators',
3982
3752
'bzrlib.tests.test_delta',
3983
3753
'bzrlib.tests.test_debug',
3754
'bzrlib.tests.test_deprecated_graph',
3984
3755
'bzrlib.tests.test_diff',
3985
3756
'bzrlib.tests.test_directory_service',
3986
3757
'bzrlib.tests.test_dirstate',
3987
3758
'bzrlib.tests.test_email_message',
3988
3759
'bzrlib.tests.test_eol_filters',
3989
3760
'bzrlib.tests.test_errors',
3990
'bzrlib.tests.test_estimate_compressed_size',
3991
3761
'bzrlib.tests.test_export',
3992
'bzrlib.tests.test_export_pot',
3993
3762
'bzrlib.tests.test_extract',
3994
'bzrlib.tests.test_features',
3995
3763
'bzrlib.tests.test_fetch',
3996
3764
'bzrlib.tests.test_fixtures',
3997
3765
'bzrlib.tests.test_fifo_cache',
3998
3766
'bzrlib.tests.test_filters',
3999
'bzrlib.tests.test_filter_tree',
4000
3767
'bzrlib.tests.test_ftp_transport',
4001
3768
'bzrlib.tests.test_foreign',
4002
3769
'bzrlib.tests.test_generate_docs',
4441
4205
% (os.path.basename(dirname), printable_e))
4208
class Feature(object):
4209
"""An operating system Feature."""
4212
self._available = None
4214
def available(self):
4215
"""Is the feature available?
4217
:return: True if the feature is available.
4219
if self._available is None:
4220
self._available = self._probe()
4221
return self._available
4224
"""Implement this method in concrete features.
4226
:return: True if the feature is available.
4228
raise NotImplementedError
4231
if getattr(self, 'feature_name', None):
4232
return self.feature_name()
4233
return self.__class__.__name__
4236
class _SymlinkFeature(Feature):
4239
return osutils.has_symlinks()
4241
def feature_name(self):
4244
SymlinkFeature = _SymlinkFeature()
4247
class _HardlinkFeature(Feature):
4250
return osutils.has_hardlinks()
4252
def feature_name(self):
4255
HardlinkFeature = _HardlinkFeature()
4258
class _OsFifoFeature(Feature):
4261
return getattr(os, 'mkfifo', None)
4263
def feature_name(self):
4264
return 'filesystem fifos'
4266
OsFifoFeature = _OsFifoFeature()
4269
class _UnicodeFilenameFeature(Feature):
4270
"""Does the filesystem support Unicode filenames?"""
4274
# Check for character combinations unlikely to be covered by any
4275
# single non-unicode encoding. We use the characters
4276
# - greek small letter alpha (U+03B1) and
4277
# - braille pattern dots-123456 (U+283F).
4278
os.stat(u'\u03b1\u283f')
4279
except UnicodeEncodeError:
4281
except (IOError, OSError):
4282
# The filesystem allows the Unicode filename but the file doesn't
4286
# The filesystem allows the Unicode filename and the file exists,
4290
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4293
class _CompatabilityThunkFeature(Feature):
4294
"""This feature is just a thunk to another feature.
4296
It issues a deprecation warning if it is accessed, to let you know that you
4297
should really use a different feature.
4300
def __init__(self, dep_version, module, name,
4301
replacement_name, replacement_module=None):
4302
super(_CompatabilityThunkFeature, self).__init__()
4303
self._module = module
4304
if replacement_module is None:
4305
replacement_module = module
4306
self._replacement_module = replacement_module
4308
self._replacement_name = replacement_name
4309
self._dep_version = dep_version
4310
self._feature = None
4313
if self._feature is None:
4314
depr_msg = self._dep_version % ('%s.%s'
4315
% (self._module, self._name))
4316
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4317
self._replacement_name)
4318
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4319
# Import the new feature and use it as a replacement for the
4321
self._feature = pyutils.get_named_object(
4322
self._replacement_module, self._replacement_name)
4326
return self._feature._probe()
4329
class ModuleAvailableFeature(Feature):
4330
"""This is a feature than describes a module we want to be available.
4332
Declare the name of the module in __init__(), and then after probing, the
4333
module will be available as 'self.module'.
4335
:ivar module: The module if it is available, else None.
4338
def __init__(self, module_name):
4339
super(ModuleAvailableFeature, self).__init__()
4340
self.module_name = module_name
4344
self._module = __import__(self.module_name, {}, {}, [''])
4351
if self.available(): # Make sure the probe has been done
4355
def feature_name(self):
4356
return self.module_name
4444
4359
def probe_unicode_in_user_encoding():
4445
4360
"""Try to encode several unicode strings to use in unicode-aware tests.
4446
4361
Return first successfull match.
4392
class _HTTPSServerFeature(Feature):
4393
"""Some tests want an https Server, check if one is available.
4395
Right now, the only way this is available is under python2.6 which provides
4406
def feature_name(self):
4407
return 'HTTPSServer'
4410
HTTPSServerFeature = _HTTPSServerFeature()
4413
class _UnicodeFilename(Feature):
4414
"""Does the filesystem support Unicode filenames?"""
4419
except UnicodeEncodeError:
4421
except (IOError, OSError):
4422
# The filesystem allows the Unicode filename but the file doesn't
4426
# The filesystem allows the Unicode filename and the file exists,
4430
UnicodeFilename = _UnicodeFilename()
4433
class _ByteStringNamedFilesystem(Feature):
4434
"""Is the filesystem based on bytes?"""
4437
if os.name == "posix":
4441
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4444
class _UTF8Filesystem(Feature):
4445
"""Is the filesystem UTF-8?"""
4448
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4452
UTF8Filesystem = _UTF8Filesystem()
4455
class _BreakinFeature(Feature):
4456
"""Does this platform support the breakin feature?"""
4459
from bzrlib import breakin
4460
if breakin.determine_signal() is None:
4462
if sys.platform == 'win32':
4463
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4464
# We trigger SIGBREAK via a Console api so we need ctypes to
4465
# access the function
4472
def feature_name(self):
4473
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4476
BreakinFeature = _BreakinFeature()
4479
class _CaseInsCasePresFilenameFeature(Feature):
4480
"""Is the file-system case insensitive, but case-preserving?"""
4483
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4485
# first check truly case-preserving for created files, then check
4486
# case insensitive when opening existing files.
4487
name = osutils.normpath(name)
4488
base, rel = osutils.split(name)
4489
found_rel = osutils.canonical_relpath(base, name)
4490
return (found_rel == rel
4491
and os.path.isfile(name.upper())
4492
and os.path.isfile(name.lower()))
4497
def feature_name(self):
4498
return "case-insensitive case-preserving filesystem"
4500
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4503
class _CaseInsensitiveFilesystemFeature(Feature):
4504
"""Check if underlying filesystem is case-insensitive but *not* case
4507
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4508
# more likely to be case preserving, so this case is rare.
4511
if CaseInsCasePresFilenameFeature.available():
4514
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4515
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4516
TestCaseWithMemoryTransport.TEST_ROOT = root
4518
root = TestCaseWithMemoryTransport.TEST_ROOT
4519
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4521
name_a = osutils.pathjoin(tdir, 'a')
4522
name_A = osutils.pathjoin(tdir, 'A')
4524
result = osutils.isdir(name_A)
4525
_rmtree_temp_dir(tdir)
4528
def feature_name(self):
4529
return 'case-insensitive filesystem'
4531
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4534
class _CaseSensitiveFilesystemFeature(Feature):
4537
if CaseInsCasePresFilenameFeature.available():
4539
elif CaseInsensitiveFilesystemFeature.available():
4544
def feature_name(self):
4545
return 'case-sensitive filesystem'
4547
# new coding style is for feature instances to be lowercase
4548
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4477
4551
# Only define SubUnitBzrRunner if subunit is available.
4479
4553
from subunit import TestProtocolClient
4497
4571
except ImportError:
4501
@deprecated_function(deprecated_in((2, 5, 0)))
4502
def ModuleAvailableFeature(name):
4503
from bzrlib.tests import features
4504
return features.ModuleAvailableFeature(name)
4574
class _PosixPermissionsFeature(Feature):
4578
# create temporary file and check if specified perms are maintained.
4581
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4582
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4585
os.chmod(name, write_perms)
4587
read_perms = os.stat(name).st_mode & 0777
4589
return (write_perms == read_perms)
4591
return (os.name == 'posix') and has_perms()
4593
def feature_name(self):
4594
return 'POSIX permissions support'
4596
posix_permissions_feature = _PosixPermissionsFeature()