53
55
# nb: check this before importing anything else from within it
54
56
_testtools_version = getattr(testtools, '__version__', ())
55
if _testtools_version < (0, 9, 5):
56
raise ImportError("need at least testtools 0.9.5: %s is %r"
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
57
59
% (testtools.__file__, _testtools_version))
58
60
from testtools import content
61
62
from bzrlib import (
65
commands as _mod_commands,
75
plugin as _mod_plugin,
82
transport as _mod_transport,
80
import bzrlib.commands
81
import bzrlib.timestamp
83
import bzrlib.inventory
84
import bzrlib.iterablefile
86
87
import bzrlib.lsprof
87
88
except ImportError:
88
89
# lsprof not available
90
from bzrlib.smart import client, request
91
from bzrlib.merge import merge_inner
94
from bzrlib.smart import client, request, server
96
from bzrlib import symbol_versioning
97
from bzrlib.symbol_versioning import (
91
105
from bzrlib.transport import (
95
from bzrlib.symbol_versioning import (
110
import bzrlib.transport
111
from bzrlib.trace import mutter, note
99
112
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
105
122
from bzrlib.ui import NullProgressView
106
123
from bzrlib.ui.text import TextUIFactory
107
from bzrlib.tests.features import _CompatabilityThunkFeature
124
import bzrlib.version_info_formats.format_custom
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
109
127
# Mark this python module as being part of the implementation
110
128
# of unittest: this gives us better tracebacks where the last
122
140
SUBUNIT_SEEK_SET = 0
123
141
SUBUNIT_SEEK_CUR = 1
125
# These are intentionally brought into this namespace. That way plugins, etc
126
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
127
TestSuite = TestUtil.TestSuite
128
TestLoader = TestUtil.TestLoader
130
# Tests should run in a clean and clearly defined environment. The goal is to
131
# keep them isolated from the running environment as mush as possible. The test
132
# framework ensures the variables defined below are set (or deleted if the
133
# value is None) before a test is run and reset to their original value after
134
# the test is run. Generally if some code depends on an environment variable,
135
# the tests should start without this variable in the environment. There are a
136
# few exceptions but you shouldn't violate this rule lightly.
140
'XDG_CONFIG_HOME': None,
141
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
142
# tests do check our impls match APPDATA
143
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
147
'BZREMAIL': None, # may still be present in the environment
148
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
149
'BZR_PROGRESS_BAR': None,
150
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
151
# as a base class instead of TestCaseInTempDir. Tests inheriting from
152
# TestCase should not use disk resources, BZR_LOG is one.
153
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
154
'BZR_PLUGIN_PATH': None,
155
'BZR_DISABLE_PLUGINS': None,
156
'BZR_PLUGINS_AT': None,
157
'BZR_CONCURRENCY': None,
158
# Make sure that any text ui tests are consistent regardless of
159
# the environment the test case is run in; you may want tests that
160
# test other combinations. 'dumb' is a reasonable guess for tests
161
# going to a pipe or a StringIO.
167
'SSH_AUTH_SOCK': None,
177
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
178
# least. If you do (care), please update this comment
182
'BZR_REMOTE_PATH': None,
183
# Generally speaking, we don't want apport reporting on crashes in
184
# the test envirnoment unless we're specifically testing apport,
185
# so that it doesn't leak into the real system environment. We
186
# use an env var so it propagates to subprocesses.
187
'APPORT_DISABLE': '1',
191
def override_os_environ(test, env=None):
192
"""Modify os.environ keeping a copy.
194
:param test: A test instance
196
:param env: A dict containing variable definitions to be installed
199
env = isolated_environ
200
test._original_os_environ = dict([(var, value)
201
for var, value in os.environ.iteritems()])
202
for var, value in env.iteritems():
203
osutils.set_or_unset_env(var, value)
204
if var not in test._original_os_environ:
205
# The var is new, add it with a value of None, so
206
# restore_os_environ will delete it
207
test._original_os_environ[var] = None
210
def restore_os_environ(test):
211
"""Restore os.environ to its original state.
213
:param test: A test instance previously passed to override_os_environ.
215
for var, value in test._original_os_environ.iteritems():
216
# Restore the original value (or delete it if the value has been set to
217
# None in override_os_environ).
218
osutils.set_or_unset_env(var, value)
221
def _clear__type_equality_funcs(test):
222
"""Cleanup bound methods stored on TestCase instances
224
Clear the dict breaking a few (mostly) harmless cycles in the affected
225
unittests released with Python 2.6 and initial Python 2.7 versions.
227
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
228
shipped in Oneiric, an object with no clear method was used, hence the
229
extra complications, see bug 809048 for details.
231
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
232
if type_equality_funcs is not None:
233
tef_clear = getattr(type_equality_funcs, "clear", None)
234
if tef_clear is None:
235
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
236
if tef_instance_dict is not None:
237
tef_clear = tef_instance_dict.clear
238
if tef_clear is not None:
242
class ExtendedTestResult(testtools.TextTestResult):
144
class ExtendedTestResult(unittest._TextTestResult):
243
145
"""Accepts, reports and accumulates the results of running tests.
245
147
Compared to the unittest version this class adds support for
321
218
if failed or errored: self.stream.write(", ")
322
219
self.stream.write("known_failure_count=%d" %
323
220
self.known_failure_count)
324
self.stream.write(")\n")
221
self.stream.writeln(")")
326
223
if self.known_failure_count:
327
self.stream.write("OK (known_failures=%d)\n" %
224
self.stream.writeln("OK (known_failures=%d)" %
328
225
self.known_failure_count)
330
self.stream.write("OK\n")
227
self.stream.writeln("OK")
331
228
if self.skip_count > 0:
332
229
skipped = self.skip_count
333
self.stream.write('%d test%s skipped\n' %
230
self.stream.writeln('%d test%s skipped' %
334
231
(skipped, skipped != 1 and "s" or ""))
335
232
if self.unsupported:
336
233
for feature, count in sorted(self.unsupported.items()):
337
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
338
235
(feature, count))
340
237
ok = self.wasStrictlySuccessful()
342
239
ok = self.wasSuccessful()
343
if self._first_thread_leaker_id:
240
if TestCase._first_thread_leaker_id:
344
241
self.stream.write(
345
242
'%s is leaking threads among %d leaking tests.\n' % (
346
self._first_thread_leaker_id,
347
self._tests_leaking_threads_count))
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
348
245
# We don't report the main thread as an active one.
349
246
self.stream.write(
350
247
'%d non-main threads were left active in the end.\n'
351
% (len(self._active_threads) - 1))
248
% (TestCase._active_threads - 1))
353
250
def getDescription(self, test):
380
276
def _shortened_test_description(self, test):
382
what = re.sub(r'^bzrlib\.tests\.', '', what)
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
385
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
386
# multiple times in a row, because the handler is added for
387
# each test but the container list is shared between cases.
388
# See lp:498869 lp:625574 and lp:637725 for background.
389
def _record_traceback_from_test(self, exc_info):
390
"""Store the traceback from passed exc_info tuple till"""
391
self._traceback_from_test = exc_info[2]
393
281
def startTest(self, test):
394
super(ExtendedTestResult, self).startTest(test)
282
unittest.TestResult.startTest(self, test)
395
283
if self.count == 0:
396
284
self.startTests()
398
285
self.report_test_start(test)
399
286
test.number = self.count
400
287
self._recordTestStartTime()
401
# Make testtools cases give us the real traceback on failure
402
addOnException = getattr(test, "addOnException", None)
403
if addOnException is not None:
404
addOnException(self._record_traceback_from_test)
405
# Only check for thread leaks on bzrlib derived test cases
406
if isinstance(test, TestCase):
407
test.addCleanup(self._check_leaked_threads, test)
409
def stopTest(self, test):
410
super(ExtendedTestResult, self).stopTest(test)
411
# Manually break cycles, means touching various private things but hey
412
getDetails = getattr(test, "getDetails", None)
413
if getDetails is not None:
415
_clear__type_equality_funcs(test)
416
self._traceback_from_test = None
418
289
def startTests(self):
419
self.report_tests_starting()
420
self._active_threads = threading.enumerate()
422
def _check_leaked_threads(self, test):
423
"""See if any threads have leaked since last call
425
A sample of live threads is stored in the _active_threads attribute,
426
when this method runs it compares the current live threads and any not
427
in the previous sample are treated as having leaked.
429
now_active_threads = set(threading.enumerate())
430
threads_leaked = now_active_threads.difference(self._active_threads)
432
self._report_thread_leak(test, threads_leaked, now_active_threads)
433
self._tests_leaking_threads_count += 1
434
if self._first_thread_leaker_id is None:
435
self._first_thread_leaker_id = test.id()
436
self._active_threads = now_active_threads
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
438
308
def _recordTestStartTime(self):
439
309
"""Record that a test has started."""
440
self._start_datetime = self._now()
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
442
319
def addError(self, test, err):
443
320
"""Tell result that test finished with an error.
477
356
self._formatTime(benchmark_time),
479
358
self.report_success(test)
480
super(ExtendedTestResult, self).addSuccess(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
481
361
test._log_contents = ''
483
363
def addExpectedFailure(self, test, err):
484
364
self.known_failure_count += 1
485
365
self.report_known_failure(test, err)
487
def addUnexpectedSuccess(self, test, details=None):
488
"""Tell result the test unexpectedly passed, counting as a failure
490
When the minimum version of testtools required becomes 0.9.8 this
491
can be updated to use the new handling there.
493
super(ExtendedTestResult, self).addFailure(test, details=details)
494
self.failure_count += 1
495
self.report_unexpected_success(test,
496
"".join(details["reason"].iter_text()))
500
367
def addNotSupported(self, test, feature):
501
368
"""The test will not be run because of a missing feature.
539
401
raise errors.BzrError("Unknown whence %r" % whence)
541
def report_tests_starting(self):
542
"""Display information before the test run begins"""
543
if getattr(sys, 'frozen', None) is None:
544
bzr_path = osutils.realpath(sys.argv[0])
546
bzr_path = sys.executable
548
'bzr selftest: %s\n' % (bzr_path,))
551
bzrlib.__path__[0],))
553
' bzr-%s python-%s %s\n' % (
554
bzrlib.version_string,
555
bzrlib._format_version_tuple(sys.version_info),
556
platform.platform(aliased=1),
558
self.stream.write('\n')
560
def report_test_start(self, test):
561
"""Display information on the test just about to be run"""
563
def _report_thread_leak(self, test, leaked_threads, active_threads):
564
"""Display information on a test that leaked one or more threads"""
565
# GZ 2010-09-09: A leak summary reported separately from the general
566
# thread debugging would be nice. Tests under subunit
567
# need something not using stream, perhaps adding a
568
# testtools details object would be fitting.
569
if 'threads' in selftest_debug_flags:
570
self.stream.write('%s is leaking, active is now %d\n' %
571
(test.id(), len(active_threads)))
403
def report_cleaning_up(self):
573
406
def startTestRun(self):
574
407
self.startTime = time.time()
715
551
return '%s%s' % (indent, err[1])
717
553
def report_error(self, test, err):
718
self.stream.write('ERROR %s\n%s\n'
554
self.stream.writeln('ERROR %s\n%s'
719
555
% (self._testTimeString(test),
720
556
self._error_summary(err)))
722
558
def report_failure(self, test, err):
723
self.stream.write(' FAIL %s\n%s\n'
559
self.stream.writeln(' FAIL %s\n%s'
724
560
% (self._testTimeString(test),
725
561
self._error_summary(err)))
727
563
def report_known_failure(self, test, err):
728
self.stream.write('XFAIL %s\n%s\n'
564
self.stream.writeln('XFAIL %s\n%s'
729
565
% (self._testTimeString(test),
730
566
self._error_summary(err)))
732
def report_unexpected_success(self, test, reason):
733
self.stream.write(' FAIL %s\n%s: %s\n'
734
% (self._testTimeString(test),
735
"Unexpected success. Should have failed",
738
568
def report_success(self, test):
739
self.stream.write(' OK %s\n' % self._testTimeString(test))
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
740
570
for bench_called, stats in getattr(test, '_benchcalls', []):
741
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
742
572
stats.pprint(file=self.stream)
743
573
# flush the stream so that we get smooth output. This verbose mode is
744
574
# used to show the output in PQM.
745
575
self.stream.flush()
747
577
def report_skip(self, test, reason):
748
self.stream.write(' SKIP %s\n%s\n'
578
self.stream.writeln(' SKIP %s\n%s'
749
579
% (self._testTimeString(test), reason))
751
581
def report_not_applicable(self, test, reason):
752
self.stream.write(' N/A %s\n %s\n'
582
self.stream.writeln(' N/A %s\n %s'
753
583
% (self._testTimeString(test), reason))
755
585
def report_unsupported(self, test, feature):
756
586
"""test cannot be run because feature is missing."""
757
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
758
588
%(self._testTimeString(test), feature))
1025
827
self._track_transports()
1026
828
self._track_locks()
1027
829
self._clear_debug_flags()
1028
# Isolate global verbosity level, to make sure it's reproducible
1029
# between tests. We should get rid of this altogether: bug 656694. --
1031
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1032
self._log_files = set()
1033
# Each key in the ``_counters`` dict holds a value for a different
1034
# counter. When the test ends, addDetail() should be used to output the
1035
# counter values. This happens in install_counter_hook().
1037
if 'config_stats' in selftest_debug_flags:
1038
self._install_config_stats_hooks()
1039
# Do not use i18n for tests (unless the test reverses this)
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
1042
833
def debug(self):
1043
834
# debug a frame up.
1045
# The sys preserved stdin/stdout should allow blackbox tests debugging
1046
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1047
).set_trace(sys._getframe().f_back)
1049
def discardDetail(self, name):
1050
"""Extend the addDetail, getDetails api so we can remove a detail.
1052
eg. bzr always adds the 'log' detail at startup, but we don't want to
1053
include it for skipped, xfail, etc tests.
1055
It is safe to call this for a detail that doesn't exist, in case this
1056
gets called multiple times.
1058
# We cheat. details is stored in __details which means we shouldn't
1059
# touch it. but getDetails() returns the dict directly, so we can
1061
details = self.getDetails()
1065
def install_counter_hook(self, hooks, name, counter_name=None):
1066
"""Install a counting hook.
1068
Any hook can be counted as long as it doesn't need to return a value.
1070
:param hooks: Where the hook should be installed.
1072
:param name: The hook name that will be counted.
1074
:param counter_name: The counter identifier in ``_counters``, defaults
1077
_counters = self._counters # Avoid closing over self
1078
if counter_name is None:
1080
if _counters.has_key(counter_name):
1081
raise AssertionError('%s is already used as a counter name'
1083
_counters[counter_name] = 0
1084
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1085
lambda: ['%d' % (_counters[counter_name],)]))
1086
def increment_counter(*args, **kwargs):
1087
_counters[counter_name] += 1
1088
label = 'count %s calls' % (counter_name,)
1089
hooks.install_named_hook(name, increment_counter, label)
1090
self.addCleanup(hooks.uninstall_named_hook, name, label)
1092
def _install_config_stats_hooks(self):
1093
"""Install config hooks to count hook calls.
1096
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1097
self.install_counter_hook(config.ConfigHooks, hook_name,
1098
'config.%s' % (hook_name,))
1100
# The OldConfigHooks are private and need special handling to protect
1101
# against recursive tests (tests that run other tests), so we just do
1102
# manually what registering them into _builtin_known_hooks will provide
1104
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1105
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1106
self.install_counter_hook(config.OldConfigHooks, hook_name,
1107
'old_config.%s' % (hook_name,))
836
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
1109
853
def _clear_debug_flags(self):
1110
854
"""Prevent externally set debug flags affecting tests.
1591
1322
self.assertEqual(expected_docstring, obj.__doc__)
1593
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1594
1324
def failUnlessExists(self, path):
1595
return self.assertPathExists(path)
1597
def assertPathExists(self, path):
1598
1325
"""Fail unless path or paths, which may be abs or relative, exist."""
1599
1326
if not isinstance(path, basestring):
1601
self.assertPathExists(p)
1328
self.failUnlessExists(p)
1603
self.assertTrue(osutils.lexists(path),
1604
path + " does not exist")
1330
self.failUnless(osutils.lexists(path),path+" does not exist")
1606
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1607
1332
def failIfExists(self, path):
1608
return self.assertPathDoesNotExist(path)
1610
def assertPathDoesNotExist(self, path):
1611
1333
"""Fail if path or paths, which may be abs or relative, exist."""
1612
1334
if not isinstance(path, basestring):
1614
self.assertPathDoesNotExist(p)
1336
self.failIfExists(p)
1616
self.assertFalse(osutils.lexists(path),
1338
self.failIf(osutils.lexists(path),path+" exists")
1619
1340
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1620
1341
"""A helper for callDeprecated and applyDeprecated.
1733
1453
def _startLogFile(self):
1734
"""Setup a in-memory target for bzr and testcase log messages"""
1735
pseudo_log_file = StringIO()
1736
def _get_log_contents_for_weird_testtools_api():
1737
return [pseudo_log_file.getvalue().decode(
1738
"utf-8", "replace").encode("utf-8")]
1739
self.addDetail("log", content.Content(content.ContentType("text",
1740
"plain", {"charset": "utf8"}),
1741
_get_log_contents_for_weird_testtools_api))
1742
self._log_file = pseudo_log_file
1743
self._log_memento = trace.push_log_file(self._log_file)
1454
"""Send bzr and test log messages to a temporary file.
1456
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1744
1462
self.addCleanup(self._finishLogFile)
1746
1464
def _finishLogFile(self):
1747
"""Flush and dereference the in-memory log for this testcase"""
1748
if trace._trace_file:
1465
"""Finished with the log file.
1467
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1749
1470
# flush the log file, to get all content
1750
trace._trace_file.flush()
1751
trace.pop_log_file(self._log_memento)
1752
# The logging module now tracks references for cleanup so discard ours
1753
del self._log_memento
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1755
1476
def thisFailsStrictLockCheck(self):
1756
1477
"""It is known that this test would fail with -Dstrict_locks.
1781
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1783
1510
# The actual value is captured by the call below
1784
value = getattr(obj, attr_name, _unitialized_attr)
1785
if value is _unitialized_attr:
1786
# When the test completes, the attribute should not exist, but if
1787
# we aren't setting a value, we don't need to do anything.
1788
if new is not _unitialized_attr:
1789
self.addCleanup(delattr, obj, attr_name)
1791
self.addCleanup(setattr, obj, attr_name, value)
1511
self.addCleanup(setattr, obj, attr_name, value)
1792
1512
if new is not _unitialized_attr:
1793
1513
setattr(obj, attr_name, new)
1796
def overrideEnv(self, name, new):
1797
"""Set an environment variable, and reset it after the test.
1799
:param name: The environment variable name.
1801
:param new: The value to set the variable to. If None, the
1802
variable is deleted from the environment.
1804
:returns: The actual variable value.
1806
value = osutils.set_or_unset_env(name, new)
1807
self.addCleanup(osutils.set_or_unset_env, name, value)
1810
def recordCalls(self, obj, attr_name):
1811
"""Monkeypatch in a wrapper that will record calls.
1813
The monkeypatch is automatically removed when the test concludes.
1815
:param obj: The namespace holding the reference to be replaced;
1816
typically a module, class, or object.
1817
:param attr_name: A string for the name of the attribute to
1819
:returns: A list that will be extended with one item every time the
1820
function is called, with a tuple of (args, kwargs).
1824
def decorator(*args, **kwargs):
1825
calls.append((args, kwargs))
1826
return orig(*args, **kwargs)
1827
orig = self.overrideAttr(obj, attr_name, decorator)
1830
1516
def _cleanEnvironment(self):
1831
for name, value in isolated_environ.iteritems():
1832
self.overrideEnv(name, value)
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1519
'HOME': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
self.addCleanup(self._restoreEnvironment)
1567
for name, value in new_env.iteritems():
1568
self._captureVar(name, value)
1570
def _captureVar(self, name, newvalue):
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1834
1578
def _restoreHooks(self):
1835
1579
for klass, (name, hooks) in self._preserved_hooks.items():
1836
1580
setattr(klass, name, hooks)
1837
self._preserved_hooks.clear()
1838
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1839
self._preserved_lazy_hooks.clear()
1841
1582
def knownFailure(self, reason):
1842
"""Declare that this test fails for a known reason
1844
Tests that are known to fail should generally be using expectedFailure
1845
with an appropriate reverse assertion if a change could cause the test
1846
to start passing. Conversely if the test has no immediate prospect of
1847
succeeding then using skip is more suitable.
1849
When this method is called while an exception is being handled, that
1850
traceback will be used, otherwise a new exception will be thrown to
1851
provide one but won't be reported.
1853
self._add_reason(reason)
1855
exc_info = sys.exc_info()
1856
if exc_info != (None, None, None):
1857
self._report_traceback(exc_info)
1860
raise self.failureException(reason)
1861
except self.failureException:
1862
exc_info = sys.exc_info()
1863
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1864
raise testtools.testcase._ExpectedFailure(exc_info)
1868
def _suppress_log(self):
1869
"""Remove the log info from details."""
1870
self.discardDetail('log')
1583
"""This test has failed for some known reason."""
1584
raise KnownFailure(reason)
1872
1586
def _do_skip(self, result, reason):
1873
self._suppress_log()
1874
1587
addSkip = getattr(result, 'addSkip', None)
1875
1588
if not callable(addSkip):
1876
1589
result.addSuccess(result)
1955
1645
self._benchtime += time.time() - start
1957
1647
def log(self, *args):
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1960
1729
def get_log(self):
1961
1730
"""Get a unicode string containing the log from bzrlib.trace.
2177
1945
variables. A value of None will unset the env variable.
2178
1946
The values must be strings. The change will only occur in the
2179
1947
child, so you don't need to fix the environment after running.
2180
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2181
doesn't support signalling subprocesses.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2182
1950
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2183
:param stderr: file to use for the subprocess's stderr. Valid values
2184
are those valid for the stderr argument of `subprocess.Popen`.
2185
Default value is ``subprocess.PIPE``.
2187
1952
:returns: Popen object for the started process.
2189
1954
if skip_if_plan_to_signal:
2190
if os.name != "posix":
2191
raise TestSkipped("Sending signals not supported")
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
2193
1958
if env_changes is None:
2194
1959
env_changes = {}
2195
# Because $HOME is set to a tempdir for the context of a test, modules
2196
# installed in the user dir will not be found unless $PYTHONUSERBASE
2197
# gets set to the computed directory of this parent process.
2198
if site.USER_BASE is not None:
2199
env_changes["PYTHONUSERBASE"] = site.USER_BASE
2202
1962
def cleanup_environment():
2242
def _add_subprocess_log(self, log_file_path):
2243
if len(self._log_files) == 0:
2244
# Register an addCleanup func. We do this on the first call to
2245
# _add_subprocess_log rather than in TestCase.setUp so that this
2246
# addCleanup is registered after any cleanups for tempdirs that
2247
# subclasses might create, which will probably remove the log file
2249
self.addCleanup(self._subprocess_log_cleanup)
2250
# self._log_files is a set, so if a log file is reused we won't grab it
2252
self._log_files.add(log_file_path)
2254
def _subprocess_log_cleanup(self):
2255
for count, log_file_path in enumerate(self._log_files):
2256
# We use buffer_now=True to avoid holding the file open beyond
2257
# the life of this function, which might interfere with e.g.
2258
# cleaning tempdirs on Windows.
2259
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2260
#detail_content = content.content_from_file(
2261
# log_file_path, buffer_now=True)
2262
with open(log_file_path, 'rb') as log_file:
2263
log_file_bytes = log_file.read()
2264
detail_content = content.Content(content.ContentType("text",
2265
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2266
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2269
1997
def _popen(self, *args, **kwargs):
2270
1998
"""Place a call to Popen.
2272
2000
Allows tests to override this method to intercept the calls made to
2273
2001
Popen for introspection.
2275
return subprocess.Popen(*args, **kwargs)
2003
return Popen(*args, **kwargs)
2277
2005
def get_source_path(self):
2278
2006
"""Return the path of the directory containing bzrlib."""
2308
2036
if retcode is not None and retcode != process.returncode:
2309
2037
if process_args is None:
2310
2038
process_args = "(unknown args)"
2311
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2312
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2313
2041
self.fail('Command bzr %s failed with retcode %s != %s'
2314
2042
% (process_args, retcode, process.returncode))
2315
2043
return [out, err]
2317
def check_tree_shape(self, tree, shape):
2318
"""Compare a tree to a list of expected names.
2045
def check_inventory_shape(self, inv, shape):
2046
"""Compare an inventory to a list of expected names.
2320
2048
Fail if they are not precisely equal.
2323
2051
shape = list(shape) # copy
2324
for path, ie in tree.iter_entries_by_dir():
2052
for path, ie in inv.entries():
2325
2053
name = path.replace('\\', '/')
2326
2054
if ie.kind == 'directory':
2327
2055
name = name + '/'
2329
pass # ignore root entry
2331
2057
shape.remove(name)
2333
2059
extras.append(name)
2425
2149
class TestCaseWithMemoryTransport(TestCase):
2426
2150
"""Common test class for tests that do not need disk resources.
2428
Tests that need disk resources should derive from TestCaseInTempDir
2429
orTestCaseWithTransport.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2431
2154
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2433
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2434
2157
a directory which does not exist. This serves to help ensure test isolation
2435
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2436
must exist. However, TestCaseWithMemoryTransport does not offer local file
2437
defaults for the transport in tests, nor does it obey the command line
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2438
2161
override, so tests that accidentally write to the common directory should
2441
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2442
``.bzr`` directory that stops us ascending higher into the filesystem.
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2445
2168
TEST_ROOT = None
2455
2178
self.transport_readonly_server = None
2456
2179
self.__vfs_server = None
2459
super(TestCaseWithMemoryTransport, self).setUp()
2461
def _add_disconnect_cleanup(transport):
2462
"""Schedule disconnection of given transport at test cleanup
2464
This needs to happen for all connected transports or leaks occur.
2466
Note reconnections may mean we call disconnect multiple times per
2467
transport which is suboptimal but seems harmless.
2469
self.addCleanup(transport.disconnect)
2471
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2472
_add_disconnect_cleanup, None)
2474
self._make_test_root()
2475
self.addCleanup(os.chdir, os.getcwdu())
2476
self.makeAndChdirToTestDir()
2477
self.overrideEnvironmentForTesting()
2478
self.__readonly_server = None
2479
self.__server = None
2480
self.reduceLockdirTimeout()
2481
# Each test may use its own config files even if the local config files
2482
# don't actually exist. They'll rightly fail if they try to create them
2484
self.overrideAttr(config, '_shared_stores', {})
2486
2181
def get_transport(self, relpath=None):
2487
2182
"""Return a writeable transport.
2694
2375
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2695
2376
self.permit_dir(self.test_dir)
2697
def make_branch(self, relpath, format=None, name=None):
2378
def make_branch(self, relpath, format=None):
2698
2379
"""Create a branch on the transport at relpath."""
2699
2380
repo = self.make_repository(relpath, format=format)
2700
return repo.bzrdir.create_branch(append_revisions_only=False,
2703
def get_default_format(self):
2706
def resolve_format(self, format):
2707
"""Resolve an object to a ControlDir format object.
2709
The initial format object can either already be
2710
a ControlDirFormat, None (for the default format),
2711
or a string with the name of the control dir format.
2713
:param format: Object to resolve
2714
:return A ControlDirFormat instance
2717
format = self.get_default_format()
2718
if isinstance(format, basestring):
2719
format = controldir.format_registry.make_bzrdir(format)
2381
return repo.bzrdir.create_branch()
2722
2383
def make_bzrdir(self, relpath, format=None):
2724
2385
# might be a relative or absolute path
2725
2386
maybe_a_url = self.get_url(relpath)
2726
2387
segments = maybe_a_url.rsplit('/', 1)
2727
t = _mod_transport.get_transport(maybe_a_url)
2388
t = get_transport(maybe_a_url)
2728
2389
if len(segments) > 1 and segments[-1] not in ('', '.'):
2729
2390
t.ensure_base()
2730
format = self.resolve_format(format)
2393
if isinstance(format, basestring):
2394
format = bzrdir.format_registry.make_bzrdir(format)
2731
2395
return format.initialize_on_transport(t)
2732
2396
except errors.UninitializableFormat:
2733
2397
raise TestSkipped("Format %s is not initializable." % format)
2735
def make_repository(self, relpath, shared=None, format=None):
2399
def make_repository(self, relpath, shared=False, format=None):
2736
2400
"""Create a repository on our default transport at relpath.
2738
2402
Note that relpath must be a relative path, not a full url.
3440
3102
"""A decorator which excludes test matching an exclude pattern."""
3442
3104
def __init__(self, suite, exclude_pattern):
3443
super(ExcludeDecorator, self).__init__(
3444
exclude_tests_by_re(suite, exclude_pattern))
3105
TestDecorator.__init__(self, suite)
3106
self.exclude_pattern = exclude_pattern
3107
self.excluded = False
3111
return iter(self._tests)
3112
self.excluded = True
3113
suite = exclude_tests_by_re(self, self.exclude_pattern)
3115
self.addTests(suite)
3116
return iter(self._tests)
3447
3119
class FilterTestsDecorator(TestDecorator):
3448
3120
"""A decorator which filters tests to those matching a pattern."""
3450
3122
def __init__(self, suite, pattern):
3451
super(FilterTestsDecorator, self).__init__(
3452
filter_suite_by_re(suite, pattern))
3123
TestDecorator.__init__(self, suite)
3124
self.pattern = pattern
3125
self.filtered = False
3129
return iter(self._tests)
3130
self.filtered = True
3131
suite = filter_suite_by_re(self, self.pattern)
3133
self.addTests(suite)
3134
return iter(self._tests)
3455
3137
class RandomDecorator(TestDecorator):
3456
3138
"""A decorator which randomises the order of its tests."""
3458
3140
def __init__(self, suite, random_seed, stream):
3459
random_seed = self.actual_seed(random_seed)
3460
stream.write("Randomizing test order using seed %s\n\n" %
3141
TestDecorator.__init__(self, suite)
3142
self.random_seed = random_seed
3143
self.randomised = False
3144
self.stream = stream
3148
return iter(self._tests)
3149
self.randomised = True
3150
self.stream.write("Randomizing test order using seed %s\n\n" %
3151
(self.actual_seed()))
3462
3152
# Initialise the random number generator.
3463
random.seed(random_seed)
3464
super(RandomDecorator, self).__init__(randomize_suite(suite))
3153
random.seed(self.actual_seed())
3154
suite = randomize_suite(self)
3156
self.addTests(suite)
3157
return iter(self._tests)
3467
def actual_seed(seed):
3159
def actual_seed(self):
3160
if self.random_seed == "now":
3469
3161
# We convert the seed to a long to make it reuseable across
3470
3162
# invocations (because the user can reenter it).
3471
return long(time.time())
3163
self.random_seed = long(time.time())
3473
3165
# Convert the seed to a long if we can
3476
except (TypeError, ValueError):
3167
self.random_seed = long(self.random_seed)
3170
return self.random_seed
3481
3173
class TestFirstDecorator(TestDecorator):
3482
3174
"""A decorator which moves named tests to the front."""
3484
3176
def __init__(self, suite, pattern):
3485
super(TestFirstDecorator, self).__init__()
3486
self.addTests(split_suite_by_re(suite, pattern))
3177
TestDecorator.__init__(self, suite)
3178
self.pattern = pattern
3179
self.filtered = False
3183
return iter(self._tests)
3184
self.filtered = True
3185
suites = split_suite_by_re(self, self.pattern)
3187
self.addTests(suites)
3188
return iter(self._tests)
3489
3191
def partition_tests(suite, count):
3490
3192
"""Partition suite into count lists of tests."""
3491
# This just assigns tests in a round-robin fashion. On one hand this
3492
# splits up blocks of related tests that might run faster if they shared
3493
# resources, but on the other it avoids assigning blocks of slow tests to
3494
# just one partition. So the slowest partition shouldn't be much slower
3496
partitions = [list() for i in range(count)]
3497
tests = iter_suite_tests(suite)
3498
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3499
partition.append(test)
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3503
3204
def workaround_zealous_crypto_random():
3534
3235
ProtocolTestCase.run(self, result)
3536
pid, status = os.waitpid(self.pid, 0)
3537
# GZ 2011-10-18: If status is nonzero, should report to the result
3538
# that something went wrong.
3237
os.waitpid(self.pid, 0)
3540
3239
test_blocks = partition_tests(suite, concurrency)
3541
# Clear the tests from the original suite so it doesn't keep them alive
3542
suite._tests[:] = []
3543
3240
for process_tests in test_blocks:
3544
process_suite = TestUtil.TestSuite(process_tests)
3545
# Also clear each split list so new suite has only reference
3546
process_tests[:] = []
3241
process_suite = TestSuite()
3242
process_suite.addTests(process_tests)
3547
3243
c2pread, c2pwrite = os.pipe()
3548
3244
pid = os.fork()
3246
workaround_zealous_crypto_random()
3551
stream = os.fdopen(c2pwrite, 'wb', 1)
3552
workaround_zealous_crypto_random()
3553
3248
os.close(c2pread)
3554
3249
# Leave stderr and stdout open so we can see test noise
3555
3250
# Close stdin so that the child goes away if it decides to
3556
3251
# read from stdin (otherwise its a roulette to see what
3557
3252
# child actually gets keystrokes for pdb etc).
3558
3253
sys.stdin.close()
3255
stream = os.fdopen(c2pwrite, 'wb', 1)
3559
3256
subunit_result = AutoTimingTestResultDecorator(
3560
SubUnitBzrProtocolClient(stream))
3257
TestProtocolClient(stream))
3561
3258
process_suite.run(subunit_result)
3563
# Try and report traceback on stream, but exit with error even
3564
# if stream couldn't be created or something else goes wrong.
3565
# The traceback is formatted to a string and written in one go
3566
# to avoid interleaving lines from multiple failing children.
3568
stream.write(traceback.format_exc())
3573
3262
os.close(c2pwrite)
3574
3263
stream = os.fdopen(c2pread, 'rb', 1)
3638
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3325
class ForwardingResult(unittest.TestResult):
3327
def __init__(self, target):
3328
unittest.TestResult.__init__(self)
3329
self.result = target
3331
def startTest(self, test):
3332
self.result.startTest(test)
3334
def stopTest(self, test):
3335
self.result.stopTest(test)
3337
def startTestRun(self):
3338
self.result.startTestRun()
3340
def stopTestRun(self):
3341
self.result.stopTestRun()
3343
def addSkip(self, test, reason):
3344
self.result.addSkip(test, reason)
3346
def addSuccess(self, test):
3347
self.result.addSuccess(test)
3349
def addError(self, test, err):
3350
self.result.addError(test, err)
3352
def addFailure(self, test, err):
3353
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3357
class ProfileResult(ForwardingResult):
3639
3358
"""Generate profiling data for all activity between start and success.
3641
3360
The profile data is appended to the test's _benchcalls attribute and can
3979
3686
'bzrlib.tests.test_commit_merge',
3980
3687
'bzrlib.tests.test_config',
3981
3688
'bzrlib.tests.test_conflicts',
3982
'bzrlib.tests.test_controldir',
3983
3689
'bzrlib.tests.test_counted_lock',
3984
3690
'bzrlib.tests.test_crash',
3985
3691
'bzrlib.tests.test_decorators',
3986
3692
'bzrlib.tests.test_delta',
3987
3693
'bzrlib.tests.test_debug',
3694
'bzrlib.tests.test_deprecated_graph',
3988
3695
'bzrlib.tests.test_diff',
3989
3696
'bzrlib.tests.test_directory_service',
3990
3697
'bzrlib.tests.test_dirstate',
3991
3698
'bzrlib.tests.test_email_message',
3992
3699
'bzrlib.tests.test_eol_filters',
3993
3700
'bzrlib.tests.test_errors',
3994
'bzrlib.tests.test_estimate_compressed_size',
3995
3701
'bzrlib.tests.test_export',
3996
'bzrlib.tests.test_export_pot',
3997
3702
'bzrlib.tests.test_extract',
3998
'bzrlib.tests.test_features',
3999
3703
'bzrlib.tests.test_fetch',
4000
'bzrlib.tests.test_fixtures',
4001
3704
'bzrlib.tests.test_fifo_cache',
4002
3705
'bzrlib.tests.test_filters',
4003
'bzrlib.tests.test_filter_tree',
4004
3706
'bzrlib.tests.test_ftp_transport',
4005
3707
'bzrlib.tests.test_foreign',
4006
3708
'bzrlib.tests.test_generate_docs',
4441
4101
if test_id != None:
4442
4102
ui.ui_factory.clear_term()
4443
4103
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4444
# Ugly, but the last thing we want here is fail, so bear with it.
4445
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4446
).encode('ascii', 'replace')
4447
4104
sys.stderr.write('Unable to remove testing dir %s\n%s'
4448
% (os.path.basename(dirname), printable_e))
4105
% (os.path.basename(dirname), e))
4108
class Feature(object):
4109
"""An operating system Feature."""
4112
self._available = None
4114
def available(self):
4115
"""Is the feature available?
4117
:return: True if the feature is available.
4119
if self._available is None:
4120
self._available = self._probe()
4121
return self._available
4124
"""Implement this method in concrete features.
4126
:return: True if the feature is available.
4128
raise NotImplementedError
4131
if getattr(self, 'feature_name', None):
4132
return self.feature_name()
4133
return self.__class__.__name__
4136
class _SymlinkFeature(Feature):
4139
return osutils.has_symlinks()
4141
def feature_name(self):
4144
SymlinkFeature = _SymlinkFeature()
4147
class _HardlinkFeature(Feature):
4150
return osutils.has_hardlinks()
4152
def feature_name(self):
4155
HardlinkFeature = _HardlinkFeature()
4158
class _OsFifoFeature(Feature):
4161
return getattr(os, 'mkfifo', None)
4163
def feature_name(self):
4164
return 'filesystem fifos'
4166
OsFifoFeature = _OsFifoFeature()
4169
class _UnicodeFilenameFeature(Feature):
4170
"""Does the filesystem support Unicode filenames?"""
4174
# Check for character combinations unlikely to be covered by any
4175
# single non-unicode encoding. We use the characters
4176
# - greek small letter alpha (U+03B1) and
4177
# - braille pattern dots-123456 (U+283F).
4178
os.stat(u'\u03b1\u283f')
4179
except UnicodeEncodeError:
4181
except (IOError, OSError):
4182
# The filesystem allows the Unicode filename but the file doesn't
4186
# The filesystem allows the Unicode filename and the file exists,
4190
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4193
class _CompatabilityThunkFeature(Feature):
4194
"""This feature is just a thunk to another feature.
4196
It issues a deprecation warning if it is accessed, to let you know that you
4197
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4202
super(_CompatabilityThunkFeature, self).__init__()
4203
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4208
self._replacement_name = replacement_name
4209
self._dep_version = dep_version
4210
self._feature = None
4213
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4227
return self._feature._probe()
4230
class ModuleAvailableFeature(Feature):
4231
"""This is a feature than describes a module we want to be available.
4233
Declare the name of the module in __init__(), and then after probing, the
4234
module will be available as 'self.module'.
4236
:ivar module: The module if it is available, else None.
4239
def __init__(self, module_name):
4240
super(ModuleAvailableFeature, self).__init__()
4241
self.module_name = module_name
4245
self._module = __import__(self.module_name, {}, {}, [''])
4252
if self.available(): # Make sure the probe has been done
4256
def feature_name(self):
4257
return self.module_name
4260
# This is kept here for compatibility, it is recommended to use
4261
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4451
4267
def probe_unicode_in_user_encoding():
4300
class _HTTPSServerFeature(Feature):
4301
"""Some tests want an https Server, check if one is available.
4303
Right now, the only way this is available is under python2.6 which provides
4314
def feature_name(self):
4315
return 'HTTPSServer'
4318
HTTPSServerFeature = _HTTPSServerFeature()
4321
class _UnicodeFilename(Feature):
4322
"""Does the filesystem support Unicode filenames?"""
4327
except UnicodeEncodeError:
4329
except (IOError, OSError):
4330
# The filesystem allows the Unicode filename but the file doesn't
4334
# The filesystem allows the Unicode filename and the file exists,
4338
UnicodeFilename = _UnicodeFilename()
4341
class _UTF8Filesystem(Feature):
4342
"""Is the filesystem UTF-8?"""
4345
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4349
UTF8Filesystem = _UTF8Filesystem()
4352
class _BreakinFeature(Feature):
4353
"""Does this platform support the breakin feature?"""
4356
from bzrlib import breakin
4357
if breakin.determine_signal() is None:
4359
if sys.platform == 'win32':
4360
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4361
# We trigger SIGBREAK via a Console api so we need ctypes to
4362
# access the function
4369
def feature_name(self):
4370
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4373
BreakinFeature = _BreakinFeature()
4376
class _CaseInsCasePresFilenameFeature(Feature):
4377
"""Is the file-system case insensitive, but case-preserving?"""
4380
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4382
# first check truly case-preserving for created files, then check
4383
# case insensitive when opening existing files.
4384
name = osutils.normpath(name)
4385
base, rel = osutils.split(name)
4386
found_rel = osutils.canonical_relpath(base, name)
4387
return (found_rel == rel
4388
and os.path.isfile(name.upper())
4389
and os.path.isfile(name.lower()))
4394
def feature_name(self):
4395
return "case-insensitive case-preserving filesystem"
4397
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4400
class _CaseInsensitiveFilesystemFeature(Feature):
4401
"""Check if underlying filesystem is case-insensitive but *not* case
4404
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4405
# more likely to be case preserving, so this case is rare.
4408
if CaseInsCasePresFilenameFeature.available():
4411
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4412
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4413
TestCaseWithMemoryTransport.TEST_ROOT = root
4415
root = TestCaseWithMemoryTransport.TEST_ROOT
4416
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4418
name_a = osutils.pathjoin(tdir, 'a')
4419
name_A = osutils.pathjoin(tdir, 'A')
4421
result = osutils.isdir(name_A)
4422
_rmtree_temp_dir(tdir)
4425
def feature_name(self):
4426
return 'case-insensitive filesystem'
4428
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4484
4452
# Only define SubUnitBzrRunner if subunit is available.
4486
4454
from subunit import TestProtocolClient
4487
4455
from subunit.test_results import AutoTimingTestResultDecorator
4488
class SubUnitBzrProtocolClient(TestProtocolClient):
4490
def stopTest(self, test):
4491
super(SubUnitBzrProtocolClient, self).stopTest(test)
4492
_clear__type_equality_funcs(test)
4494
def addSuccess(self, test, details=None):
4495
# The subunit client always includes the details in the subunit
4496
# stream, but we don't want to include it in ours.
4497
if details is not None and 'log' in details:
4499
return super(SubUnitBzrProtocolClient, self).addSuccess(
4502
4456
class SubUnitBzrRunner(TextTestRunner):
4503
4457
def run(self, test):
4504
4458
result = AutoTimingTestResultDecorator(
4505
SubUnitBzrProtocolClient(self.stream))
4459
TestProtocolClient(self.stream))
4506
4460
test.run(result)
4508
4462
except ImportError:
4512
# API compatibility for old plugins; see bug 892622.
4515
'HTTPServerFeature',
4516
'ModuleAvailableFeature',
4517
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4518
'OsFifoFeature', 'UnicodeFilenameFeature',
4519
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4520
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4521
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4522
'posix_permissions_feature',
4524
globals()[name] = _CompatabilityThunkFeature(
4525
symbol_versioning.deprecated_in((2, 5, 0)),
4526
'bzrlib.tests', name,
4527
name, 'bzrlib.tests.features')
4530
for (old_name, new_name) in [
4531
('UnicodeFilename', 'UnicodeFilenameFeature'),
4533
globals()[name] = _CompatabilityThunkFeature(
4534
symbol_versioning.deprecated_in((2, 5, 0)),
4535
'bzrlib.tests', old_name,
4536
new_name, 'bzrlib.tests.features')
4465
class _PosixPermissionsFeature(Feature):
4469
# create temporary file and check if specified perms are maintained.
4472
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4473
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4476
os.chmod(name, write_perms)
4478
read_perms = os.stat(name).st_mode & 0777
4480
return (write_perms == read_perms)
4482
return (os.name == 'posix') and has_perms()
4484
def feature_name(self):
4485
return 'POSIX permissions support'
4487
posix_permissions_feature = _PosixPermissionsFeature()