56
50
# nb: check this before importing anything else from within it
57
51
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
54
% (testtools.__file__, _testtools_version))
61
55
from testtools import content
63
58
from bzrlib import (
62
commands as _mod_commands,
72
plugin as _mod_plugin,
77
79
transport as _mod_transport,
81
import bzrlib.commands
82
import bzrlib.timestamp
84
import bzrlib.inventory
85
import bzrlib.iterablefile
88
83
import bzrlib.lsprof
89
84
except ImportError:
90
85
# lsprof not available
92
from bzrlib.merge import merge_inner
95
from bzrlib.smart import client, request, server
97
from bzrlib import symbol_versioning
87
from bzrlib.smart import client, request
88
from bzrlib.transport import (
98
92
from bzrlib.symbol_versioning import (
100
93
deprecated_function,
106
from bzrlib.transport import (
110
from bzrlib.trace import mutter, note
111
96
from bzrlib.tests import (
139
122
TestSuite = TestUtil.TestSuite
140
123
TestLoader = TestUtil.TestLoader
125
# Tests should run in a clean and clearly defined environment. The goal is to
126
# keep them isolated from the running environment as mush as possible. The test
127
# framework ensures the variables defined below are set (or deleted if the
128
# value is None) before a test is run and reset to their original value after
129
# the test is run. Generally if some code depends on an environment variable,
130
# the tests should start without this variable in the environment. There are a
131
# few exceptions but you shouldn't violate this rule lightly.
135
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
136
# tests do check our impls match APPDATA
137
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
141
'BZREMAIL': None, # may still be present in the environment
142
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
'BZR_PROGRESS_BAR': None,
144
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
# as a base class instead of TestCaseInTempDir. Tests inheriting from
146
# TestCase should not use disk resources, BZR_LOG is one.
147
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
148
'BZR_PLUGIN_PATH': None,
149
'BZR_DISABLE_PLUGINS': None,
150
'BZR_PLUGINS_AT': None,
151
'BZR_CONCURRENCY': None,
152
# Make sure that any text ui tests are consistent regardless of
153
# the environment the test case is run in; you may want tests that
154
# test other combinations. 'dumb' is a reasonable guess for tests
155
# going to a pipe or a StringIO.
161
'SSH_AUTH_SOCK': None,
171
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
172
# least. If you do (care), please update this comment
176
'BZR_REMOTE_PATH': None,
177
# Generally speaking, we don't want apport reporting on crashes in
178
# the test envirnoment unless we're specifically testing apport,
179
# so that it doesn't leak into the real system environment. We
180
# use an env var so it propagates to subprocesses.
181
'APPORT_DISABLE': '1',
185
def override_os_environ(test, env=None):
186
"""Modify os.environ keeping a copy.
188
:param test: A test instance
190
:param env: A dict containing variable definitions to be installed
193
env = isolated_environ
194
test._original_os_environ = dict([(var, value)
195
for var, value in os.environ.iteritems()])
196
for var, value in env.iteritems():
197
osutils.set_or_unset_env(var, value)
198
if var not in test._original_os_environ:
199
# The var is new, add it with a value of None, so
200
# restore_os_environ will delete it
201
test._original_os_environ[var] = None
204
def restore_os_environ(test):
205
"""Restore os.environ to its original state.
207
:param test: A test instance previously passed to override_os_environ.
209
for var, value in test._original_os_environ.iteritems():
210
# Restore the original value (or delete it if the value has been set to
211
# None in override_os_environ).
212
osutils.set_or_unset_env(var, value)
142
215
class ExtendedTestResult(testtools.TextTestResult):
143
216
"""Accepts, reports and accumulates the results of running tests.
288
371
self.report_test_start(test)
289
372
test.number = self.count
290
373
self._recordTestStartTime()
291
# Only check for thread leaks if the test case supports cleanups
292
addCleanup = getattr(test, "addCleanup", None)
293
if addCleanup is not None:
294
addCleanup(self._check_leaked_threads, test)
374
# Make testtools cases give us the real traceback on failure
375
addOnException = getattr(test, "addOnException", None)
376
if addOnException is not None:
377
addOnException(self._record_traceback_from_test)
378
# Only check for thread leaks on bzrlib derived test cases
379
if isinstance(test, TestCase):
380
test.addCleanup(self._check_leaked_threads, test)
382
def stopTest(self, test):
383
super(ExtendedTestResult, self).stopTest(test)
384
# Manually break cycles, means touching various private things but hey
385
getDetails = getattr(test, "getDetails", None)
386
if getDetails is not None:
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
400
self._traceback_from_test = None
296
402
def startTests(self):
297
403
self.report_tests_starting()
840
991
self._track_transports()
841
992
self._track_locks()
842
993
self._clear_debug_flags()
994
# Isolate global verbosity level, to make sure it's reproducible
995
# between tests. We should get rid of this altogether: bug 656694. --
997
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
998
# Isolate config option expansion until its default value for bzrlib is
999
# settled on or a the FIXME associated with _get_expand_default_value
1000
# is addressed -- vila 20110219
1001
self.overrideAttr(config, '_expand_default_value', None)
1002
self._log_files = set()
1003
# Each key in the ``_counters`` dict holds a value for a different
1004
# counter. When the test ends, addDetail() should be used to output the
1005
# counter values. This happens in install_counter_hook().
1007
if 'config_stats' in selftest_debug_flags:
1008
self._install_config_stats_hooks()
1009
# Do not use i18n for tests (unless the test reverses this)
844
1012
def debug(self):
845
1013
# debug a frame up.
847
pdb.Pdb().set_trace(sys._getframe().f_back)
1015
# The sys preserved stdin/stdout should allow blackbox tests debugging
1016
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
).set_trace(sys._getframe().f_back)
849
1019
def discardDetail(self, name):
850
1020
"""Extend the addDetail, getDetails api so we can remove a detail.
862
1032
if name in details:
863
1033
del details[name]
1035
def install_counter_hook(self, hooks, name, counter_name=None):
1036
"""Install a counting hook.
1038
Any hook can be counted as long as it doesn't need to return a value.
1040
:param hooks: Where the hook should be installed.
1042
:param name: The hook name that will be counted.
1044
:param counter_name: The counter identifier in ``_counters``, defaults
1047
_counters = self._counters # Avoid closing over self
1048
if counter_name is None:
1050
if _counters.has_key(counter_name):
1051
raise AssertionError('%s is already used as a counter name'
1053
_counters[counter_name] = 0
1054
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
lambda: ['%d' % (_counters[counter_name],)]))
1056
def increment_counter(*args, **kwargs):
1057
_counters[counter_name] += 1
1058
label = 'count %s calls' % (counter_name,)
1059
hooks.install_named_hook(name, increment_counter, label)
1060
self.addCleanup(hooks.uninstall_named_hook, name, label)
1062
def _install_config_stats_hooks(self):
1063
"""Install config hooks to count hook calls.
1066
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
self.install_counter_hook(config.ConfigHooks, hook_name,
1068
'config.%s' % (hook_name,))
1070
# The OldConfigHooks are private and need special handling to protect
1071
# against recursive tests (tests that run other tests), so we just do
1072
# manually what registering them into _builtin_known_hooks will provide
1074
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
'old_config.%s' % (hook_name,))
865
1079
def _clear_debug_flags(self):
866
1080
"""Prevent externally set debug flags affecting tests.
878
1092
def _clear_hooks(self):
879
1093
# prevent hooks affecting tests
1094
known_hooks = hooks.known_hooks
880
1095
self._preserved_hooks = {}
881
for key, factory in hooks.known_hooks.items():
882
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
883
current_hooks = hooks.known_hooks_key_to_object(key)
1096
for key, (parent, name) in known_hooks.iter_parent_objects():
1097
current_hooks = getattr(parent, name)
884
1098
self._preserved_hooks[parent] = (name, current_hooks)
1099
self._preserved_lazy_hooks = hooks._lazy_hooks
1100
hooks._lazy_hooks = {}
885
1101
self.addCleanup(self._restoreHooks)
886
for key, factory in hooks.known_hooks.items():
887
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1102
for key, (parent, name) in known_hooks.iter_parent_objects():
1103
factory = known_hooks.get(key)
888
1104
setattr(parent, name, factory())
889
1105
# this hook should always be installed
890
1106
request._install_hook()
1147
1367
'st_mtime did not match')
1148
1368
self.assertEqual(expected.st_ctime, actual.st_ctime,
1149
1369
'st_ctime did not match')
1150
if sys.platform != 'win32':
1370
if sys.platform == 'win32':
1151
1371
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1152
1372
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1153
# odd. Regardless we shouldn't actually try to assert anything
1154
# about their values
1373
# odd. We just force it to always be 0 to avoid any problems.
1374
self.assertEqual(0, expected.st_dev)
1375
self.assertEqual(0, actual.st_dev)
1376
self.assertEqual(0, expected.st_ino)
1377
self.assertEqual(0, actual.st_ino)
1155
1379
self.assertEqual(expected.st_dev, actual.st_dev,
1156
1380
'st_dev did not match')
1157
1381
self.assertEqual(expected.st_ino, actual.st_ino,
1166
1390
length, len(obj_with_len), obj_with_len))
1168
1392
def assertLogsError(self, exception_class, func, *args, **kwargs):
1169
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1393
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1171
from bzrlib import trace
1173
1396
orig_log_exception_quietly = trace.log_exception_quietly
1176
1399
orig_log_exception_quietly()
1177
captured.append(sys.exc_info())
1400
captured.append(sys.exc_info()[1])
1178
1401
trace.log_exception_quietly = capture
1179
1402
func(*args, **kwargs)
1181
1404
trace.log_exception_quietly = orig_log_exception_quietly
1182
1405
self.assertLength(1, captured)
1183
err = captured[0][1]
1184
1407
self.assertIsInstance(err, exception_class)
1334
1561
self.assertEqual(expected_docstring, obj.__doc__)
1563
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1336
1564
def failUnlessExists(self, path):
1565
return self.assertPathExists(path)
1567
def assertPathExists(self, path):
1337
1568
"""Fail unless path or paths, which may be abs or relative, exist."""
1338
1569
if not isinstance(path, basestring):
1340
self.failUnlessExists(p)
1571
self.assertPathExists(p)
1342
self.failUnless(osutils.lexists(path),path+" does not exist")
1573
self.assertTrue(osutils.lexists(path),
1574
path + " does not exist")
1576
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1344
1577
def failIfExists(self, path):
1578
return self.assertPathDoesNotExist(path)
1580
def assertPathDoesNotExist(self, path):
1345
1581
"""Fail if path or paths, which may be abs or relative, exist."""
1346
1582
if not isinstance(path, basestring):
1348
self.failIfExists(p)
1584
self.assertPathDoesNotExist(p)
1350
self.failIf(osutils.lexists(path),path+" exists")
1586
self.assertFalse(osutils.lexists(path),
1352
1589
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1353
1590
"""A helper for callDeprecated and applyDeprecated.
1468
1706
The file is removed as the test is torn down.
1470
self._log_file = StringIO()
1471
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1708
pseudo_log_file = StringIO()
1709
def _get_log_contents_for_weird_testtools_api():
1710
return [pseudo_log_file.getvalue().decode(
1711
"utf-8", "replace").encode("utf-8")]
1712
self.addDetail("log", content.Content(content.ContentType("text",
1713
"plain", {"charset": "utf8"}),
1714
_get_log_contents_for_weird_testtools_api))
1715
self._log_file = pseudo_log_file
1716
self._log_memento = trace.push_log_file(self._log_file)
1472
1717
self.addCleanup(self._finishLogFile)
1474
1719
def _finishLogFile(self):
1475
1720
"""Finished with the log file.
1477
Close the file and delete it, unless setKeepLogfile was called.
1722
Close the file and delete it.
1479
if bzrlib.trace._trace_file:
1724
if trace._trace_file:
1480
1725
# flush the log file, to get all content
1481
bzrlib.trace._trace_file.flush()
1482
bzrlib.trace.pop_log_file(self._log_memento)
1483
# Cache the log result and delete the file on disk
1484
self._get_log(False)
1726
trace._trace_file.flush()
1727
trace.pop_log_file(self._log_memento)
1486
1729
def thisFailsStrictLockCheck(self):
1487
1730
"""It is known that this test would fail with -Dstrict_locks.
1515
1761
setattr(obj, attr_name, new)
1764
def overrideEnv(self, name, new):
1765
"""Set an environment variable, and reset it after the test.
1767
:param name: The environment variable name.
1769
:param new: The value to set the variable to. If None, the
1770
variable is deleted from the environment.
1772
:returns: The actual variable value.
1774
value = osutils.set_or_unset_env(name, new)
1775
self.addCleanup(osutils.set_or_unset_env, name, value)
1778
def recordCalls(self, obj, attr_name):
1779
"""Monkeypatch in a wrapper that will record calls.
1781
The monkeypatch is automatically removed when the test concludes.
1783
:param obj: The namespace holding the reference to be replaced;
1784
typically a module, class, or object.
1785
:param attr_name: A string for the name of the attribute to
1787
:returns: A list that will be extended with one item every time the
1788
function is called, with a tuple of (args, kwargs).
1792
def decorator(*args, **kwargs):
1793
calls.append((args, kwargs))
1794
return orig(*args, **kwargs)
1795
orig = self.overrideAttr(obj, attr_name, decorator)
1518
1798
def _cleanEnvironment(self):
1520
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1521
'HOME': os.getcwd(),
1522
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1523
# tests do check our impls match APPDATA
1524
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1528
'BZREMAIL': None, # may still be present in the environment
1529
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1530
'BZR_PROGRESS_BAR': None,
1532
'BZR_PLUGIN_PATH': None,
1533
'BZR_DISABLE_PLUGINS': None,
1534
'BZR_PLUGINS_AT': None,
1535
'BZR_CONCURRENCY': None,
1536
# Make sure that any text ui tests are consistent regardless of
1537
# the environment the test case is run in; you may want tests that
1538
# test other combinations. 'dumb' is a reasonable guess for tests
1539
# going to a pipe or a StringIO.
1543
'BZR_COLUMNS': '80',
1545
'SSH_AUTH_SOCK': None,
1549
'https_proxy': None,
1550
'HTTPS_PROXY': None,
1555
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1556
# least. If you do (care), please update this comment
1560
'BZR_REMOTE_PATH': None,
1561
# Generally speaking, we don't want apport reporting on crashes in
1562
# the test envirnoment unless we're specifically testing apport,
1563
# so that it doesn't leak into the real system environment. We
1564
# use an env var so it propagates to subprocesses.
1565
'APPORT_DISABLE': '1',
1568
self.addCleanup(self._restoreEnvironment)
1569
for name, value in new_env.iteritems():
1570
self._captureVar(name, value)
1572
def _captureVar(self, name, newvalue):
1573
"""Set an environment variable, and reset it when finished."""
1574
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1576
def _restoreEnvironment(self):
1577
for name, value in self._old_env.iteritems():
1578
osutils.set_or_unset_env(name, value)
1799
for name, value in isolated_environ.iteritems():
1800
self.overrideEnv(name, value)
1580
1802
def _restoreHooks(self):
1581
1803
for klass, (name, hooks) in self._preserved_hooks.items():
1582
1804
setattr(klass, name, hooks)
1805
self._preserved_hooks.clear()
1806
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1807
self._preserved_lazy_hooks.clear()
1584
1809
def knownFailure(self, reason):
1585
"""This test has failed for some known reason."""
1586
raise KnownFailure(reason)
1810
"""Declare that this test fails for a known reason
1812
Tests that are known to fail should generally be using expectedFailure
1813
with an appropriate reverse assertion if a change could cause the test
1814
to start passing. Conversely if the test has no immediate prospect of
1815
succeeding then using skip is more suitable.
1817
When this method is called while an exception is being handled, that
1818
traceback will be used, otherwise a new exception will be thrown to
1819
provide one but won't be reported.
1821
self._add_reason(reason)
1823
exc_info = sys.exc_info()
1824
if exc_info != (None, None, None):
1825
self._report_traceback(exc_info)
1828
raise self.failureException(reason)
1829
except self.failureException:
1830
exc_info = sys.exc_info()
1831
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
raise testtools.testcase._ExpectedFailure(exc_info)
1588
1836
def _suppress_log(self):
1589
1837
"""Remove the log info from details."""
1675
1923
self._benchtime += time.time() - start
1677
1925
def log(self, *args):
1680
def _get_log(self, keep_log_file=False):
1681
"""Internal helper to get the log from bzrlib.trace for this test.
1683
Please use self.getDetails, or self.get_log to access this in test case
1686
:param keep_log_file: When True, if the log is still a file on disk
1687
leave it as a file on disk. When False, if the log is still a file
1688
on disk, the log file is deleted and the log preserved as
1690
:return: A string containing the log.
1692
if self._log_contents is not None:
1694
self._log_contents.decode('utf8')
1695
except UnicodeDecodeError:
1696
unicodestr = self._log_contents.decode('utf8', 'replace')
1697
self._log_contents = unicodestr.encode('utf8')
1698
return self._log_contents
1699
if self._log_file is not None:
1700
log_contents = self._log_file.getvalue()
1702
log_contents.decode('utf8')
1703
except UnicodeDecodeError:
1704
unicodestr = log_contents.decode('utf8', 'replace')
1705
log_contents = unicodestr.encode('utf8')
1706
if not keep_log_file:
1707
self._log_file = None
1708
# Permit multiple calls to get_log until we clean it up in
1710
self._log_contents = log_contents
1713
return "No log file content."
1715
1928
def get_log(self):
1716
1929
"""Get a unicode string containing the log from bzrlib.trace.
2205
def _add_subprocess_log(self, log_file_path):
2206
if len(self._log_files) == 0:
2207
# Register an addCleanup func. We do this on the first call to
2208
# _add_subprocess_log rather than in TestCase.setUp so that this
2209
# addCleanup is registered after any cleanups for tempdirs that
2210
# subclasses might create, which will probably remove the log file
2212
self.addCleanup(self._subprocess_log_cleanup)
2213
# self._log_files is a set, so if a log file is reused we won't grab it
2215
self._log_files.add(log_file_path)
2217
def _subprocess_log_cleanup(self):
2218
for count, log_file_path in enumerate(self._log_files):
2219
# We use buffer_now=True to avoid holding the file open beyond
2220
# the life of this function, which might interfere with e.g.
2221
# cleaning tempdirs on Windows.
2222
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
#detail_content = content.content_from_file(
2224
# log_file_path, buffer_now=True)
2225
with open(log_file_path, 'rb') as log_file:
2226
log_file_bytes = log_file.read()
2227
detail_content = content.Content(content.ContentType("text",
2228
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1985
2232
def _popen(self, *args, **kwargs):
1986
2233
"""Place a call to Popen.
2024
2271
if retcode is not None and retcode != process.returncode:
2025
2272
if process_args is None:
2026
2273
process_args = "(unknown args)"
2027
mutter('Output of bzr %s:\n%s', process_args, out)
2028
mutter('Error for bzr %s:\n%s', process_args, err)
2274
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2275
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2029
2276
self.fail('Command bzr %s failed with retcode %s != %s'
2030
2277
% (process_args, retcode, process.returncode))
2031
2278
return [out, err]
2033
def check_inventory_shape(self, inv, shape):
2034
"""Compare an inventory to a list of expected names.
2280
def check_tree_shape(self, tree, shape):
2281
"""Compare a tree to a list of expected names.
2036
2283
Fail if they are not precisely equal.
2039
2286
shape = list(shape) # copy
2040
for path, ie in inv.entries():
2287
for path, ie in tree.iter_entries_by_dir():
2041
2288
name = path.replace('\\', '/')
2042
2289
if ie.kind == 'directory':
2043
2290
name = name + '/'
2292
pass # ignore root entry
2045
2294
shape.remove(name)
2047
2296
extras.append(name)
2137
2386
class TestCaseWithMemoryTransport(TestCase):
2138
2387
"""Common test class for tests that do not need disk resources.
2140
Tests that need disk resources should derive from TestCaseWithTransport.
2389
Tests that need disk resources should derive from TestCaseInTempDir
2390
orTestCaseWithTransport.
2142
2392
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2144
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2394
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2145
2395
a directory which does not exist. This serves to help ensure test isolation
2146
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2147
must exist. However, TestCaseWithMemoryTransport does not offer local
2148
file defaults for the transport in tests, nor does it obey the command line
2396
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
defaults for the transport in tests, nor does it obey the command line
2149
2399
override, so tests that accidentally write to the common directory should
2152
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2153
a .bzr directory that stops us ascending higher into the filesystem.
2402
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
``.bzr`` directory that stops us ascending higher into the filesystem.
2156
2406
TEST_ROOT = None
2315
2566
root = TestCaseWithMemoryTransport.TEST_ROOT
2316
bzrdir.BzrDir.create_standalone_workingtree(root)
2568
# Make sure we get a readable and accessible home for .bzr.log
2569
# and/or config files, and not fallback to weird defaults (see
2570
# http://pad.lv/825027).
2571
self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
os.environ['BZR_HOME'] = root
2573
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
del os.environ['BZR_HOME']
2575
except Exception, e:
2576
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
# Hack for speed: remember the raw bytes of the dirstate file so that
2578
# we don't need to re-open the wt to check it hasn't changed.
2579
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
wt.control_transport.get_bytes('dirstate'))
2318
2582
def _check_safety_net(self):
2319
2583
"""Check that the safety .bzr directory have not been touched.
2366
2630
def make_branch(self, relpath, format=None):
2367
2631
"""Create a branch on the transport at relpath."""
2368
2632
repo = self.make_repository(relpath, format=format)
2369
return repo.bzrdir.create_branch()
2633
return repo.bzrdir.create_branch(append_revisions_only=False)
2635
def resolve_format(self, format):
2636
"""Resolve an object to a ControlDir format object.
2638
The initial format object can either already be
2639
a ControlDirFormat, None (for the default format),
2640
or a string with the name of the control dir format.
2642
:param format: Object to resolve
2643
:return A ControlDirFormat instance
2647
if isinstance(format, basestring):
2648
format = bzrdir.format_registry.make_bzrdir(format)
2651
def resolve_format(self, format):
2652
"""Resolve an object to a ControlDir format object.
2654
The initial format object can either already be
2655
a ControlDirFormat, None (for the default format),
2656
or a string with the name of the control dir format.
2658
:param format: Object to resolve
2659
:return A ControlDirFormat instance
2663
if isinstance(format, basestring):
2664
format = bzrdir.format_registry.make_bzrdir(format)
2371
2667
def make_bzrdir(self, relpath, format=None):
2418
2711
test_home_dir = self.test_home_dir
2419
2712
if isinstance(test_home_dir, unicode):
2420
2713
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2421
os.environ['HOME'] = test_home_dir
2422
os.environ['BZR_HOME'] = test_home_dir
2714
self.overrideEnv('HOME', test_home_dir)
2715
self.overrideEnv('BZR_HOME', test_home_dir)
2424
2717
def setUp(self):
2425
2718
super(TestCaseWithMemoryTransport, self).setUp()
2426
2719
# Ensure that ConnectedTransport doesn't leak sockets
2427
def get_transport_with_cleanup(*args, **kwargs):
2428
t = orig_get_transport(*args, **kwargs)
2720
def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
t = orig_get_transport_from_url(*args, **kwargs)
2429
2722
if isinstance(t, _mod_transport.ConnectedTransport):
2430
2723
self.addCleanup(t.disconnect)
2433
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
get_transport_with_cleanup)
2726
orig_get_transport_from_url = self.overrideAttr(
2727
_mod_transport, 'get_transport_from_url',
2728
get_transport_from_url_with_cleanup)
2435
2729
self._make_test_root()
2436
2730
self.addCleanup(os.chdir, os.getcwdu())
2437
2731
self.makeAndChdirToTestDir()
3334
class ForwardingResult(unittest.TestResult):
3336
def __init__(self, target):
3337
unittest.TestResult.__init__(self)
3338
self.result = target
3340
def startTest(self, test):
3341
self.result.startTest(test)
3343
def stopTest(self, test):
3344
self.result.stopTest(test)
3346
def startTestRun(self):
3347
self.result.startTestRun()
3349
def stopTestRun(self):
3350
self.result.stopTestRun()
3352
def addSkip(self, test, reason):
3353
self.result.addSkip(test, reason)
3355
def addSuccess(self, test):
3356
self.result.addSuccess(test)
3358
def addError(self, test, err):
3359
self.result.addError(test, err)
3361
def addFailure(self, test, err):
3362
self.result.addFailure(test, err)
3363
ForwardingResult = testtools.ExtendedToOriginalDecorator
3366
class ProfileResult(ForwardingResult):
3634
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3367
3635
"""Generate profiling data for all activity between start and success.
3369
3637
The profile data is appended to the test's _benchcalls attribute and can
3703
3975
'bzrlib.tests.test_commit_merge',
3704
3976
'bzrlib.tests.test_config',
3705
3977
'bzrlib.tests.test_conflicts',
3978
'bzrlib.tests.test_controldir',
3706
3979
'bzrlib.tests.test_counted_lock',
3707
3980
'bzrlib.tests.test_crash',
3708
3981
'bzrlib.tests.test_decorators',
3709
3982
'bzrlib.tests.test_delta',
3710
3983
'bzrlib.tests.test_debug',
3711
'bzrlib.tests.test_deprecated_graph',
3712
3984
'bzrlib.tests.test_diff',
3713
3985
'bzrlib.tests.test_directory_service',
3714
3986
'bzrlib.tests.test_dirstate',
3715
3987
'bzrlib.tests.test_email_message',
3716
3988
'bzrlib.tests.test_eol_filters',
3717
3989
'bzrlib.tests.test_errors',
3990
'bzrlib.tests.test_estimate_compressed_size',
3718
3991
'bzrlib.tests.test_export',
3992
'bzrlib.tests.test_export_pot',
3719
3993
'bzrlib.tests.test_extract',
3994
'bzrlib.tests.test_features',
3720
3995
'bzrlib.tests.test_fetch',
3721
3996
'bzrlib.tests.test_fixtures',
3722
3997
'bzrlib.tests.test_fifo_cache',
3723
3998
'bzrlib.tests.test_filters',
3999
'bzrlib.tests.test_filter_tree',
3724
4000
'bzrlib.tests.test_ftp_transport',
3725
4001
'bzrlib.tests.test_foreign',
3726
4002
'bzrlib.tests.test_generate_docs',
3961
4244
# Some tests mentioned in the list are not in the test suite. The
3962
4245
# list may be out of date, report to the tester.
3963
4246
for id in not_found:
3964
bzrlib.trace.warning('"%s" not found in the test suite', id)
4247
trace.warning('"%s" not found in the test suite', id)
3965
4248
for id in duplicates:
3966
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4249
trace.warning('"%s" is used as an id by several tests', id)
3971
def multiply_scenarios(scenarios_left, scenarios_right):
4254
def multiply_scenarios(*scenarios):
4255
"""Multiply two or more iterables of scenarios.
4257
It is safe to pass scenario generators or iterators.
4259
:returns: A list of compound scenarios: the cross-product of all
4260
scenarios, with the names concatenated and the parameters
4263
return reduce(_multiply_two_scenarios, map(list, scenarios))
4266
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3972
4267
"""Multiply two sets of scenarios.
3974
4269
:returns: the cartesian product of the two sets of scenarios, that is
4144
4440
% (os.path.basename(dirname), printable_e))
4147
class Feature(object):
4148
"""An operating system Feature."""
4151
self._available = None
4153
def available(self):
4154
"""Is the feature available?
4156
:return: True if the feature is available.
4158
if self._available is None:
4159
self._available = self._probe()
4160
return self._available
4163
"""Implement this method in concrete features.
4165
:return: True if the feature is available.
4167
raise NotImplementedError
4170
if getattr(self, 'feature_name', None):
4171
return self.feature_name()
4172
return self.__class__.__name__
4175
class _SymlinkFeature(Feature):
4178
return osutils.has_symlinks()
4180
def feature_name(self):
4183
SymlinkFeature = _SymlinkFeature()
4186
class _HardlinkFeature(Feature):
4189
return osutils.has_hardlinks()
4191
def feature_name(self):
4194
HardlinkFeature = _HardlinkFeature()
4197
class _OsFifoFeature(Feature):
4200
return getattr(os, 'mkfifo', None)
4202
def feature_name(self):
4203
return 'filesystem fifos'
4205
OsFifoFeature = _OsFifoFeature()
4208
class _UnicodeFilenameFeature(Feature):
4209
"""Does the filesystem support Unicode filenames?"""
4213
# Check for character combinations unlikely to be covered by any
4214
# single non-unicode encoding. We use the characters
4215
# - greek small letter alpha (U+03B1) and
4216
# - braille pattern dots-123456 (U+283F).
4217
os.stat(u'\u03b1\u283f')
4218
except UnicodeEncodeError:
4220
except (IOError, OSError):
4221
# The filesystem allows the Unicode filename but the file doesn't
4225
# The filesystem allows the Unicode filename and the file exists,
4229
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4232
class _CompatabilityThunkFeature(Feature):
4233
"""This feature is just a thunk to another feature.
4235
It issues a deprecation warning if it is accessed, to let you know that you
4236
should really use a different feature.
4239
def __init__(self, dep_version, module, name,
4240
replacement_name, replacement_module=None):
4241
super(_CompatabilityThunkFeature, self).__init__()
4242
self._module = module
4243
if replacement_module is None:
4244
replacement_module = module
4245
self._replacement_module = replacement_module
4247
self._replacement_name = replacement_name
4248
self._dep_version = dep_version
4249
self._feature = None
4252
if self._feature is None:
4253
depr_msg = self._dep_version % ('%s.%s'
4254
% (self._module, self._name))
4255
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4256
self._replacement_name)
4257
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4258
# Import the new feature and use it as a replacement for the
4260
mod = __import__(self._replacement_module, {}, {},
4261
[self._replacement_name])
4262
self._feature = getattr(mod, self._replacement_name)
4266
return self._feature._probe()
4269
class ModuleAvailableFeature(Feature):
4270
"""This is a feature than describes a module we want to be available.
4272
Declare the name of the module in __init__(), and then after probing, the
4273
module will be available as 'self.module'.
4275
:ivar module: The module if it is available, else None.
4278
def __init__(self, module_name):
4279
super(ModuleAvailableFeature, self).__init__()
4280
self.module_name = module_name
4284
self._module = __import__(self.module_name, {}, {}, [''])
4291
if self.available(): # Make sure the probe has been done
4295
def feature_name(self):
4296
return self.module_name
4299
# This is kept here for compatibility, it is recommended to use
4300
# 'bzrlib.tests.feature.paramiko' instead
4301
ParamikoFeature = _CompatabilityThunkFeature(
4302
deprecated_in((2,1,0)),
4303
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4306
4443
def probe_unicode_in_user_encoding():
4307
4444
"""Try to encode several unicode strings to use in unicode-aware tests.
4308
4445
Return first successfull match.
4339
class _HTTPSServerFeature(Feature):
4340
"""Some tests want an https Server, check if one is available.
4342
Right now, the only way this is available is under python2.6 which provides
4353
def feature_name(self):
4354
return 'HTTPSServer'
4357
HTTPSServerFeature = _HTTPSServerFeature()
4360
class _UnicodeFilename(Feature):
4361
"""Does the filesystem support Unicode filenames?"""
4366
except UnicodeEncodeError:
4368
except (IOError, OSError):
4369
# The filesystem allows the Unicode filename but the file doesn't
4373
# The filesystem allows the Unicode filename and the file exists,
4377
UnicodeFilename = _UnicodeFilename()
4380
class _ByteStringNamedFilesystem(Feature):
4381
"""Is the filesystem based on bytes?"""
4384
if os.name == "posix":
4388
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4391
class _UTF8Filesystem(Feature):
4392
"""Is the filesystem UTF-8?"""
4395
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4399
UTF8Filesystem = _UTF8Filesystem()
4402
class _BreakinFeature(Feature):
4403
"""Does this platform support the breakin feature?"""
4406
from bzrlib import breakin
4407
if breakin.determine_signal() is None:
4409
if sys.platform == 'win32':
4410
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4411
# We trigger SIGBREAK via a Console api so we need ctypes to
4412
# access the function
4419
def feature_name(self):
4420
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4423
BreakinFeature = _BreakinFeature()
4426
class _CaseInsCasePresFilenameFeature(Feature):
4427
"""Is the file-system case insensitive, but case-preserving?"""
4430
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4432
# first check truly case-preserving for created files, then check
4433
# case insensitive when opening existing files.
4434
name = osutils.normpath(name)
4435
base, rel = osutils.split(name)
4436
found_rel = osutils.canonical_relpath(base, name)
4437
return (found_rel == rel
4438
and os.path.isfile(name.upper())
4439
and os.path.isfile(name.lower()))
4444
def feature_name(self):
4445
return "case-insensitive case-preserving filesystem"
4447
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4450
class _CaseInsensitiveFilesystemFeature(Feature):
4451
"""Check if underlying filesystem is case-insensitive but *not* case
4454
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4455
# more likely to be case preserving, so this case is rare.
4458
if CaseInsCasePresFilenameFeature.available():
4461
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4462
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4463
TestCaseWithMemoryTransport.TEST_ROOT = root
4465
root = TestCaseWithMemoryTransport.TEST_ROOT
4466
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4468
name_a = osutils.pathjoin(tdir, 'a')
4469
name_A = osutils.pathjoin(tdir, 'A')
4471
result = osutils.isdir(name_A)
4472
_rmtree_temp_dir(tdir)
4475
def feature_name(self):
4476
return 'case-insensitive filesystem'
4478
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4481
class _CaseSensitiveFilesystemFeature(Feature):
4484
if CaseInsCasePresFilenameFeature.available():
4486
elif CaseInsensitiveFilesystemFeature.available():
4491
def feature_name(self):
4492
return 'case-sensitive filesystem'
4494
# new coding style is for feature instances to be lowercase
4495
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4498
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4499
SubUnitFeature = _CompatabilityThunkFeature(
4500
deprecated_in((2,1,0)),
4501
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4502
4476
# Only define SubUnitBzrRunner if subunit is available.
4504
4478
from subunit import TestProtocolClient