210
218
osutils.set_or_unset_env(var, value)
221
def _clear__type_equality_funcs(test):
222
"""Cleanup bound methods stored on TestCase instances
224
Clear the dict breaking a few (mostly) harmless cycles in the affected
225
unittests released with Python 2.6 and initial Python 2.7 versions.
227
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
228
shipped in Oneiric, an object with no clear method was used, hence the
229
extra complications, see bug 809048 for details.
231
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
232
if type_equality_funcs is not None:
233
tef_clear = getattr(type_equality_funcs, "clear", None)
234
if tef_clear is None:
235
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
236
if tef_instance_dict is not None:
237
tef_clear = tef_instance_dict.clear
238
if tef_clear is not None:
213
242
class ExtendedTestResult(testtools.TextTestResult):
214
243
"""Accepts, reports and accumulates the results of running tests.
377
406
if isinstance(test, TestCase):
378
407
test.addCleanup(self._check_leaked_threads, test)
409
def stopTest(self, test):
410
super(ExtendedTestResult, self).stopTest(test)
411
# Manually break cycles, means touching various private things but hey
412
getDetails = getattr(test, "getDetails", None)
413
if getDetails is not None:
415
_clear__type_equality_funcs(test)
416
self._traceback_from_test = None
380
418
def startTests(self):
381
419
self.report_tests_starting()
382
420
self._active_threads = threading.enumerate()
384
def stopTest(self, test):
385
self._traceback_from_test = None
387
422
def _check_leaked_threads(self, test):
388
423
"""See if any threads have leaked since last call
449
484
self.known_failure_count += 1
450
485
self.report_known_failure(test, err)
487
def addUnexpectedSuccess(self, test, details=None):
488
"""Tell result the test unexpectedly passed, counting as a failure
490
When the minimum version of testtools required becomes 0.9.8 this
491
can be updated to use the new handling there.
493
super(ExtendedTestResult, self).addFailure(test, details=details)
494
self.failure_count += 1
495
self.report_unexpected_success(test,
496
"".join(details["reason"].iter_text()))
452
500
def addNotSupported(self, test, feature):
453
501
"""The test will not be run because of a missing feature.
936
1001
def setUp(self):
937
1002
super(TestCase, self).setUp()
1004
# At this point we're still accessing the config files in $BZR_HOME (as
1005
# set by the user running selftest).
1006
timeout = config.GlobalStack().get('selftest.timeout')
1008
timeout_fixture = fixtures.TimeoutFixture(timeout)
1009
timeout_fixture.setUp()
1010
self.addCleanup(timeout_fixture.cleanUp)
938
1012
for feature in getattr(self, '_test_needs_features', []):
939
1013
self.requireFeature(feature)
940
self._log_contents = None
941
self.addDetail("log", content.Content(content.ContentType("text",
942
"plain", {"charset": "utf8"}),
943
lambda:[self._get_log(keep_log_file=True)]))
944
1014
self._cleanEnvironment()
1016
if bzrlib.global_state is not None:
1017
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1018
config.CommandLineStore())
945
1020
self._silenceUI()
946
1021
self._startLogFile()
947
1022
self._benchcalls = []
954
1029
# between tests. We should get rid of this altogether: bug 656694. --
956
1031
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
957
# Isolate config option expansion until its default value for bzrlib is
958
# settled on or a the FIXME associated with _get_expand_default_value
959
# is addressed -- vila 20110219
960
self.overrideAttr(config, '_expand_default_value', None)
1032
self._log_files = set()
1033
# Each key in the ``_counters`` dict holds a value for a different
1034
# counter. When the test ends, addDetail() should be used to output the
1035
# counter values. This happens in install_counter_hook().
1037
if 'config_stats' in selftest_debug_flags:
1038
self._install_config_stats_hooks()
1039
# Do not use i18n for tests (unless the test reverses this)
962
1042
def debug(self):
963
1043
# debug a frame up.
965
pdb.Pdb().set_trace(sys._getframe().f_back)
1045
# The sys preserved stdin/stdout should allow blackbox tests debugging
1046
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1047
).set_trace(sys._getframe().f_back)
967
1049
def discardDetail(self, name):
968
1050
"""Extend the addDetail, getDetails api so we can remove a detail.
980
1062
if name in details:
981
1063
del details[name]
1065
def install_counter_hook(self, hooks, name, counter_name=None):
1066
"""Install a counting hook.
1068
Any hook can be counted as long as it doesn't need to return a value.
1070
:param hooks: Where the hook should be installed.
1072
:param name: The hook name that will be counted.
1074
:param counter_name: The counter identifier in ``_counters``, defaults
1077
_counters = self._counters # Avoid closing over self
1078
if counter_name is None:
1080
if _counters.has_key(counter_name):
1081
raise AssertionError('%s is already used as a counter name'
1083
_counters[counter_name] = 0
1084
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1085
lambda: ['%d' % (_counters[counter_name],)]))
1086
def increment_counter(*args, **kwargs):
1087
_counters[counter_name] += 1
1088
label = 'count %s calls' % (counter_name,)
1089
hooks.install_named_hook(name, increment_counter, label)
1090
self.addCleanup(hooks.uninstall_named_hook, name, label)
1092
def _install_config_stats_hooks(self):
1093
"""Install config hooks to count hook calls.
1096
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1097
self.install_counter_hook(config.ConfigHooks, hook_name,
1098
'config.%s' % (hook_name,))
1100
# The OldConfigHooks are private and need special handling to protect
1101
# against recursive tests (tests that run other tests), so we just do
1102
# manually what registering them into _builtin_known_hooks will provide
1104
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1105
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1106
self.install_counter_hook(config.OldConfigHooks, hook_name,
1107
'old_config.%s' % (hook_name,))
983
1109
def _clear_debug_flags(self):
984
1110
"""Prevent externally set debug flags affecting tests.
1039
1165
# break some locks on purpose and should be taken into account by
1040
1166
# considering that breaking a lock is just a dirty way of releasing it.
1041
1167
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1042
message = ('Different number of acquired and '
1043
'released or broken locks. (%s, %s + %s)' %
1044
(acquired_locks, released_locks, broken_locks))
1169
'Different number of acquired and '
1170
'released or broken locks.\n'
1174
(acquired_locks, released_locks, broken_locks))
1045
1175
if not self._lock_check_thorough:
1046
1176
# Rather than fail, just warn
1047
1177
print "Broken test %s: %s" % (self, message)
1290
1420
length, len(obj_with_len), obj_with_len))
1292
1422
def assertLogsError(self, exception_class, func, *args, **kwargs):
1293
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1423
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1296
1426
orig_log_exception_quietly = trace.log_exception_quietly
1299
1429
orig_log_exception_quietly()
1300
captured.append(sys.exc_info())
1430
captured.append(sys.exc_info()[1])
1301
1431
trace.log_exception_quietly = capture
1302
1432
func(*args, **kwargs)
1304
1434
trace.log_exception_quietly = orig_log_exception_quietly
1305
1435
self.assertLength(1, captured)
1306
err = captured[0][1]
1307
1437
self.assertIsInstance(err, exception_class)
1602
1733
def _startLogFile(self):
1603
"""Send bzr and test log messages to a temporary file.
1605
The file is removed as the test is torn down.
1607
self._log_file = StringIO()
1734
"""Setup a in-memory target for bzr and testcase log messages"""
1735
pseudo_log_file = StringIO()
1736
def _get_log_contents_for_weird_testtools_api():
1737
return [pseudo_log_file.getvalue().decode(
1738
"utf-8", "replace").encode("utf-8")]
1739
self.addDetail("log", content.Content(content.ContentType("text",
1740
"plain", {"charset": "utf8"}),
1741
_get_log_contents_for_weird_testtools_api))
1742
self._log_file = pseudo_log_file
1608
1743
self._log_memento = trace.push_log_file(self._log_file)
1609
1744
self.addCleanup(self._finishLogFile)
1611
1746
def _finishLogFile(self):
1612
"""Finished with the log file.
1614
Close the file and delete it, unless setKeepLogfile was called.
1747
"""Flush and dereference the in-memory log for this testcase"""
1616
1748
if trace._trace_file:
1617
1749
# flush the log file, to get all content
1618
1750
trace._trace_file.flush()
1619
1751
trace.pop_log_file(self._log_memento)
1620
# Cache the log result and delete the file on disk
1621
self._get_log(False)
1752
# The logging module now tracks references for cleanup so discard ours
1753
del self._log_memento
1623
1755
def thisFailsStrictLockCheck(self):
1624
1756
"""It is known that this test would fail with -Dstrict_locks.
1646
1781
:returns: The actual attr value.
1648
value = getattr(obj, attr_name)
1649
1783
# The actual value is captured by the call below
1650
self.addCleanup(setattr, obj, attr_name, value)
1784
value = getattr(obj, attr_name, _unitialized_attr)
1785
if value is _unitialized_attr:
1786
# When the test completes, the attribute should not exist, but if
1787
# we aren't setting a value, we don't need to do anything.
1788
if new is not _unitialized_attr:
1789
self.addCleanup(delattr, obj, attr_name)
1791
self.addCleanup(setattr, obj, attr_name, value)
1651
1792
if new is not _unitialized_attr:
1652
1793
setattr(obj, attr_name, new)
1666
1807
self.addCleanup(osutils.set_or_unset_env, name, value)
1810
def recordCalls(self, obj, attr_name):
1811
"""Monkeypatch in a wrapper that will record calls.
1813
The monkeypatch is automatically removed when the test concludes.
1815
:param obj: The namespace holding the reference to be replaced;
1816
typically a module, class, or object.
1817
:param attr_name: A string for the name of the attribute to
1819
:returns: A list that will be extended with one item every time the
1820
function is called, with a tuple of (args, kwargs).
1824
def decorator(*args, **kwargs):
1825
calls.append((args, kwargs))
1826
return orig(*args, **kwargs)
1827
orig = self.overrideAttr(obj, attr_name, decorator)
1669
1830
def _cleanEnvironment(self):
1670
1831
for name, value in isolated_environ.iteritems():
1671
1832
self.overrideEnv(name, value)
1673
1834
def _restoreHooks(self):
1674
1835
for klass, (name, hooks) in self._preserved_hooks.items():
1675
1836
setattr(klass, name, hooks)
1676
hooks._lazy_hooks = self._preserved_lazy_hooks
1837
self._preserved_hooks.clear()
1838
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1839
self._preserved_lazy_hooks.clear()
1678
1841
def knownFailure(self, reason):
1679
"""This test has failed for some known reason."""
1680
raise KnownFailure(reason)
1842
"""Declare that this test fails for a known reason
1844
Tests that are known to fail should generally be using expectedFailure
1845
with an appropriate reverse assertion if a change could cause the test
1846
to start passing. Conversely if the test has no immediate prospect of
1847
succeeding then using skip is more suitable.
1849
When this method is called while an exception is being handled, that
1850
traceback will be used, otherwise a new exception will be thrown to
1851
provide one but won't be reported.
1853
self._add_reason(reason)
1855
exc_info = sys.exc_info()
1856
if exc_info != (None, None, None):
1857
self._report_traceback(exc_info)
1860
raise self.failureException(reason)
1861
except self.failureException:
1862
exc_info = sys.exc_info()
1863
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1864
raise testtools.testcase._ExpectedFailure(exc_info)
1682
1868
def _suppress_log(self):
1683
1869
"""Remove the log info from details."""
1771
1957
def log(self, *args):
1772
1958
trace.mutter(*args)
1774
def _get_log(self, keep_log_file=False):
1775
"""Internal helper to get the log from bzrlib.trace for this test.
1777
Please use self.getDetails, or self.get_log to access this in test case
1780
:param keep_log_file: When True, if the log is still a file on disk
1781
leave it as a file on disk. When False, if the log is still a file
1782
on disk, the log file is deleted and the log preserved as
1784
:return: A string containing the log.
1786
if self._log_contents is not None:
1788
self._log_contents.decode('utf8')
1789
except UnicodeDecodeError:
1790
unicodestr = self._log_contents.decode('utf8', 'replace')
1791
self._log_contents = unicodestr.encode('utf8')
1792
return self._log_contents
1793
if self._log_file is not None:
1794
log_contents = self._log_file.getvalue()
1796
log_contents.decode('utf8')
1797
except UnicodeDecodeError:
1798
unicodestr = log_contents.decode('utf8', 'replace')
1799
log_contents = unicodestr.encode('utf8')
1800
if not keep_log_file:
1801
self._log_file = None
1802
# Permit multiple calls to get_log until we clean it up in
1804
self._log_contents = log_contents
1807
return "No log file content."
1809
1960
def get_log(self):
1810
1961
"""Get a unicode string containing the log from bzrlib.trace.
2242
def _add_subprocess_log(self, log_file_path):
2243
if len(self._log_files) == 0:
2244
# Register an addCleanup func. We do this on the first call to
2245
# _add_subprocess_log rather than in TestCase.setUp so that this
2246
# addCleanup is registered after any cleanups for tempdirs that
2247
# subclasses might create, which will probably remove the log file
2249
self.addCleanup(self._subprocess_log_cleanup)
2250
# self._log_files is a set, so if a log file is reused we won't grab it
2252
self._log_files.add(log_file_path)
2254
def _subprocess_log_cleanup(self):
2255
for count, log_file_path in enumerate(self._log_files):
2256
# We use buffer_now=True to avoid holding the file open beyond
2257
# the life of this function, which might interfere with e.g.
2258
# cleaning tempdirs on Windows.
2259
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2260
#detail_content = content.content_from_file(
2261
# log_file_path, buffer_now=True)
2262
with open(log_file_path, 'rb') as log_file:
2263
log_file_bytes = log_file.read()
2264
detail_content = content.Content(content.ContentType("text",
2265
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2266
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2080
2269
def _popen(self, *args, **kwargs):
2081
2270
"""Place a call to Popen.
2234
2425
class TestCaseWithMemoryTransport(TestCase):
2235
2426
"""Common test class for tests that do not need disk resources.
2237
Tests that need disk resources should derive from TestCaseWithTransport.
2428
Tests that need disk resources should derive from TestCaseInTempDir
2429
orTestCaseWithTransport.
2239
2431
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2241
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2433
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2242
2434
a directory which does not exist. This serves to help ensure test isolation
2243
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2244
must exist. However, TestCaseWithMemoryTransport does not offer local
2245
file defaults for the transport in tests, nor does it obey the command line
2435
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2436
must exist. However, TestCaseWithMemoryTransport does not offer local file
2437
defaults for the transport in tests, nor does it obey the command line
2246
2438
override, so tests that accidentally write to the common directory should
2249
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2250
a .bzr directory that stops us ascending higher into the filesystem.
2441
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2442
``.bzr`` directory that stops us ascending higher into the filesystem.
2253
2445
TEST_ROOT = None
2263
2455
self.transport_readonly_server = None
2264
2456
self.__vfs_server = None
2459
super(TestCaseWithMemoryTransport, self).setUp()
2461
def _add_disconnect_cleanup(transport):
2462
"""Schedule disconnection of given transport at test cleanup
2464
This needs to happen for all connected transports or leaks occur.
2466
Note reconnections may mean we call disconnect multiple times per
2467
transport which is suboptimal but seems harmless.
2469
self.addCleanup(transport.disconnect)
2471
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2472
_add_disconnect_cleanup, None)
2474
self._make_test_root()
2475
self.addCleanup(os.chdir, os.getcwdu())
2476
self.makeAndChdirToTestDir()
2477
self.overrideEnvironmentForTesting()
2478
self.__readonly_server = None
2479
self.__server = None
2480
self.reduceLockdirTimeout()
2481
# Each test may use its own config files even if the local config files
2482
# don't actually exist. They'll rightly fail if they try to create them
2484
self.overrideAttr(config, '_shared_stores', {})
2266
2486
def get_transport(self, relpath=None):
2267
2487
"""Return a writeable transport.
2412
2633
root = TestCaseWithMemoryTransport.TEST_ROOT
2413
bzrdir.BzrDir.create_standalone_workingtree(root)
2635
# Make sure we get a readable and accessible home for .bzr.log
2636
# and/or config files, and not fallback to weird defaults (see
2637
# http://pad.lv/825027).
2638
self.assertIs(None, os.environ.get('BZR_HOME', None))
2639
os.environ['BZR_HOME'] = root
2640
wt = controldir.ControlDir.create_standalone_workingtree(root)
2641
del os.environ['BZR_HOME']
2642
except Exception, e:
2643
self.fail("Fail to initialize the safety net: %r\n" % (e,))
2644
# Hack for speed: remember the raw bytes of the dirstate file so that
2645
# we don't need to re-open the wt to check it hasn't changed.
2646
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2647
wt.control_transport.get_bytes('dirstate'))
2415
2649
def _check_safety_net(self):
2416
2650
"""Check that the safety .bzr directory have not been touched.
2460
2694
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2461
2695
self.permit_dir(self.test_dir)
2463
def make_branch(self, relpath, format=None):
2697
def make_branch(self, relpath, format=None, name=None):
2464
2698
"""Create a branch on the transport at relpath."""
2465
2699
repo = self.make_repository(relpath, format=format)
2466
return repo.bzrdir.create_branch()
2700
return repo.bzrdir.create_branch(append_revisions_only=False,
2703
def get_default_format(self):
2706
def resolve_format(self, format):
2707
"""Resolve an object to a ControlDir format object.
2709
The initial format object can either already be
2710
a ControlDirFormat, None (for the default format),
2711
or a string with the name of the control dir format.
2713
:param format: Object to resolve
2714
:return A ControlDirFormat instance
2717
format = self.get_default_format()
2718
if isinstance(format, basestring):
2719
format = controldir.format_registry.make_bzrdir(format)
2468
2722
def make_bzrdir(self, relpath, format=None):
2473
2727
t = _mod_transport.get_transport(maybe_a_url)
2474
2728
if len(segments) > 1 and segments[-1] not in ('', '.'):
2475
2729
t.ensure_base()
2478
if isinstance(format, basestring):
2479
format = bzrdir.format_registry.make_bzrdir(format)
2730
format = self.resolve_format(format)
2480
2731
return format.initialize_on_transport(t)
2481
2732
except errors.UninitializableFormat:
2482
2733
raise TestSkipped("Format %s is not initializable." % format)
2484
def make_repository(self, relpath, shared=False, format=None):
2735
def make_repository(self, relpath, shared=None, format=None):
2485
2736
"""Create a repository on our default transport at relpath.
2487
2738
Note that relpath must be a relative path, not a full url.
2518
2769
self.overrideEnv('HOME', test_home_dir)
2519
2770
self.overrideEnv('BZR_HOME', test_home_dir)
2522
super(TestCaseWithMemoryTransport, self).setUp()
2523
# Ensure that ConnectedTransport doesn't leak sockets
2524
def get_transport_with_cleanup(*args, **kwargs):
2525
t = orig_get_transport(*args, **kwargs)
2526
if isinstance(t, _mod_transport.ConnectedTransport):
2527
self.addCleanup(t.disconnect)
2530
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2531
get_transport_with_cleanup)
2532
self._make_test_root()
2533
self.addCleanup(os.chdir, os.getcwdu())
2534
self.makeAndChdirToTestDir()
2535
self.overrideEnvironmentForTesting()
2536
self.__readonly_server = None
2537
self.__server = None
2538
self.reduceLockdirTimeout()
2540
2772
def setup_smart_server_with_call_log(self):
2541
2773
"""Sets up a smart server as the transport server with a call log."""
2542
2774
self.transport_server = test_server.SmartTCPServer_for_testing
2775
self.hpss_connections = []
2543
2776
self.hpss_calls = []
2544
2777
import traceback
2545
2778
# Skip the current stack down to the caller of
3162
3414
class TestDecorator(TestUtil.TestSuite):
3163
3415
"""A decorator for TestCase/TestSuite objects.
3165
Usually, subclasses should override __iter__(used when flattening test
3166
suites), which we do to filter, reorder, parallelise and so on, run() and
3417
Contains rather than flattening suite passed on construction
3170
def __init__(self, suite):
3171
TestUtil.TestSuite.__init__(self)
3174
def countTestCases(self):
3177
cases += test.countTestCases()
3184
def run(self, result):
3185
# Use iteration on self, not self._tests, to allow subclasses to hook
3188
if result.shouldStop:
3420
def __init__(self, suite=None):
3421
super(TestDecorator, self).__init__()
3422
if suite is not None:
3425
# Don't need subclass run method with suite emptying
3426
run = unittest.TestSuite.run
3194
3429
class CountingDecorator(TestDecorator):
3205
3440
"""A decorator which excludes test matching an exclude pattern."""
3207
3442
def __init__(self, suite, exclude_pattern):
3208
TestDecorator.__init__(self, suite)
3209
self.exclude_pattern = exclude_pattern
3210
self.excluded = False
3214
return iter(self._tests)
3215
self.excluded = True
3216
suite = exclude_tests_by_re(self, self.exclude_pattern)
3218
self.addTests(suite)
3219
return iter(self._tests)
3443
super(ExcludeDecorator, self).__init__(
3444
exclude_tests_by_re(suite, exclude_pattern))
3222
3447
class FilterTestsDecorator(TestDecorator):
3223
3448
"""A decorator which filters tests to those matching a pattern."""
3225
3450
def __init__(self, suite, pattern):
3226
TestDecorator.__init__(self, suite)
3227
self.pattern = pattern
3228
self.filtered = False
3232
return iter(self._tests)
3233
self.filtered = True
3234
suite = filter_suite_by_re(self, self.pattern)
3236
self.addTests(suite)
3237
return iter(self._tests)
3451
super(FilterTestsDecorator, self).__init__(
3452
filter_suite_by_re(suite, pattern))
3240
3455
class RandomDecorator(TestDecorator):
3241
3456
"""A decorator which randomises the order of its tests."""
3243
3458
def __init__(self, suite, random_seed, stream):
3244
TestDecorator.__init__(self, suite)
3245
self.random_seed = random_seed
3246
self.randomised = False
3247
self.stream = stream
3251
return iter(self._tests)
3252
self.randomised = True
3253
self.stream.write("Randomizing test order using seed %s\n\n" %
3254
(self.actual_seed()))
3459
random_seed = self.actual_seed(random_seed)
3460
stream.write("Randomizing test order using seed %s\n\n" %
3255
3462
# Initialise the random number generator.
3256
random.seed(self.actual_seed())
3257
suite = randomize_suite(self)
3259
self.addTests(suite)
3260
return iter(self._tests)
3463
random.seed(random_seed)
3464
super(RandomDecorator, self).__init__(randomize_suite(suite))
3262
def actual_seed(self):
3263
if self.random_seed == "now":
3467
def actual_seed(seed):
3264
3469
# We convert the seed to a long to make it reuseable across
3265
3470
# invocations (because the user can reenter it).
3266
self.random_seed = long(time.time())
3471
return long(time.time())
3268
3473
# Convert the seed to a long if we can
3270
self.random_seed = long(self.random_seed)
3476
except (TypeError, ValueError):
3273
return self.random_seed
3276
3481
class TestFirstDecorator(TestDecorator):
3277
3482
"""A decorator which moves named tests to the front."""
3279
3484
def __init__(self, suite, pattern):
3280
TestDecorator.__init__(self, suite)
3281
self.pattern = pattern
3282
self.filtered = False
3286
return iter(self._tests)
3287
self.filtered = True
3288
suites = split_suite_by_re(self, self.pattern)
3290
self.addTests(suites)
3291
return iter(self._tests)
3485
super(TestFirstDecorator, self).__init__()
3486
self.addTests(split_suite_by_re(suite, pattern))
3294
3489
def partition_tests(suite, count):
3339
3534
ProtocolTestCase.run(self, result)
3341
os.waitpid(self.pid, 0)
3536
pid, status = os.waitpid(self.pid, 0)
3537
# GZ 2011-10-18: If status is nonzero, should report to the result
3538
# that something went wrong.
3343
3540
test_blocks = partition_tests(suite, concurrency)
3541
# Clear the tests from the original suite so it doesn't keep them alive
3542
suite._tests[:] = []
3344
3543
for process_tests in test_blocks:
3345
process_suite = TestUtil.TestSuite()
3346
process_suite.addTests(process_tests)
3544
process_suite = TestUtil.TestSuite(process_tests)
3545
# Also clear each split list so new suite has only reference
3546
process_tests[:] = []
3347
3547
c2pread, c2pwrite = os.pipe()
3348
3548
pid = os.fork()
3350
workaround_zealous_crypto_random()
3551
stream = os.fdopen(c2pwrite, 'wb', 1)
3552
workaround_zealous_crypto_random()
3352
3553
os.close(c2pread)
3353
3554
# Leave stderr and stdout open so we can see test noise
3354
3555
# Close stdin so that the child goes away if it decides to
3355
3556
# read from stdin (otherwise its a roulette to see what
3356
3557
# child actually gets keystrokes for pdb etc).
3357
3558
sys.stdin.close()
3359
stream = os.fdopen(c2pwrite, 'wb', 1)
3360
3559
subunit_result = AutoTimingTestResultDecorator(
3361
TestProtocolClient(stream))
3560
SubUnitBzrProtocolClient(stream))
3362
3561
process_suite.run(subunit_result)
3563
# Try and report traceback on stream, but exit with error even
3564
# if stream couldn't be created or something else goes wrong.
3565
# The traceback is formatted to a string and written in one go
3566
# to avoid interleaving lines from multiple failing children.
3568
stream.write(traceback.format_exc())
3366
3573
os.close(c2pwrite)
3367
3574
stream = os.fdopen(c2pread, 'rb', 1)
3782
3991
'bzrlib.tests.test_email_message',
3783
3992
'bzrlib.tests.test_eol_filters',
3784
3993
'bzrlib.tests.test_errors',
3994
'bzrlib.tests.test_estimate_compressed_size',
3785
3995
'bzrlib.tests.test_export',
3996
'bzrlib.tests.test_export_pot',
3786
3997
'bzrlib.tests.test_extract',
3998
'bzrlib.tests.test_features',
3787
3999
'bzrlib.tests.test_fetch',
3788
4000
'bzrlib.tests.test_fixtures',
3789
4001
'bzrlib.tests.test_fifo_cache',
3790
4002
'bzrlib.tests.test_filters',
4003
'bzrlib.tests.test_filter_tree',
3791
4004
'bzrlib.tests.test_ftp_transport',
3792
4005
'bzrlib.tests.test_foreign',
3793
4006
'bzrlib.tests.test_generate_docs',
3899
4116
'bzrlib.tests.test_upgrade',
3900
4117
'bzrlib.tests.test_upgrade_stacked',
3901
4118
'bzrlib.tests.test_urlutils',
4119
'bzrlib.tests.test_utextwrap',
3902
4120
'bzrlib.tests.test_version',
3903
4121
'bzrlib.tests.test_version_info',
3904
4122
'bzrlib.tests.test_versionedfile',
4123
'bzrlib.tests.test_vf_search',
3905
4124
'bzrlib.tests.test_weave',
3906
4125
'bzrlib.tests.test_whitebox',
3907
4126
'bzrlib.tests.test_win32utils',
4228
4448
% (os.path.basename(dirname), printable_e))
4231
class Feature(object):
4232
"""An operating system Feature."""
4235
self._available = None
4237
def available(self):
4238
"""Is the feature available?
4240
:return: True if the feature is available.
4242
if self._available is None:
4243
self._available = self._probe()
4244
return self._available
4247
"""Implement this method in concrete features.
4249
:return: True if the feature is available.
4251
raise NotImplementedError
4254
if getattr(self, 'feature_name', None):
4255
return self.feature_name()
4256
return self.__class__.__name__
4259
class _SymlinkFeature(Feature):
4262
return osutils.has_symlinks()
4264
def feature_name(self):
4267
SymlinkFeature = _SymlinkFeature()
4270
class _HardlinkFeature(Feature):
4273
return osutils.has_hardlinks()
4275
def feature_name(self):
4278
HardlinkFeature = _HardlinkFeature()
4281
class _OsFifoFeature(Feature):
4284
return getattr(os, 'mkfifo', None)
4286
def feature_name(self):
4287
return 'filesystem fifos'
4289
OsFifoFeature = _OsFifoFeature()
4292
class _UnicodeFilenameFeature(Feature):
4293
"""Does the filesystem support Unicode filenames?"""
4297
# Check for character combinations unlikely to be covered by any
4298
# single non-unicode encoding. We use the characters
4299
# - greek small letter alpha (U+03B1) and
4300
# - braille pattern dots-123456 (U+283F).
4301
os.stat(u'\u03b1\u283f')
4302
except UnicodeEncodeError:
4304
except (IOError, OSError):
4305
# The filesystem allows the Unicode filename but the file doesn't
4309
# The filesystem allows the Unicode filename and the file exists,
4313
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4316
class _CompatabilityThunkFeature(Feature):
4317
"""This feature is just a thunk to another feature.
4319
It issues a deprecation warning if it is accessed, to let you know that you
4320
should really use a different feature.
4323
def __init__(self, dep_version, module, name,
4324
replacement_name, replacement_module=None):
4325
super(_CompatabilityThunkFeature, self).__init__()
4326
self._module = module
4327
if replacement_module is None:
4328
replacement_module = module
4329
self._replacement_module = replacement_module
4331
self._replacement_name = replacement_name
4332
self._dep_version = dep_version
4333
self._feature = None
4336
if self._feature is None:
4337
depr_msg = self._dep_version % ('%s.%s'
4338
% (self._module, self._name))
4339
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4340
self._replacement_name)
4341
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4342
# Import the new feature and use it as a replacement for the
4344
self._feature = pyutils.get_named_object(
4345
self._replacement_module, self._replacement_name)
4349
return self._feature._probe()
4352
class ModuleAvailableFeature(Feature):
4353
"""This is a feature than describes a module we want to be available.
4355
Declare the name of the module in __init__(), and then after probing, the
4356
module will be available as 'self.module'.
4358
:ivar module: The module if it is available, else None.
4361
def __init__(self, module_name):
4362
super(ModuleAvailableFeature, self).__init__()
4363
self.module_name = module_name
4367
self._module = __import__(self.module_name, {}, {}, [''])
4374
if self.available(): # Make sure the probe has been done
4378
def feature_name(self):
4379
return self.module_name
4382
4451
def probe_unicode_in_user_encoding():
4383
4452
"""Try to encode several unicode strings to use in unicode-aware tests.
4384
4453
Return first successfull match.
4415
class _HTTPSServerFeature(Feature):
4416
"""Some tests want an https Server, check if one is available.
4418
Right now, the only way this is available is under python2.6 which provides
4429
def feature_name(self):
4430
return 'HTTPSServer'
4433
HTTPSServerFeature = _HTTPSServerFeature()
4436
class _UnicodeFilename(Feature):
4437
"""Does the filesystem support Unicode filenames?"""
4442
except UnicodeEncodeError:
4444
except (IOError, OSError):
4445
# The filesystem allows the Unicode filename but the file doesn't
4449
# The filesystem allows the Unicode filename and the file exists,
4453
UnicodeFilename = _UnicodeFilename()
4456
class _ByteStringNamedFilesystem(Feature):
4457
"""Is the filesystem based on bytes?"""
4460
if os.name == "posix":
4464
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4467
class _UTF8Filesystem(Feature):
4468
"""Is the filesystem UTF-8?"""
4471
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4475
UTF8Filesystem = _UTF8Filesystem()
4478
class _BreakinFeature(Feature):
4479
"""Does this platform support the breakin feature?"""
4482
from bzrlib import breakin
4483
if breakin.determine_signal() is None:
4485
if sys.platform == 'win32':
4486
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4487
# We trigger SIGBREAK via a Console api so we need ctypes to
4488
# access the function
4495
def feature_name(self):
4496
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4499
BreakinFeature = _BreakinFeature()
4502
class _CaseInsCasePresFilenameFeature(Feature):
4503
"""Is the file-system case insensitive, but case-preserving?"""
4506
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4508
# first check truly case-preserving for created files, then check
4509
# case insensitive when opening existing files.
4510
name = osutils.normpath(name)
4511
base, rel = osutils.split(name)
4512
found_rel = osutils.canonical_relpath(base, name)
4513
return (found_rel == rel
4514
and os.path.isfile(name.upper())
4515
and os.path.isfile(name.lower()))
4520
def feature_name(self):
4521
return "case-insensitive case-preserving filesystem"
4523
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4526
class _CaseInsensitiveFilesystemFeature(Feature):
4527
"""Check if underlying filesystem is case-insensitive but *not* case
4530
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4531
# more likely to be case preserving, so this case is rare.
4534
if CaseInsCasePresFilenameFeature.available():
4537
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4538
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4539
TestCaseWithMemoryTransport.TEST_ROOT = root
4541
root = TestCaseWithMemoryTransport.TEST_ROOT
4542
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4544
name_a = osutils.pathjoin(tdir, 'a')
4545
name_A = osutils.pathjoin(tdir, 'A')
4547
result = osutils.isdir(name_A)
4548
_rmtree_temp_dir(tdir)
4551
def feature_name(self):
4552
return 'case-insensitive filesystem'
4554
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4557
class _CaseSensitiveFilesystemFeature(Feature):
4560
if CaseInsCasePresFilenameFeature.available():
4562
elif CaseInsensitiveFilesystemFeature.available():
4567
def feature_name(self):
4568
return 'case-sensitive filesystem'
4570
# new coding style is for feature instances to be lowercase
4571
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4574
4484
# Only define SubUnitBzrRunner if subunit is available.
4576
4486
from subunit import TestProtocolClient
4577
4487
from subunit.test_results import AutoTimingTestResultDecorator
4578
4488
class SubUnitBzrProtocolClient(TestProtocolClient):
4490
def stopTest(self, test):
4491
super(SubUnitBzrProtocolClient, self).stopTest(test)
4492
_clear__type_equality_funcs(test)
4580
4494
def addSuccess(self, test, details=None):
4581
4495
# The subunit client always includes the details in the subunit
4582
4496
# stream, but we don't want to include it in ours.
4594
4508
except ImportError:
4597
class _PosixPermissionsFeature(Feature):
4601
# create temporary file and check if specified perms are maintained.
4604
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4605
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4608
os.chmod(name, write_perms)
4610
read_perms = os.stat(name).st_mode & 0777
4612
return (write_perms == read_perms)
4614
return (os.name == 'posix') and has_perms()
4616
def feature_name(self):
4617
return 'POSIX permissions support'
4619
posix_permissions_feature = _PosixPermissionsFeature()
4512
# API compatibility for old plugins; see bug 892622.
4515
'HTTPServerFeature',
4516
'ModuleAvailableFeature',
4517
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4518
'OsFifoFeature', 'UnicodeFilenameFeature',
4519
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4520
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4521
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4522
'posix_permissions_feature',
4524
globals()[name] = _CompatabilityThunkFeature(
4525
symbol_versioning.deprecated_in((2, 5, 0)),
4526
'bzrlib.tests', name,
4527
name, 'bzrlib.tests.features')
4530
for (old_name, new_name) in [
4531
('UnicodeFilename', 'UnicodeFilenameFeature'),
4533
globals()[name] = _CompatabilityThunkFeature(
4534
symbol_versioning.deprecated_in((2, 5, 0)),
4535
'bzrlib.tests', old_name,
4536
new_name, 'bzrlib.tests.features')