56
51
# nb: check this before importing anything else from within it
57
52
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
53
if _testtools_version < (0, 9, 5):
54
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
55
% (testtools.__file__, _testtools_version))
61
56
from testtools import content
63
59
from bzrlib import (
63
commands as _mod_commands,
73
plugin as _mod_plugin,
78
80
transport as _mod_transport,
82
import bzrlib.commands
83
import bzrlib.timestamp
85
import bzrlib.inventory
86
import bzrlib.iterablefile
89
84
import bzrlib.lsprof
90
85
except ImportError:
91
86
# lsprof not available
93
from bzrlib.merge import merge_inner
96
from bzrlib.smart import client, request, server
98
from bzrlib import symbol_versioning
88
from bzrlib.smart import client, request
89
from bzrlib.transport import (
99
93
from bzrlib.symbol_versioning import (
100
DEPRECATED_PARAMETER,
101
94
deprecated_function,
107
from bzrlib.transport import (
111
from bzrlib.trace import mutter, note
112
97
from bzrlib.tests import (
117
103
from bzrlib.ui import NullProgressView
118
104
from bzrlib.ui.text import TextUIFactory
119
import bzrlib.version_info_formats.format_custom
120
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
105
from bzrlib.tests.features import _CompatabilityThunkFeature
122
107
# Mark this python module as being part of the implementation
123
108
# of unittest: this gives us better tracebacks where the last
140
125
TestSuite = TestUtil.TestSuite
141
126
TestLoader = TestUtil.TestLoader
128
# Tests should run in a clean and clearly defined environment. The goal is to
129
# keep them isolated from the running environment as mush as possible. The test
130
# framework ensures the variables defined below are set (or deleted if the
131
# value is None) before a test is run and reset to their original value after
132
# the test is run. Generally if some code depends on an environment variable,
133
# the tests should start without this variable in the environment. There are a
134
# few exceptions but you shouldn't violate this rule lightly.
138
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
139
# tests do check our impls match APPDATA
140
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
144
'BZREMAIL': None, # may still be present in the environment
145
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
146
'BZR_PROGRESS_BAR': None,
147
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
148
# as a base class instead of TestCaseInTempDir. Tests inheriting from
149
# TestCase should not use disk resources, BZR_LOG is one.
150
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
151
'BZR_PLUGIN_PATH': None,
152
'BZR_DISABLE_PLUGINS': None,
153
'BZR_PLUGINS_AT': None,
154
'BZR_CONCURRENCY': None,
155
# Make sure that any text ui tests are consistent regardless of
156
# the environment the test case is run in; you may want tests that
157
# test other combinations. 'dumb' is a reasonable guess for tests
158
# going to a pipe or a StringIO.
164
'SSH_AUTH_SOCK': None,
174
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
175
# least. If you do (care), please update this comment
179
'BZR_REMOTE_PATH': None,
180
# Generally speaking, we don't want apport reporting on crashes in
181
# the test envirnoment unless we're specifically testing apport,
182
# so that it doesn't leak into the real system environment. We
183
# use an env var so it propagates to subprocesses.
184
'APPORT_DISABLE': '1',
188
def override_os_environ(test, env=None):
189
"""Modify os.environ keeping a copy.
191
:param test: A test instance
193
:param env: A dict containing variable definitions to be installed
196
env = isolated_environ
197
test._original_os_environ = dict([(var, value)
198
for var, value in os.environ.iteritems()])
199
for var, value in env.iteritems():
200
osutils.set_or_unset_env(var, value)
201
if var not in test._original_os_environ:
202
# The var is new, add it with a value of None, so
203
# restore_os_environ will delete it
204
test._original_os_environ[var] = None
207
def restore_os_environ(test):
208
"""Restore os.environ to its original state.
210
:param test: A test instance previously passed to override_os_environ.
212
for var, value in test._original_os_environ.iteritems():
213
# Restore the original value (or delete it if the value has been set to
214
# None in override_os_environ).
215
osutils.set_or_unset_env(var, value)
218
def _clear__type_equality_funcs(test):
219
"""Cleanup bound methods stored on TestCase instances
221
Clear the dict breaking a few (mostly) harmless cycles in the affected
222
unittests released with Python 2.6 and initial Python 2.7 versions.
224
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
225
shipped in Oneiric, an object with no clear method was used, hence the
226
extra complications, see bug 809048 for details.
228
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
229
if type_equality_funcs is not None:
230
tef_clear = getattr(type_equality_funcs, "clear", None)
231
if tef_clear is None:
232
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
233
if tef_instance_dict is not None:
234
tef_clear = tef_instance_dict.clear
235
if tef_clear is not None:
143
239
class ExtendedTestResult(testtools.TextTestResult):
144
240
"""Accepts, reports and accumulates the results of running tests.
289
395
self.report_test_start(test)
290
396
test.number = self.count
291
397
self._recordTestStartTime()
292
# Only check for thread leaks if the test case supports cleanups
293
addCleanup = getattr(test, "addCleanup", None)
294
if addCleanup is not None:
295
addCleanup(self._check_leaked_threads, test)
398
# Make testtools cases give us the real traceback on failure
399
addOnException = getattr(test, "addOnException", None)
400
if addOnException is not None:
401
addOnException(self._record_traceback_from_test)
402
# Only check for thread leaks on bzrlib derived test cases
403
if isinstance(test, TestCase):
404
test.addCleanup(self._check_leaked_threads, test)
406
def stopTest(self, test):
407
super(ExtendedTestResult, self).stopTest(test)
408
# Manually break cycles, means touching various private things but hey
409
getDetails = getattr(test, "getDetails", None)
410
if getDetails is not None:
412
_clear__type_equality_funcs(test)
413
self._traceback_from_test = None
297
415
def startTests(self):
298
416
self.report_tests_starting()
828
999
super(TestCase, self).setUp()
1001
timeout = config.GlobalStack().get('selftest.timeout')
1003
timeout_fixture = fixtures.TimeoutFixture(timeout)
1004
timeout_fixture.setUp()
1005
self.addCleanup(timeout_fixture.cleanUp)
829
1007
for feature in getattr(self, '_test_needs_features', []):
830
1008
self.requireFeature(feature)
831
self._log_contents = None
832
self.addDetail("log", content.Content(content.ContentType("text",
833
"plain", {"charset": "utf8"}),
834
lambda:[self._get_log(keep_log_file=True)]))
835
1009
self._cleanEnvironment()
1011
if bzrlib.global_state is not None:
1012
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1013
config.CommandLineStore())
836
1015
self._silenceUI()
837
1016
self._startLogFile()
838
1017
self._benchcalls = []
841
1020
self._track_transports()
842
1021
self._track_locks()
843
1022
self._clear_debug_flags()
1023
# Isolate global verbosity level, to make sure it's reproducible
1024
# between tests. We should get rid of this altogether: bug 656694. --
1026
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1027
# Isolate config option expansion until its default value for bzrlib is
1028
# settled on or a the FIXME associated with _get_expand_default_value
1029
# is addressed -- vila 20110219
1030
self.overrideAttr(config, '_expand_default_value', None)
1031
self._log_files = set()
1032
# Each key in the ``_counters`` dict holds a value for a different
1033
# counter. When the test ends, addDetail() should be used to output the
1034
# counter values. This happens in install_counter_hook().
1036
if 'config_stats' in selftest_debug_flags:
1037
self._install_config_stats_hooks()
1038
# Do not use i18n for tests (unless the test reverses this)
845
1041
def debug(self):
846
1042
# debug a frame up.
848
pdb.Pdb().set_trace(sys._getframe().f_back)
1044
# The sys preserved stdin/stdout should allow blackbox tests debugging
1045
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1046
).set_trace(sys._getframe().f_back)
850
1048
def discardDetail(self, name):
851
1049
"""Extend the addDetail, getDetails api so we can remove a detail.
863
1061
if name in details:
864
1062
del details[name]
1064
def install_counter_hook(self, hooks, name, counter_name=None):
1065
"""Install a counting hook.
1067
Any hook can be counted as long as it doesn't need to return a value.
1069
:param hooks: Where the hook should be installed.
1071
:param name: The hook name that will be counted.
1073
:param counter_name: The counter identifier in ``_counters``, defaults
1076
_counters = self._counters # Avoid closing over self
1077
if counter_name is None:
1079
if _counters.has_key(counter_name):
1080
raise AssertionError('%s is already used as a counter name'
1082
_counters[counter_name] = 0
1083
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1084
lambda: ['%d' % (_counters[counter_name],)]))
1085
def increment_counter(*args, **kwargs):
1086
_counters[counter_name] += 1
1087
label = 'count %s calls' % (counter_name,)
1088
hooks.install_named_hook(name, increment_counter, label)
1089
self.addCleanup(hooks.uninstall_named_hook, name, label)
1091
def _install_config_stats_hooks(self):
1092
"""Install config hooks to count hook calls.
1095
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1096
self.install_counter_hook(config.ConfigHooks, hook_name,
1097
'config.%s' % (hook_name,))
1099
# The OldConfigHooks are private and need special handling to protect
1100
# against recursive tests (tests that run other tests), so we just do
1101
# manually what registering them into _builtin_known_hooks will provide
1103
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1104
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1105
self.install_counter_hook(config.OldConfigHooks, hook_name,
1106
'old_config.%s' % (hook_name,))
866
1108
def _clear_debug_flags(self):
867
1109
"""Prevent externally set debug flags affecting tests.
1148
1396
'st_mtime did not match')
1149
1397
self.assertEqual(expected.st_ctime, actual.st_ctime,
1150
1398
'st_ctime did not match')
1151
if sys.platform != 'win32':
1399
if sys.platform == 'win32':
1152
1400
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1153
1401
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1154
# odd. Regardless we shouldn't actually try to assert anything
1155
# about their values
1402
# odd. We just force it to always be 0 to avoid any problems.
1403
self.assertEqual(0, expected.st_dev)
1404
self.assertEqual(0, actual.st_dev)
1405
self.assertEqual(0, expected.st_ino)
1406
self.assertEqual(0, actual.st_ino)
1156
1408
self.assertEqual(expected.st_dev, actual.st_dev,
1157
1409
'st_dev did not match')
1158
1410
self.assertEqual(expected.st_ino, actual.st_ino,
1167
1419
length, len(obj_with_len), obj_with_len))
1169
1421
def assertLogsError(self, exception_class, func, *args, **kwargs):
1170
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1422
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1172
from bzrlib import trace
1174
1425
orig_log_exception_quietly = trace.log_exception_quietly
1177
1428
orig_log_exception_quietly()
1178
captured.append(sys.exc_info())
1429
captured.append(sys.exc_info()[1])
1179
1430
trace.log_exception_quietly = capture
1180
1431
func(*args, **kwargs)
1182
1433
trace.log_exception_quietly = orig_log_exception_quietly
1183
1434
self.assertLength(1, captured)
1184
err = captured[0][1]
1185
1436
self.assertIsInstance(err, exception_class)
1335
1590
self.assertEqual(expected_docstring, obj.__doc__)
1592
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1337
1593
def failUnlessExists(self, path):
1594
return self.assertPathExists(path)
1596
def assertPathExists(self, path):
1338
1597
"""Fail unless path or paths, which may be abs or relative, exist."""
1339
1598
if not isinstance(path, basestring):
1341
self.failUnlessExists(p)
1600
self.assertPathExists(p)
1343
self.failUnless(osutils.lexists(path),path+" does not exist")
1602
self.assertTrue(osutils.lexists(path),
1603
path + " does not exist")
1605
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1345
1606
def failIfExists(self, path):
1607
return self.assertPathDoesNotExist(path)
1609
def assertPathDoesNotExist(self, path):
1346
1610
"""Fail if path or paths, which may be abs or relative, exist."""
1347
1611
if not isinstance(path, basestring):
1349
self.failIfExists(p)
1613
self.assertPathDoesNotExist(p)
1351
self.failIf(osutils.lexists(path),path+" exists")
1615
self.assertFalse(osutils.lexists(path),
1353
1618
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1354
1619
"""A helper for callDeprecated and applyDeprecated.
1466
1732
def _startLogFile(self):
1467
"""Send bzr and test log messages to a temporary file.
1469
The file is removed as the test is torn down.
1471
self._log_file = StringIO()
1472
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1733
"""Setup a in-memory target for bzr and testcase log messages"""
1734
pseudo_log_file = StringIO()
1735
def _get_log_contents_for_weird_testtools_api():
1736
return [pseudo_log_file.getvalue().decode(
1737
"utf-8", "replace").encode("utf-8")]
1738
self.addDetail("log", content.Content(content.ContentType("text",
1739
"plain", {"charset": "utf8"}),
1740
_get_log_contents_for_weird_testtools_api))
1741
self._log_file = pseudo_log_file
1742
self._log_memento = trace.push_log_file(self._log_file)
1473
1743
self.addCleanup(self._finishLogFile)
1475
1745
def _finishLogFile(self):
1476
"""Finished with the log file.
1478
Close the file and delete it, unless setKeepLogfile was called.
1480
if bzrlib.trace._trace_file:
1746
"""Flush and dereference the in-memory log for this testcase"""
1747
if trace._trace_file:
1481
1748
# flush the log file, to get all content
1482
bzrlib.trace._trace_file.flush()
1483
bzrlib.trace.pop_log_file(self._log_memento)
1484
# Cache the log result and delete the file on disk
1485
self._get_log(False)
1749
trace._trace_file.flush()
1750
trace.pop_log_file(self._log_memento)
1751
# The logging module now tracks references for cleanup so discard ours
1752
del self._log_memento
1487
1754
def thisFailsStrictLockCheck(self):
1488
1755
"""It is known that this test would fail with -Dstrict_locks.
1516
1786
setattr(obj, attr_name, new)
1789
def overrideEnv(self, name, new):
1790
"""Set an environment variable, and reset it after the test.
1792
:param name: The environment variable name.
1794
:param new: The value to set the variable to. If None, the
1795
variable is deleted from the environment.
1797
:returns: The actual variable value.
1799
value = osutils.set_or_unset_env(name, new)
1800
self.addCleanup(osutils.set_or_unset_env, name, value)
1803
def recordCalls(self, obj, attr_name):
1804
"""Monkeypatch in a wrapper that will record calls.
1806
The monkeypatch is automatically removed when the test concludes.
1808
:param obj: The namespace holding the reference to be replaced;
1809
typically a module, class, or object.
1810
:param attr_name: A string for the name of the attribute to
1812
:returns: A list that will be extended with one item every time the
1813
function is called, with a tuple of (args, kwargs).
1817
def decorator(*args, **kwargs):
1818
calls.append((args, kwargs))
1819
return orig(*args, **kwargs)
1820
orig = self.overrideAttr(obj, attr_name, decorator)
1519
1823
def _cleanEnvironment(self):
1521
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1522
'HOME': os.getcwd(),
1523
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1524
# tests do check our impls match APPDATA
1525
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1529
'BZREMAIL': None, # may still be present in the environment
1530
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1531
'BZR_PROGRESS_BAR': None,
1533
'BZR_PLUGIN_PATH': None,
1534
'BZR_DISABLE_PLUGINS': None,
1535
'BZR_PLUGINS_AT': None,
1536
'BZR_CONCURRENCY': None,
1537
# Make sure that any text ui tests are consistent regardless of
1538
# the environment the test case is run in; you may want tests that
1539
# test other combinations. 'dumb' is a reasonable guess for tests
1540
# going to a pipe or a StringIO.
1544
'BZR_COLUMNS': '80',
1546
'SSH_AUTH_SOCK': None,
1550
'https_proxy': None,
1551
'HTTPS_PROXY': None,
1556
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1557
# least. If you do (care), please update this comment
1561
'BZR_REMOTE_PATH': None,
1562
# Generally speaking, we don't want apport reporting on crashes in
1563
# the test envirnoment unless we're specifically testing apport,
1564
# so that it doesn't leak into the real system environment. We
1565
# use an env var so it propagates to subprocesses.
1566
'APPORT_DISABLE': '1',
1569
self.addCleanup(self._restoreEnvironment)
1570
for name, value in new_env.iteritems():
1571
self._captureVar(name, value)
1573
def _captureVar(self, name, newvalue):
1574
"""Set an environment variable, and reset it when finished."""
1575
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1577
def _restoreEnvironment(self):
1578
for name, value in self._old_env.iteritems():
1579
osutils.set_or_unset_env(name, value)
1824
for name, value in isolated_environ.iteritems():
1825
self.overrideEnv(name, value)
1581
1827
def _restoreHooks(self):
1582
1828
for klass, (name, hooks) in self._preserved_hooks.items():
1583
1829
setattr(klass, name, hooks)
1830
self._preserved_hooks.clear()
1831
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1832
self._preserved_lazy_hooks.clear()
1585
1834
def knownFailure(self, reason):
1586
"""This test has failed for some known reason."""
1587
raise KnownFailure(reason)
1835
"""Declare that this test fails for a known reason
1837
Tests that are known to fail should generally be using expectedFailure
1838
with an appropriate reverse assertion if a change could cause the test
1839
to start passing. Conversely if the test has no immediate prospect of
1840
succeeding then using skip is more suitable.
1842
When this method is called while an exception is being handled, that
1843
traceback will be used, otherwise a new exception will be thrown to
1844
provide one but won't be reported.
1846
self._add_reason(reason)
1848
exc_info = sys.exc_info()
1849
if exc_info != (None, None, None):
1850
self._report_traceback(exc_info)
1853
raise self.failureException(reason)
1854
except self.failureException:
1855
exc_info = sys.exc_info()
1856
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1857
raise testtools.testcase._ExpectedFailure(exc_info)
1589
1861
def _suppress_log(self):
1590
1862
"""Remove the log info from details."""
1676
1948
self._benchtime += time.time() - start
1678
1950
def log(self, *args):
1681
def _get_log(self, keep_log_file=False):
1682
"""Internal helper to get the log from bzrlib.trace for this test.
1684
Please use self.getDetails, or self.get_log to access this in test case
1687
:param keep_log_file: When True, if the log is still a file on disk
1688
leave it as a file on disk. When False, if the log is still a file
1689
on disk, the log file is deleted and the log preserved as
1691
:return: A string containing the log.
1693
if self._log_contents is not None:
1695
self._log_contents.decode('utf8')
1696
except UnicodeDecodeError:
1697
unicodestr = self._log_contents.decode('utf8', 'replace')
1698
self._log_contents = unicodestr.encode('utf8')
1699
return self._log_contents
1700
if self._log_file is not None:
1701
log_contents = self._log_file.getvalue()
1703
log_contents.decode('utf8')
1704
except UnicodeDecodeError:
1705
unicodestr = log_contents.decode('utf8', 'replace')
1706
log_contents = unicodestr.encode('utf8')
1707
if not keep_log_file:
1708
self._log_file = None
1709
# Permit multiple calls to get_log until we clean it up in
1711
self._log_contents = log_contents
1714
return "No log file content."
1716
1953
def get_log(self):
1717
1954
"""Get a unicode string containing the log from bzrlib.trace.
2235
def _add_subprocess_log(self, log_file_path):
2236
if len(self._log_files) == 0:
2237
# Register an addCleanup func. We do this on the first call to
2238
# _add_subprocess_log rather than in TestCase.setUp so that this
2239
# addCleanup is registered after any cleanups for tempdirs that
2240
# subclasses might create, which will probably remove the log file
2242
self.addCleanup(self._subprocess_log_cleanup)
2243
# self._log_files is a set, so if a log file is reused we won't grab it
2245
self._log_files.add(log_file_path)
2247
def _subprocess_log_cleanup(self):
2248
for count, log_file_path in enumerate(self._log_files):
2249
# We use buffer_now=True to avoid holding the file open beyond
2250
# the life of this function, which might interfere with e.g.
2251
# cleaning tempdirs on Windows.
2252
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2253
#detail_content = content.content_from_file(
2254
# log_file_path, buffer_now=True)
2255
with open(log_file_path, 'rb') as log_file:
2256
log_file_bytes = log_file.read()
2257
detail_content = content.Content(content.ContentType("text",
2258
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2259
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1986
2262
def _popen(self, *args, **kwargs):
1987
2263
"""Place a call to Popen.
2025
2301
if retcode is not None and retcode != process.returncode:
2026
2302
if process_args is None:
2027
2303
process_args = "(unknown args)"
2028
mutter('Output of bzr %s:\n%s', process_args, out)
2029
mutter('Error for bzr %s:\n%s', process_args, err)
2304
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2305
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2030
2306
self.fail('Command bzr %s failed with retcode %s != %s'
2031
2307
% (process_args, retcode, process.returncode))
2032
2308
return [out, err]
2034
def check_inventory_shape(self, inv, shape):
2035
"""Compare an inventory to a list of expected names.
2310
def check_tree_shape(self, tree, shape):
2311
"""Compare a tree to a list of expected names.
2037
2313
Fail if they are not precisely equal.
2040
2316
shape = list(shape) # copy
2041
for path, ie in inv.entries():
2317
for path, ie in tree.iter_entries_by_dir():
2042
2318
name = path.replace('\\', '/')
2043
2319
if ie.kind == 'directory':
2044
2320
name = name + '/'
2322
pass # ignore root entry
2046
2324
shape.remove(name)
2048
2326
extras.append(name)
2138
2418
class TestCaseWithMemoryTransport(TestCase):
2139
2419
"""Common test class for tests that do not need disk resources.
2141
Tests that need disk resources should derive from TestCaseWithTransport.
2421
Tests that need disk resources should derive from TestCaseInTempDir
2422
orTestCaseWithTransport.
2143
2424
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2145
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2426
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2146
2427
a directory which does not exist. This serves to help ensure test isolation
2147
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2148
must exist. However, TestCaseWithMemoryTransport does not offer local
2149
file defaults for the transport in tests, nor does it obey the command line
2428
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2429
must exist. However, TestCaseWithMemoryTransport does not offer local file
2430
defaults for the transport in tests, nor does it obey the command line
2150
2431
override, so tests that accidentally write to the common directory should
2153
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2154
a .bzr directory that stops us ascending higher into the filesystem.
2434
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2435
``.bzr`` directory that stops us ascending higher into the filesystem.
2157
2438
TEST_ROOT = None
2367
2659
def make_branch(self, relpath, format=None):
2368
2660
"""Create a branch on the transport at relpath."""
2369
2661
repo = self.make_repository(relpath, format=format)
2370
return repo.bzrdir.create_branch()
2662
return repo.bzrdir.create_branch(append_revisions_only=False)
2664
def get_default_format(self):
2667
def resolve_format(self, format):
2668
"""Resolve an object to a ControlDir format object.
2670
The initial format object can either already be
2671
a ControlDirFormat, None (for the default format),
2672
or a string with the name of the control dir format.
2674
:param format: Object to resolve
2675
:return A ControlDirFormat instance
2678
format = self.get_default_format()
2679
if isinstance(format, basestring):
2680
format = bzrdir.format_registry.make_bzrdir(format)
2372
2683
def make_bzrdir(self, relpath, format=None):
3071
3392
class TestDecorator(TestUtil.TestSuite):
3072
3393
"""A decorator for TestCase/TestSuite objects.
3074
Usually, subclasses should override __iter__(used when flattening test
3075
suites), which we do to filter, reorder, parallelise and so on, run() and
3395
Contains rather than flattening suite passed on construction
3079
def __init__(self, suite):
3080
TestUtil.TestSuite.__init__(self)
3083
def countTestCases(self):
3086
cases += test.countTestCases()
3093
def run(self, result):
3094
# Use iteration on self, not self._tests, to allow subclasses to hook
3097
if result.shouldStop:
3398
def __init__(self, suite=None):
3399
super(TestDecorator, self).__init__()
3400
if suite is not None:
3403
# Don't need subclass run method with suite emptying
3404
run = unittest.TestSuite.run
3103
3407
class CountingDecorator(TestDecorator):
3114
3418
"""A decorator which excludes test matching an exclude pattern."""
3116
3420
def __init__(self, suite, exclude_pattern):
3117
TestDecorator.__init__(self, suite)
3118
self.exclude_pattern = exclude_pattern
3119
self.excluded = False
3123
return iter(self._tests)
3124
self.excluded = True
3125
suite = exclude_tests_by_re(self, self.exclude_pattern)
3127
self.addTests(suite)
3128
return iter(self._tests)
3421
super(ExcludeDecorator, self).__init__(
3422
exclude_tests_by_re(suite, exclude_pattern))
3131
3425
class FilterTestsDecorator(TestDecorator):
3132
3426
"""A decorator which filters tests to those matching a pattern."""
3134
3428
def __init__(self, suite, pattern):
3135
TestDecorator.__init__(self, suite)
3136
self.pattern = pattern
3137
self.filtered = False
3141
return iter(self._tests)
3142
self.filtered = True
3143
suite = filter_suite_by_re(self, self.pattern)
3145
self.addTests(suite)
3146
return iter(self._tests)
3429
super(FilterTestsDecorator, self).__init__(
3430
filter_suite_by_re(suite, pattern))
3149
3433
class RandomDecorator(TestDecorator):
3150
3434
"""A decorator which randomises the order of its tests."""
3152
3436
def __init__(self, suite, random_seed, stream):
3153
TestDecorator.__init__(self, suite)
3154
self.random_seed = random_seed
3155
self.randomised = False
3156
self.stream = stream
3160
return iter(self._tests)
3161
self.randomised = True
3162
self.stream.write("Randomizing test order using seed %s\n\n" %
3163
(self.actual_seed()))
3437
random_seed = self.actual_seed(random_seed)
3438
stream.write("Randomizing test order using seed %s\n\n" %
3164
3440
# Initialise the random number generator.
3165
random.seed(self.actual_seed())
3166
suite = randomize_suite(self)
3168
self.addTests(suite)
3169
return iter(self._tests)
3441
random.seed(random_seed)
3442
super(RandomDecorator, self).__init__(randomize_suite(suite))
3171
def actual_seed(self):
3172
if self.random_seed == "now":
3445
def actual_seed(seed):
3173
3447
# We convert the seed to a long to make it reuseable across
3174
3448
# invocations (because the user can reenter it).
3175
self.random_seed = long(time.time())
3449
return long(time.time())
3177
3451
# Convert the seed to a long if we can
3179
self.random_seed = long(self.random_seed)
3454
except (TypeError, ValueError):
3182
return self.random_seed
3185
3459
class TestFirstDecorator(TestDecorator):
3186
3460
"""A decorator which moves named tests to the front."""
3188
3462
def __init__(self, suite, pattern):
3189
TestDecorator.__init__(self, suite)
3190
self.pattern = pattern
3191
self.filtered = False
3195
return iter(self._tests)
3196
self.filtered = True
3197
suites = split_suite_by_re(self, self.pattern)
3199
self.addTests(suites)
3200
return iter(self._tests)
3463
super(TestFirstDecorator, self).__init__()
3464
self.addTests(split_suite_by_re(suite, pattern))
3203
3467
def partition_tests(suite, count):
3248
3512
ProtocolTestCase.run(self, result)
3250
os.waitpid(self.pid, 0)
3514
pid, status = os.waitpid(self.pid, 0)
3515
# GZ 2011-10-18: If status is nonzero, should report to the result
3516
# that something went wrong.
3252
3518
test_blocks = partition_tests(suite, concurrency)
3519
# Clear the tests from the original suite so it doesn't keep them alive
3520
suite._tests[:] = []
3253
3521
for process_tests in test_blocks:
3254
process_suite = TestUtil.TestSuite()
3255
process_suite.addTests(process_tests)
3522
process_suite = TestUtil.TestSuite(process_tests)
3523
# Also clear each split list so new suite has only reference
3524
process_tests[:] = []
3256
3525
c2pread, c2pwrite = os.pipe()
3257
3526
pid = os.fork()
3259
workaround_zealous_crypto_random()
3529
stream = os.fdopen(c2pwrite, 'wb', 1)
3530
workaround_zealous_crypto_random()
3261
3531
os.close(c2pread)
3262
3532
# Leave stderr and stdout open so we can see test noise
3263
3533
# Close stdin so that the child goes away if it decides to
3264
3534
# read from stdin (otherwise its a roulette to see what
3265
3535
# child actually gets keystrokes for pdb etc).
3266
3536
sys.stdin.close()
3268
stream = os.fdopen(c2pwrite, 'wb', 1)
3269
3537
subunit_result = AutoTimingTestResultDecorator(
3270
TestProtocolClient(stream))
3538
SubUnitBzrProtocolClient(stream))
3271
3539
process_suite.run(subunit_result)
3541
# Try and report traceback on stream, but exit with error even
3542
# if stream couldn't be created or something else goes wrong.
3543
# The traceback is formatted to a string and written in one go
3544
# to avoid interleaving lines from multiple failing children.
3546
stream.write(traceback.format_exc())
3275
3551
os.close(c2pwrite)
3276
3552
stream = os.fdopen(c2pread, 'rb', 1)
3340
class ForwardingResult(unittest.TestResult):
3342
def __init__(self, target):
3343
unittest.TestResult.__init__(self)
3344
self.result = target
3346
def startTest(self, test):
3347
self.result.startTest(test)
3349
def stopTest(self, test):
3350
self.result.stopTest(test)
3352
def startTestRun(self):
3353
self.result.startTestRun()
3355
def stopTestRun(self):
3356
self.result.stopTestRun()
3358
def addSkip(self, test, reason):
3359
self.result.addSkip(test, reason)
3361
def addSuccess(self, test):
3362
self.result.addSuccess(test)
3364
def addError(self, test, err):
3365
self.result.addError(test, err)
3367
def addFailure(self, test, err):
3368
self.result.addFailure(test, err)
3369
ForwardingResult = testtools.ExtendedToOriginalDecorator
3372
class ProfileResult(ForwardingResult):
3616
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3373
3617
"""Generate profiling data for all activity between start and success.
3375
3619
The profile data is appended to the test's _benchcalls attribute and can
3709
3958
'bzrlib.tests.test_commit_merge',
3710
3959
'bzrlib.tests.test_config',
3711
3960
'bzrlib.tests.test_conflicts',
3961
'bzrlib.tests.test_controldir',
3712
3962
'bzrlib.tests.test_counted_lock',
3713
3963
'bzrlib.tests.test_crash',
3714
3964
'bzrlib.tests.test_decorators',
3715
3965
'bzrlib.tests.test_delta',
3716
3966
'bzrlib.tests.test_debug',
3717
'bzrlib.tests.test_deprecated_graph',
3718
3967
'bzrlib.tests.test_diff',
3719
3968
'bzrlib.tests.test_directory_service',
3720
3969
'bzrlib.tests.test_dirstate',
3721
3970
'bzrlib.tests.test_email_message',
3722
3971
'bzrlib.tests.test_eol_filters',
3723
3972
'bzrlib.tests.test_errors',
3973
'bzrlib.tests.test_estimate_compressed_size',
3724
3974
'bzrlib.tests.test_export',
3975
'bzrlib.tests.test_export_pot',
3725
3976
'bzrlib.tests.test_extract',
3977
'bzrlib.tests.test_features',
3726
3978
'bzrlib.tests.test_fetch',
3727
3979
'bzrlib.tests.test_fixtures',
3728
3980
'bzrlib.tests.test_fifo_cache',
3729
3981
'bzrlib.tests.test_filters',
3982
'bzrlib.tests.test_filter_tree',
3730
3983
'bzrlib.tests.test_ftp_transport',
3731
3984
'bzrlib.tests.test_foreign',
3732
3985
'bzrlib.tests.test_generate_docs',
3969
4229
# Some tests mentioned in the list are not in the test suite. The
3970
4230
# list may be out of date, report to the tester.
3971
4231
for id in not_found:
3972
bzrlib.trace.warning('"%s" not found in the test suite', id)
4232
trace.warning('"%s" not found in the test suite', id)
3973
4233
for id in duplicates:
3974
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4234
trace.warning('"%s" is used as an id by several tests', id)
3979
def multiply_scenarios(scenarios_left, scenarios_right):
4239
def multiply_scenarios(*scenarios):
4240
"""Multiply two or more iterables of scenarios.
4242
It is safe to pass scenario generators or iterators.
4244
:returns: A list of compound scenarios: the cross-product of all
4245
scenarios, with the names concatenated and the parameters
4248
return reduce(_multiply_two_scenarios, map(list, scenarios))
4251
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3980
4252
"""Multiply two sets of scenarios.
3982
4254
:returns: the cartesian product of the two sets of scenarios, that is
4152
4425
% (os.path.basename(dirname), printable_e))
4155
class Feature(object):
4156
"""An operating system Feature."""
4159
self._available = None
4161
def available(self):
4162
"""Is the feature available?
4164
:return: True if the feature is available.
4166
if self._available is None:
4167
self._available = self._probe()
4168
return self._available
4171
"""Implement this method in concrete features.
4173
:return: True if the feature is available.
4175
raise NotImplementedError
4178
if getattr(self, 'feature_name', None):
4179
return self.feature_name()
4180
return self.__class__.__name__
4183
class _SymlinkFeature(Feature):
4186
return osutils.has_symlinks()
4188
def feature_name(self):
4191
SymlinkFeature = _SymlinkFeature()
4194
class _HardlinkFeature(Feature):
4197
return osutils.has_hardlinks()
4199
def feature_name(self):
4202
HardlinkFeature = _HardlinkFeature()
4205
class _OsFifoFeature(Feature):
4208
return getattr(os, 'mkfifo', None)
4210
def feature_name(self):
4211
return 'filesystem fifos'
4213
OsFifoFeature = _OsFifoFeature()
4216
class _UnicodeFilenameFeature(Feature):
4217
"""Does the filesystem support Unicode filenames?"""
4221
# Check for character combinations unlikely to be covered by any
4222
# single non-unicode encoding. We use the characters
4223
# - greek small letter alpha (U+03B1) and
4224
# - braille pattern dots-123456 (U+283F).
4225
os.stat(u'\u03b1\u283f')
4226
except UnicodeEncodeError:
4228
except (IOError, OSError):
4229
# The filesystem allows the Unicode filename but the file doesn't
4233
# The filesystem allows the Unicode filename and the file exists,
4237
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4240
class _CompatabilityThunkFeature(Feature):
4241
"""This feature is just a thunk to another feature.
4243
It issues a deprecation warning if it is accessed, to let you know that you
4244
should really use a different feature.
4247
def __init__(self, dep_version, module, name,
4248
replacement_name, replacement_module=None):
4249
super(_CompatabilityThunkFeature, self).__init__()
4250
self._module = module
4251
if replacement_module is None:
4252
replacement_module = module
4253
self._replacement_module = replacement_module
4255
self._replacement_name = replacement_name
4256
self._dep_version = dep_version
4257
self._feature = None
4260
if self._feature is None:
4261
depr_msg = self._dep_version % ('%s.%s'
4262
% (self._module, self._name))
4263
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4264
self._replacement_name)
4265
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4266
# Import the new feature and use it as a replacement for the
4268
self._feature = pyutils.get_named_object(
4269
self._replacement_module, self._replacement_name)
4273
return self._feature._probe()
4276
class ModuleAvailableFeature(Feature):
4277
"""This is a feature than describes a module we want to be available.
4279
Declare the name of the module in __init__(), and then after probing, the
4280
module will be available as 'self.module'.
4282
:ivar module: The module if it is available, else None.
4285
def __init__(self, module_name):
4286
super(ModuleAvailableFeature, self).__init__()
4287
self.module_name = module_name
4291
self._module = __import__(self.module_name, {}, {}, [''])
4298
if self.available(): # Make sure the probe has been done
4302
def feature_name(self):
4303
return self.module_name
4306
# This is kept here for compatibility, it is recommended to use
4307
# 'bzrlib.tests.feature.paramiko' instead
4308
ParamikoFeature = _CompatabilityThunkFeature(
4309
deprecated_in((2,1,0)),
4310
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4313
4428
def probe_unicode_in_user_encoding():
4314
4429
"""Try to encode several unicode strings to use in unicode-aware tests.
4315
4430
Return first successfull match.
4346
class _HTTPSServerFeature(Feature):
4347
"""Some tests want an https Server, check if one is available.
4349
Right now, the only way this is available is under python2.6 which provides
4360
def feature_name(self):
4361
return 'HTTPSServer'
4364
HTTPSServerFeature = _HTTPSServerFeature()
4367
class _UnicodeFilename(Feature):
4368
"""Does the filesystem support Unicode filenames?"""
4373
except UnicodeEncodeError:
4375
except (IOError, OSError):
4376
# The filesystem allows the Unicode filename but the file doesn't
4380
# The filesystem allows the Unicode filename and the file exists,
4384
UnicodeFilename = _UnicodeFilename()
4387
class _ByteStringNamedFilesystem(Feature):
4388
"""Is the filesystem based on bytes?"""
4391
if os.name == "posix":
4395
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4398
class _UTF8Filesystem(Feature):
4399
"""Is the filesystem UTF-8?"""
4402
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4406
UTF8Filesystem = _UTF8Filesystem()
4409
class _BreakinFeature(Feature):
4410
"""Does this platform support the breakin feature?"""
4413
from bzrlib import breakin
4414
if breakin.determine_signal() is None:
4416
if sys.platform == 'win32':
4417
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4418
# We trigger SIGBREAK via a Console api so we need ctypes to
4419
# access the function
4426
def feature_name(self):
4427
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4430
BreakinFeature = _BreakinFeature()
4433
class _CaseInsCasePresFilenameFeature(Feature):
4434
"""Is the file-system case insensitive, but case-preserving?"""
4437
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4439
# first check truly case-preserving for created files, then check
4440
# case insensitive when opening existing files.
4441
name = osutils.normpath(name)
4442
base, rel = osutils.split(name)
4443
found_rel = osutils.canonical_relpath(base, name)
4444
return (found_rel == rel
4445
and os.path.isfile(name.upper())
4446
and os.path.isfile(name.lower()))
4451
def feature_name(self):
4452
return "case-insensitive case-preserving filesystem"
4454
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4457
class _CaseInsensitiveFilesystemFeature(Feature):
4458
"""Check if underlying filesystem is case-insensitive but *not* case
4461
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4462
# more likely to be case preserving, so this case is rare.
4465
if CaseInsCasePresFilenameFeature.available():
4468
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4469
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4470
TestCaseWithMemoryTransport.TEST_ROOT = root
4472
root = TestCaseWithMemoryTransport.TEST_ROOT
4473
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4475
name_a = osutils.pathjoin(tdir, 'a')
4476
name_A = osutils.pathjoin(tdir, 'A')
4478
result = osutils.isdir(name_A)
4479
_rmtree_temp_dir(tdir)
4482
def feature_name(self):
4483
return 'case-insensitive filesystem'
4485
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4488
class _CaseSensitiveFilesystemFeature(Feature):
4491
if CaseInsCasePresFilenameFeature.available():
4493
elif CaseInsensitiveFilesystemFeature.available():
4498
def feature_name(self):
4499
return 'case-sensitive filesystem'
4501
# new coding style is for feature instances to be lowercase
4502
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4505
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4506
SubUnitFeature = _CompatabilityThunkFeature(
4507
deprecated_in((2,1,0)),
4508
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4509
4461
# Only define SubUnitBzrRunner if subunit is available.
4511
4463
from subunit import TestProtocolClient
4512
4464
from subunit.test_results import AutoTimingTestResultDecorator
4513
4465
class SubUnitBzrProtocolClient(TestProtocolClient):
4467
def stopTest(self, test):
4468
super(SubUnitBzrProtocolClient, self).stopTest(test)
4469
_clear__type_equality_funcs(test)
4515
4471
def addSuccess(self, test, details=None):
4516
4472
# The subunit client always includes the details in the subunit
4517
4473
# stream, but we don't want to include it in ours.
4529
4485
except ImportError:
4532
class _PosixPermissionsFeature(Feature):
4536
# create temporary file and check if specified perms are maintained.
4539
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4540
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4543
os.chmod(name, write_perms)
4545
read_perms = os.stat(name).st_mode & 0777
4547
return (write_perms == read_perms)
4549
return (os.name == 'posix') and has_perms()
4551
def feature_name(self):
4552
return 'POSIX permissions support'
4554
posix_permissions_feature = _PosixPermissionsFeature()
4489
# API compatibility for old plugins; see bug 892622.
4492
'HTTPServerFeature',
4493
'ModuleAvailableFeature',
4494
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4495
'OsFifoFeature', 'UnicodeFilenameFeature',
4496
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4497
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4498
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4499
'posix_permissions_feature',
4501
globals()[name] = _CompatabilityThunkFeature(
4502
symbol_versioning.deprecated_in((2, 5, 0)),
4503
'bzrlib.tests', name,
4504
name, 'bzrlib.tests.features')
4507
for (old_name, new_name) in [
4508
('UnicodeFilename', 'UnicodeFilenameFeature'),
4510
globals()[name] = _CompatabilityThunkFeature(
4511
symbol_versioning.deprecated_in((2, 5, 0)),
4512
'bzrlib.tests', old_name,
4513
new_name, 'bzrlib.tests.features')