56
54
# nb: check this before importing anything else from within it
57
55
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
56
if _testtools_version < (0, 9, 5):
57
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
58
% (testtools.__file__, _testtools_version))
61
59
from testtools import content
63
62
from bzrlib import (
66
commands as _mod_commands,
76
plugin as _mod_plugin,
78
83
transport as _mod_transport,
82
import bzrlib.commands
83
import bzrlib.timestamp
85
import bzrlib.inventory
86
import bzrlib.iterablefile
89
87
import bzrlib.lsprof
90
88
except ImportError:
91
89
# lsprof not available
93
from bzrlib.merge import merge_inner
96
from bzrlib.smart import client, request, server
98
from bzrlib import symbol_versioning
91
from bzrlib.smart import client, request
92
from bzrlib.transport import (
99
96
from bzrlib.symbol_versioning import (
100
DEPRECATED_PARAMETER,
101
97
deprecated_function,
107
from bzrlib.transport import (
111
from bzrlib.trace import mutter, note
112
100
from bzrlib.tests import (
117
106
from bzrlib.ui import NullProgressView
118
107
from bzrlib.ui.text import TextUIFactory
119
import bzrlib.version_info_formats.format_custom
120
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
108
from bzrlib.tests.features import _CompatabilityThunkFeature
122
110
# Mark this python module as being part of the implementation
123
111
# of unittest: this gives us better tracebacks where the last
135
123
SUBUNIT_SEEK_SET = 0
136
124
SUBUNIT_SEEK_CUR = 1
126
# These are intentionally brought into this namespace. That way plugins, etc
127
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
128
TestSuite = TestUtil.TestSuite
129
TestLoader = TestUtil.TestLoader
131
# Tests should run in a clean and clearly defined environment. The goal is to
132
# keep them isolated from the running environment as mush as possible. The test
133
# framework ensures the variables defined below are set (or deleted if the
134
# value is None) before a test is run and reset to their original value after
135
# the test is run. Generally if some code depends on an environment variable,
136
# the tests should start without this variable in the environment. There are a
137
# few exceptions but you shouldn't violate this rule lightly.
141
'XDG_CONFIG_HOME': None,
142
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
143
# tests do check our impls match APPDATA
144
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
148
'BZREMAIL': None, # may still be present in the environment
149
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
150
'BZR_PROGRESS_BAR': None,
151
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
152
# as a base class instead of TestCaseInTempDir. Tests inheriting from
153
# TestCase should not use disk resources, BZR_LOG is one.
154
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
155
'BZR_PLUGIN_PATH': None,
156
'BZR_DISABLE_PLUGINS': None,
157
'BZR_PLUGINS_AT': None,
158
'BZR_CONCURRENCY': None,
159
# Make sure that any text ui tests are consistent regardless of
160
# the environment the test case is run in; you may want tests that
161
# test other combinations. 'dumb' is a reasonable guess for tests
162
# going to a pipe or a StringIO.
168
'SSH_AUTH_SOCK': None,
178
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
179
# least. If you do (care), please update this comment
183
'BZR_REMOTE_PATH': None,
184
# Generally speaking, we don't want apport reporting on crashes in
185
# the test envirnoment unless we're specifically testing apport,
186
# so that it doesn't leak into the real system environment. We
187
# use an env var so it propagates to subprocesses.
188
'APPORT_DISABLE': '1',
192
def override_os_environ(test, env=None):
193
"""Modify os.environ keeping a copy.
195
:param test: A test instance
197
:param env: A dict containing variable definitions to be installed
200
env = isolated_environ
201
test._original_os_environ = dict([(var, value)
202
for var, value in os.environ.iteritems()])
203
for var, value in env.iteritems():
204
osutils.set_or_unset_env(var, value)
205
if var not in test._original_os_environ:
206
# The var is new, add it with a value of None, so
207
# restore_os_environ will delete it
208
test._original_os_environ[var] = None
211
def restore_os_environ(test):
212
"""Restore os.environ to its original state.
214
:param test: A test instance previously passed to override_os_environ.
216
for var, value in test._original_os_environ.iteritems():
217
# Restore the original value (or delete it if the value has been set to
218
# None in override_os_environ).
219
osutils.set_or_unset_env(var, value)
222
def _clear__type_equality_funcs(test):
223
"""Cleanup bound methods stored on TestCase instances
225
Clear the dict breaking a few (mostly) harmless cycles in the affected
226
unittests released with Python 2.6 and initial Python 2.7 versions.
228
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
229
shipped in Oneiric, an object with no clear method was used, hence the
230
extra complications, see bug 809048 for details.
232
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
233
if type_equality_funcs is not None:
234
tef_clear = getattr(type_equality_funcs, "clear", None)
235
if tef_clear is None:
236
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
237
if tef_instance_dict is not None:
238
tef_clear = tef_instance_dict.clear
239
if tef_clear is not None:
139
243
class ExtendedTestResult(testtools.TextTestResult):
140
244
"""Accepts, reports and accumulates the results of running tests.
275
392
what = re.sub(r'^bzrlib\.tests\.', '', what)
395
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
396
# multiple times in a row, because the handler is added for
397
# each test but the container list is shared between cases.
398
# See lp:498869 lp:625574 and lp:637725 for background.
399
def _record_traceback_from_test(self, exc_info):
400
"""Store the traceback from passed exc_info tuple till"""
401
self._traceback_from_test = exc_info[2]
278
403
def startTest(self, test):
279
404
super(ExtendedTestResult, self).startTest(test)
280
405
if self.count == 0:
281
406
self.startTests()
282
408
self.report_test_start(test)
283
409
test.number = self.count
284
410
self._recordTestStartTime()
411
# Make testtools cases give us the real traceback on failure
412
addOnException = getattr(test, "addOnException", None)
413
if addOnException is not None:
414
addOnException(self._record_traceback_from_test)
415
# Only check for thread leaks on bzrlib derived test cases
416
if isinstance(test, TestCase):
417
test.addCleanup(self._check_leaked_threads, test)
419
def stopTest(self, test):
420
super(ExtendedTestResult, self).stopTest(test)
421
# Manually break cycles, means touching various private things but hey
422
getDetails = getattr(test, "getDetails", None)
423
if getDetails is not None:
425
_clear__type_equality_funcs(test)
426
self._traceback_from_test = None
286
428
def startTests(self):
288
if getattr(sys, 'frozen', None) is None:
289
bzr_path = osutils.realpath(sys.argv[0])
291
bzr_path = sys.executable
293
'bzr selftest: %s\n' % (bzr_path,))
296
bzrlib.__path__[0],))
298
' bzr-%s python-%s %s\n' % (
299
bzrlib.version_string,
300
bzrlib._format_version_tuple(sys.version_info),
301
platform.platform(aliased=1),
303
self.stream.write('\n')
429
self.report_tests_starting()
430
self._active_threads = threading.enumerate()
432
def _check_leaked_threads(self, test):
433
"""See if any threads have leaked since last call
435
A sample of live threads is stored in the _active_threads attribute,
436
when this method runs it compares the current live threads and any not
437
in the previous sample are treated as having leaked.
439
now_active_threads = set(threading.enumerate())
440
threads_leaked = now_active_threads.difference(self._active_threads)
442
self._report_thread_leak(test, threads_leaked, now_active_threads)
443
self._tests_leaking_threads_count += 1
444
if self._first_thread_leaker_id is None:
445
self._first_thread_leaker_id = test.id()
446
self._active_threads = now_active_threads
305
448
def _recordTestStartTime(self):
306
449
"""Record that a test has started."""
307
self._start_time = time.time()
309
def _cleanupLogFile(self, test):
310
# We can only do this if we have one of our TestCases, not if
312
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
313
if setKeepLogfile is not None:
450
self._start_datetime = self._now()
316
452
def addError(self, test, err):
317
453
"""Tell result that test finished with an error.
398
549
raise errors.BzrError("Unknown whence %r" % whence)
400
def report_cleaning_up(self):
551
def report_tests_starting(self):
552
"""Display information before the test run begins"""
553
if getattr(sys, 'frozen', None) is None:
554
bzr_path = osutils.realpath(sys.argv[0])
556
bzr_path = sys.executable
558
'bzr selftest: %s\n' % (bzr_path,))
561
bzrlib.__path__[0],))
563
' bzr-%s python-%s %s\n' % (
564
bzrlib.version_string,
565
bzrlib._format_version_tuple(sys.version_info),
566
platform.platform(aliased=1),
568
self.stream.write('\n')
570
def report_test_start(self, test):
571
"""Display information on the test just about to be run"""
573
def _report_thread_leak(self, test, leaked_threads, active_threads):
574
"""Display information on a test that leaked one or more threads"""
575
# GZ 2010-09-09: A leak summary reported separately from the general
576
# thread debugging would be nice. Tests under subunit
577
# need something not using stream, perhaps adding a
578
# testtools details object would be fitting.
579
if 'threads' in selftest_debug_flags:
580
self.stream.write('%s is leaking, active is now %d\n' %
581
(test.id(), len(active_threads)))
403
583
def startTestRun(self):
404
584
self.startTime = time.time()
824
1035
self._track_transports()
825
1036
self._track_locks()
826
1037
self._clear_debug_flags()
827
TestCase._active_threads = threading.activeCount()
828
self.addCleanup(self._check_leaked_threads)
1038
# Isolate global verbosity level, to make sure it's reproducible
1039
# between tests. We should get rid of this altogether: bug 656694. --
1041
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1042
self._log_files = set()
1043
# Each key in the ``_counters`` dict holds a value for a different
1044
# counter. When the test ends, addDetail() should be used to output the
1045
# counter values. This happens in install_counter_hook().
1047
if 'config_stats' in selftest_debug_flags:
1048
self._install_config_stats_hooks()
1049
# Do not use i18n for tests (unless the test reverses this)
830
1052
def debug(self):
831
1053
# debug a frame up.
833
pdb.Pdb().set_trace(sys._getframe().f_back)
835
def _check_leaked_threads(self):
836
active = threading.activeCount()
837
leaked_threads = active - TestCase._active_threads
838
TestCase._active_threads = active
839
# If some tests make the number of threads *decrease*, we'll consider
840
# that they are just observing old threads dieing, not agressively kill
841
# random threads. So we don't report these tests as leaking. The risk
842
# is that we have false positives that way (the test see 2 threads
843
# going away but leak one) but it seems less likely than the actual
844
# false positives (the test see threads going away and does not leak).
845
if leaked_threads > 0:
846
if 'threads' in selftest_debug_flags:
847
print '%s is leaking, active is now %d' % (self.id(), active)
848
TestCase._leaking_threads_tests += 1
849
if TestCase._first_thread_leaker_id is None:
850
TestCase._first_thread_leaker_id = self.id()
1055
# The sys preserved stdin/stdout should allow blackbox tests debugging
1056
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1057
).set_trace(sys._getframe().f_back)
1059
def discardDetail(self, name):
1060
"""Extend the addDetail, getDetails api so we can remove a detail.
1062
eg. bzr always adds the 'log' detail at startup, but we don't want to
1063
include it for skipped, xfail, etc tests.
1065
It is safe to call this for a detail that doesn't exist, in case this
1066
gets called multiple times.
1068
# We cheat. details is stored in __details which means we shouldn't
1069
# touch it. but getDetails() returns the dict directly, so we can
1071
details = self.getDetails()
1075
def install_counter_hook(self, hooks, name, counter_name=None):
1076
"""Install a counting hook.
1078
Any hook can be counted as long as it doesn't need to return a value.
1080
:param hooks: Where the hook should be installed.
1082
:param name: The hook name that will be counted.
1084
:param counter_name: The counter identifier in ``_counters``, defaults
1087
_counters = self._counters # Avoid closing over self
1088
if counter_name is None:
1090
if _counters.has_key(counter_name):
1091
raise AssertionError('%s is already used as a counter name'
1093
_counters[counter_name] = 0
1094
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1095
lambda: ['%d' % (_counters[counter_name],)]))
1096
def increment_counter(*args, **kwargs):
1097
_counters[counter_name] += 1
1098
label = 'count %s calls' % (counter_name,)
1099
hooks.install_named_hook(name, increment_counter, label)
1100
self.addCleanup(hooks.uninstall_named_hook, name, label)
1102
def _install_config_stats_hooks(self):
1103
"""Install config hooks to count hook calls.
1106
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1107
self.install_counter_hook(config.ConfigHooks, hook_name,
1108
'config.%s' % (hook_name,))
1110
# The OldConfigHooks are private and need special handling to protect
1111
# against recursive tests (tests that run other tests), so we just do
1112
# manually what registering them into _builtin_known_hooks will provide
1114
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1115
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1116
self.install_counter_hook(config.OldConfigHooks, hook_name,
1117
'old_config.%s' % (hook_name,))
852
1119
def _clear_debug_flags(self):
853
1120
"""Prevent externally set debug flags affecting tests.
1321
1601
self.assertEqual(expected_docstring, obj.__doc__)
1603
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1323
1604
def failUnlessExists(self, path):
1605
return self.assertPathExists(path)
1607
def assertPathExists(self, path):
1324
1608
"""Fail unless path or paths, which may be abs or relative, exist."""
1325
1609
if not isinstance(path, basestring):
1327
self.failUnlessExists(p)
1611
self.assertPathExists(p)
1329
self.failUnless(osutils.lexists(path),path+" does not exist")
1613
self.assertTrue(osutils.lexists(path),
1614
path + " does not exist")
1616
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1331
1617
def failIfExists(self, path):
1618
return self.assertPathDoesNotExist(path)
1620
def assertPathDoesNotExist(self, path):
1332
1621
"""Fail if path or paths, which may be abs or relative, exist."""
1333
1622
if not isinstance(path, basestring):
1335
self.failIfExists(p)
1624
self.assertPathDoesNotExist(p)
1337
self.failIf(osutils.lexists(path),path+" exists")
1626
self.assertFalse(osutils.lexists(path),
1339
1629
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1340
1630
"""A helper for callDeprecated and applyDeprecated.
1452
1743
def _startLogFile(self):
1453
"""Send bzr and test log messages to a temporary file.
1455
The file is removed as the test is torn down.
1457
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1458
self._log_file = os.fdopen(fileno, 'w+')
1459
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1460
self._log_file_name = name
1744
"""Setup a in-memory target for bzr and testcase log messages"""
1745
pseudo_log_file = StringIO()
1746
def _get_log_contents_for_weird_testtools_api():
1747
return [pseudo_log_file.getvalue().decode(
1748
"utf-8", "replace").encode("utf-8")]
1749
self.addDetail("log", content.Content(content.ContentType("text",
1750
"plain", {"charset": "utf8"}),
1751
_get_log_contents_for_weird_testtools_api))
1752
self._log_file = pseudo_log_file
1753
self._log_memento = trace.push_log_file(self._log_file)
1461
1754
self.addCleanup(self._finishLogFile)
1463
1756
def _finishLogFile(self):
1464
"""Finished with the log file.
1466
Close the file and delete it, unless setKeepLogfile was called.
1468
if bzrlib.trace._trace_file:
1757
"""Flush and dereference the in-memory log for this testcase"""
1758
if trace._trace_file:
1469
1759
# flush the log file, to get all content
1470
bzrlib.trace._trace_file.flush()
1471
bzrlib.trace.pop_log_file(self._log_memento)
1472
# Cache the log result and delete the file on disk
1473
self._get_log(False)
1760
trace._trace_file.flush()
1761
trace.pop_log_file(self._log_memento)
1762
# The logging module now tracks references for cleanup so discard ours
1763
del self._log_memento
1475
1765
def thisFailsStrictLockCheck(self):
1476
1766
"""It is known that this test would fail with -Dstrict_locks.
1506
1791
:returns: The actual attr value.
1508
value = getattr(obj, attr_name)
1509
1793
# The actual value is captured by the call below
1510
self.addCleanup(setattr, obj, attr_name, value)
1794
value = getattr(obj, attr_name, _unitialized_attr)
1795
if value is _unitialized_attr:
1796
# When the test completes, the attribute should not exist, but if
1797
# we aren't setting a value, we don't need to do anything.
1798
if new is not _unitialized_attr:
1799
self.addCleanup(delattr, obj, attr_name)
1801
self.addCleanup(setattr, obj, attr_name, value)
1511
1802
if new is not _unitialized_attr:
1512
1803
setattr(obj, attr_name, new)
1806
def overrideEnv(self, name, new):
1807
"""Set an environment variable, and reset it after the test.
1809
:param name: The environment variable name.
1811
:param new: The value to set the variable to. If None, the
1812
variable is deleted from the environment.
1814
:returns: The actual variable value.
1816
value = osutils.set_or_unset_env(name, new)
1817
self.addCleanup(osutils.set_or_unset_env, name, value)
1820
def recordCalls(self, obj, attr_name):
1821
"""Monkeypatch in a wrapper that will record calls.
1823
The monkeypatch is automatically removed when the test concludes.
1825
:param obj: The namespace holding the reference to be replaced;
1826
typically a module, class, or object.
1827
:param attr_name: A string for the name of the attribute to
1829
:returns: A list that will be extended with one item every time the
1830
function is called, with a tuple of (args, kwargs).
1834
def decorator(*args, **kwargs):
1835
calls.append((args, kwargs))
1836
return orig(*args, **kwargs)
1837
orig = self.overrideAttr(obj, attr_name, decorator)
1515
1840
def _cleanEnvironment(self):
1517
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1518
'HOME': os.getcwd(),
1519
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1520
# tests do check our impls match APPDATA
1521
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1525
'BZREMAIL': None, # may still be present in the environment
1526
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1527
'BZR_PROGRESS_BAR': None,
1529
'BZR_PLUGIN_PATH': None,
1530
'BZR_DISABLE_PLUGINS': None,
1531
'BZR_PLUGINS_AT': None,
1532
'BZR_CONCURRENCY': None,
1533
# Make sure that any text ui tests are consistent regardless of
1534
# the environment the test case is run in; you may want tests that
1535
# test other combinations. 'dumb' is a reasonable guess for tests
1536
# going to a pipe or a StringIO.
1540
'BZR_COLUMNS': '80',
1542
'SSH_AUTH_SOCK': None,
1546
'https_proxy': None,
1547
'HTTPS_PROXY': None,
1552
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1553
# least. If you do (care), please update this comment
1557
'BZR_REMOTE_PATH': None,
1558
# Generally speaking, we don't want apport reporting on crashes in
1559
# the test envirnoment unless we're specifically testing apport,
1560
# so that it doesn't leak into the real system environment. We
1561
# use an env var so it propagates to subprocesses.
1562
'APPORT_DISABLE': '1',
1565
self.addCleanup(self._restoreEnvironment)
1566
for name, value in new_env.iteritems():
1567
self._captureVar(name, value)
1569
def _captureVar(self, name, newvalue):
1570
"""Set an environment variable, and reset it when finished."""
1571
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1573
def _restoreEnvironment(self):
1574
for name, value in self._old_env.iteritems():
1575
osutils.set_or_unset_env(name, value)
1841
for name, value in isolated_environ.iteritems():
1842
self.overrideEnv(name, value)
1577
1844
def _restoreHooks(self):
1578
1845
for klass, (name, hooks) in self._preserved_hooks.items():
1579
1846
setattr(klass, name, hooks)
1847
self._preserved_hooks.clear()
1848
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1849
self._preserved_lazy_hooks.clear()
1581
1851
def knownFailure(self, reason):
1582
"""This test has failed for some known reason."""
1583
raise KnownFailure(reason)
1852
"""Declare that this test fails for a known reason
1854
Tests that are known to fail should generally be using expectedFailure
1855
with an appropriate reverse assertion if a change could cause the test
1856
to start passing. Conversely if the test has no immediate prospect of
1857
succeeding then using skip is more suitable.
1859
When this method is called while an exception is being handled, that
1860
traceback will be used, otherwise a new exception will be thrown to
1861
provide one but won't be reported.
1863
self._add_reason(reason)
1865
exc_info = sys.exc_info()
1866
if exc_info != (None, None, None):
1867
self._report_traceback(exc_info)
1870
raise self.failureException(reason)
1871
except self.failureException:
1872
exc_info = sys.exc_info()
1873
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1874
raise testtools.testcase._ExpectedFailure(exc_info)
1878
def _suppress_log(self):
1879
"""Remove the log info from details."""
1880
self.discardDetail('log')
1585
1882
def _do_skip(self, result, reason):
1883
self._suppress_log()
1586
1884
addSkip = getattr(result, 'addSkip', None)
1587
1885
if not callable(addSkip):
1588
1886
result.addSuccess(result)
1644
1965
self._benchtime += time.time() - start
1646
1967
def log(self, *args):
1649
def _get_log(self, keep_log_file=False):
1650
"""Internal helper to get the log from bzrlib.trace for this test.
1652
Please use self.getDetails, or self.get_log to access this in test case
1655
:param keep_log_file: When True, if the log is still a file on disk
1656
leave it as a file on disk. When False, if the log is still a file
1657
on disk, the log file is deleted and the log preserved as
1659
:return: A string containing the log.
1661
if self._log_contents is not None:
1663
self._log_contents.decode('utf8')
1664
except UnicodeDecodeError:
1665
unicodestr = self._log_contents.decode('utf8', 'replace')
1666
self._log_contents = unicodestr.encode('utf8')
1667
return self._log_contents
1669
if bzrlib.trace._trace_file:
1670
# flush the log file, to get all content
1671
bzrlib.trace._trace_file.flush()
1672
if self._log_file_name is not None:
1673
logfile = open(self._log_file_name)
1675
log_contents = logfile.read()
1679
log_contents.decode('utf8')
1680
except UnicodeDecodeError:
1681
unicodestr = log_contents.decode('utf8', 'replace')
1682
log_contents = unicodestr.encode('utf8')
1683
if not keep_log_file:
1685
max_close_attempts = 100
1686
first_close_error = None
1687
while close_attempts < max_close_attempts:
1690
self._log_file.close()
1691
except IOError, ioe:
1692
if ioe.errno is None:
1693
# No errno implies 'close() called during
1694
# concurrent operation on the same file object', so
1695
# retry. Probably a thread is trying to write to
1697
if first_close_error is None:
1698
first_close_error = ioe
1703
if close_attempts > 1:
1705
'Unable to close log file on first attempt, '
1706
'will retry: %s\n' % (first_close_error,))
1707
if close_attempts == max_close_attempts:
1709
'Unable to close log file after %d attempts.\n'
1710
% (max_close_attempts,))
1711
self._log_file = None
1712
# Permit multiple calls to get_log until we clean it up in
1714
self._log_contents = log_contents
1716
os.remove(self._log_file_name)
1718
if sys.platform == 'win32' and e.errno == errno.EACCES:
1719
sys.stderr.write(('Unable to delete log file '
1720
' %r\n' % self._log_file_name))
1723
self._log_file_name = None
1726
return "No log file content and no log file name."
1728
1970
def get_log(self):
1729
1971
"""Get a unicode string containing the log from bzrlib.trace.
1944
2187
variables. A value of None will unset the env variable.
1945
2188
The values must be strings. The change will only occur in the
1946
2189
child, so you don't need to fix the environment after running.
1947
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2190
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2191
doesn't support signalling subprocesses.
1949
2192
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2193
:param stderr: file to use for the subprocess's stderr. Valid values
2194
are those valid for the stderr argument of `subprocess.Popen`.
2195
Default value is ``subprocess.PIPE``.
1951
2197
:returns: Popen object for the started process.
1953
2199
if skip_if_plan_to_signal:
1954
if not getattr(os, 'kill', None):
1955
raise TestSkipped("os.kill not available.")
2200
if os.name != "posix":
2201
raise TestSkipped("Sending signals not supported")
1957
2203
if env_changes is None:
1958
2204
env_changes = {}
2205
# Because $HOME is set to a tempdir for the context of a test, modules
2206
# installed in the user dir will not be found unless $PYTHONUSERBASE
2207
# gets set to the computed directory of this parent process.
2208
if site.USER_BASE is not None:
2209
env_changes["PYTHONUSERBASE"] = site.USER_BASE
1961
2212
def cleanup_environment():
2252
def _add_subprocess_log(self, log_file_path):
2253
if len(self._log_files) == 0:
2254
# Register an addCleanup func. We do this on the first call to
2255
# _add_subprocess_log rather than in TestCase.setUp so that this
2256
# addCleanup is registered after any cleanups for tempdirs that
2257
# subclasses might create, which will probably remove the log file
2259
self.addCleanup(self._subprocess_log_cleanup)
2260
# self._log_files is a set, so if a log file is reused we won't grab it
2262
self._log_files.add(log_file_path)
2264
def _subprocess_log_cleanup(self):
2265
for count, log_file_path in enumerate(self._log_files):
2266
# We use buffer_now=True to avoid holding the file open beyond
2267
# the life of this function, which might interfere with e.g.
2268
# cleaning tempdirs on Windows.
2269
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2270
#detail_content = content.content_from_file(
2271
# log_file_path, buffer_now=True)
2272
with open(log_file_path, 'rb') as log_file:
2273
log_file_bytes = log_file.read()
2274
detail_content = content.Content(content.ContentType("text",
2275
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2276
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1998
2279
def _popen(self, *args, **kwargs):
1999
2280
"""Place a call to Popen.
2037
2318
if retcode is not None and retcode != process.returncode:
2038
2319
if process_args is None:
2039
2320
process_args = "(unknown args)"
2040
mutter('Output of bzr %s:\n%s', process_args, out)
2041
mutter('Error for bzr %s:\n%s', process_args, err)
2321
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2322
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2042
2323
self.fail('Command bzr %s failed with retcode %s != %s'
2043
2324
% (process_args, retcode, process.returncode))
2044
2325
return [out, err]
2046
def check_inventory_shape(self, inv, shape):
2047
"""Compare an inventory to a list of expected names.
2327
def check_tree_shape(self, tree, shape):
2328
"""Compare a tree to a list of expected names.
2049
2330
Fail if they are not precisely equal.
2052
2333
shape = list(shape) # copy
2053
for path, ie in inv.entries():
2334
for path, ie in tree.iter_entries_by_dir():
2054
2335
name = path.replace('\\', '/')
2055
2336
if ie.kind == 'directory':
2056
2337
name = name + '/'
2339
pass # ignore root entry
2058
2341
shape.remove(name)
2060
2343
extras.append(name)
2150
2435
class TestCaseWithMemoryTransport(TestCase):
2151
2436
"""Common test class for tests that do not need disk resources.
2153
Tests that need disk resources should derive from TestCaseWithTransport.
2438
Tests that need disk resources should derive from TestCaseInTempDir
2439
orTestCaseWithTransport.
2155
2441
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2157
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2443
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2158
2444
a directory which does not exist. This serves to help ensure test isolation
2159
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2160
must exist. However, TestCaseWithMemoryTransport does not offer local
2161
file defaults for the transport in tests, nor does it obey the command line
2445
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2446
must exist. However, TestCaseWithMemoryTransport does not offer local file
2447
defaults for the transport in tests, nor does it obey the command line
2162
2448
override, so tests that accidentally write to the common directory should
2165
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2166
a .bzr directory that stops us ascending higher into the filesystem.
2451
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2452
``.bzr`` directory that stops us ascending higher into the filesystem.
2169
2455
TEST_ROOT = None
2179
2465
self.transport_readonly_server = None
2180
2466
self.__vfs_server = None
2469
super(TestCaseWithMemoryTransport, self).setUp()
2471
def _add_disconnect_cleanup(transport):
2472
"""Schedule disconnection of given transport at test cleanup
2474
This needs to happen for all connected transports or leaks occur.
2476
Note reconnections may mean we call disconnect multiple times per
2477
transport which is suboptimal but seems harmless.
2479
self.addCleanup(transport.disconnect)
2481
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2482
_add_disconnect_cleanup, None)
2484
self._make_test_root()
2485
self.addCleanup(os.chdir, os.getcwdu())
2486
self.makeAndChdirToTestDir()
2487
self.overrideEnvironmentForTesting()
2488
self.__readonly_server = None
2489
self.__server = None
2490
self.reduceLockdirTimeout()
2491
# Each test may use its own config files even if the local config files
2492
# don't actually exist. They'll rightly fail if they try to create them
2494
self.overrideAttr(config, '_shared_stores', {})
2182
2496
def get_transport(self, relpath=None):
2183
2497
"""Return a writeable transport.
2376
2704
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2377
2705
self.permit_dir(self.test_dir)
2379
def make_branch(self, relpath, format=None):
2707
def make_branch(self, relpath, format=None, name=None):
2380
2708
"""Create a branch on the transport at relpath."""
2381
2709
repo = self.make_repository(relpath, format=format)
2382
return repo.bzrdir.create_branch()
2710
return repo.bzrdir.create_branch(append_revisions_only=False,
2713
def get_default_format(self):
2716
def resolve_format(self, format):
2717
"""Resolve an object to a ControlDir format object.
2719
The initial format object can either already be
2720
a ControlDirFormat, None (for the default format),
2721
or a string with the name of the control dir format.
2723
:param format: Object to resolve
2724
:return A ControlDirFormat instance
2727
format = self.get_default_format()
2728
if isinstance(format, basestring):
2729
format = controldir.format_registry.make_bzrdir(format)
2384
2732
def make_bzrdir(self, relpath, format=None):
2431
2776
test_home_dir = self.test_home_dir
2432
2777
if isinstance(test_home_dir, unicode):
2433
2778
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2434
os.environ['HOME'] = test_home_dir
2435
os.environ['BZR_HOME'] = test_home_dir
2438
super(TestCaseWithMemoryTransport, self).setUp()
2439
# Ensure that ConnectedTransport doesn't leak sockets
2440
def get_transport_with_cleanup(*args, **kwargs):
2441
t = orig_get_transport(*args, **kwargs)
2442
if isinstance(t, _mod_transport.ConnectedTransport):
2443
self.addCleanup(t.disconnect)
2446
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
get_transport_with_cleanup)
2448
self._make_test_root()
2449
self.addCleanup(os.chdir, os.getcwdu())
2450
self.makeAndChdirToTestDir()
2451
self.overrideEnvironmentForTesting()
2452
self.__readonly_server = None
2453
self.__server = None
2454
self.reduceLockdirTimeout()
2779
self.overrideEnv('HOME', test_home_dir)
2780
self.overrideEnv('BZR_HOME', test_home_dir)
2456
2782
def setup_smart_server_with_call_log(self):
2457
2783
"""Sets up a smart server as the transport server with a call log."""
2458
2784
self.transport_server = test_server.SmartTCPServer_for_testing
2785
self.hpss_connections = []
2459
2786
self.hpss_calls = []
2460
2787
import traceback
2461
2788
# Skip the current stack down to the caller of
3118
3450
"""A decorator which excludes test matching an exclude pattern."""
3120
3452
def __init__(self, suite, exclude_pattern):
3121
TestDecorator.__init__(self, suite)
3122
self.exclude_pattern = exclude_pattern
3123
self.excluded = False
3127
return iter(self._tests)
3128
self.excluded = True
3129
suite = exclude_tests_by_re(self, self.exclude_pattern)
3131
self.addTests(suite)
3132
return iter(self._tests)
3453
super(ExcludeDecorator, self).__init__(
3454
exclude_tests_by_re(suite, exclude_pattern))
3135
3457
class FilterTestsDecorator(TestDecorator):
3136
3458
"""A decorator which filters tests to those matching a pattern."""
3138
3460
def __init__(self, suite, pattern):
3139
TestDecorator.__init__(self, suite)
3140
self.pattern = pattern
3141
self.filtered = False
3145
return iter(self._tests)
3146
self.filtered = True
3147
suite = filter_suite_by_re(self, self.pattern)
3149
self.addTests(suite)
3150
return iter(self._tests)
3461
super(FilterTestsDecorator, self).__init__(
3462
filter_suite_by_re(suite, pattern))
3153
3465
class RandomDecorator(TestDecorator):
3154
3466
"""A decorator which randomises the order of its tests."""
3156
3468
def __init__(self, suite, random_seed, stream):
3157
TestDecorator.__init__(self, suite)
3158
self.random_seed = random_seed
3159
self.randomised = False
3160
self.stream = stream
3164
return iter(self._tests)
3165
self.randomised = True
3166
self.stream.write("Randomizing test order using seed %s\n\n" %
3167
(self.actual_seed()))
3469
random_seed = self.actual_seed(random_seed)
3470
stream.write("Randomizing test order using seed %s\n\n" %
3168
3472
# Initialise the random number generator.
3169
random.seed(self.actual_seed())
3170
suite = randomize_suite(self)
3172
self.addTests(suite)
3173
return iter(self._tests)
3473
random.seed(random_seed)
3474
super(RandomDecorator, self).__init__(randomize_suite(suite))
3175
def actual_seed(self):
3176
if self.random_seed == "now":
3477
def actual_seed(seed):
3177
3479
# We convert the seed to a long to make it reuseable across
3178
3480
# invocations (because the user can reenter it).
3179
self.random_seed = long(time.time())
3481
return long(time.time())
3181
3483
# Convert the seed to a long if we can
3183
self.random_seed = long(self.random_seed)
3486
except (TypeError, ValueError):
3186
return self.random_seed
3189
3491
class TestFirstDecorator(TestDecorator):
3190
3492
"""A decorator which moves named tests to the front."""
3192
3494
def __init__(self, suite, pattern):
3193
TestDecorator.__init__(self, suite)
3194
self.pattern = pattern
3195
self.filtered = False
3199
return iter(self._tests)
3200
self.filtered = True
3201
suites = split_suite_by_re(self, self.pattern)
3203
self.addTests(suites)
3204
return iter(self._tests)
3495
super(TestFirstDecorator, self).__init__()
3496
self.addTests(split_suite_by_re(suite, pattern))
3207
3499
def partition_tests(suite, count):
3252
3544
ProtocolTestCase.run(self, result)
3254
os.waitpid(self.pid, 0)
3546
pid, status = os.waitpid(self.pid, 0)
3547
# GZ 2011-10-18: If status is nonzero, should report to the result
3548
# that something went wrong.
3256
3550
test_blocks = partition_tests(suite, concurrency)
3551
# Clear the tests from the original suite so it doesn't keep them alive
3552
suite._tests[:] = []
3257
3553
for process_tests in test_blocks:
3258
process_suite = TestUtil.TestSuite()
3259
process_suite.addTests(process_tests)
3554
process_suite = TestUtil.TestSuite(process_tests)
3555
# Also clear each split list so new suite has only reference
3556
process_tests[:] = []
3260
3557
c2pread, c2pwrite = os.pipe()
3261
3558
pid = os.fork()
3263
workaround_zealous_crypto_random()
3561
stream = os.fdopen(c2pwrite, 'wb', 1)
3562
workaround_zealous_crypto_random()
3265
3563
os.close(c2pread)
3266
3564
# Leave stderr and stdout open so we can see test noise
3267
3565
# Close stdin so that the child goes away if it decides to
3268
3566
# read from stdin (otherwise its a roulette to see what
3269
3567
# child actually gets keystrokes for pdb etc).
3270
3568
sys.stdin.close()
3272
stream = os.fdopen(c2pwrite, 'wb', 1)
3273
3569
subunit_result = AutoTimingTestResultDecorator(
3274
TestProtocolClient(stream))
3570
SubUnitBzrProtocolClient(stream))
3275
3571
process_suite.run(subunit_result)
3573
# Try and report traceback on stream, but exit with error even
3574
# if stream couldn't be created or something else goes wrong.
3575
# The traceback is formatted to a string and written in one go
3576
# to avoid interleaving lines from multiple failing children.
3578
stream.write(traceback.format_exc())
3279
3583
os.close(c2pwrite)
3280
3584
stream = os.fdopen(c2pread, 'rb', 1)
3344
class ForwardingResult(unittest.TestResult):
3346
def __init__(self, target):
3347
unittest.TestResult.__init__(self)
3348
self.result = target
3350
def startTest(self, test):
3351
self.result.startTest(test)
3353
def stopTest(self, test):
3354
self.result.stopTest(test)
3356
def startTestRun(self):
3357
self.result.startTestRun()
3359
def stopTestRun(self):
3360
self.result.stopTestRun()
3362
def addSkip(self, test, reason):
3363
self.result.addSkip(test, reason)
3365
def addSuccess(self, test):
3366
self.result.addSuccess(test)
3368
def addError(self, test, err):
3369
self.result.addError(test, err)
3371
def addFailure(self, test, err):
3372
self.result.addFailure(test, err)
3373
ForwardingResult = testtools.ExtendedToOriginalDecorator
3376
class ProfileResult(ForwardingResult):
3648
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3377
3649
"""Generate profiling data for all activity between start and success.
3379
3651
The profile data is appended to the test's _benchcalls attribute and can
3712
3989
'bzrlib.tests.test_commit_merge',
3713
3990
'bzrlib.tests.test_config',
3714
3991
'bzrlib.tests.test_conflicts',
3992
'bzrlib.tests.test_controldir',
3715
3993
'bzrlib.tests.test_counted_lock',
3716
3994
'bzrlib.tests.test_crash',
3717
3995
'bzrlib.tests.test_decorators',
3718
3996
'bzrlib.tests.test_delta',
3719
3997
'bzrlib.tests.test_debug',
3720
'bzrlib.tests.test_deprecated_graph',
3721
3998
'bzrlib.tests.test_diff',
3722
3999
'bzrlib.tests.test_directory_service',
3723
4000
'bzrlib.tests.test_dirstate',
3724
4001
'bzrlib.tests.test_email_message',
3725
4002
'bzrlib.tests.test_eol_filters',
3726
4003
'bzrlib.tests.test_errors',
4004
'bzrlib.tests.test_estimate_compressed_size',
3727
4005
'bzrlib.tests.test_export',
4006
'bzrlib.tests.test_export_pot',
3728
4007
'bzrlib.tests.test_extract',
4008
'bzrlib.tests.test_features',
3729
4009
'bzrlib.tests.test_fetch',
3730
4010
'bzrlib.tests.test_fixtures',
3731
4011
'bzrlib.tests.test_fifo_cache',
3732
4012
'bzrlib.tests.test_filters',
4013
'bzrlib.tests.test_filter_tree',
3733
4014
'bzrlib.tests.test_ftp_transport',
3734
4015
'bzrlib.tests.test_foreign',
3735
4016
'bzrlib.tests.test_generate_docs',
4141
4458
% (os.path.basename(dirname), printable_e))
4144
class Feature(object):
4145
"""An operating system Feature."""
4148
self._available = None
4150
def available(self):
4151
"""Is the feature available?
4153
:return: True if the feature is available.
4155
if self._available is None:
4156
self._available = self._probe()
4157
return self._available
4160
"""Implement this method in concrete features.
4162
:return: True if the feature is available.
4164
raise NotImplementedError
4167
if getattr(self, 'feature_name', None):
4168
return self.feature_name()
4169
return self.__class__.__name__
4172
class _SymlinkFeature(Feature):
4175
return osutils.has_symlinks()
4177
def feature_name(self):
4180
SymlinkFeature = _SymlinkFeature()
4183
class _HardlinkFeature(Feature):
4186
return osutils.has_hardlinks()
4188
def feature_name(self):
4191
HardlinkFeature = _HardlinkFeature()
4194
class _OsFifoFeature(Feature):
4197
return getattr(os, 'mkfifo', None)
4199
def feature_name(self):
4200
return 'filesystem fifos'
4202
OsFifoFeature = _OsFifoFeature()
4205
class _UnicodeFilenameFeature(Feature):
4206
"""Does the filesystem support Unicode filenames?"""
4210
# Check for character combinations unlikely to be covered by any
4211
# single non-unicode encoding. We use the characters
4212
# - greek small letter alpha (U+03B1) and
4213
# - braille pattern dots-123456 (U+283F).
4214
os.stat(u'\u03b1\u283f')
4215
except UnicodeEncodeError:
4217
except (IOError, OSError):
4218
# The filesystem allows the Unicode filename but the file doesn't
4222
# The filesystem allows the Unicode filename and the file exists,
4226
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4229
class _CompatabilityThunkFeature(Feature):
4230
"""This feature is just a thunk to another feature.
4232
It issues a deprecation warning if it is accessed, to let you know that you
4233
should really use a different feature.
4236
def __init__(self, dep_version, module, name,
4237
replacement_name, replacement_module=None):
4238
super(_CompatabilityThunkFeature, self).__init__()
4239
self._module = module
4240
if replacement_module is None:
4241
replacement_module = module
4242
self._replacement_module = replacement_module
4244
self._replacement_name = replacement_name
4245
self._dep_version = dep_version
4246
self._feature = None
4249
if self._feature is None:
4250
depr_msg = self._dep_version % ('%s.%s'
4251
% (self._module, self._name))
4252
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4253
self._replacement_name)
4254
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4255
# Import the new feature and use it as a replacement for the
4257
mod = __import__(self._replacement_module, {}, {},
4258
[self._replacement_name])
4259
self._feature = getattr(mod, self._replacement_name)
4263
return self._feature._probe()
4266
class ModuleAvailableFeature(Feature):
4267
"""This is a feature than describes a module we want to be available.
4269
Declare the name of the module in __init__(), and then after probing, the
4270
module will be available as 'self.module'.
4272
:ivar module: The module if it is available, else None.
4275
def __init__(self, module_name):
4276
super(ModuleAvailableFeature, self).__init__()
4277
self.module_name = module_name
4281
self._module = __import__(self.module_name, {}, {}, [''])
4288
if self.available(): # Make sure the probe has been done
4292
def feature_name(self):
4293
return self.module_name
4296
# This is kept here for compatibility, it is recommended to use
4297
# 'bzrlib.tests.feature.paramiko' instead
4298
ParamikoFeature = _CompatabilityThunkFeature(
4299
deprecated_in((2,1,0)),
4300
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4303
4461
def probe_unicode_in_user_encoding():
4304
4462
"""Try to encode several unicode strings to use in unicode-aware tests.
4305
4463
Return first successfull match.
4336
class _HTTPSServerFeature(Feature):
4337
"""Some tests want an https Server, check if one is available.
4339
Right now, the only way this is available is under python2.6 which provides
4350
def feature_name(self):
4351
return 'HTTPSServer'
4354
HTTPSServerFeature = _HTTPSServerFeature()
4357
class _UnicodeFilename(Feature):
4358
"""Does the filesystem support Unicode filenames?"""
4363
except UnicodeEncodeError:
4365
except (IOError, OSError):
4366
# The filesystem allows the Unicode filename but the file doesn't
4370
# The filesystem allows the Unicode filename and the file exists,
4374
UnicodeFilename = _UnicodeFilename()
4377
class _ByteStringNamedFilesystem(Feature):
4378
"""Is the filesystem based on bytes?"""
4381
if os.name == "posix":
4385
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4388
class _UTF8Filesystem(Feature):
4389
"""Is the filesystem UTF-8?"""
4392
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4396
UTF8Filesystem = _UTF8Filesystem()
4399
class _BreakinFeature(Feature):
4400
"""Does this platform support the breakin feature?"""
4403
from bzrlib import breakin
4404
if breakin.determine_signal() is None:
4406
if sys.platform == 'win32':
4407
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4408
# We trigger SIGBREAK via a Console api so we need ctypes to
4409
# access the function
4416
def feature_name(self):
4417
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4420
BreakinFeature = _BreakinFeature()
4423
class _CaseInsCasePresFilenameFeature(Feature):
4424
"""Is the file-system case insensitive, but case-preserving?"""
4427
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4429
# first check truly case-preserving for created files, then check
4430
# case insensitive when opening existing files.
4431
name = osutils.normpath(name)
4432
base, rel = osutils.split(name)
4433
found_rel = osutils.canonical_relpath(base, name)
4434
return (found_rel == rel
4435
and os.path.isfile(name.upper())
4436
and os.path.isfile(name.lower()))
4441
def feature_name(self):
4442
return "case-insensitive case-preserving filesystem"
4444
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4447
class _CaseInsensitiveFilesystemFeature(Feature):
4448
"""Check if underlying filesystem is case-insensitive but *not* case
4451
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4452
# more likely to be case preserving, so this case is rare.
4455
if CaseInsCasePresFilenameFeature.available():
4458
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4459
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4460
TestCaseWithMemoryTransport.TEST_ROOT = root
4462
root = TestCaseWithMemoryTransport.TEST_ROOT
4463
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4465
name_a = osutils.pathjoin(tdir, 'a')
4466
name_A = osutils.pathjoin(tdir, 'A')
4468
result = osutils.isdir(name_A)
4469
_rmtree_temp_dir(tdir)
4472
def feature_name(self):
4473
return 'case-insensitive filesystem'
4475
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4478
class _CaseSensitiveFilesystemFeature(Feature):
4481
if CaseInsCasePresFilenameFeature.available():
4483
elif CaseInsensitiveFilesystemFeature.available():
4488
def feature_name(self):
4489
return 'case-sensitive filesystem'
4491
# new coding style is for feature instances to be lowercase
4492
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4495
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4496
SubUnitFeature = _CompatabilityThunkFeature(
4497
deprecated_in((2,1,0)),
4498
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4499
4494
# Only define SubUnitBzrRunner if subunit is available.
4501
4496
from subunit import TestProtocolClient
4502
4497
from subunit.test_results import AutoTimingTestResultDecorator
4498
class SubUnitBzrProtocolClient(TestProtocolClient):
4500
def stopTest(self, test):
4501
super(SubUnitBzrProtocolClient, self).stopTest(test)
4502
_clear__type_equality_funcs(test)
4504
def addSuccess(self, test, details=None):
4505
# The subunit client always includes the details in the subunit
4506
# stream, but we don't want to include it in ours.
4507
if details is not None and 'log' in details:
4509
return super(SubUnitBzrProtocolClient, self).addSuccess(
4503
4512
class SubUnitBzrRunner(TextTestRunner):
4504
4513
def run(self, test):
4505
4514
result = AutoTimingTestResultDecorator(
4506
TestProtocolClient(self.stream))
4515
SubUnitBzrProtocolClient(self.stream))
4507
4516
test.run(result)
4509
4518
except ImportError:
4512
class _PosixPermissionsFeature(Feature):
4516
# create temporary file and check if specified perms are maintained.
4519
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4520
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4523
os.chmod(name, write_perms)
4525
read_perms = os.stat(name).st_mode & 0777
4527
return (write_perms == read_perms)
4529
return (os.name == 'posix') and has_perms()
4531
def feature_name(self):
4532
return 'POSIX permissions support'
4534
posix_permissions_feature = _PosixPermissionsFeature()
4522
# API compatibility for old plugins; see bug 892622.
4525
'HTTPServerFeature',
4526
'ModuleAvailableFeature',
4527
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4528
'OsFifoFeature', 'UnicodeFilenameFeature',
4529
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4530
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4531
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4532
'posix_permissions_feature',
4534
globals()[name] = _CompatabilityThunkFeature(
4535
symbol_versioning.deprecated_in((2, 5, 0)),
4536
'bzrlib.tests', name,
4537
name, 'bzrlib.tests.features')
4540
for (old_name, new_name) in [
4541
('UnicodeFilename', 'UnicodeFilenameFeature'),
4543
globals()[name] = _CompatabilityThunkFeature(
4544
symbol_versioning.deprecated_in((2, 5, 0)),
4545
'bzrlib.tests', old_name,
4546
new_name, 'bzrlib.tests.features')