56
53
# nb: check this before importing anything else from within it
57
54
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
55
if _testtools_version < (0, 9, 5):
56
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
57
% (testtools.__file__, _testtools_version))
61
58
from testtools import content
63
61
from bzrlib import (
65
commands as _mod_commands,
75
plugin as _mod_plugin,
77
82
transport as _mod_transport,
81
import bzrlib.commands
82
import bzrlib.timestamp
84
import bzrlib.inventory
85
import bzrlib.iterablefile
88
86
import bzrlib.lsprof
89
87
except ImportError:
90
88
# lsprof not available
92
from bzrlib.merge import merge_inner
95
from bzrlib.smart import client, request, server
97
from bzrlib import symbol_versioning
90
from bzrlib.smart import client, request
91
from bzrlib.transport import (
98
95
from bzrlib.symbol_versioning import (
100
96
deprecated_function,
106
from bzrlib.transport import (
110
from bzrlib.trace import mutter, note
111
99
from bzrlib.tests import (
116
105
from bzrlib.ui import NullProgressView
117
106
from bzrlib.ui.text import TextUIFactory
118
import bzrlib.version_info_formats.format_custom
119
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
107
from bzrlib.tests.features import _CompatabilityThunkFeature
121
109
# Mark this python module as being part of the implementation
122
110
# of unittest: this gives us better tracebacks where the last
139
127
TestSuite = TestUtil.TestSuite
140
128
TestLoader = TestUtil.TestLoader
130
# Tests should run in a clean and clearly defined environment. The goal is to
131
# keep them isolated from the running environment as mush as possible. The test
132
# framework ensures the variables defined below are set (or deleted if the
133
# value is None) before a test is run and reset to their original value after
134
# the test is run. Generally if some code depends on an environment variable,
135
# the tests should start without this variable in the environment. There are a
136
# few exceptions but you shouldn't violate this rule lightly.
140
'XDG_CONFIG_HOME': None,
141
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
142
# tests do check our impls match APPDATA
143
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
147
'BZREMAIL': None, # may still be present in the environment
148
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
149
'BZR_PROGRESS_BAR': None,
150
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
151
# as a base class instead of TestCaseInTempDir. Tests inheriting from
152
# TestCase should not use disk resources, BZR_LOG is one.
153
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
154
'BZR_PLUGIN_PATH': None,
155
'BZR_DISABLE_PLUGINS': None,
156
'BZR_PLUGINS_AT': None,
157
'BZR_CONCURRENCY': None,
158
# Make sure that any text ui tests are consistent regardless of
159
# the environment the test case is run in; you may want tests that
160
# test other combinations. 'dumb' is a reasonable guess for tests
161
# going to a pipe or a StringIO.
167
'SSH_AUTH_SOCK': None,
177
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
178
# least. If you do (care), please update this comment
182
'BZR_REMOTE_PATH': None,
183
# Generally speaking, we don't want apport reporting on crashes in
184
# the test envirnoment unless we're specifically testing apport,
185
# so that it doesn't leak into the real system environment. We
186
# use an env var so it propagates to subprocesses.
187
'APPORT_DISABLE': '1',
191
def override_os_environ(test, env=None):
192
"""Modify os.environ keeping a copy.
194
:param test: A test instance
196
:param env: A dict containing variable definitions to be installed
199
env = isolated_environ
200
test._original_os_environ = dict([(var, value)
201
for var, value in os.environ.iteritems()])
202
for var, value in env.iteritems():
203
osutils.set_or_unset_env(var, value)
204
if var not in test._original_os_environ:
205
# The var is new, add it with a value of None, so
206
# restore_os_environ will delete it
207
test._original_os_environ[var] = None
210
def restore_os_environ(test):
211
"""Restore os.environ to its original state.
213
:param test: A test instance previously passed to override_os_environ.
215
for var, value in test._original_os_environ.iteritems():
216
# Restore the original value (or delete it if the value has been set to
217
# None in override_os_environ).
218
osutils.set_or_unset_env(var, value)
221
def _clear__type_equality_funcs(test):
222
"""Cleanup bound methods stored on TestCase instances
224
Clear the dict breaking a few (mostly) harmless cycles in the affected
225
unittests released with Python 2.6 and initial Python 2.7 versions.
227
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
228
shipped in Oneiric, an object with no clear method was used, hence the
229
extra complications, see bug 809048 for details.
231
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
232
if type_equality_funcs is not None:
233
tef_clear = getattr(type_equality_funcs, "clear", None)
234
if tef_clear is None:
235
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
236
if tef_instance_dict is not None:
237
tef_clear = tef_instance_dict.clear
238
if tef_clear is not None:
142
242
class ExtendedTestResult(testtools.TextTestResult):
143
243
"""Accepts, reports and accumulates the results of running tests.
288
398
self.report_test_start(test)
289
399
test.number = self.count
290
400
self._recordTestStartTime()
291
# Only check for thread leaks if the test case supports cleanups
292
addCleanup = getattr(test, "addCleanup", None)
293
if addCleanup is not None:
294
addCleanup(self._check_leaked_threads, test)
401
# Make testtools cases give us the real traceback on failure
402
addOnException = getattr(test, "addOnException", None)
403
if addOnException is not None:
404
addOnException(self._record_traceback_from_test)
405
# Only check for thread leaks on bzrlib derived test cases
406
if isinstance(test, TestCase):
407
test.addCleanup(self._check_leaked_threads, test)
409
def stopTest(self, test):
410
super(ExtendedTestResult, self).stopTest(test)
411
# Manually break cycles, means touching various private things but hey
412
getDetails = getattr(test, "getDetails", None)
413
if getDetails is not None:
415
_clear__type_equality_funcs(test)
416
self._traceback_from_test = None
296
418
def startTests(self):
297
419
self.report_tests_starting()
862
1062
if name in details:
863
1063
del details[name]
1065
def install_counter_hook(self, hooks, name, counter_name=None):
1066
"""Install a counting hook.
1068
Any hook can be counted as long as it doesn't need to return a value.
1070
:param hooks: Where the hook should be installed.
1072
:param name: The hook name that will be counted.
1074
:param counter_name: The counter identifier in ``_counters``, defaults
1077
_counters = self._counters # Avoid closing over self
1078
if counter_name is None:
1080
if _counters.has_key(counter_name):
1081
raise AssertionError('%s is already used as a counter name'
1083
_counters[counter_name] = 0
1084
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1085
lambda: ['%d' % (_counters[counter_name],)]))
1086
def increment_counter(*args, **kwargs):
1087
_counters[counter_name] += 1
1088
label = 'count %s calls' % (counter_name,)
1089
hooks.install_named_hook(name, increment_counter, label)
1090
self.addCleanup(hooks.uninstall_named_hook, name, label)
1092
def _install_config_stats_hooks(self):
1093
"""Install config hooks to count hook calls.
1096
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1097
self.install_counter_hook(config.ConfigHooks, hook_name,
1098
'config.%s' % (hook_name,))
1100
# The OldConfigHooks are private and need special handling to protect
1101
# against recursive tests (tests that run other tests), so we just do
1102
# manually what registering them into _builtin_known_hooks will provide
1104
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1105
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1106
self.install_counter_hook(config.OldConfigHooks, hook_name,
1107
'old_config.%s' % (hook_name,))
865
1109
def _clear_debug_flags(self):
866
1110
"""Prevent externally set debug flags affecting tests.
878
1122
def _clear_hooks(self):
879
1123
# prevent hooks affecting tests
1124
known_hooks = hooks.known_hooks
880
1125
self._preserved_hooks = {}
881
for key, factory in hooks.known_hooks.items():
882
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
883
current_hooks = hooks.known_hooks_key_to_object(key)
1126
for key, (parent, name) in known_hooks.iter_parent_objects():
1127
current_hooks = getattr(parent, name)
884
1128
self._preserved_hooks[parent] = (name, current_hooks)
1129
self._preserved_lazy_hooks = hooks._lazy_hooks
1130
hooks._lazy_hooks = {}
885
1131
self.addCleanup(self._restoreHooks)
886
for key, factory in hooks.known_hooks.items():
887
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1132
for key, (parent, name) in known_hooks.iter_parent_objects():
1133
factory = known_hooks.get(key)
888
1134
setattr(parent, name, factory())
889
1135
# this hook should always be installed
890
1136
request._install_hook()
1334
1591
self.assertEqual(expected_docstring, obj.__doc__)
1593
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1336
1594
def failUnlessExists(self, path):
1595
return self.assertPathExists(path)
1597
def assertPathExists(self, path):
1337
1598
"""Fail unless path or paths, which may be abs or relative, exist."""
1338
1599
if not isinstance(path, basestring):
1340
self.failUnlessExists(p)
1601
self.assertPathExists(p)
1342
self.failUnless(osutils.lexists(path),path+" does not exist")
1603
self.assertTrue(osutils.lexists(path),
1604
path + " does not exist")
1606
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1344
1607
def failIfExists(self, path):
1608
return self.assertPathDoesNotExist(path)
1610
def assertPathDoesNotExist(self, path):
1345
1611
"""Fail if path or paths, which may be abs or relative, exist."""
1346
1612
if not isinstance(path, basestring):
1348
self.failIfExists(p)
1614
self.assertPathDoesNotExist(p)
1350
self.failIf(osutils.lexists(path),path+" exists")
1616
self.assertFalse(osutils.lexists(path),
1352
1619
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1353
1620
"""A helper for callDeprecated and applyDeprecated.
1465
1733
def _startLogFile(self):
1466
"""Send bzr and test log messages to a temporary file.
1468
The file is removed as the test is torn down.
1470
self._log_file = StringIO()
1471
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1734
"""Setup a in-memory target for bzr and testcase log messages"""
1735
pseudo_log_file = StringIO()
1736
def _get_log_contents_for_weird_testtools_api():
1737
return [pseudo_log_file.getvalue().decode(
1738
"utf-8", "replace").encode("utf-8")]
1739
self.addDetail("log", content.Content(content.ContentType("text",
1740
"plain", {"charset": "utf8"}),
1741
_get_log_contents_for_weird_testtools_api))
1742
self._log_file = pseudo_log_file
1743
self._log_memento = trace.push_log_file(self._log_file)
1472
1744
self.addCleanup(self._finishLogFile)
1474
1746
def _finishLogFile(self):
1475
"""Finished with the log file.
1477
Close the file and delete it, unless setKeepLogfile was called.
1479
if bzrlib.trace._trace_file:
1747
"""Flush and dereference the in-memory log for this testcase"""
1748
if trace._trace_file:
1480
1749
# flush the log file, to get all content
1481
bzrlib.trace._trace_file.flush()
1482
bzrlib.trace.pop_log_file(self._log_memento)
1483
# Cache the log result and delete the file on disk
1484
self._get_log(False)
1750
trace._trace_file.flush()
1751
trace.pop_log_file(self._log_memento)
1752
# The logging module now tracks references for cleanup so discard ours
1753
del self._log_memento
1486
1755
def thisFailsStrictLockCheck(self):
1487
1756
"""It is known that this test would fail with -Dstrict_locks.
1509
1781
:returns: The actual attr value.
1511
value = getattr(obj, attr_name)
1512
1783
# The actual value is captured by the call below
1513
self.addCleanup(setattr, obj, attr_name, value)
1784
value = getattr(obj, attr_name, _unitialized_attr)
1785
if value is _unitialized_attr:
1786
# When the test completes, the attribute should not exist, but if
1787
# we aren't setting a value, we don't need to do anything.
1788
if new is not _unitialized_attr:
1789
self.addCleanup(delattr, obj, attr_name)
1791
self.addCleanup(setattr, obj, attr_name, value)
1514
1792
if new is not _unitialized_attr:
1515
1793
setattr(obj, attr_name, new)
1796
def overrideEnv(self, name, new):
1797
"""Set an environment variable, and reset it after the test.
1799
:param name: The environment variable name.
1801
:param new: The value to set the variable to. If None, the
1802
variable is deleted from the environment.
1804
:returns: The actual variable value.
1806
value = osutils.set_or_unset_env(name, new)
1807
self.addCleanup(osutils.set_or_unset_env, name, value)
1810
def recordCalls(self, obj, attr_name):
1811
"""Monkeypatch in a wrapper that will record calls.
1813
The monkeypatch is automatically removed when the test concludes.
1815
:param obj: The namespace holding the reference to be replaced;
1816
typically a module, class, or object.
1817
:param attr_name: A string for the name of the attribute to
1819
:returns: A list that will be extended with one item every time the
1820
function is called, with a tuple of (args, kwargs).
1824
def decorator(*args, **kwargs):
1825
calls.append((args, kwargs))
1826
return orig(*args, **kwargs)
1827
orig = self.overrideAttr(obj, attr_name, decorator)
1518
1830
def _cleanEnvironment(self):
1520
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1521
'HOME': os.getcwd(),
1522
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1523
# tests do check our impls match APPDATA
1524
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1528
'BZREMAIL': None, # may still be present in the environment
1529
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1530
'BZR_PROGRESS_BAR': None,
1532
'BZR_PLUGIN_PATH': None,
1533
'BZR_DISABLE_PLUGINS': None,
1534
'BZR_PLUGINS_AT': None,
1535
'BZR_CONCURRENCY': None,
1536
# Make sure that any text ui tests are consistent regardless of
1537
# the environment the test case is run in; you may want tests that
1538
# test other combinations. 'dumb' is a reasonable guess for tests
1539
# going to a pipe or a StringIO.
1543
'BZR_COLUMNS': '80',
1545
'SSH_AUTH_SOCK': None,
1549
'https_proxy': None,
1550
'HTTPS_PROXY': None,
1555
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1556
# least. If you do (care), please update this comment
1560
'BZR_REMOTE_PATH': None,
1561
# Generally speaking, we don't want apport reporting on crashes in
1562
# the test envirnoment unless we're specifically testing apport,
1563
# so that it doesn't leak into the real system environment. We
1564
# use an env var so it propagates to subprocesses.
1565
'APPORT_DISABLE': '1',
1568
self.addCleanup(self._restoreEnvironment)
1569
for name, value in new_env.iteritems():
1570
self._captureVar(name, value)
1572
def _captureVar(self, name, newvalue):
1573
"""Set an environment variable, and reset it when finished."""
1574
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1576
def _restoreEnvironment(self):
1577
for name, value in self._old_env.iteritems():
1578
osutils.set_or_unset_env(name, value)
1831
for name, value in isolated_environ.iteritems():
1832
self.overrideEnv(name, value)
1580
1834
def _restoreHooks(self):
1581
1835
for klass, (name, hooks) in self._preserved_hooks.items():
1582
1836
setattr(klass, name, hooks)
1837
self._preserved_hooks.clear()
1838
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1839
self._preserved_lazy_hooks.clear()
1584
1841
def knownFailure(self, reason):
1585
"""This test has failed for some known reason."""
1586
raise KnownFailure(reason)
1842
"""Declare that this test fails for a known reason
1844
Tests that are known to fail should generally be using expectedFailure
1845
with an appropriate reverse assertion if a change could cause the test
1846
to start passing. Conversely if the test has no immediate prospect of
1847
succeeding then using skip is more suitable.
1849
When this method is called while an exception is being handled, that
1850
traceback will be used, otherwise a new exception will be thrown to
1851
provide one but won't be reported.
1853
self._add_reason(reason)
1855
exc_info = sys.exc_info()
1856
if exc_info != (None, None, None):
1857
self._report_traceback(exc_info)
1860
raise self.failureException(reason)
1861
except self.failureException:
1862
exc_info = sys.exc_info()
1863
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1864
raise testtools.testcase._ExpectedFailure(exc_info)
1588
1868
def _suppress_log(self):
1589
1869
"""Remove the log info from details."""
1675
1955
self._benchtime += time.time() - start
1677
1957
def log(self, *args):
1680
def _get_log(self, keep_log_file=False):
1681
"""Internal helper to get the log from bzrlib.trace for this test.
1683
Please use self.getDetails, or self.get_log to access this in test case
1686
:param keep_log_file: When True, if the log is still a file on disk
1687
leave it as a file on disk. When False, if the log is still a file
1688
on disk, the log file is deleted and the log preserved as
1690
:return: A string containing the log.
1692
if self._log_contents is not None:
1694
self._log_contents.decode('utf8')
1695
except UnicodeDecodeError:
1696
unicodestr = self._log_contents.decode('utf8', 'replace')
1697
self._log_contents = unicodestr.encode('utf8')
1698
return self._log_contents
1699
if self._log_file is not None:
1700
log_contents = self._log_file.getvalue()
1702
log_contents.decode('utf8')
1703
except UnicodeDecodeError:
1704
unicodestr = log_contents.decode('utf8', 'replace')
1705
log_contents = unicodestr.encode('utf8')
1706
if not keep_log_file:
1707
self._log_file = None
1708
# Permit multiple calls to get_log until we clean it up in
1710
self._log_contents = log_contents
1713
return "No log file content."
1715
1960
def get_log(self):
1716
1961
"""Get a unicode string containing the log from bzrlib.trace.
2242
def _add_subprocess_log(self, log_file_path):
2243
if len(self._log_files) == 0:
2244
# Register an addCleanup func. We do this on the first call to
2245
# _add_subprocess_log rather than in TestCase.setUp so that this
2246
# addCleanup is registered after any cleanups for tempdirs that
2247
# subclasses might create, which will probably remove the log file
2249
self.addCleanup(self._subprocess_log_cleanup)
2250
# self._log_files is a set, so if a log file is reused we won't grab it
2252
self._log_files.add(log_file_path)
2254
def _subprocess_log_cleanup(self):
2255
for count, log_file_path in enumerate(self._log_files):
2256
# We use buffer_now=True to avoid holding the file open beyond
2257
# the life of this function, which might interfere with e.g.
2258
# cleaning tempdirs on Windows.
2259
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2260
#detail_content = content.content_from_file(
2261
# log_file_path, buffer_now=True)
2262
with open(log_file_path, 'rb') as log_file:
2263
log_file_bytes = log_file.read()
2264
detail_content = content.Content(content.ContentType("text",
2265
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2266
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1985
2269
def _popen(self, *args, **kwargs):
1986
2270
"""Place a call to Popen.
2024
2308
if retcode is not None and retcode != process.returncode:
2025
2309
if process_args is None:
2026
2310
process_args = "(unknown args)"
2027
mutter('Output of bzr %s:\n%s', process_args, out)
2028
mutter('Error for bzr %s:\n%s', process_args, err)
2311
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2312
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2029
2313
self.fail('Command bzr %s failed with retcode %s != %s'
2030
2314
% (process_args, retcode, process.returncode))
2031
2315
return [out, err]
2033
def check_inventory_shape(self, inv, shape):
2034
"""Compare an inventory to a list of expected names.
2317
def check_tree_shape(self, tree, shape):
2318
"""Compare a tree to a list of expected names.
2036
2320
Fail if they are not precisely equal.
2039
2323
shape = list(shape) # copy
2040
for path, ie in inv.entries():
2324
for path, ie in tree.iter_entries_by_dir():
2041
2325
name = path.replace('\\', '/')
2042
2326
if ie.kind == 'directory':
2043
2327
name = name + '/'
2329
pass # ignore root entry
2045
2331
shape.remove(name)
2047
2333
extras.append(name)
2137
2425
class TestCaseWithMemoryTransport(TestCase):
2138
2426
"""Common test class for tests that do not need disk resources.
2140
Tests that need disk resources should derive from TestCaseWithTransport.
2428
Tests that need disk resources should derive from TestCaseInTempDir
2429
orTestCaseWithTransport.
2142
2431
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2144
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2433
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2145
2434
a directory which does not exist. This serves to help ensure test isolation
2146
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2147
must exist. However, TestCaseWithMemoryTransport does not offer local
2148
file defaults for the transport in tests, nor does it obey the command line
2435
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2436
must exist. However, TestCaseWithMemoryTransport does not offer local file
2437
defaults for the transport in tests, nor does it obey the command line
2149
2438
override, so tests that accidentally write to the common directory should
2152
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2153
a .bzr directory that stops us ascending higher into the filesystem.
2441
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2442
``.bzr`` directory that stops us ascending higher into the filesystem.
2156
2445
TEST_ROOT = None
2166
2455
self.transport_readonly_server = None
2167
2456
self.__vfs_server = None
2459
super(TestCaseWithMemoryTransport, self).setUp()
2461
def _add_disconnect_cleanup(transport):
2462
"""Schedule disconnection of given transport at test cleanup
2464
This needs to happen for all connected transports or leaks occur.
2466
Note reconnections may mean we call disconnect multiple times per
2467
transport which is suboptimal but seems harmless.
2469
self.addCleanup(transport.disconnect)
2471
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2472
_add_disconnect_cleanup, None)
2474
self._make_test_root()
2475
self.addCleanup(os.chdir, os.getcwdu())
2476
self.makeAndChdirToTestDir()
2477
self.overrideEnvironmentForTesting()
2478
self.__readonly_server = None
2479
self.__server = None
2480
self.reduceLockdirTimeout()
2481
# Each test may use its own config files even if the local config files
2482
# don't actually exist. They'll rightly fail if they try to create them
2484
self.overrideAttr(config, '_shared_stores', {})
2169
2486
def get_transport(self, relpath=None):
2170
2487
"""Return a writeable transport.
2363
2694
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2364
2695
self.permit_dir(self.test_dir)
2366
def make_branch(self, relpath, format=None):
2697
def make_branch(self, relpath, format=None, name=None):
2367
2698
"""Create a branch on the transport at relpath."""
2368
2699
repo = self.make_repository(relpath, format=format)
2369
return repo.bzrdir.create_branch()
2700
return repo.bzrdir.create_branch(append_revisions_only=False,
2703
def get_default_format(self):
2706
def resolve_format(self, format):
2707
"""Resolve an object to a ControlDir format object.
2709
The initial format object can either already be
2710
a ControlDirFormat, None (for the default format),
2711
or a string with the name of the control dir format.
2713
:param format: Object to resolve
2714
:return A ControlDirFormat instance
2717
format = self.get_default_format()
2718
if isinstance(format, basestring):
2719
format = controldir.format_registry.make_bzrdir(format)
2371
2722
def make_bzrdir(self, relpath, format=None):
2418
2766
test_home_dir = self.test_home_dir
2419
2767
if isinstance(test_home_dir, unicode):
2420
2768
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2421
os.environ['HOME'] = test_home_dir
2422
os.environ['BZR_HOME'] = test_home_dir
2425
super(TestCaseWithMemoryTransport, self).setUp()
2426
# Ensure that ConnectedTransport doesn't leak sockets
2427
def get_transport_with_cleanup(*args, **kwargs):
2428
t = orig_get_transport(*args, **kwargs)
2429
if isinstance(t, _mod_transport.ConnectedTransport):
2430
self.addCleanup(t.disconnect)
2433
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
get_transport_with_cleanup)
2435
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2437
self.makeAndChdirToTestDir()
2438
self.overrideEnvironmentForTesting()
2439
self.__readonly_server = None
2440
self.__server = None
2441
self.reduceLockdirTimeout()
2769
self.overrideEnv('HOME', test_home_dir)
2770
self.overrideEnv('BZR_HOME', test_home_dir)
2443
2772
def setup_smart_server_with_call_log(self):
2444
2773
"""Sets up a smart server as the transport server with a call log."""
2445
2774
self.transport_server = test_server.SmartTCPServer_for_testing
2775
self.hpss_connections = []
2446
2776
self.hpss_calls = []
2447
2777
import traceback
2448
2778
# Skip the current stack down to the caller of
3108
3440
"""A decorator which excludes test matching an exclude pattern."""
3110
3442
def __init__(self, suite, exclude_pattern):
3111
TestDecorator.__init__(self, suite)
3112
self.exclude_pattern = exclude_pattern
3113
self.excluded = False
3117
return iter(self._tests)
3118
self.excluded = True
3119
suite = exclude_tests_by_re(self, self.exclude_pattern)
3121
self.addTests(suite)
3122
return iter(self._tests)
3443
super(ExcludeDecorator, self).__init__(
3444
exclude_tests_by_re(suite, exclude_pattern))
3125
3447
class FilterTestsDecorator(TestDecorator):
3126
3448
"""A decorator which filters tests to those matching a pattern."""
3128
3450
def __init__(self, suite, pattern):
3129
TestDecorator.__init__(self, suite)
3130
self.pattern = pattern
3131
self.filtered = False
3135
return iter(self._tests)
3136
self.filtered = True
3137
suite = filter_suite_by_re(self, self.pattern)
3139
self.addTests(suite)
3140
return iter(self._tests)
3451
super(FilterTestsDecorator, self).__init__(
3452
filter_suite_by_re(suite, pattern))
3143
3455
class RandomDecorator(TestDecorator):
3144
3456
"""A decorator which randomises the order of its tests."""
3146
3458
def __init__(self, suite, random_seed, stream):
3147
TestDecorator.__init__(self, suite)
3148
self.random_seed = random_seed
3149
self.randomised = False
3150
self.stream = stream
3154
return iter(self._tests)
3155
self.randomised = True
3156
self.stream.write("Randomizing test order using seed %s\n\n" %
3157
(self.actual_seed()))
3459
random_seed = self.actual_seed(random_seed)
3460
stream.write("Randomizing test order using seed %s\n\n" %
3158
3462
# Initialise the random number generator.
3159
random.seed(self.actual_seed())
3160
suite = randomize_suite(self)
3162
self.addTests(suite)
3163
return iter(self._tests)
3463
random.seed(random_seed)
3464
super(RandomDecorator, self).__init__(randomize_suite(suite))
3165
def actual_seed(self):
3166
if self.random_seed == "now":
3467
def actual_seed(seed):
3167
3469
# We convert the seed to a long to make it reuseable across
3168
3470
# invocations (because the user can reenter it).
3169
self.random_seed = long(time.time())
3471
return long(time.time())
3171
3473
# Convert the seed to a long if we can
3173
self.random_seed = long(self.random_seed)
3476
except (TypeError, ValueError):
3176
return self.random_seed
3179
3481
class TestFirstDecorator(TestDecorator):
3180
3482
"""A decorator which moves named tests to the front."""
3182
3484
def __init__(self, suite, pattern):
3183
TestDecorator.__init__(self, suite)
3184
self.pattern = pattern
3185
self.filtered = False
3189
return iter(self._tests)
3190
self.filtered = True
3191
suites = split_suite_by_re(self, self.pattern)
3193
self.addTests(suites)
3194
return iter(self._tests)
3485
super(TestFirstDecorator, self).__init__()
3486
self.addTests(split_suite_by_re(suite, pattern))
3197
3489
def partition_tests(suite, count):
3242
3534
ProtocolTestCase.run(self, result)
3244
os.waitpid(self.pid, 0)
3536
pid, status = os.waitpid(self.pid, 0)
3537
# GZ 2011-10-18: If status is nonzero, should report to the result
3538
# that something went wrong.
3246
3540
test_blocks = partition_tests(suite, concurrency)
3541
# Clear the tests from the original suite so it doesn't keep them alive
3542
suite._tests[:] = []
3247
3543
for process_tests in test_blocks:
3248
process_suite = TestUtil.TestSuite()
3249
process_suite.addTests(process_tests)
3544
process_suite = TestUtil.TestSuite(process_tests)
3545
# Also clear each split list so new suite has only reference
3546
process_tests[:] = []
3250
3547
c2pread, c2pwrite = os.pipe()
3251
3548
pid = os.fork()
3253
workaround_zealous_crypto_random()
3551
stream = os.fdopen(c2pwrite, 'wb', 1)
3552
workaround_zealous_crypto_random()
3255
3553
os.close(c2pread)
3256
3554
# Leave stderr and stdout open so we can see test noise
3257
3555
# Close stdin so that the child goes away if it decides to
3258
3556
# read from stdin (otherwise its a roulette to see what
3259
3557
# child actually gets keystrokes for pdb etc).
3260
3558
sys.stdin.close()
3262
stream = os.fdopen(c2pwrite, 'wb', 1)
3263
3559
subunit_result = AutoTimingTestResultDecorator(
3264
TestProtocolClient(stream))
3560
SubUnitBzrProtocolClient(stream))
3265
3561
process_suite.run(subunit_result)
3563
# Try and report traceback on stream, but exit with error even
3564
# if stream couldn't be created or something else goes wrong.
3565
# The traceback is formatted to a string and written in one go
3566
# to avoid interleaving lines from multiple failing children.
3568
stream.write(traceback.format_exc())
3269
3573
os.close(c2pwrite)
3270
3574
stream = os.fdopen(c2pread, 'rb', 1)
3334
class ForwardingResult(unittest.TestResult):
3336
def __init__(self, target):
3337
unittest.TestResult.__init__(self)
3338
self.result = target
3340
def startTest(self, test):
3341
self.result.startTest(test)
3343
def stopTest(self, test):
3344
self.result.stopTest(test)
3346
def startTestRun(self):
3347
self.result.startTestRun()
3349
def stopTestRun(self):
3350
self.result.stopTestRun()
3352
def addSkip(self, test, reason):
3353
self.result.addSkip(test, reason)
3355
def addSuccess(self, test):
3356
self.result.addSuccess(test)
3358
def addError(self, test, err):
3359
self.result.addError(test, err)
3361
def addFailure(self, test, err):
3362
self.result.addFailure(test, err)
3363
ForwardingResult = testtools.ExtendedToOriginalDecorator
3366
class ProfileResult(ForwardingResult):
3638
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3367
3639
"""Generate profiling data for all activity between start and success.
3369
3641
The profile data is appended to the test's _benchcalls attribute and can
3703
3979
'bzrlib.tests.test_commit_merge',
3704
3980
'bzrlib.tests.test_config',
3705
3981
'bzrlib.tests.test_conflicts',
3982
'bzrlib.tests.test_controldir',
3706
3983
'bzrlib.tests.test_counted_lock',
3707
3984
'bzrlib.tests.test_crash',
3708
3985
'bzrlib.tests.test_decorators',
3709
3986
'bzrlib.tests.test_delta',
3710
3987
'bzrlib.tests.test_debug',
3711
'bzrlib.tests.test_deprecated_graph',
3712
3988
'bzrlib.tests.test_diff',
3713
3989
'bzrlib.tests.test_directory_service',
3714
3990
'bzrlib.tests.test_dirstate',
3715
3991
'bzrlib.tests.test_email_message',
3716
3992
'bzrlib.tests.test_eol_filters',
3717
3993
'bzrlib.tests.test_errors',
3994
'bzrlib.tests.test_estimate_compressed_size',
3718
3995
'bzrlib.tests.test_export',
3996
'bzrlib.tests.test_export_pot',
3719
3997
'bzrlib.tests.test_extract',
3998
'bzrlib.tests.test_features',
3720
3999
'bzrlib.tests.test_fetch',
3721
4000
'bzrlib.tests.test_fixtures',
3722
4001
'bzrlib.tests.test_fifo_cache',
3723
4002
'bzrlib.tests.test_filters',
4003
'bzrlib.tests.test_filter_tree',
3724
4004
'bzrlib.tests.test_ftp_transport',
3725
4005
'bzrlib.tests.test_foreign',
3726
4006
'bzrlib.tests.test_generate_docs',
3961
4252
# Some tests mentioned in the list are not in the test suite. The
3962
4253
# list may be out of date, report to the tester.
3963
4254
for id in not_found:
3964
bzrlib.trace.warning('"%s" not found in the test suite', id)
4255
trace.warning('"%s" not found in the test suite', id)
3965
4256
for id in duplicates:
3966
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4257
trace.warning('"%s" is used as an id by several tests', id)
3971
def multiply_scenarios(scenarios_left, scenarios_right):
4262
def multiply_scenarios(*scenarios):
4263
"""Multiply two or more iterables of scenarios.
4265
It is safe to pass scenario generators or iterators.
4267
:returns: A list of compound scenarios: the cross-product of all
4268
scenarios, with the names concatenated and the parameters
4271
return reduce(_multiply_two_scenarios, map(list, scenarios))
4274
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3972
4275
"""Multiply two sets of scenarios.
3974
4277
:returns: the cartesian product of the two sets of scenarios, that is
4144
4448
% (os.path.basename(dirname), printable_e))
4147
class Feature(object):
4148
"""An operating system Feature."""
4151
self._available = None
4153
def available(self):
4154
"""Is the feature available?
4156
:return: True if the feature is available.
4158
if self._available is None:
4159
self._available = self._probe()
4160
return self._available
4163
"""Implement this method in concrete features.
4165
:return: True if the feature is available.
4167
raise NotImplementedError
4170
if getattr(self, 'feature_name', None):
4171
return self.feature_name()
4172
return self.__class__.__name__
4175
class _SymlinkFeature(Feature):
4178
return osutils.has_symlinks()
4180
def feature_name(self):
4183
SymlinkFeature = _SymlinkFeature()
4186
class _HardlinkFeature(Feature):
4189
return osutils.has_hardlinks()
4191
def feature_name(self):
4194
HardlinkFeature = _HardlinkFeature()
4197
class _OsFifoFeature(Feature):
4200
return getattr(os, 'mkfifo', None)
4202
def feature_name(self):
4203
return 'filesystem fifos'
4205
OsFifoFeature = _OsFifoFeature()
4208
class _UnicodeFilenameFeature(Feature):
4209
"""Does the filesystem support Unicode filenames?"""
4213
# Check for character combinations unlikely to be covered by any
4214
# single non-unicode encoding. We use the characters
4215
# - greek small letter alpha (U+03B1) and
4216
# - braille pattern dots-123456 (U+283F).
4217
os.stat(u'\u03b1\u283f')
4218
except UnicodeEncodeError:
4220
except (IOError, OSError):
4221
# The filesystem allows the Unicode filename but the file doesn't
4225
# The filesystem allows the Unicode filename and the file exists,
4229
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4232
class _CompatabilityThunkFeature(Feature):
4233
"""This feature is just a thunk to another feature.
4235
It issues a deprecation warning if it is accessed, to let you know that you
4236
should really use a different feature.
4239
def __init__(self, dep_version, module, name,
4240
replacement_name, replacement_module=None):
4241
super(_CompatabilityThunkFeature, self).__init__()
4242
self._module = module
4243
if replacement_module is None:
4244
replacement_module = module
4245
self._replacement_module = replacement_module
4247
self._replacement_name = replacement_name
4248
self._dep_version = dep_version
4249
self._feature = None
4252
if self._feature is None:
4253
depr_msg = self._dep_version % ('%s.%s'
4254
% (self._module, self._name))
4255
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4256
self._replacement_name)
4257
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4258
# Import the new feature and use it as a replacement for the
4260
mod = __import__(self._replacement_module, {}, {},
4261
[self._replacement_name])
4262
self._feature = getattr(mod, self._replacement_name)
4266
return self._feature._probe()
4269
class ModuleAvailableFeature(Feature):
4270
"""This is a feature than describes a module we want to be available.
4272
Declare the name of the module in __init__(), and then after probing, the
4273
module will be available as 'self.module'.
4275
:ivar module: The module if it is available, else None.
4278
def __init__(self, module_name):
4279
super(ModuleAvailableFeature, self).__init__()
4280
self.module_name = module_name
4284
self._module = __import__(self.module_name, {}, {}, [''])
4291
if self.available(): # Make sure the probe has been done
4295
def feature_name(self):
4296
return self.module_name
4299
# This is kept here for compatibility, it is recommended to use
4300
# 'bzrlib.tests.feature.paramiko' instead
4301
ParamikoFeature = _CompatabilityThunkFeature(
4302
deprecated_in((2,1,0)),
4303
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4306
4451
def probe_unicode_in_user_encoding():
4307
4452
"""Try to encode several unicode strings to use in unicode-aware tests.
4308
4453
Return first successfull match.
4339
class _HTTPSServerFeature(Feature):
4340
"""Some tests want an https Server, check if one is available.
4342
Right now, the only way this is available is under python2.6 which provides
4353
def feature_name(self):
4354
return 'HTTPSServer'
4357
HTTPSServerFeature = _HTTPSServerFeature()
4360
class _UnicodeFilename(Feature):
4361
"""Does the filesystem support Unicode filenames?"""
4366
except UnicodeEncodeError:
4368
except (IOError, OSError):
4369
# The filesystem allows the Unicode filename but the file doesn't
4373
# The filesystem allows the Unicode filename and the file exists,
4377
UnicodeFilename = _UnicodeFilename()
4380
class _ByteStringNamedFilesystem(Feature):
4381
"""Is the filesystem based on bytes?"""
4384
if os.name == "posix":
4388
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4391
class _UTF8Filesystem(Feature):
4392
"""Is the filesystem UTF-8?"""
4395
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4399
UTF8Filesystem = _UTF8Filesystem()
4402
class _BreakinFeature(Feature):
4403
"""Does this platform support the breakin feature?"""
4406
from bzrlib import breakin
4407
if breakin.determine_signal() is None:
4409
if sys.platform == 'win32':
4410
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4411
# We trigger SIGBREAK via a Console api so we need ctypes to
4412
# access the function
4419
def feature_name(self):
4420
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4423
BreakinFeature = _BreakinFeature()
4426
class _CaseInsCasePresFilenameFeature(Feature):
4427
"""Is the file-system case insensitive, but case-preserving?"""
4430
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4432
# first check truly case-preserving for created files, then check
4433
# case insensitive when opening existing files.
4434
name = osutils.normpath(name)
4435
base, rel = osutils.split(name)
4436
found_rel = osutils.canonical_relpath(base, name)
4437
return (found_rel == rel
4438
and os.path.isfile(name.upper())
4439
and os.path.isfile(name.lower()))
4444
def feature_name(self):
4445
return "case-insensitive case-preserving filesystem"
4447
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4450
class _CaseInsensitiveFilesystemFeature(Feature):
4451
"""Check if underlying filesystem is case-insensitive but *not* case
4454
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4455
# more likely to be case preserving, so this case is rare.
4458
if CaseInsCasePresFilenameFeature.available():
4461
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4462
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4463
TestCaseWithMemoryTransport.TEST_ROOT = root
4465
root = TestCaseWithMemoryTransport.TEST_ROOT
4466
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4468
name_a = osutils.pathjoin(tdir, 'a')
4469
name_A = osutils.pathjoin(tdir, 'A')
4471
result = osutils.isdir(name_A)
4472
_rmtree_temp_dir(tdir)
4475
def feature_name(self):
4476
return 'case-insensitive filesystem'
4478
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4481
class _CaseSensitiveFilesystemFeature(Feature):
4484
if CaseInsCasePresFilenameFeature.available():
4486
elif CaseInsensitiveFilesystemFeature.available():
4491
def feature_name(self):
4492
return 'case-sensitive filesystem'
4494
# new coding style is for feature instances to be lowercase
4495
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4498
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4499
SubUnitFeature = _CompatabilityThunkFeature(
4500
deprecated_in((2,1,0)),
4501
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4502
4484
# Only define SubUnitBzrRunner if subunit is available.
4504
4486
from subunit import TestProtocolClient
4505
4487
from subunit.test_results import AutoTimingTestResultDecorator
4506
4488
class SubUnitBzrProtocolClient(TestProtocolClient):
4490
def stopTest(self, test):
4491
super(SubUnitBzrProtocolClient, self).stopTest(test)
4492
_clear__type_equality_funcs(test)
4508
4494
def addSuccess(self, test, details=None):
4509
4495
# The subunit client always includes the details in the subunit
4510
4496
# stream, but we don't want to include it in ours.
4522
4508
except ImportError:
4525
class _PosixPermissionsFeature(Feature):
4529
# create temporary file and check if specified perms are maintained.
4532
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4533
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4536
os.chmod(name, write_perms)
4538
read_perms = os.stat(name).st_mode & 0777
4540
return (write_perms == read_perms)
4542
return (os.name == 'posix') and has_perms()
4544
def feature_name(self):
4545
return 'POSIX permissions support'
4547
posix_permissions_feature = _PosixPermissionsFeature()
4512
# API compatibility for old plugins; see bug 892622.
4515
'HTTPServerFeature',
4516
'ModuleAvailableFeature',
4517
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4518
'OsFifoFeature', 'UnicodeFilenameFeature',
4519
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4520
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4521
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4522
'posix_permissions_feature',
4524
globals()[name] = _CompatabilityThunkFeature(
4525
symbol_versioning.deprecated_in((2, 5, 0)),
4526
'bzrlib.tests', name,
4527
name, 'bzrlib.tests.features')
4530
for (old_name, new_name) in [
4531
('UnicodeFilename', 'UnicodeFilenameFeature'),
4533
globals()[name] = _CompatabilityThunkFeature(
4534
symbol_versioning.deprecated_in((2, 5, 0)),
4535
'bzrlib.tests', old_name,
4536
new_name, 'bzrlib.tests.features')