56
54
# nb: check this before importing anything else from within it
57
55
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
56
if _testtools_version < (0, 9, 5):
57
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
58
% (testtools.__file__, _testtools_version))
61
59
from testtools import content
63
62
from bzrlib import (
66
commands as _mod_commands,
76
plugin as _mod_plugin,
78
83
transport as _mod_transport,
82
import bzrlib.commands
83
import bzrlib.timestamp
85
import bzrlib.inventory
86
import bzrlib.iterablefile
89
87
import bzrlib.lsprof
90
88
except ImportError:
91
89
# lsprof not available
93
from bzrlib.merge import merge_inner
96
from bzrlib.smart import client, request, server
98
from bzrlib import symbol_versioning
91
from bzrlib.smart import client, request
92
from bzrlib.transport import (
99
96
from bzrlib.symbol_versioning import (
100
DEPRECATED_PARAMETER,
101
97
deprecated_function,
107
from bzrlib.transport import (
111
from bzrlib.trace import mutter, note
112
100
from bzrlib.tests import (
117
106
from bzrlib.ui import NullProgressView
118
107
from bzrlib.ui.text import TextUIFactory
119
import bzrlib.version_info_formats.format_custom
120
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
108
from bzrlib.tests.features import _CompatabilityThunkFeature
122
110
# Mark this python module as being part of the implementation
123
111
# of unittest: this gives us better tracebacks where the last
135
123
SUBUNIT_SEEK_SET = 0
136
124
SUBUNIT_SEEK_CUR = 1
126
# These are intentionally brought into this namespace. That way plugins, etc
127
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
128
TestSuite = TestUtil.TestSuite
129
TestLoader = TestUtil.TestLoader
131
# Tests should run in a clean and clearly defined environment. The goal is to
132
# keep them isolated from the running environment as mush as possible. The test
133
# framework ensures the variables defined below are set (or deleted if the
134
# value is None) before a test is run and reset to their original value after
135
# the test is run. Generally if some code depends on an environment variable,
136
# the tests should start without this variable in the environment. There are a
137
# few exceptions but you shouldn't violate this rule lightly.
141
'XDG_CONFIG_HOME': None,
142
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
143
# tests do check our impls match APPDATA
144
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
148
'BZREMAIL': None, # may still be present in the environment
149
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
150
'BZR_PROGRESS_BAR': None,
151
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
152
# as a base class instead of TestCaseInTempDir. Tests inheriting from
153
# TestCase should not use disk resources, BZR_LOG is one.
154
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
155
'BZR_PLUGIN_PATH': None,
156
'BZR_DISABLE_PLUGINS': None,
157
'BZR_PLUGINS_AT': None,
158
'BZR_CONCURRENCY': None,
159
# Make sure that any text ui tests are consistent regardless of
160
# the environment the test case is run in; you may want tests that
161
# test other combinations. 'dumb' is a reasonable guess for tests
162
# going to a pipe or a StringIO.
168
'SSH_AUTH_SOCK': None,
178
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
179
# least. If you do (care), please update this comment
183
'BZR_REMOTE_PATH': None,
184
# Generally speaking, we don't want apport reporting on crashes in
185
# the test envirnoment unless we're specifically testing apport,
186
# so that it doesn't leak into the real system environment. We
187
# use an env var so it propagates to subprocesses.
188
'APPORT_DISABLE': '1',
192
def override_os_environ(test, env=None):
193
"""Modify os.environ keeping a copy.
195
:param test: A test instance
197
:param env: A dict containing variable definitions to be installed
200
env = isolated_environ
201
test._original_os_environ = dict([(var, value)
202
for var, value in os.environ.iteritems()])
203
for var, value in env.iteritems():
204
osutils.set_or_unset_env(var, value)
205
if var not in test._original_os_environ:
206
# The var is new, add it with a value of None, so
207
# restore_os_environ will delete it
208
test._original_os_environ[var] = None
211
def restore_os_environ(test):
212
"""Restore os.environ to its original state.
214
:param test: A test instance previously passed to override_os_environ.
216
for var, value in test._original_os_environ.iteritems():
217
# Restore the original value (or delete it if the value has been set to
218
# None in override_os_environ).
219
osutils.set_or_unset_env(var, value)
222
def _clear__type_equality_funcs(test):
223
"""Cleanup bound methods stored on TestCase instances
225
Clear the dict breaking a few (mostly) harmless cycles in the affected
226
unittests released with Python 2.6 and initial Python 2.7 versions.
228
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
229
shipped in Oneiric, an object with no clear method was used, hence the
230
extra complications, see bug 809048 for details.
232
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
233
if type_equality_funcs is not None:
234
tef_clear = getattr(type_equality_funcs, "clear", None)
235
if tef_clear is None:
236
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
237
if tef_instance_dict is not None:
238
tef_clear = tef_instance_dict.clear
239
if tef_clear is not None:
139
243
class ExtendedTestResult(testtools.TextTestResult):
140
244
"""Accepts, reports and accumulates the results of running tests.
275
392
what = re.sub(r'^bzrlib\.tests\.', '', what)
395
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
396
# multiple times in a row, because the handler is added for
397
# each test but the container list is shared between cases.
398
# See lp:498869 lp:625574 and lp:637725 for background.
399
def _record_traceback_from_test(self, exc_info):
400
"""Store the traceback from passed exc_info tuple till"""
401
self._traceback_from_test = exc_info[2]
278
403
def startTest(self, test):
279
404
super(ExtendedTestResult, self).startTest(test)
280
405
if self.count == 0:
281
406
self.startTests()
282
408
self.report_test_start(test)
283
409
test.number = self.count
284
410
self._recordTestStartTime()
411
# Make testtools cases give us the real traceback on failure
412
addOnException = getattr(test, "addOnException", None)
413
if addOnException is not None:
414
addOnException(self._record_traceback_from_test)
415
# Only check for thread leaks on bzrlib derived test cases
416
if isinstance(test, TestCase):
417
test.addCleanup(self._check_leaked_threads, test)
419
def stopTest(self, test):
420
super(ExtendedTestResult, self).stopTest(test)
421
# Manually break cycles, means touching various private things but hey
422
getDetails = getattr(test, "getDetails", None)
423
if getDetails is not None:
425
_clear__type_equality_funcs(test)
426
self._traceback_from_test = None
286
428
def startTests(self):
288
if getattr(sys, 'frozen', None) is None:
289
bzr_path = osutils.realpath(sys.argv[0])
291
bzr_path = sys.executable
293
'bzr selftest: %s\n' % (bzr_path,))
296
bzrlib.__path__[0],))
298
' bzr-%s python-%s %s\n' % (
299
bzrlib.version_string,
300
bzrlib._format_version_tuple(sys.version_info),
301
platform.platform(aliased=1),
303
self.stream.write('\n')
429
self.report_tests_starting()
430
self._active_threads = threading.enumerate()
432
def _check_leaked_threads(self, test):
433
"""See if any threads have leaked since last call
435
A sample of live threads is stored in the _active_threads attribute,
436
when this method runs it compares the current live threads and any not
437
in the previous sample are treated as having leaked.
439
now_active_threads = set(threading.enumerate())
440
threads_leaked = now_active_threads.difference(self._active_threads)
442
self._report_thread_leak(test, threads_leaked, now_active_threads)
443
self._tests_leaking_threads_count += 1
444
if self._first_thread_leaker_id is None:
445
self._first_thread_leaker_id = test.id()
446
self._active_threads = now_active_threads
305
448
def _recordTestStartTime(self):
306
449
"""Record that a test has started."""
307
self._start_time = time.time()
309
def _cleanupLogFile(self, test):
310
# We can only do this if we have one of our TestCases, not if
312
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
313
if setKeepLogfile is not None:
450
self._start_datetime = self._now()
316
452
def addError(self, test, err):
317
453
"""Tell result that test finished with an error.
398
549
raise errors.BzrError("Unknown whence %r" % whence)
400
def report_cleaning_up(self):
551
def report_tests_starting(self):
552
"""Display information before the test run begins"""
553
if getattr(sys, 'frozen', None) is None:
554
bzr_path = osutils.realpath(sys.argv[0])
556
bzr_path = sys.executable
558
'bzr selftest: %s\n' % (bzr_path,))
561
bzrlib.__path__[0],))
563
' bzr-%s python-%s %s\n' % (
564
bzrlib.version_string,
565
bzrlib._format_version_tuple(sys.version_info),
566
platform.platform(aliased=1),
568
self.stream.write('\n')
570
def report_test_start(self, test):
571
"""Display information on the test just about to be run"""
573
def _report_thread_leak(self, test, leaked_threads, active_threads):
574
"""Display information on a test that leaked one or more threads"""
575
# GZ 2010-09-09: A leak summary reported separately from the general
576
# thread debugging would be nice. Tests under subunit
577
# need something not using stream, perhaps adding a
578
# testtools details object would be fitting.
579
if 'threads' in selftest_debug_flags:
580
self.stream.write('%s is leaking, active is now %d\n' %
581
(test.id(), len(active_threads)))
403
583
def startTestRun(self):
404
584
self.startTime = time.time()
824
1035
self._track_transports()
825
1036
self._track_locks()
826
1037
self._clear_debug_flags()
827
TestCase._active_threads = threading.activeCount()
828
self.addCleanup(self._check_leaked_threads)
1038
# Isolate global verbosity level, to make sure it's reproducible
1039
# between tests. We should get rid of this altogether: bug 656694. --
1041
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1042
self._log_files = set()
1043
# Each key in the ``_counters`` dict holds a value for a different
1044
# counter. When the test ends, addDetail() should be used to output the
1045
# counter values. This happens in install_counter_hook().
1047
if 'config_stats' in selftest_debug_flags:
1048
self._install_config_stats_hooks()
1049
# Do not use i18n for tests (unless the test reverses this)
830
1052
def debug(self):
831
1053
# debug a frame up.
833
pdb.Pdb().set_trace(sys._getframe().f_back)
835
def _check_leaked_threads(self):
836
active = threading.activeCount()
837
leaked_threads = active - TestCase._active_threads
838
TestCase._active_threads = active
839
# If some tests make the number of threads *decrease*, we'll consider
840
# that they are just observing old threads dieing, not agressively kill
841
# random threads. So we don't report these tests as leaking. The risk
842
# is that we have false positives that way (the test see 2 threads
843
# going away but leak one) but it seems less likely than the actual
844
# false positives (the test see threads going away and does not leak).
845
if leaked_threads > 0:
846
if 'threads' in selftest_debug_flags:
847
print '%s is leaking, active is now %d' % (self.id(), active)
848
TestCase._leaking_threads_tests += 1
849
if TestCase._first_thread_leaker_id is None:
850
TestCase._first_thread_leaker_id = self.id()
1055
# The sys preserved stdin/stdout should allow blackbox tests debugging
1056
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1057
).set_trace(sys._getframe().f_back)
1059
def discardDetail(self, name):
1060
"""Extend the addDetail, getDetails api so we can remove a detail.
1062
eg. bzr always adds the 'log' detail at startup, but we don't want to
1063
include it for skipped, xfail, etc tests.
1065
It is safe to call this for a detail that doesn't exist, in case this
1066
gets called multiple times.
1068
# We cheat. details is stored in __details which means we shouldn't
1069
# touch it. but getDetails() returns the dict directly, so we can
1071
details = self.getDetails()
1075
def install_counter_hook(self, hooks, name, counter_name=None):
1076
"""Install a counting hook.
1078
Any hook can be counted as long as it doesn't need to return a value.
1080
:param hooks: Where the hook should be installed.
1082
:param name: The hook name that will be counted.
1084
:param counter_name: The counter identifier in ``_counters``, defaults
1087
_counters = self._counters # Avoid closing over self
1088
if counter_name is None:
1090
if _counters.has_key(counter_name):
1091
raise AssertionError('%s is already used as a counter name'
1093
_counters[counter_name] = 0
1094
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1095
lambda: ['%d' % (_counters[counter_name],)]))
1096
def increment_counter(*args, **kwargs):
1097
_counters[counter_name] += 1
1098
label = 'count %s calls' % (counter_name,)
1099
hooks.install_named_hook(name, increment_counter, label)
1100
self.addCleanup(hooks.uninstall_named_hook, name, label)
1102
def _install_config_stats_hooks(self):
1103
"""Install config hooks to count hook calls.
1106
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1107
self.install_counter_hook(config.ConfigHooks, hook_name,
1108
'config.%s' % (hook_name,))
1110
# The OldConfigHooks are private and need special handling to protect
1111
# against recursive tests (tests that run other tests), so we just do
1112
# manually what registering them into _builtin_known_hooks will provide
1114
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1115
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1116
self.install_counter_hook(config.OldConfigHooks, hook_name,
1117
'old_config.%s' % (hook_name,))
852
1119
def _clear_debug_flags(self):
853
1120
"""Prevent externally set debug flags affecting tests.
1321
1603
self.assertEqual(expected_docstring, obj.__doc__)
1605
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1323
1606
def failUnlessExists(self, path):
1607
return self.assertPathExists(path)
1609
def assertPathExists(self, path):
1324
1610
"""Fail unless path or paths, which may be abs or relative, exist."""
1325
1611
if not isinstance(path, basestring):
1327
self.failUnlessExists(p)
1613
self.assertPathExists(p)
1329
self.failUnless(osutils.lexists(path),path+" does not exist")
1615
self.assertTrue(osutils.lexists(path),
1616
path + " does not exist")
1618
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1331
1619
def failIfExists(self, path):
1620
return self.assertPathDoesNotExist(path)
1622
def assertPathDoesNotExist(self, path):
1332
1623
"""Fail if path or paths, which may be abs or relative, exist."""
1333
1624
if not isinstance(path, basestring):
1335
self.failIfExists(p)
1626
self.assertPathDoesNotExist(p)
1337
self.failIf(osutils.lexists(path),path+" exists")
1628
self.assertFalse(osutils.lexists(path),
1339
1631
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1340
1632
"""A helper for callDeprecated and applyDeprecated.
1452
1745
def _startLogFile(self):
1453
"""Send bzr and test log messages to a temporary file.
1455
The file is removed as the test is torn down.
1457
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1458
self._log_file = os.fdopen(fileno, 'w+')
1459
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1460
self._log_file_name = name
1746
"""Setup a in-memory target for bzr and testcase log messages"""
1747
pseudo_log_file = StringIO()
1748
def _get_log_contents_for_weird_testtools_api():
1749
return [pseudo_log_file.getvalue().decode(
1750
"utf-8", "replace").encode("utf-8")]
1751
self.addDetail("log", content.Content(content.ContentType("text",
1752
"plain", {"charset": "utf8"}),
1753
_get_log_contents_for_weird_testtools_api))
1754
self._log_file = pseudo_log_file
1755
self._log_memento = trace.push_log_file(self._log_file)
1461
1756
self.addCleanup(self._finishLogFile)
1463
1758
def _finishLogFile(self):
1464
"""Finished with the log file.
1466
Close the file and delete it, unless setKeepLogfile was called.
1468
if bzrlib.trace._trace_file:
1759
"""Flush and dereference the in-memory log for this testcase"""
1760
if trace._trace_file:
1469
1761
# flush the log file, to get all content
1470
bzrlib.trace._trace_file.flush()
1471
bzrlib.trace.pop_log_file(self._log_memento)
1472
# Cache the log result and delete the file on disk
1473
self._get_log(False)
1762
trace._trace_file.flush()
1763
trace.pop_log_file(self._log_memento)
1764
# The logging module now tracks references for cleanup so discard ours
1765
del self._log_memento
1475
1767
def thisFailsStrictLockCheck(self):
1476
1768
"""It is known that this test would fail with -Dstrict_locks.
1506
1793
:returns: The actual attr value.
1508
value = getattr(obj, attr_name)
1509
1795
# The actual value is captured by the call below
1510
self.addCleanup(setattr, obj, attr_name, value)
1796
value = getattr(obj, attr_name, _unitialized_attr)
1797
if value is _unitialized_attr:
1798
# When the test completes, the attribute should not exist, but if
1799
# we aren't setting a value, we don't need to do anything.
1800
if new is not _unitialized_attr:
1801
self.addCleanup(delattr, obj, attr_name)
1803
self.addCleanup(setattr, obj, attr_name, value)
1511
1804
if new is not _unitialized_attr:
1512
1805
setattr(obj, attr_name, new)
1808
def overrideEnv(self, name, new):
1809
"""Set an environment variable, and reset it after the test.
1811
:param name: The environment variable name.
1813
:param new: The value to set the variable to. If None, the
1814
variable is deleted from the environment.
1816
:returns: The actual variable value.
1818
value = osutils.set_or_unset_env(name, new)
1819
self.addCleanup(osutils.set_or_unset_env, name, value)
1822
def recordCalls(self, obj, attr_name):
1823
"""Monkeypatch in a wrapper that will record calls.
1825
The monkeypatch is automatically removed when the test concludes.
1827
:param obj: The namespace holding the reference to be replaced;
1828
typically a module, class, or object.
1829
:param attr_name: A string for the name of the attribute to
1831
:returns: A list that will be extended with one item every time the
1832
function is called, with a tuple of (args, kwargs).
1836
def decorator(*args, **kwargs):
1837
calls.append((args, kwargs))
1838
return orig(*args, **kwargs)
1839
orig = self.overrideAttr(obj, attr_name, decorator)
1515
1842
def _cleanEnvironment(self):
1517
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1518
'HOME': os.getcwd(),
1519
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1520
# tests do check our impls match APPDATA
1521
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1525
'BZREMAIL': None, # may still be present in the environment
1526
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1527
'BZR_PROGRESS_BAR': None,
1529
'BZR_PLUGIN_PATH': None,
1530
'BZR_DISABLE_PLUGINS': None,
1531
'BZR_PLUGINS_AT': None,
1532
'BZR_CONCURRENCY': None,
1533
# Make sure that any text ui tests are consistent regardless of
1534
# the environment the test case is run in; you may want tests that
1535
# test other combinations. 'dumb' is a reasonable guess for tests
1536
# going to a pipe or a StringIO.
1540
'BZR_COLUMNS': '80',
1542
'SSH_AUTH_SOCK': None,
1546
'https_proxy': None,
1547
'HTTPS_PROXY': None,
1552
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1553
# least. If you do (care), please update this comment
1557
'BZR_REMOTE_PATH': None,
1558
# Generally speaking, we don't want apport reporting on crashes in
1559
# the test envirnoment unless we're specifically testing apport,
1560
# so that it doesn't leak into the real system environment. We
1561
# use an env var so it propagates to subprocesses.
1562
'APPORT_DISABLE': '1',
1565
self.addCleanup(self._restoreEnvironment)
1566
for name, value in new_env.iteritems():
1567
self._captureVar(name, value)
1569
def _captureVar(self, name, newvalue):
1570
"""Set an environment variable, and reset it when finished."""
1571
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1573
def _restoreEnvironment(self):
1574
for name, value in self._old_env.iteritems():
1575
osutils.set_or_unset_env(name, value)
1843
for name, value in isolated_environ.iteritems():
1844
self.overrideEnv(name, value)
1577
1846
def _restoreHooks(self):
1578
1847
for klass, (name, hooks) in self._preserved_hooks.items():
1579
1848
setattr(klass, name, hooks)
1849
self._preserved_hooks.clear()
1850
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1851
self._preserved_lazy_hooks.clear()
1581
1853
def knownFailure(self, reason):
1582
"""This test has failed for some known reason."""
1583
raise KnownFailure(reason)
1854
"""Declare that this test fails for a known reason
1856
Tests that are known to fail should generally be using expectedFailure
1857
with an appropriate reverse assertion if a change could cause the test
1858
to start passing. Conversely if the test has no immediate prospect of
1859
succeeding then using skip is more suitable.
1861
When this method is called while an exception is being handled, that
1862
traceback will be used, otherwise a new exception will be thrown to
1863
provide one but won't be reported.
1865
self._add_reason(reason)
1867
exc_info = sys.exc_info()
1868
if exc_info != (None, None, None):
1869
self._report_traceback(exc_info)
1872
raise self.failureException(reason)
1873
except self.failureException:
1874
exc_info = sys.exc_info()
1875
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1876
raise testtools.testcase._ExpectedFailure(exc_info)
1880
def _suppress_log(self):
1881
"""Remove the log info from details."""
1882
self.discardDetail('log')
1585
1884
def _do_skip(self, result, reason):
1885
self._suppress_log()
1586
1886
addSkip = getattr(result, 'addSkip', None)
1587
1887
if not callable(addSkip):
1588
1888
result.addSuccess(result)
1644
1967
self._benchtime += time.time() - start
1646
1969
def log(self, *args):
1649
def _get_log(self, keep_log_file=False):
1650
"""Internal helper to get the log from bzrlib.trace for this test.
1652
Please use self.getDetails, or self.get_log to access this in test case
1655
:param keep_log_file: When True, if the log is still a file on disk
1656
leave it as a file on disk. When False, if the log is still a file
1657
on disk, the log file is deleted and the log preserved as
1659
:return: A string containing the log.
1661
if self._log_contents is not None:
1663
self._log_contents.decode('utf8')
1664
except UnicodeDecodeError:
1665
unicodestr = self._log_contents.decode('utf8', 'replace')
1666
self._log_contents = unicodestr.encode('utf8')
1667
return self._log_contents
1669
if bzrlib.trace._trace_file:
1670
# flush the log file, to get all content
1671
bzrlib.trace._trace_file.flush()
1672
if self._log_file_name is not None:
1673
logfile = open(self._log_file_name)
1675
log_contents = logfile.read()
1679
log_contents.decode('utf8')
1680
except UnicodeDecodeError:
1681
unicodestr = log_contents.decode('utf8', 'replace')
1682
log_contents = unicodestr.encode('utf8')
1683
if not keep_log_file:
1685
max_close_attempts = 100
1686
first_close_error = None
1687
while close_attempts < max_close_attempts:
1690
self._log_file.close()
1691
except IOError, ioe:
1692
if ioe.errno is None:
1693
# No errno implies 'close() called during
1694
# concurrent operation on the same file object', so
1695
# retry. Probably a thread is trying to write to
1697
if first_close_error is None:
1698
first_close_error = ioe
1703
if close_attempts > 1:
1705
'Unable to close log file on first attempt, '
1706
'will retry: %s\n' % (first_close_error,))
1707
if close_attempts == max_close_attempts:
1709
'Unable to close log file after %d attempts.\n'
1710
% (max_close_attempts,))
1711
self._log_file = None
1712
# Permit multiple calls to get_log until we clean it up in
1714
self._log_contents = log_contents
1716
os.remove(self._log_file_name)
1718
if sys.platform == 'win32' and e.errno == errno.EACCES:
1719
sys.stderr.write(('Unable to delete log file '
1720
' %r\n' % self._log_file_name))
1723
self._log_file_name = None
1726
return "No log file content and no log file name."
1728
1972
def get_log(self):
1729
1973
"""Get a unicode string containing the log from bzrlib.trace.
1944
2189
variables. A value of None will unset the env variable.
1945
2190
The values must be strings. The change will only occur in the
1946
2191
child, so you don't need to fix the environment after running.
1947
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2192
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2193
doesn't support signalling subprocesses.
1949
2194
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2195
:param stderr: file to use for the subprocess's stderr. Valid values
2196
are those valid for the stderr argument of `subprocess.Popen`.
2197
Default value is ``subprocess.PIPE``.
1951
2199
:returns: Popen object for the started process.
1953
2201
if skip_if_plan_to_signal:
1954
if not getattr(os, 'kill', None):
1955
raise TestSkipped("os.kill not available.")
2202
if os.name != "posix":
2203
raise TestSkipped("Sending signals not supported")
1957
2205
if env_changes is None:
1958
2206
env_changes = {}
2207
# Because $HOME is set to a tempdir for the context of a test, modules
2208
# installed in the user dir will not be found unless $PYTHONUSERBASE
2209
# gets set to the computed directory of this parent process.
2210
if site.USER_BASE is not None:
2211
env_changes["PYTHONUSERBASE"] = site.USER_BASE
1961
2214
def cleanup_environment():
2254
def _add_subprocess_log(self, log_file_path):
2255
if len(self._log_files) == 0:
2256
# Register an addCleanup func. We do this on the first call to
2257
# _add_subprocess_log rather than in TestCase.setUp so that this
2258
# addCleanup is registered after any cleanups for tempdirs that
2259
# subclasses might create, which will probably remove the log file
2261
self.addCleanup(self._subprocess_log_cleanup)
2262
# self._log_files is a set, so if a log file is reused we won't grab it
2264
self._log_files.add(log_file_path)
2266
def _subprocess_log_cleanup(self):
2267
for count, log_file_path in enumerate(self._log_files):
2268
# We use buffer_now=True to avoid holding the file open beyond
2269
# the life of this function, which might interfere with e.g.
2270
# cleaning tempdirs on Windows.
2271
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2272
#detail_content = content.content_from_file(
2273
# log_file_path, buffer_now=True)
2274
with open(log_file_path, 'rb') as log_file:
2275
log_file_bytes = log_file.read()
2276
detail_content = content.Content(content.ContentType("text",
2277
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2278
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1998
2281
def _popen(self, *args, **kwargs):
1999
2282
"""Place a call to Popen.
2037
2320
if retcode is not None and retcode != process.returncode:
2038
2321
if process_args is None:
2039
2322
process_args = "(unknown args)"
2040
mutter('Output of bzr %s:\n%s', process_args, out)
2041
mutter('Error for bzr %s:\n%s', process_args, err)
2323
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2324
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2042
2325
self.fail('Command bzr %s failed with retcode %s != %s'
2043
2326
% (process_args, retcode, process.returncode))
2044
2327
return [out, err]
2046
def check_inventory_shape(self, inv, shape):
2047
"""Compare an inventory to a list of expected names.
2329
def check_tree_shape(self, tree, shape):
2330
"""Compare a tree to a list of expected names.
2049
2332
Fail if they are not precisely equal.
2052
2335
shape = list(shape) # copy
2053
for path, ie in inv.entries():
2336
for path, ie in tree.iter_entries_by_dir():
2054
2337
name = path.replace('\\', '/')
2055
2338
if ie.kind == 'directory':
2056
2339
name = name + '/'
2341
pass # ignore root entry
2058
2343
shape.remove(name)
2060
2345
extras.append(name)
2150
2437
class TestCaseWithMemoryTransport(TestCase):
2151
2438
"""Common test class for tests that do not need disk resources.
2153
Tests that need disk resources should derive from TestCaseWithTransport.
2440
Tests that need disk resources should derive from TestCaseInTempDir
2441
orTestCaseWithTransport.
2155
2443
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2157
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2445
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2158
2446
a directory which does not exist. This serves to help ensure test isolation
2159
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2160
must exist. However, TestCaseWithMemoryTransport does not offer local
2161
file defaults for the transport in tests, nor does it obey the command line
2447
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2448
must exist. However, TestCaseWithMemoryTransport does not offer local file
2449
defaults for the transport in tests, nor does it obey the command line
2162
2450
override, so tests that accidentally write to the common directory should
2165
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2166
a .bzr directory that stops us ascending higher into the filesystem.
2453
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2454
``.bzr`` directory that stops us ascending higher into the filesystem.
2169
2457
TEST_ROOT = None
2179
2467
self.transport_readonly_server = None
2180
2468
self.__vfs_server = None
2471
super(TestCaseWithMemoryTransport, self).setUp()
2473
def _add_disconnect_cleanup(transport):
2474
"""Schedule disconnection of given transport at test cleanup
2476
This needs to happen for all connected transports or leaks occur.
2478
Note reconnections may mean we call disconnect multiple times per
2479
transport which is suboptimal but seems harmless.
2481
self.addCleanup(transport.disconnect)
2483
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2484
_add_disconnect_cleanup, None)
2486
self._make_test_root()
2487
self.addCleanup(os.chdir, os.getcwdu())
2488
self.makeAndChdirToTestDir()
2489
self.overrideEnvironmentForTesting()
2490
self.__readonly_server = None
2491
self.__server = None
2492
self.reduceLockdirTimeout()
2493
# Each test may use its own config files even if the local config files
2494
# don't actually exist. They'll rightly fail if they try to create them
2496
self.overrideAttr(config, '_shared_stores', {})
2182
2498
def get_transport(self, relpath=None):
2183
2499
"""Return a writeable transport.
2376
2706
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2377
2707
self.permit_dir(self.test_dir)
2379
def make_branch(self, relpath, format=None):
2709
def make_branch(self, relpath, format=None, name=None):
2380
2710
"""Create a branch on the transport at relpath."""
2381
2711
repo = self.make_repository(relpath, format=format)
2382
return repo.bzrdir.create_branch()
2712
return repo.bzrdir.create_branch(append_revisions_only=False,
2715
def get_default_format(self):
2718
def resolve_format(self, format):
2719
"""Resolve an object to a ControlDir format object.
2721
The initial format object can either already be
2722
a ControlDirFormat, None (for the default format),
2723
or a string with the name of the control dir format.
2725
:param format: Object to resolve
2726
:return A ControlDirFormat instance
2729
format = self.get_default_format()
2730
if isinstance(format, basestring):
2731
format = controldir.format_registry.make_bzrdir(format)
2384
2734
def make_bzrdir(self, relpath, format=None):
2431
2778
test_home_dir = self.test_home_dir
2432
2779
if isinstance(test_home_dir, unicode):
2433
2780
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2434
os.environ['HOME'] = test_home_dir
2435
os.environ['BZR_HOME'] = test_home_dir
2438
super(TestCaseWithMemoryTransport, self).setUp()
2439
# Ensure that ConnectedTransport doesn't leak sockets
2440
def get_transport_with_cleanup(*args, **kwargs):
2441
t = orig_get_transport(*args, **kwargs)
2442
if isinstance(t, _mod_transport.ConnectedTransport):
2443
self.addCleanup(t.disconnect)
2446
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
get_transport_with_cleanup)
2448
self._make_test_root()
2449
self.addCleanup(os.chdir, os.getcwdu())
2450
self.makeAndChdirToTestDir()
2451
self.overrideEnvironmentForTesting()
2452
self.__readonly_server = None
2453
self.__server = None
2454
self.reduceLockdirTimeout()
2781
self.overrideEnv('HOME', test_home_dir)
2782
self.overrideEnv('BZR_HOME', test_home_dir)
2456
2784
def setup_smart_server_with_call_log(self):
2457
2785
"""Sets up a smart server as the transport server with a call log."""
2458
2786
self.transport_server = test_server.SmartTCPServer_for_testing
2787
self.hpss_connections = []
2459
2788
self.hpss_calls = []
2460
2789
import traceback
2461
2790
# Skip the current stack down to the caller of
3118
3452
"""A decorator which excludes test matching an exclude pattern."""
3120
3454
def __init__(self, suite, exclude_pattern):
3121
TestDecorator.__init__(self, suite)
3122
self.exclude_pattern = exclude_pattern
3123
self.excluded = False
3127
return iter(self._tests)
3128
self.excluded = True
3129
suite = exclude_tests_by_re(self, self.exclude_pattern)
3131
self.addTests(suite)
3132
return iter(self._tests)
3455
super(ExcludeDecorator, self).__init__(
3456
exclude_tests_by_re(suite, exclude_pattern))
3135
3459
class FilterTestsDecorator(TestDecorator):
3136
3460
"""A decorator which filters tests to those matching a pattern."""
3138
3462
def __init__(self, suite, pattern):
3139
TestDecorator.__init__(self, suite)
3140
self.pattern = pattern
3141
self.filtered = False
3145
return iter(self._tests)
3146
self.filtered = True
3147
suite = filter_suite_by_re(self, self.pattern)
3149
self.addTests(suite)
3150
return iter(self._tests)
3463
super(FilterTestsDecorator, self).__init__(
3464
filter_suite_by_re(suite, pattern))
3153
3467
class RandomDecorator(TestDecorator):
3154
3468
"""A decorator which randomises the order of its tests."""
3156
3470
def __init__(self, suite, random_seed, stream):
3157
TestDecorator.__init__(self, suite)
3158
self.random_seed = random_seed
3159
self.randomised = False
3160
self.stream = stream
3164
return iter(self._tests)
3165
self.randomised = True
3166
self.stream.write("Randomizing test order using seed %s\n\n" %
3167
(self.actual_seed()))
3471
random_seed = self.actual_seed(random_seed)
3472
stream.write("Randomizing test order using seed %s\n\n" %
3168
3474
# Initialise the random number generator.
3169
random.seed(self.actual_seed())
3170
suite = randomize_suite(self)
3172
self.addTests(suite)
3173
return iter(self._tests)
3475
random.seed(random_seed)
3476
super(RandomDecorator, self).__init__(randomize_suite(suite))
3175
def actual_seed(self):
3176
if self.random_seed == "now":
3479
def actual_seed(seed):
3177
3481
# We convert the seed to a long to make it reuseable across
3178
3482
# invocations (because the user can reenter it).
3179
self.random_seed = long(time.time())
3483
return long(time.time())
3181
3485
# Convert the seed to a long if we can
3183
self.random_seed = long(self.random_seed)
3488
except (TypeError, ValueError):
3186
return self.random_seed
3189
3493
class TestFirstDecorator(TestDecorator):
3190
3494
"""A decorator which moves named tests to the front."""
3192
3496
def __init__(self, suite, pattern):
3193
TestDecorator.__init__(self, suite)
3194
self.pattern = pattern
3195
self.filtered = False
3199
return iter(self._tests)
3200
self.filtered = True
3201
suites = split_suite_by_re(self, self.pattern)
3203
self.addTests(suites)
3204
return iter(self._tests)
3497
super(TestFirstDecorator, self).__init__()
3498
self.addTests(split_suite_by_re(suite, pattern))
3207
3501
def partition_tests(suite, count):
3252
3546
ProtocolTestCase.run(self, result)
3254
os.waitpid(self.pid, 0)
3548
pid, status = os.waitpid(self.pid, 0)
3549
# GZ 2011-10-18: If status is nonzero, should report to the result
3550
# that something went wrong.
3256
3552
test_blocks = partition_tests(suite, concurrency)
3553
# Clear the tests from the original suite so it doesn't keep them alive
3554
suite._tests[:] = []
3257
3555
for process_tests in test_blocks:
3258
process_suite = TestUtil.TestSuite()
3259
process_suite.addTests(process_tests)
3556
process_suite = TestUtil.TestSuite(process_tests)
3557
# Also clear each split list so new suite has only reference
3558
process_tests[:] = []
3260
3559
c2pread, c2pwrite = os.pipe()
3261
3560
pid = os.fork()
3263
workaround_zealous_crypto_random()
3563
stream = os.fdopen(c2pwrite, 'wb', 1)
3564
workaround_zealous_crypto_random()
3265
3565
os.close(c2pread)
3266
3566
# Leave stderr and stdout open so we can see test noise
3267
3567
# Close stdin so that the child goes away if it decides to
3268
3568
# read from stdin (otherwise its a roulette to see what
3269
3569
# child actually gets keystrokes for pdb etc).
3270
3570
sys.stdin.close()
3272
stream = os.fdopen(c2pwrite, 'wb', 1)
3273
3571
subunit_result = AutoTimingTestResultDecorator(
3274
TestProtocolClient(stream))
3572
SubUnitBzrProtocolClient(stream))
3275
3573
process_suite.run(subunit_result)
3575
# Try and report traceback on stream, but exit with error even
3576
# if stream couldn't be created or something else goes wrong.
3577
# The traceback is formatted to a string and written in one go
3578
# to avoid interleaving lines from multiple failing children.
3580
stream.write(traceback.format_exc())
3279
3585
os.close(c2pwrite)
3280
3586
stream = os.fdopen(c2pread, 'rb', 1)
3344
class ForwardingResult(unittest.TestResult):
3346
def __init__(self, target):
3347
unittest.TestResult.__init__(self)
3348
self.result = target
3350
def startTest(self, test):
3351
self.result.startTest(test)
3353
def stopTest(self, test):
3354
self.result.stopTest(test)
3356
def startTestRun(self):
3357
self.result.startTestRun()
3359
def stopTestRun(self):
3360
self.result.stopTestRun()
3362
def addSkip(self, test, reason):
3363
self.result.addSkip(test, reason)
3365
def addSuccess(self, test):
3366
self.result.addSuccess(test)
3368
def addError(self, test, err):
3369
self.result.addError(test, err)
3371
def addFailure(self, test, err):
3372
self.result.addFailure(test, err)
3373
ForwardingResult = testtools.ExtendedToOriginalDecorator
3376
class ProfileResult(ForwardingResult):
3650
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3377
3651
"""Generate profiling data for all activity between start and success.
3379
3653
The profile data is appended to the test's _benchcalls attribute and can
3712
3991
'bzrlib.tests.test_commit_merge',
3713
3992
'bzrlib.tests.test_config',
3714
3993
'bzrlib.tests.test_conflicts',
3994
'bzrlib.tests.test_controldir',
3715
3995
'bzrlib.tests.test_counted_lock',
3716
3996
'bzrlib.tests.test_crash',
3717
3997
'bzrlib.tests.test_decorators',
3718
3998
'bzrlib.tests.test_delta',
3719
3999
'bzrlib.tests.test_debug',
3720
'bzrlib.tests.test_deprecated_graph',
3721
4000
'bzrlib.tests.test_diff',
3722
4001
'bzrlib.tests.test_directory_service',
3723
4002
'bzrlib.tests.test_dirstate',
3724
4003
'bzrlib.tests.test_email_message',
3725
4004
'bzrlib.tests.test_eol_filters',
3726
4005
'bzrlib.tests.test_errors',
4006
'bzrlib.tests.test_estimate_compressed_size',
3727
4007
'bzrlib.tests.test_export',
4008
'bzrlib.tests.test_export_pot',
3728
4009
'bzrlib.tests.test_extract',
4010
'bzrlib.tests.test_features',
3729
4011
'bzrlib.tests.test_fetch',
3730
4012
'bzrlib.tests.test_fixtures',
3731
4013
'bzrlib.tests.test_fifo_cache',
3732
4014
'bzrlib.tests.test_filters',
4015
'bzrlib.tests.test_filter_tree',
3733
4016
'bzrlib.tests.test_ftp_transport',
3734
4017
'bzrlib.tests.test_foreign',
3735
4018
'bzrlib.tests.test_generate_docs',
4141
4460
% (os.path.basename(dirname), printable_e))
4144
class Feature(object):
4145
"""An operating system Feature."""
4148
self._available = None
4150
def available(self):
4151
"""Is the feature available?
4153
:return: True if the feature is available.
4155
if self._available is None:
4156
self._available = self._probe()
4157
return self._available
4160
"""Implement this method in concrete features.
4162
:return: True if the feature is available.
4164
raise NotImplementedError
4167
if getattr(self, 'feature_name', None):
4168
return self.feature_name()
4169
return self.__class__.__name__
4172
class _SymlinkFeature(Feature):
4175
return osutils.has_symlinks()
4177
def feature_name(self):
4180
SymlinkFeature = _SymlinkFeature()
4183
class _HardlinkFeature(Feature):
4186
return osutils.has_hardlinks()
4188
def feature_name(self):
4191
HardlinkFeature = _HardlinkFeature()
4194
class _OsFifoFeature(Feature):
4197
return getattr(os, 'mkfifo', None)
4199
def feature_name(self):
4200
return 'filesystem fifos'
4202
OsFifoFeature = _OsFifoFeature()
4205
class _UnicodeFilenameFeature(Feature):
4206
"""Does the filesystem support Unicode filenames?"""
4210
# Check for character combinations unlikely to be covered by any
4211
# single non-unicode encoding. We use the characters
4212
# - greek small letter alpha (U+03B1) and
4213
# - braille pattern dots-123456 (U+283F).
4214
os.stat(u'\u03b1\u283f')
4215
except UnicodeEncodeError:
4217
except (IOError, OSError):
4218
# The filesystem allows the Unicode filename but the file doesn't
4222
# The filesystem allows the Unicode filename and the file exists,
4226
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4229
class _CompatabilityThunkFeature(Feature):
4230
"""This feature is just a thunk to another feature.
4232
It issues a deprecation warning if it is accessed, to let you know that you
4233
should really use a different feature.
4236
def __init__(self, dep_version, module, name,
4237
replacement_name, replacement_module=None):
4238
super(_CompatabilityThunkFeature, self).__init__()
4239
self._module = module
4240
if replacement_module is None:
4241
replacement_module = module
4242
self._replacement_module = replacement_module
4244
self._replacement_name = replacement_name
4245
self._dep_version = dep_version
4246
self._feature = None
4249
if self._feature is None:
4250
depr_msg = self._dep_version % ('%s.%s'
4251
% (self._module, self._name))
4252
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4253
self._replacement_name)
4254
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4255
# Import the new feature and use it as a replacement for the
4257
mod = __import__(self._replacement_module, {}, {},
4258
[self._replacement_name])
4259
self._feature = getattr(mod, self._replacement_name)
4263
return self._feature._probe()
4266
class ModuleAvailableFeature(Feature):
4267
"""This is a feature than describes a module we want to be available.
4269
Declare the name of the module in __init__(), and then after probing, the
4270
module will be available as 'self.module'.
4272
:ivar module: The module if it is available, else None.
4275
def __init__(self, module_name):
4276
super(ModuleAvailableFeature, self).__init__()
4277
self.module_name = module_name
4281
self._module = __import__(self.module_name, {}, {}, [''])
4288
if self.available(): # Make sure the probe has been done
4292
def feature_name(self):
4293
return self.module_name
4296
# This is kept here for compatibility, it is recommended to use
4297
# 'bzrlib.tests.feature.paramiko' instead
4298
ParamikoFeature = _CompatabilityThunkFeature(
4299
deprecated_in((2,1,0)),
4300
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4303
4463
def probe_unicode_in_user_encoding():
4304
4464
"""Try to encode several unicode strings to use in unicode-aware tests.
4305
4465
Return first successfull match.
4336
class _HTTPSServerFeature(Feature):
4337
"""Some tests want an https Server, check if one is available.
4339
Right now, the only way this is available is under python2.6 which provides
4350
def feature_name(self):
4351
return 'HTTPSServer'
4354
HTTPSServerFeature = _HTTPSServerFeature()
4357
class _UnicodeFilename(Feature):
4358
"""Does the filesystem support Unicode filenames?"""
4363
except UnicodeEncodeError:
4365
except (IOError, OSError):
4366
# The filesystem allows the Unicode filename but the file doesn't
4370
# The filesystem allows the Unicode filename and the file exists,
4374
UnicodeFilename = _UnicodeFilename()
4377
class _ByteStringNamedFilesystem(Feature):
4378
"""Is the filesystem based on bytes?"""
4381
if os.name == "posix":
4385
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4388
class _UTF8Filesystem(Feature):
4389
"""Is the filesystem UTF-8?"""
4392
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4396
UTF8Filesystem = _UTF8Filesystem()
4399
class _BreakinFeature(Feature):
4400
"""Does this platform support the breakin feature?"""
4403
from bzrlib import breakin
4404
if breakin.determine_signal() is None:
4406
if sys.platform == 'win32':
4407
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4408
# We trigger SIGBREAK via a Console api so we need ctypes to
4409
# access the function
4416
def feature_name(self):
4417
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4420
BreakinFeature = _BreakinFeature()
4423
class _CaseInsCasePresFilenameFeature(Feature):
4424
"""Is the file-system case insensitive, but case-preserving?"""
4427
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4429
# first check truly case-preserving for created files, then check
4430
# case insensitive when opening existing files.
4431
name = osutils.normpath(name)
4432
base, rel = osutils.split(name)
4433
found_rel = osutils.canonical_relpath(base, name)
4434
return (found_rel == rel
4435
and os.path.isfile(name.upper())
4436
and os.path.isfile(name.lower()))
4441
def feature_name(self):
4442
return "case-insensitive case-preserving filesystem"
4444
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4447
class _CaseInsensitiveFilesystemFeature(Feature):
4448
"""Check if underlying filesystem is case-insensitive but *not* case
4451
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4452
# more likely to be case preserving, so this case is rare.
4455
if CaseInsCasePresFilenameFeature.available():
4458
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4459
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4460
TestCaseWithMemoryTransport.TEST_ROOT = root
4462
root = TestCaseWithMemoryTransport.TEST_ROOT
4463
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4465
name_a = osutils.pathjoin(tdir, 'a')
4466
name_A = osutils.pathjoin(tdir, 'A')
4468
result = osutils.isdir(name_A)
4469
_rmtree_temp_dir(tdir)
4472
def feature_name(self):
4473
return 'case-insensitive filesystem'
4475
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4478
class _CaseSensitiveFilesystemFeature(Feature):
4481
if CaseInsCasePresFilenameFeature.available():
4483
elif CaseInsensitiveFilesystemFeature.available():
4488
def feature_name(self):
4489
return 'case-sensitive filesystem'
4491
# new coding style is for feature instances to be lowercase
4492
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4495
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4496
SubUnitFeature = _CompatabilityThunkFeature(
4497
deprecated_in((2,1,0)),
4498
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4499
4496
# Only define SubUnitBzrRunner if subunit is available.
4501
4498
from subunit import TestProtocolClient
4502
4499
from subunit.test_results import AutoTimingTestResultDecorator
4500
class SubUnitBzrProtocolClient(TestProtocolClient):
4502
def stopTest(self, test):
4503
super(SubUnitBzrProtocolClient, self).stopTest(test)
4504
_clear__type_equality_funcs(test)
4506
def addSuccess(self, test, details=None):
4507
# The subunit client always includes the details in the subunit
4508
# stream, but we don't want to include it in ours.
4509
if details is not None and 'log' in details:
4511
return super(SubUnitBzrProtocolClient, self).addSuccess(
4503
4514
class SubUnitBzrRunner(TextTestRunner):
4504
4515
def run(self, test):
4505
4516
result = AutoTimingTestResultDecorator(
4506
TestProtocolClient(self.stream))
4517
SubUnitBzrProtocolClient(self.stream))
4507
4518
test.run(result)
4509
4520
except ImportError:
4512
class _PosixPermissionsFeature(Feature):
4516
# create temporary file and check if specified perms are maintained.
4519
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4520
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4523
os.chmod(name, write_perms)
4525
read_perms = os.stat(name).st_mode & 0777
4527
return (write_perms == read_perms)
4529
return (os.name == 'posix') and has_perms()
4531
def feature_name(self):
4532
return 'POSIX permissions support'
4534
posix_permissions_feature = _PosixPermissionsFeature()
4524
# API compatibility for old plugins; see bug 892622.
4527
'HTTPServerFeature',
4528
'ModuleAvailableFeature',
4529
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4530
'OsFifoFeature', 'UnicodeFilenameFeature',
4531
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4532
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4533
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4534
'posix_permissions_feature',
4536
globals()[name] = _CompatabilityThunkFeature(
4537
symbol_versioning.deprecated_in((2, 5, 0)),
4538
'bzrlib.tests', name,
4539
name, 'bzrlib.tests.features')
4542
for (old_name, new_name) in [
4543
('UnicodeFilename', 'UnicodeFilenameFeature'),
4545
globals()[name] = _CompatabilityThunkFeature(
4546
symbol_versioning.deprecated_in((2, 5, 0)),
4547
'bzrlib.tests', old_name,
4548
new_name, 'bzrlib.tests.features')