55
53
# nb: check this before importing anything else from within it
56
54
_testtools_version = getattr(testtools, '__version__', ())
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
55
if _testtools_version < (0, 9, 5):
56
raise ImportError("need at least testtools 0.9.5: %s is %r"
59
57
% (testtools.__file__, _testtools_version))
60
58
from testtools import content
62
61
from bzrlib import (
65
commands as _mod_commands,
75
plugin as _mod_plugin,
82
transport as _mod_transport,
80
import bzrlib.commands
81
import bzrlib.timestamp
83
import bzrlib.inventory
84
import bzrlib.iterablefile
87
86
import bzrlib.lsprof
88
87
except ImportError:
89
88
# lsprof not available
91
from bzrlib.merge import merge_inner
94
from bzrlib.smart import client, request, server
96
from bzrlib import symbol_versioning
90
from bzrlib.smart import client, request
91
from bzrlib.transport import (
97
95
from bzrlib.symbol_versioning import (
99
96
deprecated_function,
105
from bzrlib.transport import (
110
import bzrlib.transport
111
from bzrlib.trace import mutter, note
112
99
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
122
105
from bzrlib.ui import NullProgressView
123
106
from bzrlib.ui.text import TextUIFactory
124
import bzrlib.version_info_formats.format_custom
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
107
from bzrlib.tests.features import _CompatabilityThunkFeature
127
109
# Mark this python module as being part of the implementation
128
110
# of unittest: this gives us better tracebacks where the last
140
122
SUBUNIT_SEEK_SET = 0
141
123
SUBUNIT_SEEK_CUR = 1
144
class ExtendedTestResult(unittest._TextTestResult):
125
# These are intentionally brought into this namespace. That way plugins, etc
126
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
127
TestSuite = TestUtil.TestSuite
128
TestLoader = TestUtil.TestLoader
130
# Tests should run in a clean and clearly defined environment. The goal is to
131
# keep them isolated from the running environment as mush as possible. The test
132
# framework ensures the variables defined below are set (or deleted if the
133
# value is None) before a test is run and reset to their original value after
134
# the test is run. Generally if some code depends on an environment variable,
135
# the tests should start without this variable in the environment. There are a
136
# few exceptions but you shouldn't violate this rule lightly.
140
'XDG_CONFIG_HOME': None,
141
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
142
# tests do check our impls match APPDATA
143
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
147
'BZREMAIL': None, # may still be present in the environment
148
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
149
'BZR_PROGRESS_BAR': None,
150
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
151
# as a base class instead of TestCaseInTempDir. Tests inheriting from
152
# TestCase should not use disk resources, BZR_LOG is one.
153
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
154
'BZR_PLUGIN_PATH': None,
155
'BZR_DISABLE_PLUGINS': None,
156
'BZR_PLUGINS_AT': None,
157
'BZR_CONCURRENCY': None,
158
# Make sure that any text ui tests are consistent regardless of
159
# the environment the test case is run in; you may want tests that
160
# test other combinations. 'dumb' is a reasonable guess for tests
161
# going to a pipe or a StringIO.
167
'SSH_AUTH_SOCK': None,
177
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
178
# least. If you do (care), please update this comment
182
'BZR_REMOTE_PATH': None,
183
# Generally speaking, we don't want apport reporting on crashes in
184
# the test envirnoment unless we're specifically testing apport,
185
# so that it doesn't leak into the real system environment. We
186
# use an env var so it propagates to subprocesses.
187
'APPORT_DISABLE': '1',
191
def override_os_environ(test, env=None):
192
"""Modify os.environ keeping a copy.
194
:param test: A test instance
196
:param env: A dict containing variable definitions to be installed
199
env = isolated_environ
200
test._original_os_environ = dict([(var, value)
201
for var, value in os.environ.iteritems()])
202
for var, value in env.iteritems():
203
osutils.set_or_unset_env(var, value)
204
if var not in test._original_os_environ:
205
# The var is new, add it with a value of None, so
206
# restore_os_environ will delete it
207
test._original_os_environ[var] = None
210
def restore_os_environ(test):
211
"""Restore os.environ to its original state.
213
:param test: A test instance previously passed to override_os_environ.
215
for var, value in test._original_os_environ.iteritems():
216
# Restore the original value (or delete it if the value has been set to
217
# None in override_os_environ).
218
osutils.set_or_unset_env(var, value)
221
def _clear__type_equality_funcs(test):
222
"""Cleanup bound methods stored on TestCase instances
224
Clear the dict breaking a few (mostly) harmless cycles in the affected
225
unittests released with Python 2.6 and initial Python 2.7 versions.
227
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
228
shipped in Oneiric, an object with no clear method was used, hence the
229
extra complications, see bug 809048 for details.
231
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
232
if type_equality_funcs is not None:
233
tef_clear = getattr(type_equality_funcs, "clear", None)
234
if tef_clear is None:
235
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
236
if tef_instance_dict is not None:
237
tef_clear = tef_instance_dict.clear
238
if tef_clear is not None:
242
class ExtendedTestResult(testtools.TextTestResult):
145
243
"""Accepts, reports and accumulates the results of running tests.
147
245
Compared to the unittest version this class adds support for
218
321
if failed or errored: self.stream.write(", ")
219
322
self.stream.write("known_failure_count=%d" %
220
323
self.known_failure_count)
221
self.stream.writeln(")")
324
self.stream.write(")\n")
223
326
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
327
self.stream.write("OK (known_failures=%d)\n" %
225
328
self.known_failure_count)
227
self.stream.writeln("OK")
330
self.stream.write("OK\n")
228
331
if self.skip_count > 0:
229
332
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
333
self.stream.write('%d test%s skipped\n' %
231
334
(skipped, skipped != 1 and "s" or ""))
232
335
if self.unsupported:
233
336
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
337
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
338
(feature, count))
237
340
ok = self.wasStrictlySuccessful()
239
342
ok = self.wasSuccessful()
240
if TestCase._first_thread_leaker_id:
343
if self._first_thread_leaker_id:
241
344
self.stream.write(
242
345
'%s is leaking threads among %d leaking tests.\n' % (
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
346
self._first_thread_leaker_id,
347
self._tests_leaking_threads_count))
245
348
# We don't report the main thread as an active one.
246
349
self.stream.write(
247
350
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
351
% (len(self._active_threads) - 1))
250
353
def getDescription(self, test):
276
380
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
382
what = re.sub(r'^bzrlib\.tests\.', '', what)
385
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
386
# multiple times in a row, because the handler is added for
387
# each test but the container list is shared between cases.
388
# See lp:498869 lp:625574 and lp:637725 for background.
389
def _record_traceback_from_test(self, exc_info):
390
"""Store the traceback from passed exc_info tuple till"""
391
self._traceback_from_test = exc_info[2]
281
393
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
394
super(ExtendedTestResult, self).startTest(test)
283
395
if self.count == 0:
284
396
self.startTests()
285
398
self.report_test_start(test)
286
399
test.number = self.count
287
400
self._recordTestStartTime()
401
# Make testtools cases give us the real traceback on failure
402
addOnException = getattr(test, "addOnException", None)
403
if addOnException is not None:
404
addOnException(self._record_traceback_from_test)
405
# Only check for thread leaks on bzrlib derived test cases
406
if isinstance(test, TestCase):
407
test.addCleanup(self._check_leaked_threads, test)
409
def stopTest(self, test):
410
super(ExtendedTestResult, self).stopTest(test)
411
# Manually break cycles, means touching various private things but hey
412
getDetails = getattr(test, "getDetails", None)
413
if getDetails is not None:
415
_clear__type_equality_funcs(test)
416
self._traceback_from_test = None
289
418
def startTests(self):
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
419
self.report_tests_starting()
420
self._active_threads = threading.enumerate()
422
def _check_leaked_threads(self, test):
423
"""See if any threads have leaked since last call
425
A sample of live threads is stored in the _active_threads attribute,
426
when this method runs it compares the current live threads and any not
427
in the previous sample are treated as having leaked.
429
now_active_threads = set(threading.enumerate())
430
threads_leaked = now_active_threads.difference(self._active_threads)
432
self._report_thread_leak(test, threads_leaked, now_active_threads)
433
self._tests_leaking_threads_count += 1
434
if self._first_thread_leaker_id is None:
435
self._first_thread_leaker_id = test.id()
436
self._active_threads = now_active_threads
308
438
def _recordTestStartTime(self):
309
439
"""Record that a test has started."""
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
440
self._start_datetime = self._now()
319
442
def addError(self, test, err):
320
443
"""Tell result that test finished with an error.
356
477
self._formatTime(benchmark_time),
358
479
self.report_success(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
480
super(ExtendedTestResult, self).addSuccess(test)
361
481
test._log_contents = ''
363
483
def addExpectedFailure(self, test, err):
364
484
self.known_failure_count += 1
365
485
self.report_known_failure(test, err)
487
def addUnexpectedSuccess(self, test, details=None):
488
"""Tell result the test unexpectedly passed, counting as a failure
490
When the minimum version of testtools required becomes 0.9.8 this
491
can be updated to use the new handling there.
493
super(ExtendedTestResult, self).addFailure(test, details=details)
494
self.failure_count += 1
495
self.report_unexpected_success(test,
496
"".join(details["reason"].iter_text()))
367
500
def addNotSupported(self, test, feature):
368
501
"""The test will not be run because of a missing feature.
401
539
raise errors.BzrError("Unknown whence %r" % whence)
403
def report_cleaning_up(self):
541
def report_tests_starting(self):
542
"""Display information before the test run begins"""
543
if getattr(sys, 'frozen', None) is None:
544
bzr_path = osutils.realpath(sys.argv[0])
546
bzr_path = sys.executable
548
'bzr selftest: %s\n' % (bzr_path,))
551
bzrlib.__path__[0],))
553
' bzr-%s python-%s %s\n' % (
554
bzrlib.version_string,
555
bzrlib._format_version_tuple(sys.version_info),
556
platform.platform(aliased=1),
558
self.stream.write('\n')
560
def report_test_start(self, test):
561
"""Display information on the test just about to be run"""
563
def _report_thread_leak(self, test, leaked_threads, active_threads):
564
"""Display information on a test that leaked one or more threads"""
565
# GZ 2010-09-09: A leak summary reported separately from the general
566
# thread debugging would be nice. Tests under subunit
567
# need something not using stream, perhaps adding a
568
# testtools details object would be fitting.
569
if 'threads' in selftest_debug_flags:
570
self.stream.write('%s is leaking, active is now %d\n' %
571
(test.id(), len(active_threads)))
406
573
def startTestRun(self):
407
574
self.startTime = time.time()
551
715
return '%s%s' % (indent, err[1])
553
717
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
718
self.stream.write('ERROR %s\n%s\n'
555
719
% (self._testTimeString(test),
556
720
self._error_summary(err)))
558
722
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
723
self.stream.write(' FAIL %s\n%s\n'
560
724
% (self._testTimeString(test),
561
725
self._error_summary(err)))
563
727
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
728
self.stream.write('XFAIL %s\n%s\n'
565
729
% (self._testTimeString(test),
566
730
self._error_summary(err)))
732
def report_unexpected_success(self, test, reason):
733
self.stream.write(' FAIL %s\n%s: %s\n'
734
% (self._testTimeString(test),
735
"Unexpected success. Should have failed",
568
738
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
739
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
740
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
741
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
742
stats.pprint(file=self.stream)
573
743
# flush the stream so that we get smooth output. This verbose mode is
574
744
# used to show the output in PQM.
575
745
self.stream.flush()
577
747
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
748
self.stream.write(' SKIP %s\n%s\n'
579
749
% (self._testTimeString(test), reason))
581
751
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
752
self.stream.write(' N/A %s\n %s\n'
583
753
% (self._testTimeString(test), reason))
585
755
def report_unsupported(self, test, feature):
586
756
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
757
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
758
%(self._testTimeString(test), feature))
827
1023
self._track_transports()
828
1024
self._track_locks()
829
1025
self._clear_debug_flags()
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
1026
# Isolate global verbosity level, to make sure it's reproducible
1027
# between tests. We should get rid of this altogether: bug 656694. --
1029
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1030
# Isolate config option expansion until its default value for bzrlib is
1031
# settled on or a the FIXME associated with _get_expand_default_value
1032
# is addressed -- vila 20110219
1033
self.overrideAttr(config, '_expand_default_value', None)
1034
self._log_files = set()
1035
# Each key in the ``_counters`` dict holds a value for a different
1036
# counter. When the test ends, addDetail() should be used to output the
1037
# counter values. This happens in install_counter_hook().
1039
if 'config_stats' in selftest_debug_flags:
1040
self._install_config_stats_hooks()
1041
# Do not use i18n for tests (unless the test reverses this)
833
1044
def debug(self):
834
1045
# debug a frame up.
836
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
1047
# The sys preserved stdin/stdout should allow blackbox tests debugging
1048
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1049
).set_trace(sys._getframe().f_back)
1051
def discardDetail(self, name):
1052
"""Extend the addDetail, getDetails api so we can remove a detail.
1054
eg. bzr always adds the 'log' detail at startup, but we don't want to
1055
include it for skipped, xfail, etc tests.
1057
It is safe to call this for a detail that doesn't exist, in case this
1058
gets called multiple times.
1060
# We cheat. details is stored in __details which means we shouldn't
1061
# touch it. but getDetails() returns the dict directly, so we can
1063
details = self.getDetails()
1067
def install_counter_hook(self, hooks, name, counter_name=None):
1068
"""Install a counting hook.
1070
Any hook can be counted as long as it doesn't need to return a value.
1072
:param hooks: Where the hook should be installed.
1074
:param name: The hook name that will be counted.
1076
:param counter_name: The counter identifier in ``_counters``, defaults
1079
_counters = self._counters # Avoid closing over self
1080
if counter_name is None:
1082
if _counters.has_key(counter_name):
1083
raise AssertionError('%s is already used as a counter name'
1085
_counters[counter_name] = 0
1086
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1087
lambda: ['%d' % (_counters[counter_name],)]))
1088
def increment_counter(*args, **kwargs):
1089
_counters[counter_name] += 1
1090
label = 'count %s calls' % (counter_name,)
1091
hooks.install_named_hook(name, increment_counter, label)
1092
self.addCleanup(hooks.uninstall_named_hook, name, label)
1094
def _install_config_stats_hooks(self):
1095
"""Install config hooks to count hook calls.
1098
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1099
self.install_counter_hook(config.ConfigHooks, hook_name,
1100
'config.%s' % (hook_name,))
1102
# The OldConfigHooks are private and need special handling to protect
1103
# against recursive tests (tests that run other tests), so we just do
1104
# manually what registering them into _builtin_known_hooks will provide
1106
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1107
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1108
self.install_counter_hook(config.OldConfigHooks, hook_name,
1109
'old_config.%s' % (hook_name,))
853
1111
def _clear_debug_flags(self):
854
1112
"""Prevent externally set debug flags affecting tests.
1322
1593
self.assertEqual(expected_docstring, obj.__doc__)
1595
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1324
1596
def failUnlessExists(self, path):
1597
return self.assertPathExists(path)
1599
def assertPathExists(self, path):
1325
1600
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1601
if not isinstance(path, basestring):
1328
self.failUnlessExists(p)
1603
self.assertPathExists(p)
1330
self.failUnless(osutils.lexists(path),path+" does not exist")
1605
self.assertTrue(osutils.lexists(path),
1606
path + " does not exist")
1608
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1332
1609
def failIfExists(self, path):
1610
return self.assertPathDoesNotExist(path)
1612
def assertPathDoesNotExist(self, path):
1333
1613
"""Fail if path or paths, which may be abs or relative, exist."""
1334
1614
if not isinstance(path, basestring):
1336
self.failIfExists(p)
1616
self.assertPathDoesNotExist(p)
1338
self.failIf(osutils.lexists(path),path+" exists")
1618
self.assertFalse(osutils.lexists(path),
1340
1621
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1341
1622
"""A helper for callDeprecated and applyDeprecated.
1453
1735
def _startLogFile(self):
1454
"""Send bzr and test log messages to a temporary file.
1456
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1736
"""Setup a in-memory target for bzr and testcase log messages"""
1737
pseudo_log_file = StringIO()
1738
def _get_log_contents_for_weird_testtools_api():
1739
return [pseudo_log_file.getvalue().decode(
1740
"utf-8", "replace").encode("utf-8")]
1741
self.addDetail("log", content.Content(content.ContentType("text",
1742
"plain", {"charset": "utf8"}),
1743
_get_log_contents_for_weird_testtools_api))
1744
self._log_file = pseudo_log_file
1745
self._log_memento = trace.push_log_file(self._log_file)
1462
1746
self.addCleanup(self._finishLogFile)
1464
1748
def _finishLogFile(self):
1465
"""Finished with the log file.
1467
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1749
"""Flush and dereference the in-memory log for this testcase"""
1750
if trace._trace_file:
1470
1751
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1752
trace._trace_file.flush()
1753
trace.pop_log_file(self._log_memento)
1754
# The logging module now tracks references for cleanup so discard ours
1755
del self._log_memento
1476
1757
def thisFailsStrictLockCheck(self):
1477
1758
"""It is known that this test would fail with -Dstrict_locks.
1513
1789
setattr(obj, attr_name, new)
1792
def overrideEnv(self, name, new):
1793
"""Set an environment variable, and reset it after the test.
1795
:param name: The environment variable name.
1797
:param new: The value to set the variable to. If None, the
1798
variable is deleted from the environment.
1800
:returns: The actual variable value.
1802
value = osutils.set_or_unset_env(name, new)
1803
self.addCleanup(osutils.set_or_unset_env, name, value)
1806
def recordCalls(self, obj, attr_name):
1807
"""Monkeypatch in a wrapper that will record calls.
1809
The monkeypatch is automatically removed when the test concludes.
1811
:param obj: The namespace holding the reference to be replaced;
1812
typically a module, class, or object.
1813
:param attr_name: A string for the name of the attribute to
1815
:returns: A list that will be extended with one item every time the
1816
function is called, with a tuple of (args, kwargs).
1820
def decorator(*args, **kwargs):
1821
calls.append((args, kwargs))
1822
return orig(*args, **kwargs)
1823
orig = self.overrideAttr(obj, attr_name, decorator)
1516
1826
def _cleanEnvironment(self):
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1519
'HOME': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
self.addCleanup(self._restoreEnvironment)
1567
for name, value in new_env.iteritems():
1568
self._captureVar(name, value)
1570
def _captureVar(self, name, newvalue):
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1827
for name, value in isolated_environ.iteritems():
1828
self.overrideEnv(name, value)
1578
1830
def _restoreHooks(self):
1579
1831
for klass, (name, hooks) in self._preserved_hooks.items():
1580
1832
setattr(klass, name, hooks)
1833
self._preserved_hooks.clear()
1834
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1835
self._preserved_lazy_hooks.clear()
1582
1837
def knownFailure(self, reason):
1583
"""This test has failed for some known reason."""
1584
raise KnownFailure(reason)
1838
"""Declare that this test fails for a known reason
1840
Tests that are known to fail should generally be using expectedFailure
1841
with an appropriate reverse assertion if a change could cause the test
1842
to start passing. Conversely if the test has no immediate prospect of
1843
succeeding then using skip is more suitable.
1845
When this method is called while an exception is being handled, that
1846
traceback will be used, otherwise a new exception will be thrown to
1847
provide one but won't be reported.
1849
self._add_reason(reason)
1851
exc_info = sys.exc_info()
1852
if exc_info != (None, None, None):
1853
self._report_traceback(exc_info)
1856
raise self.failureException(reason)
1857
except self.failureException:
1858
exc_info = sys.exc_info()
1859
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1860
raise testtools.testcase._ExpectedFailure(exc_info)
1864
def _suppress_log(self):
1865
"""Remove the log info from details."""
1866
self.discardDetail('log')
1586
1868
def _do_skip(self, result, reason):
1869
self._suppress_log()
1587
1870
addSkip = getattr(result, 'addSkip', None)
1588
1871
if not callable(addSkip):
1589
1872
result.addSuccess(result)
1645
1951
self._benchtime += time.time() - start
1647
1953
def log(self, *args):
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1729
1956
def get_log(self):
1730
1957
"""Get a unicode string containing the log from bzrlib.trace.
1945
2173
variables. A value of None will unset the env variable.
1946
2174
The values must be strings. The change will only occur in the
1947
2175
child, so you don't need to fix the environment after running.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2176
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2177
doesn't support signalling subprocesses.
1950
2178
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2179
:param stderr: file to use for the subprocess's stderr. Valid values
2180
are those valid for the stderr argument of `subprocess.Popen`.
2181
Default value is ``subprocess.PIPE``.
1952
2183
:returns: Popen object for the started process.
1954
2185
if skip_if_plan_to_signal:
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
2186
if os.name != "posix":
2187
raise TestSkipped("Sending signals not supported")
1958
2189
if env_changes is None:
1959
2190
env_changes = {}
2191
# Because $HOME is set to a tempdir for the context of a test, modules
2192
# installed in the user dir will not be found unless $PYTHONUSERBASE
2193
# gets set to the computed directory of this parent process.
2194
if site.USER_BASE is not None:
2195
env_changes["PYTHONUSERBASE"] = site.USER_BASE
1962
2198
def cleanup_environment():
2238
def _add_subprocess_log(self, log_file_path):
2239
if len(self._log_files) == 0:
2240
# Register an addCleanup func. We do this on the first call to
2241
# _add_subprocess_log rather than in TestCase.setUp so that this
2242
# addCleanup is registered after any cleanups for tempdirs that
2243
# subclasses might create, which will probably remove the log file
2245
self.addCleanup(self._subprocess_log_cleanup)
2246
# self._log_files is a set, so if a log file is reused we won't grab it
2248
self._log_files.add(log_file_path)
2250
def _subprocess_log_cleanup(self):
2251
for count, log_file_path in enumerate(self._log_files):
2252
# We use buffer_now=True to avoid holding the file open beyond
2253
# the life of this function, which might interfere with e.g.
2254
# cleaning tempdirs on Windows.
2255
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2256
#detail_content = content.content_from_file(
2257
# log_file_path, buffer_now=True)
2258
with open(log_file_path, 'rb') as log_file:
2259
log_file_bytes = log_file.read()
2260
detail_content = content.Content(content.ContentType("text",
2261
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2262
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1997
2265
def _popen(self, *args, **kwargs):
1998
2266
"""Place a call to Popen.
2000
2268
Allows tests to override this method to intercept the calls made to
2001
2269
Popen for introspection.
2003
return Popen(*args, **kwargs)
2271
return subprocess.Popen(*args, **kwargs)
2005
2273
def get_source_path(self):
2006
2274
"""Return the path of the directory containing bzrlib."""
2036
2304
if retcode is not None and retcode != process.returncode:
2037
2305
if process_args is None:
2038
2306
process_args = "(unknown args)"
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2307
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2308
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2041
2309
self.fail('Command bzr %s failed with retcode %s != %s'
2042
2310
% (process_args, retcode, process.returncode))
2043
2311
return [out, err]
2045
def check_inventory_shape(self, inv, shape):
2046
"""Compare an inventory to a list of expected names.
2313
def check_tree_shape(self, tree, shape):
2314
"""Compare a tree to a list of expected names.
2048
2316
Fail if they are not precisely equal.
2051
2319
shape = list(shape) # copy
2052
for path, ie in inv.entries():
2320
for path, ie in tree.iter_entries_by_dir():
2053
2321
name = path.replace('\\', '/')
2054
2322
if ie.kind == 'directory':
2055
2323
name = name + '/'
2325
pass # ignore root entry
2057
2327
shape.remove(name)
2059
2329
extras.append(name)
2149
2421
class TestCaseWithMemoryTransport(TestCase):
2150
2422
"""Common test class for tests that do not need disk resources.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2424
Tests that need disk resources should derive from TestCaseInTempDir
2425
orTestCaseWithTransport.
2154
2427
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2429
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2157
2430
a directory which does not exist. This serves to help ensure test isolation
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2431
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2432
must exist. However, TestCaseWithMemoryTransport does not offer local file
2433
defaults for the transport in tests, nor does it obey the command line
2161
2434
override, so tests that accidentally write to the common directory should
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2437
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2438
``.bzr`` directory that stops us ascending higher into the filesystem.
2168
2441
TEST_ROOT = None
2375
2659
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2376
2660
self.permit_dir(self.test_dir)
2378
def make_branch(self, relpath, format=None):
2662
def make_branch(self, relpath, format=None, name=None):
2379
2663
"""Create a branch on the transport at relpath."""
2380
2664
repo = self.make_repository(relpath, format=format)
2381
return repo.bzrdir.create_branch()
2665
return repo.bzrdir.create_branch(append_revisions_only=False,
2668
def get_default_format(self):
2671
def resolve_format(self, format):
2672
"""Resolve an object to a ControlDir format object.
2674
The initial format object can either already be
2675
a ControlDirFormat, None (for the default format),
2676
or a string with the name of the control dir format.
2678
:param format: Object to resolve
2679
:return A ControlDirFormat instance
2682
format = self.get_default_format()
2683
if isinstance(format, basestring):
2684
format = bzrdir.format_registry.make_bzrdir(format)
2383
2687
def make_bzrdir(self, relpath, format=None):
2385
2689
# might be a relative or absolute path
2386
2690
maybe_a_url = self.get_url(relpath)
2387
2691
segments = maybe_a_url.rsplit('/', 1)
2388
t = get_transport(maybe_a_url)
2692
t = _mod_transport.get_transport(maybe_a_url)
2389
2693
if len(segments) > 1 and segments[-1] not in ('', '.'):
2390
2694
t.ensure_base()
2393
if isinstance(format, basestring):
2394
format = bzrdir.format_registry.make_bzrdir(format)
2695
format = self.resolve_format(format)
2395
2696
return format.initialize_on_transport(t)
2396
2697
except errors.UninitializableFormat:
2397
2698
raise TestSkipped("Format %s is not initializable." % format)
2399
def make_repository(self, relpath, shared=False, format=None):
2700
def make_repository(self, relpath, shared=None, format=None):
2400
2701
"""Create a repository on our default transport at relpath.
2402
2703
Note that relpath must be a relative path, not a full url.
3102
3428
"""A decorator which excludes test matching an exclude pattern."""
3104
3430
def __init__(self, suite, exclude_pattern):
3105
TestDecorator.__init__(self, suite)
3106
self.exclude_pattern = exclude_pattern
3107
self.excluded = False
3111
return iter(self._tests)
3112
self.excluded = True
3113
suite = exclude_tests_by_re(self, self.exclude_pattern)
3115
self.addTests(suite)
3116
return iter(self._tests)
3431
super(ExcludeDecorator, self).__init__(
3432
exclude_tests_by_re(suite, exclude_pattern))
3119
3435
class FilterTestsDecorator(TestDecorator):
3120
3436
"""A decorator which filters tests to those matching a pattern."""
3122
3438
def __init__(self, suite, pattern):
3123
TestDecorator.__init__(self, suite)
3124
self.pattern = pattern
3125
self.filtered = False
3129
return iter(self._tests)
3130
self.filtered = True
3131
suite = filter_suite_by_re(self, self.pattern)
3133
self.addTests(suite)
3134
return iter(self._tests)
3439
super(FilterTestsDecorator, self).__init__(
3440
filter_suite_by_re(suite, pattern))
3137
3443
class RandomDecorator(TestDecorator):
3138
3444
"""A decorator which randomises the order of its tests."""
3140
3446
def __init__(self, suite, random_seed, stream):
3141
TestDecorator.__init__(self, suite)
3142
self.random_seed = random_seed
3143
self.randomised = False
3144
self.stream = stream
3148
return iter(self._tests)
3149
self.randomised = True
3150
self.stream.write("Randomizing test order using seed %s\n\n" %
3151
(self.actual_seed()))
3447
random_seed = self.actual_seed(random_seed)
3448
stream.write("Randomizing test order using seed %s\n\n" %
3152
3450
# Initialise the random number generator.
3153
random.seed(self.actual_seed())
3154
suite = randomize_suite(self)
3156
self.addTests(suite)
3157
return iter(self._tests)
3451
random.seed(random_seed)
3452
super(RandomDecorator, self).__init__(randomize_suite(suite))
3159
def actual_seed(self):
3160
if self.random_seed == "now":
3455
def actual_seed(seed):
3161
3457
# We convert the seed to a long to make it reuseable across
3162
3458
# invocations (because the user can reenter it).
3163
self.random_seed = long(time.time())
3459
return long(time.time())
3165
3461
# Convert the seed to a long if we can
3167
self.random_seed = long(self.random_seed)
3464
except (TypeError, ValueError):
3170
return self.random_seed
3173
3469
class TestFirstDecorator(TestDecorator):
3174
3470
"""A decorator which moves named tests to the front."""
3176
3472
def __init__(self, suite, pattern):
3177
TestDecorator.__init__(self, suite)
3178
self.pattern = pattern
3179
self.filtered = False
3183
return iter(self._tests)
3184
self.filtered = True
3185
suites = split_suite_by_re(self, self.pattern)
3187
self.addTests(suites)
3188
return iter(self._tests)
3473
super(TestFirstDecorator, self).__init__()
3474
self.addTests(split_suite_by_re(suite, pattern))
3191
3477
def partition_tests(suite, count):
3192
3478
"""Partition suite into count lists of tests."""
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3479
# This just assigns tests in a round-robin fashion. On one hand this
3480
# splits up blocks of related tests that might run faster if they shared
3481
# resources, but on the other it avoids assigning blocks of slow tests to
3482
# just one partition. So the slowest partition shouldn't be much slower
3484
partitions = [list() for i in range(count)]
3485
tests = iter_suite_tests(suite)
3486
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3487
partition.append(test)
3204
3491
def workaround_zealous_crypto_random():
3235
3522
ProtocolTestCase.run(self, result)
3237
os.waitpid(self.pid, 0)
3524
pid, status = os.waitpid(self.pid, 0)
3525
# GZ 2011-10-18: If status is nonzero, should report to the result
3526
# that something went wrong.
3239
3528
test_blocks = partition_tests(suite, concurrency)
3529
# Clear the tests from the original suite so it doesn't keep them alive
3530
suite._tests[:] = []
3240
3531
for process_tests in test_blocks:
3241
process_suite = TestSuite()
3242
process_suite.addTests(process_tests)
3532
process_suite = TestUtil.TestSuite(process_tests)
3533
# Also clear each split list so new suite has only reference
3534
process_tests[:] = []
3243
3535
c2pread, c2pwrite = os.pipe()
3244
3536
pid = os.fork()
3246
workaround_zealous_crypto_random()
3539
stream = os.fdopen(c2pwrite, 'wb', 1)
3540
workaround_zealous_crypto_random()
3248
3541
os.close(c2pread)
3249
3542
# Leave stderr and stdout open so we can see test noise
3250
3543
# Close stdin so that the child goes away if it decides to
3251
3544
# read from stdin (otherwise its a roulette to see what
3252
3545
# child actually gets keystrokes for pdb etc).
3253
3546
sys.stdin.close()
3255
stream = os.fdopen(c2pwrite, 'wb', 1)
3256
3547
subunit_result = AutoTimingTestResultDecorator(
3257
TestProtocolClient(stream))
3548
SubUnitBzrProtocolClient(stream))
3258
3549
process_suite.run(subunit_result)
3551
# Try and report traceback on stream, but exit with error even
3552
# if stream couldn't be created or something else goes wrong.
3553
# The traceback is formatted to a string and written in one go
3554
# to avoid interleaving lines from multiple failing children.
3556
stream.write(traceback.format_exc())
3262
3561
os.close(c2pwrite)
3263
3562
stream = os.fdopen(c2pread, 'rb', 1)
3325
class ForwardingResult(unittest.TestResult):
3327
def __init__(self, target):
3328
unittest.TestResult.__init__(self)
3329
self.result = target
3331
def startTest(self, test):
3332
self.result.startTest(test)
3334
def stopTest(self, test):
3335
self.result.stopTest(test)
3337
def startTestRun(self):
3338
self.result.startTestRun()
3340
def stopTestRun(self):
3341
self.result.stopTestRun()
3343
def addSkip(self, test, reason):
3344
self.result.addSkip(test, reason)
3346
def addSuccess(self, test):
3347
self.result.addSuccess(test)
3349
def addError(self, test, err):
3350
self.result.addError(test, err)
3352
def addFailure(self, test, err):
3353
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3357
class ProfileResult(ForwardingResult):
3626
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3358
3627
"""Generate profiling data for all activity between start and success.
3360
3629
The profile data is appended to the test's _benchcalls attribute and can
3686
3967
'bzrlib.tests.test_commit_merge',
3687
3968
'bzrlib.tests.test_config',
3688
3969
'bzrlib.tests.test_conflicts',
3970
'bzrlib.tests.test_controldir',
3689
3971
'bzrlib.tests.test_counted_lock',
3690
3972
'bzrlib.tests.test_crash',
3691
3973
'bzrlib.tests.test_decorators',
3692
3974
'bzrlib.tests.test_delta',
3693
3975
'bzrlib.tests.test_debug',
3694
'bzrlib.tests.test_deprecated_graph',
3695
3976
'bzrlib.tests.test_diff',
3696
3977
'bzrlib.tests.test_directory_service',
3697
3978
'bzrlib.tests.test_dirstate',
3698
3979
'bzrlib.tests.test_email_message',
3699
3980
'bzrlib.tests.test_eol_filters',
3700
3981
'bzrlib.tests.test_errors',
3982
'bzrlib.tests.test_estimate_compressed_size',
3701
3983
'bzrlib.tests.test_export',
3984
'bzrlib.tests.test_export_pot',
3702
3985
'bzrlib.tests.test_extract',
3986
'bzrlib.tests.test_features',
3703
3987
'bzrlib.tests.test_fetch',
3988
'bzrlib.tests.test_fixtures',
3704
3989
'bzrlib.tests.test_fifo_cache',
3705
3990
'bzrlib.tests.test_filters',
3991
'bzrlib.tests.test_filter_tree',
3706
3992
'bzrlib.tests.test_ftp_transport',
3707
3993
'bzrlib.tests.test_foreign',
3708
3994
'bzrlib.tests.test_generate_docs',
4101
4429
if test_id != None:
4102
4430
ui.ui_factory.clear_term()
4103
4431
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4432
# Ugly, but the last thing we want here is fail, so bear with it.
4433
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4434
).encode('ascii', 'replace')
4104
4435
sys.stderr.write('Unable to remove testing dir %s\n%s'
4105
% (os.path.basename(dirname), e))
4108
class Feature(object):
4109
"""An operating system Feature."""
4112
self._available = None
4114
def available(self):
4115
"""Is the feature available?
4117
:return: True if the feature is available.
4119
if self._available is None:
4120
self._available = self._probe()
4121
return self._available
4124
"""Implement this method in concrete features.
4126
:return: True if the feature is available.
4128
raise NotImplementedError
4131
if getattr(self, 'feature_name', None):
4132
return self.feature_name()
4133
return self.__class__.__name__
4136
class _SymlinkFeature(Feature):
4139
return osutils.has_symlinks()
4141
def feature_name(self):
4144
SymlinkFeature = _SymlinkFeature()
4147
class _HardlinkFeature(Feature):
4150
return osutils.has_hardlinks()
4152
def feature_name(self):
4155
HardlinkFeature = _HardlinkFeature()
4158
class _OsFifoFeature(Feature):
4161
return getattr(os, 'mkfifo', None)
4163
def feature_name(self):
4164
return 'filesystem fifos'
4166
OsFifoFeature = _OsFifoFeature()
4169
class _UnicodeFilenameFeature(Feature):
4170
"""Does the filesystem support Unicode filenames?"""
4174
# Check for character combinations unlikely to be covered by any
4175
# single non-unicode encoding. We use the characters
4176
# - greek small letter alpha (U+03B1) and
4177
# - braille pattern dots-123456 (U+283F).
4178
os.stat(u'\u03b1\u283f')
4179
except UnicodeEncodeError:
4181
except (IOError, OSError):
4182
# The filesystem allows the Unicode filename but the file doesn't
4186
# The filesystem allows the Unicode filename and the file exists,
4190
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4193
class _CompatabilityThunkFeature(Feature):
4194
"""This feature is just a thunk to another feature.
4196
It issues a deprecation warning if it is accessed, to let you know that you
4197
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4202
super(_CompatabilityThunkFeature, self).__init__()
4203
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4208
self._replacement_name = replacement_name
4209
self._dep_version = dep_version
4210
self._feature = None
4213
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4227
return self._feature._probe()
4230
class ModuleAvailableFeature(Feature):
4231
"""This is a feature than describes a module we want to be available.
4233
Declare the name of the module in __init__(), and then after probing, the
4234
module will be available as 'self.module'.
4236
:ivar module: The module if it is available, else None.
4239
def __init__(self, module_name):
4240
super(ModuleAvailableFeature, self).__init__()
4241
self.module_name = module_name
4245
self._module = __import__(self.module_name, {}, {}, [''])
4252
if self.available(): # Make sure the probe has been done
4256
def feature_name(self):
4257
return self.module_name
4260
# This is kept here for compatibility, it is recommended to use
4261
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4436
% (os.path.basename(dirname), printable_e))
4267
4439
def probe_unicode_in_user_encoding():
4300
class _HTTPSServerFeature(Feature):
4301
"""Some tests want an https Server, check if one is available.
4303
Right now, the only way this is available is under python2.6 which provides
4314
def feature_name(self):
4315
return 'HTTPSServer'
4318
HTTPSServerFeature = _HTTPSServerFeature()
4321
class _UnicodeFilename(Feature):
4322
"""Does the filesystem support Unicode filenames?"""
4327
except UnicodeEncodeError:
4329
except (IOError, OSError):
4330
# The filesystem allows the Unicode filename but the file doesn't
4334
# The filesystem allows the Unicode filename and the file exists,
4338
UnicodeFilename = _UnicodeFilename()
4341
class _UTF8Filesystem(Feature):
4342
"""Is the filesystem UTF-8?"""
4345
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4349
UTF8Filesystem = _UTF8Filesystem()
4352
class _BreakinFeature(Feature):
4353
"""Does this platform support the breakin feature?"""
4356
from bzrlib import breakin
4357
if breakin.determine_signal() is None:
4359
if sys.platform == 'win32':
4360
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4361
# We trigger SIGBREAK via a Console api so we need ctypes to
4362
# access the function
4369
def feature_name(self):
4370
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4373
BreakinFeature = _BreakinFeature()
4376
class _CaseInsCasePresFilenameFeature(Feature):
4377
"""Is the file-system case insensitive, but case-preserving?"""
4380
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4382
# first check truly case-preserving for created files, then check
4383
# case insensitive when opening existing files.
4384
name = osutils.normpath(name)
4385
base, rel = osutils.split(name)
4386
found_rel = osutils.canonical_relpath(base, name)
4387
return (found_rel == rel
4388
and os.path.isfile(name.upper())
4389
and os.path.isfile(name.lower()))
4394
def feature_name(self):
4395
return "case-insensitive case-preserving filesystem"
4397
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4400
class _CaseInsensitiveFilesystemFeature(Feature):
4401
"""Check if underlying filesystem is case-insensitive but *not* case
4404
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4405
# more likely to be case preserving, so this case is rare.
4408
if CaseInsCasePresFilenameFeature.available():
4411
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4412
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4413
TestCaseWithMemoryTransport.TEST_ROOT = root
4415
root = TestCaseWithMemoryTransport.TEST_ROOT
4416
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4418
name_a = osutils.pathjoin(tdir, 'a')
4419
name_A = osutils.pathjoin(tdir, 'A')
4421
result = osutils.isdir(name_A)
4422
_rmtree_temp_dir(tdir)
4425
def feature_name(self):
4426
return 'case-insensitive filesystem'
4428
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4452
4472
# Only define SubUnitBzrRunner if subunit is available.
4454
4474
from subunit import TestProtocolClient
4455
4475
from subunit.test_results import AutoTimingTestResultDecorator
4476
class SubUnitBzrProtocolClient(TestProtocolClient):
4478
def stopTest(self, test):
4479
super(SubUnitBzrProtocolClient, self).stopTest(test)
4480
_clear__type_equality_funcs(test)
4482
def addSuccess(self, test, details=None):
4483
# The subunit client always includes the details in the subunit
4484
# stream, but we don't want to include it in ours.
4485
if details is not None and 'log' in details:
4487
return super(SubUnitBzrProtocolClient, self).addSuccess(
4456
4490
class SubUnitBzrRunner(TextTestRunner):
4457
4491
def run(self, test):
4458
4492
result = AutoTimingTestResultDecorator(
4459
TestProtocolClient(self.stream))
4493
SubUnitBzrProtocolClient(self.stream))
4460
4494
test.run(result)
4462
4496
except ImportError:
4465
class _PosixPermissionsFeature(Feature):
4469
# create temporary file and check if specified perms are maintained.
4472
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4473
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4476
os.chmod(name, write_perms)
4478
read_perms = os.stat(name).st_mode & 0777
4480
return (write_perms == read_perms)
4482
return (os.name == 'posix') and has_perms()
4484
def feature_name(self):
4485
return 'POSIX permissions support'
4487
posix_permissions_feature = _PosixPermissionsFeature()
4500
# API compatibility for old plugins; see bug 892622.
4503
'HTTPServerFeature',
4504
'ModuleAvailableFeature',
4505
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4506
'OsFifoFeature', 'UnicodeFilenameFeature',
4507
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4508
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4509
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4510
'posix_permissions_feature',
4512
globals()[name] = _CompatabilityThunkFeature(
4513
symbol_versioning.deprecated_in((2, 5, 0)),
4514
'bzrlib.tests', name,
4515
name, 'bzrlib.tests.features')
4518
for (old_name, new_name) in [
4519
('UnicodeFilename', 'UnicodeFilenameFeature'),
4521
globals()[name] = _CompatabilityThunkFeature(
4522
symbol_versioning.deprecated_in((2, 5, 0)),
4523
'bzrlib.tests', old_name,
4524
new_name, 'bzrlib.tests.features')