53
56
# nb: check this before importing anything else from within it
54
57
_testtools_version = getattr(testtools, '__version__', ())
55
if _testtools_version < (0, 9, 5):
56
raise ImportError("need at least testtools 0.9.5: %s is %r"
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
57
60
% (testtools.__file__, _testtools_version))
58
61
from testtools import content
61
63
from bzrlib import (
65
commands as _mod_commands,
75
plugin as _mod_plugin,
82
78
transport as _mod_transport,
82
import bzrlib.commands
83
import bzrlib.timestamp
85
import bzrlib.inventory
86
import bzrlib.iterablefile
86
89
import bzrlib.lsprof
87
90
except ImportError:
88
91
# lsprof not available
90
from bzrlib.smart import client, request
93
from bzrlib.merge import merge_inner
96
from bzrlib.smart import client, request, server
98
from bzrlib import symbol_versioning
99
from bzrlib.symbol_versioning import (
100
DEPRECATED_PARAMETER,
91
107
from bzrlib.transport import (
95
from bzrlib.symbol_versioning import (
111
from bzrlib.trace import mutter, note
99
112
from bzrlib.tests import (
105
117
from bzrlib.ui import NullProgressView
106
118
from bzrlib.ui.text import TextUIFactory
107
from bzrlib.tests.features import _CompatabilityThunkFeature
119
import bzrlib.version_info_formats.format_custom
120
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
109
122
# Mark this python module as being part of the implementation
110
123
# of unittest: this gives us better tracebacks where the last
122
135
SUBUNIT_SEEK_SET = 0
123
136
SUBUNIT_SEEK_CUR = 1
125
# These are intentionally brought into this namespace. That way plugins, etc
126
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
127
TestSuite = TestUtil.TestSuite
128
TestLoader = TestUtil.TestLoader
130
# Tests should run in a clean and clearly defined environment. The goal is to
131
# keep them isolated from the running environment as mush as possible. The test
132
# framework ensures the variables defined below are set (or deleted if the
133
# value is None) before a test is run and reset to their original value after
134
# the test is run. Generally if some code depends on an environment variable,
135
# the tests should start without this variable in the environment. There are a
136
# few exceptions but you shouldn't violate this rule lightly.
140
'XDG_CONFIG_HOME': None,
141
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
142
# tests do check our impls match APPDATA
143
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
147
'BZREMAIL': None, # may still be present in the environment
148
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
149
'BZR_PROGRESS_BAR': None,
150
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
151
# as a base class instead of TestCaseInTempDir. Tests inheriting from
152
# TestCase should not use disk resources, BZR_LOG is one.
153
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
154
'BZR_PLUGIN_PATH': None,
155
'BZR_DISABLE_PLUGINS': None,
156
'BZR_PLUGINS_AT': None,
157
'BZR_CONCURRENCY': None,
158
# Make sure that any text ui tests are consistent regardless of
159
# the environment the test case is run in; you may want tests that
160
# test other combinations. 'dumb' is a reasonable guess for tests
161
# going to a pipe or a StringIO.
167
'SSH_AUTH_SOCK': None,
177
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
178
# least. If you do (care), please update this comment
182
'BZR_REMOTE_PATH': None,
183
# Generally speaking, we don't want apport reporting on crashes in
184
# the test envirnoment unless we're specifically testing apport,
185
# so that it doesn't leak into the real system environment. We
186
# use an env var so it propagates to subprocesses.
187
'APPORT_DISABLE': '1',
191
def override_os_environ(test, env=None):
192
"""Modify os.environ keeping a copy.
194
:param test: A test instance
196
:param env: A dict containing variable definitions to be installed
199
env = isolated_environ
200
test._original_os_environ = dict([(var, value)
201
for var, value in os.environ.iteritems()])
202
for var, value in env.iteritems():
203
osutils.set_or_unset_env(var, value)
204
if var not in test._original_os_environ:
205
# The var is new, add it with a value of None, so
206
# restore_os_environ will delete it
207
test._original_os_environ[var] = None
210
def restore_os_environ(test):
211
"""Restore os.environ to its original state.
213
:param test: A test instance previously passed to override_os_environ.
215
for var, value in test._original_os_environ.iteritems():
216
# Restore the original value (or delete it if the value has been set to
217
# None in override_os_environ).
218
osutils.set_or_unset_env(var, value)
221
def _clear__type_equality_funcs(test):
222
"""Cleanup bound methods stored on TestCase instances
224
Clear the dict breaking a few (mostly) harmless cycles in the affected
225
unittests released with Python 2.6 and initial Python 2.7 versions.
227
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
228
shipped in Oneiric, an object with no clear method was used, hence the
229
extra complications, see bug 809048 for details.
231
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
232
if type_equality_funcs is not None:
233
tef_clear = getattr(type_equality_funcs, "clear", None)
234
if tef_clear is None:
235
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
236
if tef_instance_dict is not None:
237
tef_clear = tef_instance_dict.clear
238
if tef_clear is not None:
242
139
class ExtendedTestResult(testtools.TextTestResult):
243
140
"""Accepts, reports and accumulates the results of running tests.
382
275
what = re.sub(r'^bzrlib\.tests\.', '', what)
385
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
386
# multiple times in a row, because the handler is added for
387
# each test but the container list is shared between cases.
388
# See lp:498869 lp:625574 and lp:637725 for background.
389
def _record_traceback_from_test(self, exc_info):
390
"""Store the traceback from passed exc_info tuple till"""
391
self._traceback_from_test = exc_info[2]
393
278
def startTest(self, test):
394
279
super(ExtendedTestResult, self).startTest(test)
395
280
if self.count == 0:
396
281
self.startTests()
398
282
self.report_test_start(test)
399
283
test.number = self.count
400
284
self._recordTestStartTime()
401
# Make testtools cases give us the real traceback on failure
402
addOnException = getattr(test, "addOnException", None)
403
if addOnException is not None:
404
addOnException(self._record_traceback_from_test)
405
# Only check for thread leaks on bzrlib derived test cases
406
if isinstance(test, TestCase):
407
test.addCleanup(self._check_leaked_threads, test)
409
def stopTest(self, test):
410
super(ExtendedTestResult, self).stopTest(test)
411
# Manually break cycles, means touching various private things but hey
412
getDetails = getattr(test, "getDetails", None)
413
if getDetails is not None:
415
_clear__type_equality_funcs(test)
416
self._traceback_from_test = None
418
286
def startTests(self):
419
self.report_tests_starting()
420
self._active_threads = threading.enumerate()
422
def _check_leaked_threads(self, test):
423
"""See if any threads have leaked since last call
425
A sample of live threads is stored in the _active_threads attribute,
426
when this method runs it compares the current live threads and any not
427
in the previous sample are treated as having leaked.
429
now_active_threads = set(threading.enumerate())
430
threads_leaked = now_active_threads.difference(self._active_threads)
432
self._report_thread_leak(test, threads_leaked, now_active_threads)
433
self._tests_leaking_threads_count += 1
434
if self._first_thread_leaker_id is None:
435
self._first_thread_leaker_id = test.id()
436
self._active_threads = now_active_threads
288
if getattr(sys, 'frozen', None) is None:
289
bzr_path = osutils.realpath(sys.argv[0])
291
bzr_path = sys.executable
293
'bzr selftest: %s\n' % (bzr_path,))
296
bzrlib.__path__[0],))
298
' bzr-%s python-%s %s\n' % (
299
bzrlib.version_string,
300
bzrlib._format_version_tuple(sys.version_info),
301
platform.platform(aliased=1),
303
self.stream.write('\n')
438
305
def _recordTestStartTime(self):
439
306
"""Record that a test has started."""
440
self._start_datetime = self._now()
307
self._start_time = time.time()
309
def _cleanupLogFile(self, test):
310
# We can only do this if we have one of our TestCases, not if
312
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
313
if setKeepLogfile is not None:
442
316
def addError(self, test, err):
443
317
"""Tell result that test finished with an error.
539
398
raise errors.BzrError("Unknown whence %r" % whence)
541
def report_tests_starting(self):
542
"""Display information before the test run begins"""
543
if getattr(sys, 'frozen', None) is None:
544
bzr_path = osutils.realpath(sys.argv[0])
546
bzr_path = sys.executable
548
'bzr selftest: %s\n' % (bzr_path,))
551
bzrlib.__path__[0],))
553
' bzr-%s python-%s %s\n' % (
554
bzrlib.version_string,
555
bzrlib._format_version_tuple(sys.version_info),
556
platform.platform(aliased=1),
558
self.stream.write('\n')
560
def report_test_start(self, test):
561
"""Display information on the test just about to be run"""
563
def _report_thread_leak(self, test, leaked_threads, active_threads):
564
"""Display information on a test that leaked one or more threads"""
565
# GZ 2010-09-09: A leak summary reported separately from the general
566
# thread debugging would be nice. Tests under subunit
567
# need something not using stream, perhaps adding a
568
# testtools details object would be fitting.
569
if 'threads' in selftest_debug_flags:
570
self.stream.write('%s is leaking, active is now %d\n' %
571
(test.id(), len(active_threads)))
400
def report_cleaning_up(self):
573
403
def startTestRun(self):
574
404
self.startTime = time.time()
1025
824
self._track_transports()
1026
825
self._track_locks()
1027
826
self._clear_debug_flags()
1028
# Isolate global verbosity level, to make sure it's reproducible
1029
# between tests. We should get rid of this altogether: bug 656694. --
1031
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1032
self._log_files = set()
1033
# Each key in the ``_counters`` dict holds a value for a different
1034
# counter. When the test ends, addDetail() should be used to output the
1035
# counter values. This happens in install_counter_hook().
1037
if 'config_stats' in selftest_debug_flags:
1038
self._install_config_stats_hooks()
1039
# Do not use i18n for tests (unless the test reverses this)
827
TestCase._active_threads = threading.activeCount()
828
self.addCleanup(self._check_leaked_threads)
1042
830
def debug(self):
1043
831
# debug a frame up.
1045
# The sys preserved stdin/stdout should allow blackbox tests debugging
1046
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1047
).set_trace(sys._getframe().f_back)
1049
def discardDetail(self, name):
1050
"""Extend the addDetail, getDetails api so we can remove a detail.
1052
eg. bzr always adds the 'log' detail at startup, but we don't want to
1053
include it for skipped, xfail, etc tests.
1055
It is safe to call this for a detail that doesn't exist, in case this
1056
gets called multiple times.
1058
# We cheat. details is stored in __details which means we shouldn't
1059
# touch it. but getDetails() returns the dict directly, so we can
1061
details = self.getDetails()
1065
def install_counter_hook(self, hooks, name, counter_name=None):
1066
"""Install a counting hook.
1068
Any hook can be counted as long as it doesn't need to return a value.
1070
:param hooks: Where the hook should be installed.
1072
:param name: The hook name that will be counted.
1074
:param counter_name: The counter identifier in ``_counters``, defaults
1077
_counters = self._counters # Avoid closing over self
1078
if counter_name is None:
1080
if _counters.has_key(counter_name):
1081
raise AssertionError('%s is already used as a counter name'
1083
_counters[counter_name] = 0
1084
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1085
lambda: ['%d' % (_counters[counter_name],)]))
1086
def increment_counter(*args, **kwargs):
1087
_counters[counter_name] += 1
1088
label = 'count %s calls' % (counter_name,)
1089
hooks.install_named_hook(name, increment_counter, label)
1090
self.addCleanup(hooks.uninstall_named_hook, name, label)
1092
def _install_config_stats_hooks(self):
1093
"""Install config hooks to count hook calls.
1096
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1097
self.install_counter_hook(config.ConfigHooks, hook_name,
1098
'config.%s' % (hook_name,))
1100
# The OldConfigHooks are private and need special handling to protect
1101
# against recursive tests (tests that run other tests), so we just do
1102
# manually what registering them into _builtin_known_hooks will provide
1104
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1105
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1106
self.install_counter_hook(config.OldConfigHooks, hook_name,
1107
'old_config.%s' % (hook_name,))
833
pdb.Pdb().set_trace(sys._getframe().f_back)
835
def _check_leaked_threads(self):
836
active = threading.activeCount()
837
leaked_threads = active - TestCase._active_threads
838
TestCase._active_threads = active
839
# If some tests make the number of threads *decrease*, we'll consider
840
# that they are just observing old threads dieing, not agressively kill
841
# random threads. So we don't report these tests as leaking. The risk
842
# is that we have false positives that way (the test see 2 threads
843
# going away but leak one) but it seems less likely than the actual
844
# false positives (the test see threads going away and does not leak).
845
if leaked_threads > 0:
846
if 'threads' in selftest_debug_flags:
847
print '%s is leaking, active is now %d' % (self.id(), active)
848
TestCase._leaking_threads_tests += 1
849
if TestCase._first_thread_leaker_id is None:
850
TestCase._first_thread_leaker_id = self.id()
1109
852
def _clear_debug_flags(self):
1110
853
"""Prevent externally set debug flags affecting tests.
1591
1321
self.assertEqual(expected_docstring, obj.__doc__)
1593
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1594
1323
def failUnlessExists(self, path):
1595
return self.assertPathExists(path)
1597
def assertPathExists(self, path):
1598
1324
"""Fail unless path or paths, which may be abs or relative, exist."""
1599
1325
if not isinstance(path, basestring):
1601
self.assertPathExists(p)
1327
self.failUnlessExists(p)
1603
self.assertTrue(osutils.lexists(path),
1604
path + " does not exist")
1329
self.failUnless(osutils.lexists(path),path+" does not exist")
1606
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1607
1331
def failIfExists(self, path):
1608
return self.assertPathDoesNotExist(path)
1610
def assertPathDoesNotExist(self, path):
1611
1332
"""Fail if path or paths, which may be abs or relative, exist."""
1612
1333
if not isinstance(path, basestring):
1614
self.assertPathDoesNotExist(p)
1335
self.failIfExists(p)
1616
self.assertFalse(osutils.lexists(path),
1337
self.failIf(osutils.lexists(path),path+" exists")
1619
1339
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1620
1340
"""A helper for callDeprecated and applyDeprecated.
1733
1452
def _startLogFile(self):
1734
"""Setup a in-memory target for bzr and testcase log messages"""
1735
pseudo_log_file = StringIO()
1736
def _get_log_contents_for_weird_testtools_api():
1737
return [pseudo_log_file.getvalue().decode(
1738
"utf-8", "replace").encode("utf-8")]
1739
self.addDetail("log", content.Content(content.ContentType("text",
1740
"plain", {"charset": "utf8"}),
1741
_get_log_contents_for_weird_testtools_api))
1742
self._log_file = pseudo_log_file
1743
self._log_memento = trace.push_log_file(self._log_file)
1453
"""Send bzr and test log messages to a temporary file.
1455
The file is removed as the test is torn down.
1457
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1458
self._log_file = os.fdopen(fileno, 'w+')
1459
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1460
self._log_file_name = name
1744
1461
self.addCleanup(self._finishLogFile)
1746
1463
def _finishLogFile(self):
1747
"""Flush and dereference the in-memory log for this testcase"""
1748
if trace._trace_file:
1464
"""Finished with the log file.
1466
Close the file and delete it, unless setKeepLogfile was called.
1468
if bzrlib.trace._trace_file:
1749
1469
# flush the log file, to get all content
1750
trace._trace_file.flush()
1751
trace.pop_log_file(self._log_memento)
1752
# The logging module now tracks references for cleanup so discard ours
1753
del self._log_memento
1470
bzrlib.trace._trace_file.flush()
1471
bzrlib.trace.pop_log_file(self._log_memento)
1472
# Cache the log result and delete the file on disk
1473
self._get_log(False)
1755
1475
def thisFailsStrictLockCheck(self):
1756
1476
"""It is known that this test would fail with -Dstrict_locks.
1781
1506
:returns: The actual attr value.
1508
value = getattr(obj, attr_name)
1783
1509
# The actual value is captured by the call below
1784
value = getattr(obj, attr_name, _unitialized_attr)
1785
if value is _unitialized_attr:
1786
# When the test completes, the attribute should not exist, but if
1787
# we aren't setting a value, we don't need to do anything.
1788
if new is not _unitialized_attr:
1789
self.addCleanup(delattr, obj, attr_name)
1791
self.addCleanup(setattr, obj, attr_name, value)
1510
self.addCleanup(setattr, obj, attr_name, value)
1792
1511
if new is not _unitialized_attr:
1793
1512
setattr(obj, attr_name, new)
1796
def overrideEnv(self, name, new):
1797
"""Set an environment variable, and reset it after the test.
1799
:param name: The environment variable name.
1801
:param new: The value to set the variable to. If None, the
1802
variable is deleted from the environment.
1804
:returns: The actual variable value.
1806
value = osutils.set_or_unset_env(name, new)
1807
self.addCleanup(osutils.set_or_unset_env, name, value)
1810
def recordCalls(self, obj, attr_name):
1811
"""Monkeypatch in a wrapper that will record calls.
1813
The monkeypatch is automatically removed when the test concludes.
1815
:param obj: The namespace holding the reference to be replaced;
1816
typically a module, class, or object.
1817
:param attr_name: A string for the name of the attribute to
1819
:returns: A list that will be extended with one item every time the
1820
function is called, with a tuple of (args, kwargs).
1824
def decorator(*args, **kwargs):
1825
calls.append((args, kwargs))
1826
return orig(*args, **kwargs)
1827
orig = self.overrideAttr(obj, attr_name, decorator)
1830
1515
def _cleanEnvironment(self):
1831
for name, value in isolated_environ.iteritems():
1832
self.overrideEnv(name, value)
1517
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1518
'HOME': os.getcwd(),
1519
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1520
# tests do check our impls match APPDATA
1521
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1525
'BZREMAIL': None, # may still be present in the environment
1526
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1527
'BZR_PROGRESS_BAR': None,
1529
'BZR_PLUGIN_PATH': None,
1530
'BZR_DISABLE_PLUGINS': None,
1531
'BZR_PLUGINS_AT': None,
1532
'BZR_CONCURRENCY': None,
1533
# Make sure that any text ui tests are consistent regardless of
1534
# the environment the test case is run in; you may want tests that
1535
# test other combinations. 'dumb' is a reasonable guess for tests
1536
# going to a pipe or a StringIO.
1540
'BZR_COLUMNS': '80',
1542
'SSH_AUTH_SOCK': None,
1546
'https_proxy': None,
1547
'HTTPS_PROXY': None,
1552
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1553
# least. If you do (care), please update this comment
1557
'BZR_REMOTE_PATH': None,
1558
# Generally speaking, we don't want apport reporting on crashes in
1559
# the test envirnoment unless we're specifically testing apport,
1560
# so that it doesn't leak into the real system environment. We
1561
# use an env var so it propagates to subprocesses.
1562
'APPORT_DISABLE': '1',
1565
self.addCleanup(self._restoreEnvironment)
1566
for name, value in new_env.iteritems():
1567
self._captureVar(name, value)
1569
def _captureVar(self, name, newvalue):
1570
"""Set an environment variable, and reset it when finished."""
1571
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1573
def _restoreEnvironment(self):
1574
for name, value in self._old_env.iteritems():
1575
osutils.set_or_unset_env(name, value)
1834
1577
def _restoreHooks(self):
1835
1578
for klass, (name, hooks) in self._preserved_hooks.items():
1836
1579
setattr(klass, name, hooks)
1837
self._preserved_hooks.clear()
1838
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1839
self._preserved_lazy_hooks.clear()
1841
1581
def knownFailure(self, reason):
1842
"""Declare that this test fails for a known reason
1844
Tests that are known to fail should generally be using expectedFailure
1845
with an appropriate reverse assertion if a change could cause the test
1846
to start passing. Conversely if the test has no immediate prospect of
1847
succeeding then using skip is more suitable.
1849
When this method is called while an exception is being handled, that
1850
traceback will be used, otherwise a new exception will be thrown to
1851
provide one but won't be reported.
1853
self._add_reason(reason)
1855
exc_info = sys.exc_info()
1856
if exc_info != (None, None, None):
1857
self._report_traceback(exc_info)
1860
raise self.failureException(reason)
1861
except self.failureException:
1862
exc_info = sys.exc_info()
1863
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1864
raise testtools.testcase._ExpectedFailure(exc_info)
1868
def _suppress_log(self):
1869
"""Remove the log info from details."""
1870
self.discardDetail('log')
1582
"""This test has failed for some known reason."""
1583
raise KnownFailure(reason)
1872
1585
def _do_skip(self, result, reason):
1873
self._suppress_log()
1874
1586
addSkip = getattr(result, 'addSkip', None)
1875
1587
if not callable(addSkip):
1876
1588
result.addSuccess(result)
1955
1644
self._benchtime += time.time() - start
1957
1646
def log(self, *args):
1649
def _get_log(self, keep_log_file=False):
1650
"""Internal helper to get the log from bzrlib.trace for this test.
1652
Please use self.getDetails, or self.get_log to access this in test case
1655
:param keep_log_file: When True, if the log is still a file on disk
1656
leave it as a file on disk. When False, if the log is still a file
1657
on disk, the log file is deleted and the log preserved as
1659
:return: A string containing the log.
1661
if self._log_contents is not None:
1663
self._log_contents.decode('utf8')
1664
except UnicodeDecodeError:
1665
unicodestr = self._log_contents.decode('utf8', 'replace')
1666
self._log_contents = unicodestr.encode('utf8')
1667
return self._log_contents
1669
if bzrlib.trace._trace_file:
1670
# flush the log file, to get all content
1671
bzrlib.trace._trace_file.flush()
1672
if self._log_file_name is not None:
1673
logfile = open(self._log_file_name)
1675
log_contents = logfile.read()
1679
log_contents.decode('utf8')
1680
except UnicodeDecodeError:
1681
unicodestr = log_contents.decode('utf8', 'replace')
1682
log_contents = unicodestr.encode('utf8')
1683
if not keep_log_file:
1685
max_close_attempts = 100
1686
first_close_error = None
1687
while close_attempts < max_close_attempts:
1690
self._log_file.close()
1691
except IOError, ioe:
1692
if ioe.errno is None:
1693
# No errno implies 'close() called during
1694
# concurrent operation on the same file object', so
1695
# retry. Probably a thread is trying to write to
1697
if first_close_error is None:
1698
first_close_error = ioe
1703
if close_attempts > 1:
1705
'Unable to close log file on first attempt, '
1706
'will retry: %s\n' % (first_close_error,))
1707
if close_attempts == max_close_attempts:
1709
'Unable to close log file after %d attempts.\n'
1710
% (max_close_attempts,))
1711
self._log_file = None
1712
# Permit multiple calls to get_log until we clean it up in
1714
self._log_contents = log_contents
1716
os.remove(self._log_file_name)
1718
if sys.platform == 'win32' and e.errno == errno.EACCES:
1719
sys.stderr.write(('Unable to delete log file '
1720
' %r\n' % self._log_file_name))
1723
self._log_file_name = None
1726
return "No log file content and no log file name."
1960
1728
def get_log(self):
1961
1729
"""Get a unicode string containing the log from bzrlib.trace.
2177
1944
variables. A value of None will unset the env variable.
2178
1945
The values must be strings. The change will only occur in the
2179
1946
child, so you don't need to fix the environment after running.
2180
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2181
doesn't support signalling subprocesses.
1947
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2182
1949
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2183
:param stderr: file to use for the subprocess's stderr. Valid values
2184
are those valid for the stderr argument of `subprocess.Popen`.
2185
Default value is ``subprocess.PIPE``.
2187
1951
:returns: Popen object for the started process.
2189
1953
if skip_if_plan_to_signal:
2190
if os.name != "posix":
2191
raise TestSkipped("Sending signals not supported")
1954
if not getattr(os, 'kill', None):
1955
raise TestSkipped("os.kill not available.")
2193
1957
if env_changes is None:
2194
1958
env_changes = {}
2195
# Because $HOME is set to a tempdir for the context of a test, modules
2196
# installed in the user dir will not be found unless $PYTHONUSERBASE
2197
# gets set to the computed directory of this parent process.
2198
if site.USER_BASE is not None:
2199
env_changes["PYTHONUSERBASE"] = site.USER_BASE
2202
1961
def cleanup_environment():
2242
def _add_subprocess_log(self, log_file_path):
2243
if len(self._log_files) == 0:
2244
# Register an addCleanup func. We do this on the first call to
2245
# _add_subprocess_log rather than in TestCase.setUp so that this
2246
# addCleanup is registered after any cleanups for tempdirs that
2247
# subclasses might create, which will probably remove the log file
2249
self.addCleanup(self._subprocess_log_cleanup)
2250
# self._log_files is a set, so if a log file is reused we won't grab it
2252
self._log_files.add(log_file_path)
2254
def _subprocess_log_cleanup(self):
2255
for count, log_file_path in enumerate(self._log_files):
2256
# We use buffer_now=True to avoid holding the file open beyond
2257
# the life of this function, which might interfere with e.g.
2258
# cleaning tempdirs on Windows.
2259
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2260
#detail_content = content.content_from_file(
2261
# log_file_path, buffer_now=True)
2262
with open(log_file_path, 'rb') as log_file:
2263
log_file_bytes = log_file.read()
2264
detail_content = content.Content(content.ContentType("text",
2265
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2266
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2269
1998
def _popen(self, *args, **kwargs):
2270
1999
"""Place a call to Popen.
2308
2037
if retcode is not None and retcode != process.returncode:
2309
2038
if process_args is None:
2310
2039
process_args = "(unknown args)"
2311
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2312
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2040
mutter('Output of bzr %s:\n%s', process_args, out)
2041
mutter('Error for bzr %s:\n%s', process_args, err)
2313
2042
self.fail('Command bzr %s failed with retcode %s != %s'
2314
2043
% (process_args, retcode, process.returncode))
2315
2044
return [out, err]
2317
def check_tree_shape(self, tree, shape):
2318
"""Compare a tree to a list of expected names.
2046
def check_inventory_shape(self, inv, shape):
2047
"""Compare an inventory to a list of expected names.
2320
2049
Fail if they are not precisely equal.
2323
2052
shape = list(shape) # copy
2324
for path, ie in tree.iter_entries_by_dir():
2053
for path, ie in inv.entries():
2325
2054
name = path.replace('\\', '/')
2326
2055
if ie.kind == 'directory':
2327
2056
name = name + '/'
2329
pass # ignore root entry
2331
2058
shape.remove(name)
2333
2060
extras.append(name)
2425
2150
class TestCaseWithMemoryTransport(TestCase):
2426
2151
"""Common test class for tests that do not need disk resources.
2428
Tests that need disk resources should derive from TestCaseInTempDir
2429
orTestCaseWithTransport.
2153
Tests that need disk resources should derive from TestCaseWithTransport.
2431
2155
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2433
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2157
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2434
2158
a directory which does not exist. This serves to help ensure test isolation
2435
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2436
must exist. However, TestCaseWithMemoryTransport does not offer local file
2437
defaults for the transport in tests, nor does it obey the command line
2159
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2160
must exist. However, TestCaseWithMemoryTransport does not offer local
2161
file defaults for the transport in tests, nor does it obey the command line
2438
2162
override, so tests that accidentally write to the common directory should
2441
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2442
``.bzr`` directory that stops us ascending higher into the filesystem.
2165
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2166
a .bzr directory that stops us ascending higher into the filesystem.
2445
2169
TEST_ROOT = None
2455
2179
self.transport_readonly_server = None
2456
2180
self.__vfs_server = None
2459
super(TestCaseWithMemoryTransport, self).setUp()
2461
def _add_disconnect_cleanup(transport):
2462
"""Schedule disconnection of given transport at test cleanup
2464
This needs to happen for all connected transports or leaks occur.
2466
Note reconnections may mean we call disconnect multiple times per
2467
transport which is suboptimal but seems harmless.
2469
self.addCleanup(transport.disconnect)
2471
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2472
_add_disconnect_cleanup, None)
2474
self._make_test_root()
2475
self.addCleanup(os.chdir, os.getcwdu())
2476
self.makeAndChdirToTestDir()
2477
self.overrideEnvironmentForTesting()
2478
self.__readonly_server = None
2479
self.__server = None
2480
self.reduceLockdirTimeout()
2481
# Each test may use its own config files even if the local config files
2482
# don't actually exist. They'll rightly fail if they try to create them
2484
self.overrideAttr(config, '_shared_stores', {})
2486
2182
def get_transport(self, relpath=None):
2487
2183
"""Return a writeable transport.
2694
2376
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2695
2377
self.permit_dir(self.test_dir)
2697
def make_branch(self, relpath, format=None, name=None):
2379
def make_branch(self, relpath, format=None):
2698
2380
"""Create a branch on the transport at relpath."""
2699
2381
repo = self.make_repository(relpath, format=format)
2700
return repo.bzrdir.create_branch(append_revisions_only=False,
2703
def get_default_format(self):
2706
def resolve_format(self, format):
2707
"""Resolve an object to a ControlDir format object.
2709
The initial format object can either already be
2710
a ControlDirFormat, None (for the default format),
2711
or a string with the name of the control dir format.
2713
:param format: Object to resolve
2714
:return A ControlDirFormat instance
2717
format = self.get_default_format()
2718
if isinstance(format, basestring):
2719
format = controldir.format_registry.make_bzrdir(format)
2382
return repo.bzrdir.create_branch()
2722
2384
def make_bzrdir(self, relpath, format=None):
2766
2431
test_home_dir = self.test_home_dir
2767
2432
if isinstance(test_home_dir, unicode):
2768
2433
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2769
self.overrideEnv('HOME', test_home_dir)
2770
self.overrideEnv('BZR_HOME', test_home_dir)
2434
os.environ['HOME'] = test_home_dir
2435
os.environ['BZR_HOME'] = test_home_dir
2438
super(TestCaseWithMemoryTransport, self).setUp()
2439
# Ensure that ConnectedTransport doesn't leak sockets
2440
def get_transport_with_cleanup(*args, **kwargs):
2441
t = orig_get_transport(*args, **kwargs)
2442
if isinstance(t, _mod_transport.ConnectedTransport):
2443
self.addCleanup(t.disconnect)
2446
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
get_transport_with_cleanup)
2448
self._make_test_root()
2449
self.addCleanup(os.chdir, os.getcwdu())
2450
self.makeAndChdirToTestDir()
2451
self.overrideEnvironmentForTesting()
2452
self.__readonly_server = None
2453
self.__server = None
2454
self.reduceLockdirTimeout()
2772
2456
def setup_smart_server_with_call_log(self):
2773
2457
"""Sets up a smart server as the transport server with a call log."""
2774
2458
self.transport_server = test_server.SmartTCPServer_for_testing
2775
self.hpss_connections = []
2776
2459
self.hpss_calls = []
2777
2460
import traceback
2778
2461
# Skip the current stack down to the caller of
3440
3118
"""A decorator which excludes test matching an exclude pattern."""
3442
3120
def __init__(self, suite, exclude_pattern):
3443
super(ExcludeDecorator, self).__init__(
3444
exclude_tests_by_re(suite, exclude_pattern))
3121
TestDecorator.__init__(self, suite)
3122
self.exclude_pattern = exclude_pattern
3123
self.excluded = False
3127
return iter(self._tests)
3128
self.excluded = True
3129
suite = exclude_tests_by_re(self, self.exclude_pattern)
3131
self.addTests(suite)
3132
return iter(self._tests)
3447
3135
class FilterTestsDecorator(TestDecorator):
3448
3136
"""A decorator which filters tests to those matching a pattern."""
3450
3138
def __init__(self, suite, pattern):
3451
super(FilterTestsDecorator, self).__init__(
3452
filter_suite_by_re(suite, pattern))
3139
TestDecorator.__init__(self, suite)
3140
self.pattern = pattern
3141
self.filtered = False
3145
return iter(self._tests)
3146
self.filtered = True
3147
suite = filter_suite_by_re(self, self.pattern)
3149
self.addTests(suite)
3150
return iter(self._tests)
3455
3153
class RandomDecorator(TestDecorator):
3456
3154
"""A decorator which randomises the order of its tests."""
3458
3156
def __init__(self, suite, random_seed, stream):
3459
random_seed = self.actual_seed(random_seed)
3460
stream.write("Randomizing test order using seed %s\n\n" %
3157
TestDecorator.__init__(self, suite)
3158
self.random_seed = random_seed
3159
self.randomised = False
3160
self.stream = stream
3164
return iter(self._tests)
3165
self.randomised = True
3166
self.stream.write("Randomizing test order using seed %s\n\n" %
3167
(self.actual_seed()))
3462
3168
# Initialise the random number generator.
3463
random.seed(random_seed)
3464
super(RandomDecorator, self).__init__(randomize_suite(suite))
3169
random.seed(self.actual_seed())
3170
suite = randomize_suite(self)
3172
self.addTests(suite)
3173
return iter(self._tests)
3467
def actual_seed(seed):
3175
def actual_seed(self):
3176
if self.random_seed == "now":
3469
3177
# We convert the seed to a long to make it reuseable across
3470
3178
# invocations (because the user can reenter it).
3471
return long(time.time())
3179
self.random_seed = long(time.time())
3473
3181
# Convert the seed to a long if we can
3476
except (TypeError, ValueError):
3183
self.random_seed = long(self.random_seed)
3186
return self.random_seed
3481
3189
class TestFirstDecorator(TestDecorator):
3482
3190
"""A decorator which moves named tests to the front."""
3484
3192
def __init__(self, suite, pattern):
3485
super(TestFirstDecorator, self).__init__()
3486
self.addTests(split_suite_by_re(suite, pattern))
3193
TestDecorator.__init__(self, suite)
3194
self.pattern = pattern
3195
self.filtered = False
3199
return iter(self._tests)
3200
self.filtered = True
3201
suites = split_suite_by_re(self, self.pattern)
3203
self.addTests(suites)
3204
return iter(self._tests)
3489
3207
def partition_tests(suite, count):
3534
3252
ProtocolTestCase.run(self, result)
3536
pid, status = os.waitpid(self.pid, 0)
3537
# GZ 2011-10-18: If status is nonzero, should report to the result
3538
# that something went wrong.
3254
os.waitpid(self.pid, 0)
3540
3256
test_blocks = partition_tests(suite, concurrency)
3541
# Clear the tests from the original suite so it doesn't keep them alive
3542
suite._tests[:] = []
3543
3257
for process_tests in test_blocks:
3544
process_suite = TestUtil.TestSuite(process_tests)
3545
# Also clear each split list so new suite has only reference
3546
process_tests[:] = []
3258
process_suite = TestUtil.TestSuite()
3259
process_suite.addTests(process_tests)
3547
3260
c2pread, c2pwrite = os.pipe()
3548
3261
pid = os.fork()
3263
workaround_zealous_crypto_random()
3551
stream = os.fdopen(c2pwrite, 'wb', 1)
3552
workaround_zealous_crypto_random()
3553
3265
os.close(c2pread)
3554
3266
# Leave stderr and stdout open so we can see test noise
3555
3267
# Close stdin so that the child goes away if it decides to
3556
3268
# read from stdin (otherwise its a roulette to see what
3557
3269
# child actually gets keystrokes for pdb etc).
3558
3270
sys.stdin.close()
3272
stream = os.fdopen(c2pwrite, 'wb', 1)
3559
3273
subunit_result = AutoTimingTestResultDecorator(
3560
SubUnitBzrProtocolClient(stream))
3274
TestProtocolClient(stream))
3561
3275
process_suite.run(subunit_result)
3563
# Try and report traceback on stream, but exit with error even
3564
# if stream couldn't be created or something else goes wrong.
3565
# The traceback is formatted to a string and written in one go
3566
# to avoid interleaving lines from multiple failing children.
3568
stream.write(traceback.format_exc())
3573
3279
os.close(c2pwrite)
3574
3280
stream = os.fdopen(c2pread, 'rb', 1)
3638
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3344
class ForwardingResult(unittest.TestResult):
3346
def __init__(self, target):
3347
unittest.TestResult.__init__(self)
3348
self.result = target
3350
def startTest(self, test):
3351
self.result.startTest(test)
3353
def stopTest(self, test):
3354
self.result.stopTest(test)
3356
def startTestRun(self):
3357
self.result.startTestRun()
3359
def stopTestRun(self):
3360
self.result.stopTestRun()
3362
def addSkip(self, test, reason):
3363
self.result.addSkip(test, reason)
3365
def addSuccess(self, test):
3366
self.result.addSuccess(test)
3368
def addError(self, test, err):
3369
self.result.addError(test, err)
3371
def addFailure(self, test, err):
3372
self.result.addFailure(test, err)
3373
ForwardingResult = testtools.ExtendedToOriginalDecorator
3376
class ProfileResult(ForwardingResult):
3639
3377
"""Generate profiling data for all activity between start and success.
3641
3379
The profile data is appended to the test's _benchcalls attribute and can
3979
3712
'bzrlib.tests.test_commit_merge',
3980
3713
'bzrlib.tests.test_config',
3981
3714
'bzrlib.tests.test_conflicts',
3982
'bzrlib.tests.test_controldir',
3983
3715
'bzrlib.tests.test_counted_lock',
3984
3716
'bzrlib.tests.test_crash',
3985
3717
'bzrlib.tests.test_decorators',
3986
3718
'bzrlib.tests.test_delta',
3987
3719
'bzrlib.tests.test_debug',
3720
'bzrlib.tests.test_deprecated_graph',
3988
3721
'bzrlib.tests.test_diff',
3989
3722
'bzrlib.tests.test_directory_service',
3990
3723
'bzrlib.tests.test_dirstate',
3991
3724
'bzrlib.tests.test_email_message',
3992
3725
'bzrlib.tests.test_eol_filters',
3993
3726
'bzrlib.tests.test_errors',
3994
'bzrlib.tests.test_estimate_compressed_size',
3995
3727
'bzrlib.tests.test_export',
3996
'bzrlib.tests.test_export_pot',
3997
3728
'bzrlib.tests.test_extract',
3998
'bzrlib.tests.test_features',
3999
3729
'bzrlib.tests.test_fetch',
4000
3730
'bzrlib.tests.test_fixtures',
4001
3731
'bzrlib.tests.test_fifo_cache',
4002
3732
'bzrlib.tests.test_filters',
4003
'bzrlib.tests.test_filter_tree',
4004
3733
'bzrlib.tests.test_ftp_transport',
4005
3734
'bzrlib.tests.test_foreign',
4006
3735
'bzrlib.tests.test_generate_docs',
4448
4141
% (os.path.basename(dirname), printable_e))
4144
class Feature(object):
4145
"""An operating system Feature."""
4148
self._available = None
4150
def available(self):
4151
"""Is the feature available?
4153
:return: True if the feature is available.
4155
if self._available is None:
4156
self._available = self._probe()
4157
return self._available
4160
"""Implement this method in concrete features.
4162
:return: True if the feature is available.
4164
raise NotImplementedError
4167
if getattr(self, 'feature_name', None):
4168
return self.feature_name()
4169
return self.__class__.__name__
4172
class _SymlinkFeature(Feature):
4175
return osutils.has_symlinks()
4177
def feature_name(self):
4180
SymlinkFeature = _SymlinkFeature()
4183
class _HardlinkFeature(Feature):
4186
return osutils.has_hardlinks()
4188
def feature_name(self):
4191
HardlinkFeature = _HardlinkFeature()
4194
class _OsFifoFeature(Feature):
4197
return getattr(os, 'mkfifo', None)
4199
def feature_name(self):
4200
return 'filesystem fifos'
4202
OsFifoFeature = _OsFifoFeature()
4205
class _UnicodeFilenameFeature(Feature):
4206
"""Does the filesystem support Unicode filenames?"""
4210
# Check for character combinations unlikely to be covered by any
4211
# single non-unicode encoding. We use the characters
4212
# - greek small letter alpha (U+03B1) and
4213
# - braille pattern dots-123456 (U+283F).
4214
os.stat(u'\u03b1\u283f')
4215
except UnicodeEncodeError:
4217
except (IOError, OSError):
4218
# The filesystem allows the Unicode filename but the file doesn't
4222
# The filesystem allows the Unicode filename and the file exists,
4226
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4229
class _CompatabilityThunkFeature(Feature):
4230
"""This feature is just a thunk to another feature.
4232
It issues a deprecation warning if it is accessed, to let you know that you
4233
should really use a different feature.
4236
def __init__(self, dep_version, module, name,
4237
replacement_name, replacement_module=None):
4238
super(_CompatabilityThunkFeature, self).__init__()
4239
self._module = module
4240
if replacement_module is None:
4241
replacement_module = module
4242
self._replacement_module = replacement_module
4244
self._replacement_name = replacement_name
4245
self._dep_version = dep_version
4246
self._feature = None
4249
if self._feature is None:
4250
depr_msg = self._dep_version % ('%s.%s'
4251
% (self._module, self._name))
4252
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4253
self._replacement_name)
4254
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4255
# Import the new feature and use it as a replacement for the
4257
mod = __import__(self._replacement_module, {}, {},
4258
[self._replacement_name])
4259
self._feature = getattr(mod, self._replacement_name)
4263
return self._feature._probe()
4266
class ModuleAvailableFeature(Feature):
4267
"""This is a feature than describes a module we want to be available.
4269
Declare the name of the module in __init__(), and then after probing, the
4270
module will be available as 'self.module'.
4272
:ivar module: The module if it is available, else None.
4275
def __init__(self, module_name):
4276
super(ModuleAvailableFeature, self).__init__()
4277
self.module_name = module_name
4281
self._module = __import__(self.module_name, {}, {}, [''])
4288
if self.available(): # Make sure the probe has been done
4292
def feature_name(self):
4293
return self.module_name
4296
# This is kept here for compatibility, it is recommended to use
4297
# 'bzrlib.tests.feature.paramiko' instead
4298
ParamikoFeature = _CompatabilityThunkFeature(
4299
deprecated_in((2,1,0)),
4300
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4451
4303
def probe_unicode_in_user_encoding():
4452
4304
"""Try to encode several unicode strings to use in unicode-aware tests.
4453
4305
Return first successfull match.
4336
class _HTTPSServerFeature(Feature):
4337
"""Some tests want an https Server, check if one is available.
4339
Right now, the only way this is available is under python2.6 which provides
4350
def feature_name(self):
4351
return 'HTTPSServer'
4354
HTTPSServerFeature = _HTTPSServerFeature()
4357
class _UnicodeFilename(Feature):
4358
"""Does the filesystem support Unicode filenames?"""
4363
except UnicodeEncodeError:
4365
except (IOError, OSError):
4366
# The filesystem allows the Unicode filename but the file doesn't
4370
# The filesystem allows the Unicode filename and the file exists,
4374
UnicodeFilename = _UnicodeFilename()
4377
class _ByteStringNamedFilesystem(Feature):
4378
"""Is the filesystem based on bytes?"""
4381
if os.name == "posix":
4385
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4388
class _UTF8Filesystem(Feature):
4389
"""Is the filesystem UTF-8?"""
4392
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4396
UTF8Filesystem = _UTF8Filesystem()
4399
class _BreakinFeature(Feature):
4400
"""Does this platform support the breakin feature?"""
4403
from bzrlib import breakin
4404
if breakin.determine_signal() is None:
4406
if sys.platform == 'win32':
4407
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4408
# We trigger SIGBREAK via a Console api so we need ctypes to
4409
# access the function
4416
def feature_name(self):
4417
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4420
BreakinFeature = _BreakinFeature()
4423
class _CaseInsCasePresFilenameFeature(Feature):
4424
"""Is the file-system case insensitive, but case-preserving?"""
4427
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4429
# first check truly case-preserving for created files, then check
4430
# case insensitive when opening existing files.
4431
name = osutils.normpath(name)
4432
base, rel = osutils.split(name)
4433
found_rel = osutils.canonical_relpath(base, name)
4434
return (found_rel == rel
4435
and os.path.isfile(name.upper())
4436
and os.path.isfile(name.lower()))
4441
def feature_name(self):
4442
return "case-insensitive case-preserving filesystem"
4444
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4447
class _CaseInsensitiveFilesystemFeature(Feature):
4448
"""Check if underlying filesystem is case-insensitive but *not* case
4451
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4452
# more likely to be case preserving, so this case is rare.
4455
if CaseInsCasePresFilenameFeature.available():
4458
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4459
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4460
TestCaseWithMemoryTransport.TEST_ROOT = root
4462
root = TestCaseWithMemoryTransport.TEST_ROOT
4463
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4465
name_a = osutils.pathjoin(tdir, 'a')
4466
name_A = osutils.pathjoin(tdir, 'A')
4468
result = osutils.isdir(name_A)
4469
_rmtree_temp_dir(tdir)
4472
def feature_name(self):
4473
return 'case-insensitive filesystem'
4475
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4478
class _CaseSensitiveFilesystemFeature(Feature):
4481
if CaseInsCasePresFilenameFeature.available():
4483
elif CaseInsensitiveFilesystemFeature.available():
4488
def feature_name(self):
4489
return 'case-sensitive filesystem'
4491
# new coding style is for feature instances to be lowercase
4492
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4495
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4496
SubUnitFeature = _CompatabilityThunkFeature(
4497
deprecated_in((2,1,0)),
4498
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4484
4499
# Only define SubUnitBzrRunner if subunit is available.
4486
4501
from subunit import TestProtocolClient
4487
4502
from subunit.test_results import AutoTimingTestResultDecorator
4488
class SubUnitBzrProtocolClient(TestProtocolClient):
4490
def stopTest(self, test):
4491
super(SubUnitBzrProtocolClient, self).stopTest(test)
4492
_clear__type_equality_funcs(test)
4494
def addSuccess(self, test, details=None):
4495
# The subunit client always includes the details in the subunit
4496
# stream, but we don't want to include it in ours.
4497
if details is not None and 'log' in details:
4499
return super(SubUnitBzrProtocolClient, self).addSuccess(
4502
4503
class SubUnitBzrRunner(TextTestRunner):
4503
4504
def run(self, test):
4504
4505
result = AutoTimingTestResultDecorator(
4505
SubUnitBzrProtocolClient(self.stream))
4506
TestProtocolClient(self.stream))
4506
4507
test.run(result)
4508
4509
except ImportError:
4512
# API compatibility for old plugins; see bug 892622.
4515
'HTTPServerFeature',
4516
'ModuleAvailableFeature',
4517
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4518
'OsFifoFeature', 'UnicodeFilenameFeature',
4519
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4520
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4521
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4522
'posix_permissions_feature',
4524
globals()[name] = _CompatabilityThunkFeature(
4525
symbol_versioning.deprecated_in((2, 5, 0)),
4526
'bzrlib.tests', name,
4527
name, 'bzrlib.tests.features')
4530
for (old_name, new_name) in [
4531
('UnicodeFilename', 'UnicodeFilenameFeature'),
4533
globals()[name] = _CompatabilityThunkFeature(
4534
symbol_versioning.deprecated_in((2, 5, 0)),
4535
'bzrlib.tests', old_name,
4536
new_name, 'bzrlib.tests.features')
4512
class _PosixPermissionsFeature(Feature):
4516
# create temporary file and check if specified perms are maintained.
4519
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4520
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4523
os.chmod(name, write_perms)
4525
read_perms = os.stat(name).st_mode & 0777
4527
return (write_perms == read_perms)
4529
return (os.name == 'posix') and has_perms()
4531
def feature_name(self):
4532
return 'POSIX permissions support'
4534
posix_permissions_feature = _PosixPermissionsFeature()