54
56
# nb: check this before importing anything else from within it
55
57
_testtools_version = getattr(testtools, '__version__', ())
56
if _testtools_version < (0, 9, 5):
57
raise ImportError("need at least testtools 0.9.5: %s is %r"
58
if _testtools_version < (0, 9, 2):
59
raise ImportError("need at least testtools 0.9.2: %s is %r"
58
60
% (testtools.__file__, _testtools_version))
59
61
from testtools import content
62
63
from bzrlib import (
66
commands as _mod_commands,
76
plugin as _mod_plugin,
83
77
transport as _mod_transport,
81
import bzrlib.commands
82
import bzrlib.timestamp
84
import bzrlib.inventory
85
import bzrlib.iterablefile
87
88
import bzrlib.lsprof
88
89
except ImportError:
89
90
# lsprof not available
91
from bzrlib.smart import client, request
92
from bzrlib.merge import merge_inner
95
from bzrlib.smart import client, request, server
97
from bzrlib import symbol_versioning
98
from bzrlib.symbol_versioning import (
92
106
from bzrlib.transport import (
96
from bzrlib.symbol_versioning import (
110
from bzrlib.trace import mutter, note
100
111
from bzrlib.tests import (
106
116
from bzrlib.ui import NullProgressView
107
117
from bzrlib.ui.text import TextUIFactory
108
from bzrlib.tests.features import _CompatabilityThunkFeature
118
import bzrlib.version_info_formats.format_custom
119
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
110
121
# Mark this python module as being part of the implementation
111
122
# of unittest: this gives us better tracebacks where the last
128
139
TestSuite = TestUtil.TestSuite
129
140
TestLoader = TestUtil.TestLoader
131
# Tests should run in a clean and clearly defined environment. The goal is to
132
# keep them isolated from the running environment as mush as possible. The test
133
# framework ensures the variables defined below are set (or deleted if the
134
# value is None) before a test is run and reset to their original value after
135
# the test is run. Generally if some code depends on an environment variable,
136
# the tests should start without this variable in the environment. There are a
137
# few exceptions but you shouldn't violate this rule lightly.
141
'XDG_CONFIG_HOME': None,
142
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
143
# tests do check our impls match APPDATA
144
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
148
'BZREMAIL': None, # may still be present in the environment
149
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
150
'BZR_PROGRESS_BAR': None,
151
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
152
# as a base class instead of TestCaseInTempDir. Tests inheriting from
153
# TestCase should not use disk resources, BZR_LOG is one.
154
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
155
'BZR_PLUGIN_PATH': None,
156
'BZR_DISABLE_PLUGINS': None,
157
'BZR_PLUGINS_AT': None,
158
'BZR_CONCURRENCY': None,
159
# Make sure that any text ui tests are consistent regardless of
160
# the environment the test case is run in; you may want tests that
161
# test other combinations. 'dumb' is a reasonable guess for tests
162
# going to a pipe or a StringIO.
168
'SSH_AUTH_SOCK': None,
178
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
179
# least. If you do (care), please update this comment
183
'BZR_REMOTE_PATH': None,
184
# Generally speaking, we don't want apport reporting on crashes in
185
# the test envirnoment unless we're specifically testing apport,
186
# so that it doesn't leak into the real system environment. We
187
# use an env var so it propagates to subprocesses.
188
'APPORT_DISABLE': '1',
192
def override_os_environ(test, env=None):
193
"""Modify os.environ keeping a copy.
195
:param test: A test instance
197
:param env: A dict containing variable definitions to be installed
200
env = isolated_environ
201
test._original_os_environ = dict([(var, value)
202
for var, value in os.environ.iteritems()])
203
for var, value in env.iteritems():
204
osutils.set_or_unset_env(var, value)
205
if var not in test._original_os_environ:
206
# The var is new, add it with a value of None, so
207
# restore_os_environ will delete it
208
test._original_os_environ[var] = None
211
def restore_os_environ(test):
212
"""Restore os.environ to its original state.
214
:param test: A test instance previously passed to override_os_environ.
216
for var, value in test._original_os_environ.iteritems():
217
# Restore the original value (or delete it if the value has been set to
218
# None in override_os_environ).
219
osutils.set_or_unset_env(var, value)
222
def _clear__type_equality_funcs(test):
223
"""Cleanup bound methods stored on TestCase instances
225
Clear the dict breaking a few (mostly) harmless cycles in the affected
226
unittests released with Python 2.6 and initial Python 2.7 versions.
228
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
229
shipped in Oneiric, an object with no clear method was used, hence the
230
extra complications, see bug 809048 for details.
232
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
233
if type_equality_funcs is not None:
234
tef_clear = getattr(type_equality_funcs, "clear", None)
235
if tef_clear is None:
236
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
237
if tef_instance_dict is not None:
238
tef_clear = tef_instance_dict.clear
239
if tef_clear is not None:
243
142
class ExtendedTestResult(testtools.TextTestResult):
244
143
"""Accepts, reports and accumulates the results of running tests.
408
288
self.report_test_start(test)
409
289
test.number = self.count
410
290
self._recordTestStartTime()
411
# Make testtools cases give us the real traceback on failure
412
addOnException = getattr(test, "addOnException", None)
413
if addOnException is not None:
414
addOnException(self._record_traceback_from_test)
415
# Only check for thread leaks on bzrlib derived test cases
416
if isinstance(test, TestCase):
417
test.addCleanup(self._check_leaked_threads, test)
419
def stopTest(self, test):
420
super(ExtendedTestResult, self).stopTest(test)
421
# Manually break cycles, means touching various private things but hey
422
getDetails = getattr(test, "getDetails", None)
423
if getDetails is not None:
425
_clear__type_equality_funcs(test)
426
self._traceback_from_test = None
291
# Only check for thread leaks if the test case supports cleanups
292
addCleanup = getattr(test, "addCleanup", None)
293
if addCleanup is not None:
294
addCleanup(self._check_leaked_threads, test)
428
296
def startTests(self):
429
297
self.report_tests_starting()
1072
862
if name in details:
1073
863
del details[name]
1075
def install_counter_hook(self, hooks, name, counter_name=None):
1076
"""Install a counting hook.
1078
Any hook can be counted as long as it doesn't need to return a value.
1080
:param hooks: Where the hook should be installed.
1082
:param name: The hook name that will be counted.
1084
:param counter_name: The counter identifier in ``_counters``, defaults
1087
_counters = self._counters # Avoid closing over self
1088
if counter_name is None:
1090
if _counters.has_key(counter_name):
1091
raise AssertionError('%s is already used as a counter name'
1093
_counters[counter_name] = 0
1094
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1095
lambda: ['%d' % (_counters[counter_name],)]))
1096
def increment_counter(*args, **kwargs):
1097
_counters[counter_name] += 1
1098
label = 'count %s calls' % (counter_name,)
1099
hooks.install_named_hook(name, increment_counter, label)
1100
self.addCleanup(hooks.uninstall_named_hook, name, label)
1102
def _install_config_stats_hooks(self):
1103
"""Install config hooks to count hook calls.
1106
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1107
self.install_counter_hook(config.ConfigHooks, hook_name,
1108
'config.%s' % (hook_name,))
1110
# The OldConfigHooks are private and need special handling to protect
1111
# against recursive tests (tests that run other tests), so we just do
1112
# manually what registering them into _builtin_known_hooks will provide
1114
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1115
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1116
self.install_counter_hook(config.OldConfigHooks, hook_name,
1117
'old_config.%s' % (hook_name,))
1119
865
def _clear_debug_flags(self):
1120
866
"""Prevent externally set debug flags affecting tests.
1132
878
def _clear_hooks(self):
1133
879
# prevent hooks affecting tests
1134
known_hooks = hooks.known_hooks
1135
880
self._preserved_hooks = {}
1136
for key, (parent, name) in known_hooks.iter_parent_objects():
1137
current_hooks = getattr(parent, name)
881
for key, factory in hooks.known_hooks.items():
882
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
883
current_hooks = hooks.known_hooks_key_to_object(key)
1138
884
self._preserved_hooks[parent] = (name, current_hooks)
1139
self._preserved_lazy_hooks = hooks._lazy_hooks
1140
hooks._lazy_hooks = {}
1141
885
self.addCleanup(self._restoreHooks)
1142
for key, (parent, name) in known_hooks.iter_parent_objects():
1143
factory = known_hooks.get(key)
886
for key, factory in hooks.known_hooks.items():
887
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1144
888
setattr(parent, name, factory())
1145
889
# this hook should always be installed
1146
890
request._install_hook()
1603
1334
self.assertEqual(expected_docstring, obj.__doc__)
1605
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1606
1336
def failUnlessExists(self, path):
1607
return self.assertPathExists(path)
1609
def assertPathExists(self, path):
1610
1337
"""Fail unless path or paths, which may be abs or relative, exist."""
1611
1338
if not isinstance(path, basestring):
1613
self.assertPathExists(p)
1340
self.failUnlessExists(p)
1615
self.assertTrue(osutils.lexists(path),
1616
path + " does not exist")
1342
self.failUnless(osutils.lexists(path),path+" does not exist")
1618
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1619
1344
def failIfExists(self, path):
1620
return self.assertPathDoesNotExist(path)
1622
def assertPathDoesNotExist(self, path):
1623
1345
"""Fail if path or paths, which may be abs or relative, exist."""
1624
1346
if not isinstance(path, basestring):
1626
self.assertPathDoesNotExist(p)
1348
self.failIfExists(p)
1628
self.assertFalse(osutils.lexists(path),
1350
self.failIf(osutils.lexists(path),path+" exists")
1631
1352
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1632
1353
"""A helper for callDeprecated and applyDeprecated.
1745
1465
def _startLogFile(self):
1746
"""Setup a in-memory target for bzr and testcase log messages"""
1747
pseudo_log_file = StringIO()
1748
def _get_log_contents_for_weird_testtools_api():
1749
return [pseudo_log_file.getvalue().decode(
1750
"utf-8", "replace").encode("utf-8")]
1751
self.addDetail("log", content.Content(content.ContentType("text",
1752
"plain", {"charset": "utf8"}),
1753
_get_log_contents_for_weird_testtools_api))
1754
self._log_file = pseudo_log_file
1755
self._log_memento = trace.push_log_file(self._log_file)
1466
"""Send bzr and test log messages to a temporary file.
1468
The file is removed as the test is torn down.
1470
self._log_file = StringIO()
1471
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1756
1472
self.addCleanup(self._finishLogFile)
1758
1474
def _finishLogFile(self):
1759
"""Flush and dereference the in-memory log for this testcase"""
1760
if trace._trace_file:
1475
"""Finished with the log file.
1477
Close the file and delete it, unless setKeepLogfile was called.
1479
if bzrlib.trace._trace_file:
1761
1480
# flush the log file, to get all content
1762
trace._trace_file.flush()
1763
trace.pop_log_file(self._log_memento)
1764
# The logging module now tracks references for cleanup so discard ours
1765
del self._log_memento
1481
bzrlib.trace._trace_file.flush()
1482
bzrlib.trace.pop_log_file(self._log_memento)
1483
# Cache the log result and delete the file on disk
1484
self._get_log(False)
1767
1486
def thisFailsStrictLockCheck(self):
1768
1487
"""It is known that this test would fail with -Dstrict_locks.
1793
1509
:returns: The actual attr value.
1511
value = getattr(obj, attr_name)
1795
1512
# The actual value is captured by the call below
1796
value = getattr(obj, attr_name, _unitialized_attr)
1797
if value is _unitialized_attr:
1798
# When the test completes, the attribute should not exist, but if
1799
# we aren't setting a value, we don't need to do anything.
1800
if new is not _unitialized_attr:
1801
self.addCleanup(delattr, obj, attr_name)
1803
self.addCleanup(setattr, obj, attr_name, value)
1513
self.addCleanup(setattr, obj, attr_name, value)
1804
1514
if new is not _unitialized_attr:
1805
1515
setattr(obj, attr_name, new)
1808
def overrideEnv(self, name, new):
1809
"""Set an environment variable, and reset it after the test.
1811
:param name: The environment variable name.
1813
:param new: The value to set the variable to. If None, the
1814
variable is deleted from the environment.
1816
:returns: The actual variable value.
1818
value = osutils.set_or_unset_env(name, new)
1819
self.addCleanup(osutils.set_or_unset_env, name, value)
1822
def recordCalls(self, obj, attr_name):
1823
"""Monkeypatch in a wrapper that will record calls.
1825
The monkeypatch is automatically removed when the test concludes.
1827
:param obj: The namespace holding the reference to be replaced;
1828
typically a module, class, or object.
1829
:param attr_name: A string for the name of the attribute to
1831
:returns: A list that will be extended with one item every time the
1832
function is called, with a tuple of (args, kwargs).
1836
def decorator(*args, **kwargs):
1837
calls.append((args, kwargs))
1838
return orig(*args, **kwargs)
1839
orig = self.overrideAttr(obj, attr_name, decorator)
1842
1518
def _cleanEnvironment(self):
1843
for name, value in isolated_environ.iteritems():
1844
self.overrideEnv(name, value)
1520
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1521
'HOME': os.getcwd(),
1522
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1523
# tests do check our impls match APPDATA
1524
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1528
'BZREMAIL': None, # may still be present in the environment
1529
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1530
'BZR_PROGRESS_BAR': None,
1532
'BZR_PLUGIN_PATH': None,
1533
'BZR_DISABLE_PLUGINS': None,
1534
'BZR_PLUGINS_AT': None,
1535
'BZR_CONCURRENCY': None,
1536
# Make sure that any text ui tests are consistent regardless of
1537
# the environment the test case is run in; you may want tests that
1538
# test other combinations. 'dumb' is a reasonable guess for tests
1539
# going to a pipe or a StringIO.
1543
'BZR_COLUMNS': '80',
1545
'SSH_AUTH_SOCK': None,
1549
'https_proxy': None,
1550
'HTTPS_PROXY': None,
1555
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1556
# least. If you do (care), please update this comment
1560
'BZR_REMOTE_PATH': None,
1561
# Generally speaking, we don't want apport reporting on crashes in
1562
# the test envirnoment unless we're specifically testing apport,
1563
# so that it doesn't leak into the real system environment. We
1564
# use an env var so it propagates to subprocesses.
1565
'APPORT_DISABLE': '1',
1568
self.addCleanup(self._restoreEnvironment)
1569
for name, value in new_env.iteritems():
1570
self._captureVar(name, value)
1572
def _captureVar(self, name, newvalue):
1573
"""Set an environment variable, and reset it when finished."""
1574
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1576
def _restoreEnvironment(self):
1577
for name, value in self._old_env.iteritems():
1578
osutils.set_or_unset_env(name, value)
1846
1580
def _restoreHooks(self):
1847
1581
for klass, (name, hooks) in self._preserved_hooks.items():
1848
1582
setattr(klass, name, hooks)
1849
self._preserved_hooks.clear()
1850
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1851
self._preserved_lazy_hooks.clear()
1853
1584
def knownFailure(self, reason):
1854
"""Declare that this test fails for a known reason
1856
Tests that are known to fail should generally be using expectedFailure
1857
with an appropriate reverse assertion if a change could cause the test
1858
to start passing. Conversely if the test has no immediate prospect of
1859
succeeding then using skip is more suitable.
1861
When this method is called while an exception is being handled, that
1862
traceback will be used, otherwise a new exception will be thrown to
1863
provide one but won't be reported.
1865
self._add_reason(reason)
1867
exc_info = sys.exc_info()
1868
if exc_info != (None, None, None):
1869
self._report_traceback(exc_info)
1872
raise self.failureException(reason)
1873
except self.failureException:
1874
exc_info = sys.exc_info()
1875
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1876
raise testtools.testcase._ExpectedFailure(exc_info)
1585
"""This test has failed for some known reason."""
1586
raise KnownFailure(reason)
1880
1588
def _suppress_log(self):
1881
1589
"""Remove the log info from details."""
1967
1675
self._benchtime += time.time() - start
1969
1677
def log(self, *args):
1680
def _get_log(self, keep_log_file=False):
1681
"""Internal helper to get the log from bzrlib.trace for this test.
1683
Please use self.getDetails, or self.get_log to access this in test case
1686
:param keep_log_file: When True, if the log is still a file on disk
1687
leave it as a file on disk. When False, if the log is still a file
1688
on disk, the log file is deleted and the log preserved as
1690
:return: A string containing the log.
1692
if self._log_contents is not None:
1694
self._log_contents.decode('utf8')
1695
except UnicodeDecodeError:
1696
unicodestr = self._log_contents.decode('utf8', 'replace')
1697
self._log_contents = unicodestr.encode('utf8')
1698
return self._log_contents
1699
if self._log_file is not None:
1700
log_contents = self._log_file.getvalue()
1702
log_contents.decode('utf8')
1703
except UnicodeDecodeError:
1704
unicodestr = log_contents.decode('utf8', 'replace')
1705
log_contents = unicodestr.encode('utf8')
1706
if not keep_log_file:
1707
self._log_file = None
1708
# Permit multiple calls to get_log until we clean it up in
1710
self._log_contents = log_contents
1713
return "No log file content."
1972
1715
def get_log(self):
1973
1716
"""Get a unicode string containing the log from bzrlib.trace.
2254
def _add_subprocess_log(self, log_file_path):
2255
if len(self._log_files) == 0:
2256
# Register an addCleanup func. We do this on the first call to
2257
# _add_subprocess_log rather than in TestCase.setUp so that this
2258
# addCleanup is registered after any cleanups for tempdirs that
2259
# subclasses might create, which will probably remove the log file
2261
self.addCleanup(self._subprocess_log_cleanup)
2262
# self._log_files is a set, so if a log file is reused we won't grab it
2264
self._log_files.add(log_file_path)
2266
def _subprocess_log_cleanup(self):
2267
for count, log_file_path in enumerate(self._log_files):
2268
# We use buffer_now=True to avoid holding the file open beyond
2269
# the life of this function, which might interfere with e.g.
2270
# cleaning tempdirs on Windows.
2271
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2272
#detail_content = content.content_from_file(
2273
# log_file_path, buffer_now=True)
2274
with open(log_file_path, 'rb') as log_file:
2275
log_file_bytes = log_file.read()
2276
detail_content = content.Content(content.ContentType("text",
2277
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2278
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2281
1985
def _popen(self, *args, **kwargs):
2282
1986
"""Place a call to Popen.
2320
2024
if retcode is not None and retcode != process.returncode:
2321
2025
if process_args is None:
2322
2026
process_args = "(unknown args)"
2323
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2324
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2027
mutter('Output of bzr %s:\n%s', process_args, out)
2028
mutter('Error for bzr %s:\n%s', process_args, err)
2325
2029
self.fail('Command bzr %s failed with retcode %s != %s'
2326
2030
% (process_args, retcode, process.returncode))
2327
2031
return [out, err]
2329
def check_tree_shape(self, tree, shape):
2330
"""Compare a tree to a list of expected names.
2033
def check_inventory_shape(self, inv, shape):
2034
"""Compare an inventory to a list of expected names.
2332
2036
Fail if they are not precisely equal.
2335
2039
shape = list(shape) # copy
2336
for path, ie in tree.iter_entries_by_dir():
2040
for path, ie in inv.entries():
2337
2041
name = path.replace('\\', '/')
2338
2042
if ie.kind == 'directory':
2339
2043
name = name + '/'
2341
pass # ignore root entry
2343
2045
shape.remove(name)
2345
2047
extras.append(name)
2437
2137
class TestCaseWithMemoryTransport(TestCase):
2438
2138
"""Common test class for tests that do not need disk resources.
2440
Tests that need disk resources should derive from TestCaseInTempDir
2441
orTestCaseWithTransport.
2140
Tests that need disk resources should derive from TestCaseWithTransport.
2443
2142
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2445
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2144
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2446
2145
a directory which does not exist. This serves to help ensure test isolation
2447
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2448
must exist. However, TestCaseWithMemoryTransport does not offer local file
2449
defaults for the transport in tests, nor does it obey the command line
2146
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2147
must exist. However, TestCaseWithMemoryTransport does not offer local
2148
file defaults for the transport in tests, nor does it obey the command line
2450
2149
override, so tests that accidentally write to the common directory should
2453
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2454
``.bzr`` directory that stops us ascending higher into the filesystem.
2152
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2153
a .bzr directory that stops us ascending higher into the filesystem.
2457
2156
TEST_ROOT = None
2467
2166
self.transport_readonly_server = None
2468
2167
self.__vfs_server = None
2471
super(TestCaseWithMemoryTransport, self).setUp()
2473
def _add_disconnect_cleanup(transport):
2474
"""Schedule disconnection of given transport at test cleanup
2476
This needs to happen for all connected transports or leaks occur.
2478
Note reconnections may mean we call disconnect multiple times per
2479
transport which is suboptimal but seems harmless.
2481
self.addCleanup(transport.disconnect)
2483
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2484
_add_disconnect_cleanup, None)
2486
self._make_test_root()
2487
self.addCleanup(os.chdir, os.getcwdu())
2488
self.makeAndChdirToTestDir()
2489
self.overrideEnvironmentForTesting()
2490
self.__readonly_server = None
2491
self.__server = None
2492
self.reduceLockdirTimeout()
2493
# Each test may use its own config files even if the local config files
2494
# don't actually exist. They'll rightly fail if they try to create them
2496
self.overrideAttr(config, '_shared_stores', {})
2498
2169
def get_transport(self, relpath=None):
2499
2170
"""Return a writeable transport.
2706
2363
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2707
2364
self.permit_dir(self.test_dir)
2709
def make_branch(self, relpath, format=None, name=None):
2366
def make_branch(self, relpath, format=None):
2710
2367
"""Create a branch on the transport at relpath."""
2711
2368
repo = self.make_repository(relpath, format=format)
2712
return repo.bzrdir.create_branch(append_revisions_only=False,
2715
def get_default_format(self):
2718
def resolve_format(self, format):
2719
"""Resolve an object to a ControlDir format object.
2721
The initial format object can either already be
2722
a ControlDirFormat, None (for the default format),
2723
or a string with the name of the control dir format.
2725
:param format: Object to resolve
2726
:return A ControlDirFormat instance
2729
format = self.get_default_format()
2730
if isinstance(format, basestring):
2731
format = controldir.format_registry.make_bzrdir(format)
2369
return repo.bzrdir.create_branch()
2734
2371
def make_bzrdir(self, relpath, format=None):
2778
2418
test_home_dir = self.test_home_dir
2779
2419
if isinstance(test_home_dir, unicode):
2780
2420
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2781
self.overrideEnv('HOME', test_home_dir)
2782
self.overrideEnv('BZR_HOME', test_home_dir)
2421
os.environ['HOME'] = test_home_dir
2422
os.environ['BZR_HOME'] = test_home_dir
2425
super(TestCaseWithMemoryTransport, self).setUp()
2426
# Ensure that ConnectedTransport doesn't leak sockets
2427
def get_transport_with_cleanup(*args, **kwargs):
2428
t = orig_get_transport(*args, **kwargs)
2429
if isinstance(t, _mod_transport.ConnectedTransport):
2430
self.addCleanup(t.disconnect)
2433
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
get_transport_with_cleanup)
2435
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2437
self.makeAndChdirToTestDir()
2438
self.overrideEnvironmentForTesting()
2439
self.__readonly_server = None
2440
self.__server = None
2441
self.reduceLockdirTimeout()
2784
2443
def setup_smart_server_with_call_log(self):
2785
2444
"""Sets up a smart server as the transport server with a call log."""
2786
2445
self.transport_server = test_server.SmartTCPServer_for_testing
2787
self.hpss_connections = []
2788
2446
self.hpss_calls = []
2789
2447
import traceback
2790
2448
# Skip the current stack down to the caller of
3452
3108
"""A decorator which excludes test matching an exclude pattern."""
3454
3110
def __init__(self, suite, exclude_pattern):
3455
super(ExcludeDecorator, self).__init__(
3456
exclude_tests_by_re(suite, exclude_pattern))
3111
TestDecorator.__init__(self, suite)
3112
self.exclude_pattern = exclude_pattern
3113
self.excluded = False
3117
return iter(self._tests)
3118
self.excluded = True
3119
suite = exclude_tests_by_re(self, self.exclude_pattern)
3121
self.addTests(suite)
3122
return iter(self._tests)
3459
3125
class FilterTestsDecorator(TestDecorator):
3460
3126
"""A decorator which filters tests to those matching a pattern."""
3462
3128
def __init__(self, suite, pattern):
3463
super(FilterTestsDecorator, self).__init__(
3464
filter_suite_by_re(suite, pattern))
3129
TestDecorator.__init__(self, suite)
3130
self.pattern = pattern
3131
self.filtered = False
3135
return iter(self._tests)
3136
self.filtered = True
3137
suite = filter_suite_by_re(self, self.pattern)
3139
self.addTests(suite)
3140
return iter(self._tests)
3467
3143
class RandomDecorator(TestDecorator):
3468
3144
"""A decorator which randomises the order of its tests."""
3470
3146
def __init__(self, suite, random_seed, stream):
3471
random_seed = self.actual_seed(random_seed)
3472
stream.write("Randomizing test order using seed %s\n\n" %
3147
TestDecorator.__init__(self, suite)
3148
self.random_seed = random_seed
3149
self.randomised = False
3150
self.stream = stream
3154
return iter(self._tests)
3155
self.randomised = True
3156
self.stream.write("Randomizing test order using seed %s\n\n" %
3157
(self.actual_seed()))
3474
3158
# Initialise the random number generator.
3475
random.seed(random_seed)
3476
super(RandomDecorator, self).__init__(randomize_suite(suite))
3159
random.seed(self.actual_seed())
3160
suite = randomize_suite(self)
3162
self.addTests(suite)
3163
return iter(self._tests)
3479
def actual_seed(seed):
3165
def actual_seed(self):
3166
if self.random_seed == "now":
3481
3167
# We convert the seed to a long to make it reuseable across
3482
3168
# invocations (because the user can reenter it).
3483
return long(time.time())
3169
self.random_seed = long(time.time())
3485
3171
# Convert the seed to a long if we can
3488
except (TypeError, ValueError):
3173
self.random_seed = long(self.random_seed)
3176
return self.random_seed
3493
3179
class TestFirstDecorator(TestDecorator):
3494
3180
"""A decorator which moves named tests to the front."""
3496
3182
def __init__(self, suite, pattern):
3497
super(TestFirstDecorator, self).__init__()
3498
self.addTests(split_suite_by_re(suite, pattern))
3183
TestDecorator.__init__(self, suite)
3184
self.pattern = pattern
3185
self.filtered = False
3189
return iter(self._tests)
3190
self.filtered = True
3191
suites = split_suite_by_re(self, self.pattern)
3193
self.addTests(suites)
3194
return iter(self._tests)
3501
3197
def partition_tests(suite, count):
3546
3242
ProtocolTestCase.run(self, result)
3548
pid, status = os.waitpid(self.pid, 0)
3549
# GZ 2011-10-18: If status is nonzero, should report to the result
3550
# that something went wrong.
3244
os.waitpid(self.pid, 0)
3552
3246
test_blocks = partition_tests(suite, concurrency)
3553
# Clear the tests from the original suite so it doesn't keep them alive
3554
suite._tests[:] = []
3555
3247
for process_tests in test_blocks:
3556
process_suite = TestUtil.TestSuite(process_tests)
3557
# Also clear each split list so new suite has only reference
3558
process_tests[:] = []
3248
process_suite = TestUtil.TestSuite()
3249
process_suite.addTests(process_tests)
3559
3250
c2pread, c2pwrite = os.pipe()
3560
3251
pid = os.fork()
3253
workaround_zealous_crypto_random()
3563
stream = os.fdopen(c2pwrite, 'wb', 1)
3564
workaround_zealous_crypto_random()
3565
3255
os.close(c2pread)
3566
3256
# Leave stderr and stdout open so we can see test noise
3567
3257
# Close stdin so that the child goes away if it decides to
3568
3258
# read from stdin (otherwise its a roulette to see what
3569
3259
# child actually gets keystrokes for pdb etc).
3570
3260
sys.stdin.close()
3262
stream = os.fdopen(c2pwrite, 'wb', 1)
3571
3263
subunit_result = AutoTimingTestResultDecorator(
3572
SubUnitBzrProtocolClient(stream))
3264
TestProtocolClient(stream))
3573
3265
process_suite.run(subunit_result)
3575
# Try and report traceback on stream, but exit with error even
3576
# if stream couldn't be created or something else goes wrong.
3577
# The traceback is formatted to a string and written in one go
3578
# to avoid interleaving lines from multiple failing children.
3580
stream.write(traceback.format_exc())
3585
3269
os.close(c2pwrite)
3586
3270
stream = os.fdopen(c2pread, 'rb', 1)
3650
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3334
class ForwardingResult(unittest.TestResult):
3336
def __init__(self, target):
3337
unittest.TestResult.__init__(self)
3338
self.result = target
3340
def startTest(self, test):
3341
self.result.startTest(test)
3343
def stopTest(self, test):
3344
self.result.stopTest(test)
3346
def startTestRun(self):
3347
self.result.startTestRun()
3349
def stopTestRun(self):
3350
self.result.stopTestRun()
3352
def addSkip(self, test, reason):
3353
self.result.addSkip(test, reason)
3355
def addSuccess(self, test):
3356
self.result.addSuccess(test)
3358
def addError(self, test, err):
3359
self.result.addError(test, err)
3361
def addFailure(self, test, err):
3362
self.result.addFailure(test, err)
3363
ForwardingResult = testtools.ExtendedToOriginalDecorator
3366
class ProfileResult(ForwardingResult):
3651
3367
"""Generate profiling data for all activity between start and success.
3653
3369
The profile data is appended to the test's _benchcalls attribute and can
3991
3703
'bzrlib.tests.test_commit_merge',
3992
3704
'bzrlib.tests.test_config',
3993
3705
'bzrlib.tests.test_conflicts',
3994
'bzrlib.tests.test_controldir',
3995
3706
'bzrlib.tests.test_counted_lock',
3996
3707
'bzrlib.tests.test_crash',
3997
3708
'bzrlib.tests.test_decorators',
3998
3709
'bzrlib.tests.test_delta',
3999
3710
'bzrlib.tests.test_debug',
3711
'bzrlib.tests.test_deprecated_graph',
4000
3712
'bzrlib.tests.test_diff',
4001
3713
'bzrlib.tests.test_directory_service',
4002
3714
'bzrlib.tests.test_dirstate',
4003
3715
'bzrlib.tests.test_email_message',
4004
3716
'bzrlib.tests.test_eol_filters',
4005
3717
'bzrlib.tests.test_errors',
4006
'bzrlib.tests.test_estimate_compressed_size',
4007
3718
'bzrlib.tests.test_export',
4008
'bzrlib.tests.test_export_pot',
4009
3719
'bzrlib.tests.test_extract',
4010
'bzrlib.tests.test_features',
4011
3720
'bzrlib.tests.test_fetch',
4012
3721
'bzrlib.tests.test_fixtures',
4013
3722
'bzrlib.tests.test_fifo_cache',
4014
3723
'bzrlib.tests.test_filters',
4015
'bzrlib.tests.test_filter_tree',
4016
3724
'bzrlib.tests.test_ftp_transport',
4017
3725
'bzrlib.tests.test_foreign',
4018
3726
'bzrlib.tests.test_generate_docs',
4264
3961
# Some tests mentioned in the list are not in the test suite. The
4265
3962
# list may be out of date, report to the tester.
4266
3963
for id in not_found:
4267
trace.warning('"%s" not found in the test suite', id)
3964
bzrlib.trace.warning('"%s" not found in the test suite', id)
4268
3965
for id in duplicates:
4269
trace.warning('"%s" is used as an id by several tests', id)
3966
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4274
def multiply_scenarios(*scenarios):
4275
"""Multiply two or more iterables of scenarios.
4277
It is safe to pass scenario generators or iterators.
4279
:returns: A list of compound scenarios: the cross-product of all
4280
scenarios, with the names concatenated and the parameters
4283
return reduce(_multiply_two_scenarios, map(list, scenarios))
4286
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3971
def multiply_scenarios(scenarios_left, scenarios_right):
4287
3972
"""Multiply two sets of scenarios.
4289
3974
:returns: the cartesian product of the two sets of scenarios, that is
4460
4144
% (os.path.basename(dirname), printable_e))
4147
class Feature(object):
4148
"""An operating system Feature."""
4151
self._available = None
4153
def available(self):
4154
"""Is the feature available?
4156
:return: True if the feature is available.
4158
if self._available is None:
4159
self._available = self._probe()
4160
return self._available
4163
"""Implement this method in concrete features.
4165
:return: True if the feature is available.
4167
raise NotImplementedError
4170
if getattr(self, 'feature_name', None):
4171
return self.feature_name()
4172
return self.__class__.__name__
4175
class _SymlinkFeature(Feature):
4178
return osutils.has_symlinks()
4180
def feature_name(self):
4183
SymlinkFeature = _SymlinkFeature()
4186
class _HardlinkFeature(Feature):
4189
return osutils.has_hardlinks()
4191
def feature_name(self):
4194
HardlinkFeature = _HardlinkFeature()
4197
class _OsFifoFeature(Feature):
4200
return getattr(os, 'mkfifo', None)
4202
def feature_name(self):
4203
return 'filesystem fifos'
4205
OsFifoFeature = _OsFifoFeature()
4208
class _UnicodeFilenameFeature(Feature):
4209
"""Does the filesystem support Unicode filenames?"""
4213
# Check for character combinations unlikely to be covered by any
4214
# single non-unicode encoding. We use the characters
4215
# - greek small letter alpha (U+03B1) and
4216
# - braille pattern dots-123456 (U+283F).
4217
os.stat(u'\u03b1\u283f')
4218
except UnicodeEncodeError:
4220
except (IOError, OSError):
4221
# The filesystem allows the Unicode filename but the file doesn't
4225
# The filesystem allows the Unicode filename and the file exists,
4229
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4232
class _CompatabilityThunkFeature(Feature):
4233
"""This feature is just a thunk to another feature.
4235
It issues a deprecation warning if it is accessed, to let you know that you
4236
should really use a different feature.
4239
def __init__(self, dep_version, module, name,
4240
replacement_name, replacement_module=None):
4241
super(_CompatabilityThunkFeature, self).__init__()
4242
self._module = module
4243
if replacement_module is None:
4244
replacement_module = module
4245
self._replacement_module = replacement_module
4247
self._replacement_name = replacement_name
4248
self._dep_version = dep_version
4249
self._feature = None
4252
if self._feature is None:
4253
depr_msg = self._dep_version % ('%s.%s'
4254
% (self._module, self._name))
4255
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4256
self._replacement_name)
4257
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4258
# Import the new feature and use it as a replacement for the
4260
mod = __import__(self._replacement_module, {}, {},
4261
[self._replacement_name])
4262
self._feature = getattr(mod, self._replacement_name)
4266
return self._feature._probe()
4269
class ModuleAvailableFeature(Feature):
4270
"""This is a feature than describes a module we want to be available.
4272
Declare the name of the module in __init__(), and then after probing, the
4273
module will be available as 'self.module'.
4275
:ivar module: The module if it is available, else None.
4278
def __init__(self, module_name):
4279
super(ModuleAvailableFeature, self).__init__()
4280
self.module_name = module_name
4284
self._module = __import__(self.module_name, {}, {}, [''])
4291
if self.available(): # Make sure the probe has been done
4295
def feature_name(self):
4296
return self.module_name
4299
# This is kept here for compatibility, it is recommended to use
4300
# 'bzrlib.tests.feature.paramiko' instead
4301
ParamikoFeature = _CompatabilityThunkFeature(
4302
deprecated_in((2,1,0)),
4303
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4463
4306
def probe_unicode_in_user_encoding():
4464
4307
"""Try to encode several unicode strings to use in unicode-aware tests.
4465
4308
Return first successfull match.
4339
class _HTTPSServerFeature(Feature):
4340
"""Some tests want an https Server, check if one is available.
4342
Right now, the only way this is available is under python2.6 which provides
4353
def feature_name(self):
4354
return 'HTTPSServer'
4357
HTTPSServerFeature = _HTTPSServerFeature()
4360
class _UnicodeFilename(Feature):
4361
"""Does the filesystem support Unicode filenames?"""
4366
except UnicodeEncodeError:
4368
except (IOError, OSError):
4369
# The filesystem allows the Unicode filename but the file doesn't
4373
# The filesystem allows the Unicode filename and the file exists,
4377
UnicodeFilename = _UnicodeFilename()
4380
class _ByteStringNamedFilesystem(Feature):
4381
"""Is the filesystem based on bytes?"""
4384
if os.name == "posix":
4388
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4391
class _UTF8Filesystem(Feature):
4392
"""Is the filesystem UTF-8?"""
4395
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4399
UTF8Filesystem = _UTF8Filesystem()
4402
class _BreakinFeature(Feature):
4403
"""Does this platform support the breakin feature?"""
4406
from bzrlib import breakin
4407
if breakin.determine_signal() is None:
4409
if sys.platform == 'win32':
4410
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4411
# We trigger SIGBREAK via a Console api so we need ctypes to
4412
# access the function
4419
def feature_name(self):
4420
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4423
BreakinFeature = _BreakinFeature()
4426
class _CaseInsCasePresFilenameFeature(Feature):
4427
"""Is the file-system case insensitive, but case-preserving?"""
4430
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4432
# first check truly case-preserving for created files, then check
4433
# case insensitive when opening existing files.
4434
name = osutils.normpath(name)
4435
base, rel = osutils.split(name)
4436
found_rel = osutils.canonical_relpath(base, name)
4437
return (found_rel == rel
4438
and os.path.isfile(name.upper())
4439
and os.path.isfile(name.lower()))
4444
def feature_name(self):
4445
return "case-insensitive case-preserving filesystem"
4447
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4450
class _CaseInsensitiveFilesystemFeature(Feature):
4451
"""Check if underlying filesystem is case-insensitive but *not* case
4454
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4455
# more likely to be case preserving, so this case is rare.
4458
if CaseInsCasePresFilenameFeature.available():
4461
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4462
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4463
TestCaseWithMemoryTransport.TEST_ROOT = root
4465
root = TestCaseWithMemoryTransport.TEST_ROOT
4466
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4468
name_a = osutils.pathjoin(tdir, 'a')
4469
name_A = osutils.pathjoin(tdir, 'A')
4471
result = osutils.isdir(name_A)
4472
_rmtree_temp_dir(tdir)
4475
def feature_name(self):
4476
return 'case-insensitive filesystem'
4478
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4481
class _CaseSensitiveFilesystemFeature(Feature):
4484
if CaseInsCasePresFilenameFeature.available():
4486
elif CaseInsensitiveFilesystemFeature.available():
4491
def feature_name(self):
4492
return 'case-sensitive filesystem'
4494
# new coding style is for feature instances to be lowercase
4495
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4498
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4499
SubUnitFeature = _CompatabilityThunkFeature(
4500
deprecated_in((2,1,0)),
4501
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4496
4502
# Only define SubUnitBzrRunner if subunit is available.
4498
4504
from subunit import TestProtocolClient
4499
4505
from subunit.test_results import AutoTimingTestResultDecorator
4500
4506
class SubUnitBzrProtocolClient(TestProtocolClient):
4502
def stopTest(self, test):
4503
super(SubUnitBzrProtocolClient, self).stopTest(test)
4504
_clear__type_equality_funcs(test)
4506
4508
def addSuccess(self, test, details=None):
4507
4509
# The subunit client always includes the details in the subunit
4508
4510
# stream, but we don't want to include it in ours.
4520
4522
except ImportError:
4524
# API compatibility for old plugins; see bug 892622.
4527
'HTTPServerFeature',
4528
'ModuleAvailableFeature',
4529
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4530
'OsFifoFeature', 'UnicodeFilenameFeature',
4531
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4532
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4533
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4534
'posix_permissions_feature',
4536
globals()[name] = _CompatabilityThunkFeature(
4537
symbol_versioning.deprecated_in((2, 5, 0)),
4538
'bzrlib.tests', name,
4539
name, 'bzrlib.tests.features')
4542
for (old_name, new_name) in [
4543
('UnicodeFilename', 'UnicodeFilenameFeature'),
4545
globals()[name] = _CompatabilityThunkFeature(
4546
symbol_versioning.deprecated_in((2, 5, 0)),
4547
'bzrlib.tests', old_name,
4548
new_name, 'bzrlib.tests.features')
4525
class _PosixPermissionsFeature(Feature):
4529
# create temporary file and check if specified perms are maintained.
4532
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4533
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4536
os.chmod(name, write_perms)
4538
read_perms = os.stat(name).st_mode & 0777
4540
return (write_perms == read_perms)
4542
return (os.name == 'posix') and has_perms()
4544
def feature_name(self):
4545
return 'POSIX permissions support'
4547
posix_permissions_feature = _PosixPermissionsFeature()