50
# nb: check this before importing anything else from within it
51
_testtools_version = getattr(testtools, '__version__', ())
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
54
% (testtools.__file__, _testtools_version))
55
54
from testtools import content
58
56
from bzrlib import (
62
commands as _mod_commands,
71
plugin as _mod_plugin,
78
transport as _mod_transport,
74
import bzrlib.commands
75
import bzrlib.timestamp
77
import bzrlib.inventory
78
import bzrlib.iterablefile
82
81
import bzrlib.lsprof
83
82
except ImportError:
84
83
# lsprof not available
86
from bzrlib.smart import client, request
87
from bzrlib.transport import (
85
from bzrlib.merge import merge_inner
88
from bzrlib.smart import client, request, server
90
from bzrlib import symbol_versioning
91
91
from bzrlib.symbol_versioning import (
92
93
deprecated_function,
95
from bzrlib.tests import (
98
from bzrlib.transport import get_transport, pathfilter
99
import bzrlib.transport
100
from bzrlib.transport.local import LocalURLServer
101
from bzrlib.transport.memory import MemoryServer
102
from bzrlib.transport.readonly import ReadonlyServer
103
from bzrlib.trace import mutter, note
104
from bzrlib.tests import TestUtil
105
from bzrlib.tests.http_server import HttpServer
106
from bzrlib.tests.TestUtil import (
110
from bzrlib.tests.treeshape import build_tree_contents
100
111
from bzrlib.ui import NullProgressView
101
112
from bzrlib.ui.text import TextUIFactory
113
import bzrlib.version_info_formats.format_custom
114
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
103
116
# Mark this python module as being part of the implementation
104
117
# of unittest: this gives us better tracebacks where the last
105
118
# shown frame is the test code, not our assertXYZ.
108
default_transport = test_server.LocalURLServer
111
_unitialized_attr = object()
112
"""A sentinel needed to act as a default value in a method signature."""
121
default_transport = LocalURLServer
115
123
# Subunit result codes, defined here to prevent a hard dependency on subunit.
116
124
SUBUNIT_SEEK_SET = 0
117
125
SUBUNIT_SEEK_CUR = 1
119
# These are intentionally brought into this namespace. That way plugins, etc
120
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
121
TestSuite = TestUtil.TestSuite
122
TestLoader = TestUtil.TestLoader
124
# Tests should run in a clean and clearly defined environment. The goal is to
125
# keep them isolated from the running environment as mush as possible. The test
126
# framework ensures the variables defined below are set (or deleted if the
127
# value is None) before a test is run and reset to their original value after
128
# the test is run. Generally if some code depends on an environment variable,
129
# the tests should start without this variable in the environment. There are a
130
# few exceptions but you shouldn't violate this rule lightly.
134
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
135
# tests do check our impls match APPDATA
136
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
140
'BZREMAIL': None, # may still be present in the environment
141
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
142
'BZR_PROGRESS_BAR': None,
143
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
144
# as a base class instead of TestCaseInTempDir. Tests inheriting from
145
# TestCase should not use disk resources, BZR_LOG is one.
146
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
147
'BZR_PLUGIN_PATH': None,
148
'BZR_DISABLE_PLUGINS': None,
149
'BZR_PLUGINS_AT': None,
150
'BZR_CONCURRENCY': None,
151
# Make sure that any text ui tests are consistent regardless of
152
# the environment the test case is run in; you may want tests that
153
# test other combinations. 'dumb' is a reasonable guess for tests
154
# going to a pipe or a StringIO.
160
'SSH_AUTH_SOCK': None,
170
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
171
# least. If you do (care), please update this comment
175
'BZR_REMOTE_PATH': None,
176
# Generally speaking, we don't want apport reporting on crashes in
177
# the test envirnoment unless we're specifically testing apport,
178
# so that it doesn't leak into the real system environment. We
179
# use an env var so it propagates to subprocesses.
180
'APPORT_DISABLE': '1',
184
def override_os_environ(test, env=None):
185
"""Modify os.environ keeping a copy.
187
:param test: A test instance
189
:param env: A dict containing variable definitions to be installed
192
env = isolated_environ
193
test._original_os_environ = dict([(var, value)
194
for var, value in os.environ.iteritems()])
195
for var, value in env.iteritems():
196
osutils.set_or_unset_env(var, value)
197
if var not in test._original_os_environ:
198
# The var is new, add it with a value of None, so
199
# restore_os_environ will delete it
200
test._original_os_environ[var] = None
203
def restore_os_environ(test):
204
"""Restore os.environ to its original state.
206
:param test: A test instance previously passed to override_os_environ.
208
for var, value in test._original_os_environ.iteritems():
209
# Restore the original value (or delete it if the value has been set to
210
# None in override_os_environ).
211
osutils.set_or_unset_env(var, value)
214
class ExtendedTestResult(testtools.TextTestResult):
128
class ExtendedTestResult(unittest._TextTestResult):
215
129
"""Accepts, reports and accumulates the results of running tests.
217
131
Compared to the unittest version this class adds support for
293
202
if failed or errored: self.stream.write(", ")
294
203
self.stream.write("known_failure_count=%d" %
295
204
self.known_failure_count)
296
self.stream.write(")\n")
205
self.stream.writeln(")")
298
207
if self.known_failure_count:
299
self.stream.write("OK (known_failures=%d)\n" %
208
self.stream.writeln("OK (known_failures=%d)" %
300
209
self.known_failure_count)
302
self.stream.write("OK\n")
211
self.stream.writeln("OK")
303
212
if self.skip_count > 0:
304
213
skipped = self.skip_count
305
self.stream.write('%d test%s skipped\n' %
214
self.stream.writeln('%d test%s skipped' %
306
215
(skipped, skipped != 1 and "s" or ""))
307
216
if self.unsupported:
308
217
for feature, count in sorted(self.unsupported.items()):
309
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
218
self.stream.writeln("Missing feature '%s' skipped %d tests." %
310
219
(feature, count))
312
221
ok = self.wasStrictlySuccessful()
314
223
ok = self.wasSuccessful()
315
if self._first_thread_leaker_id:
224
if TestCase._first_thread_leaker_id:
316
225
self.stream.write(
317
226
'%s is leaking threads among %d leaking tests.\n' % (
318
self._first_thread_leaker_id,
319
self._tests_leaking_threads_count))
227
TestCase._first_thread_leaker_id,
228
TestCase._leaking_threads_tests))
320
229
# We don't report the main thread as an active one.
321
230
self.stream.write(
322
231
'%d non-main threads were left active in the end.\n'
323
% (len(self._active_threads) - 1))
325
def getDescription(self, test):
232
% (TestCase._active_threads - 1))
328
234
def _extractBenchmarkTime(self, testCase, details=None):
329
235
"""Add a benchmark time for the current test case."""
352
257
def _shortened_test_description(self, test):
354
what = re.sub(r'^bzrlib\.tests\.', '', what)
259
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
357
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
358
# multiple times in a row, because the handler is added for
359
# each test but the container list is shared between cases.
360
# See lp:498869 lp:625574 and lp:637725 for background.
361
def _record_traceback_from_test(self, exc_info):
362
"""Store the traceback from passed exc_info tuple till"""
363
self._traceback_from_test = exc_info[2]
365
262
def startTest(self, test):
366
super(ExtendedTestResult, self).startTest(test)
263
unittest.TestResult.startTest(self, test)
367
264
if self.count == 0:
368
265
self.startTests()
370
266
self.report_test_start(test)
371
267
test.number = self.count
372
268
self._recordTestStartTime()
373
# Make testtools cases give us the real traceback on failure
374
addOnException = getattr(test, "addOnException", None)
375
if addOnException is not None:
376
addOnException(self._record_traceback_from_test)
377
# Only check for thread leaks on bzrlib derived test cases
378
if isinstance(test, TestCase):
379
test.addCleanup(self._check_leaked_threads, test)
381
def stopTest(self, test):
382
super(ExtendedTestResult, self).stopTest(test)
383
# Manually break cycles, means touching various private things but hey
384
getDetails = getattr(test, "getDetails", None)
385
if getDetails is not None:
387
# Clear _type_equality_funcs to try to stop TestCase instances
388
# from wasting memory. 'clear' is not available in all Python
389
# versions (bug 809048)
390
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
391
if type_equality_funcs is not None:
392
tef_clear = getattr(type_equality_funcs, "clear", None)
393
if tef_clear is None:
394
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
395
if tef_instance_dict is not None:
396
tef_clear = tef_instance_dict.clear
397
if tef_clear is not None:
399
self._traceback_from_test = None
401
270
def startTests(self):
402
self.report_tests_starting()
403
self._active_threads = threading.enumerate()
405
def _check_leaked_threads(self, test):
406
"""See if any threads have leaked since last call
408
A sample of live threads is stored in the _active_threads attribute,
409
when this method runs it compares the current live threads and any not
410
in the previous sample are treated as having leaked.
412
now_active_threads = set(threading.enumerate())
413
threads_leaked = now_active_threads.difference(self._active_threads)
415
self._report_thread_leak(test, threads_leaked, now_active_threads)
416
self._tests_leaking_threads_count += 1
417
if self._first_thread_leaker_id is None:
418
self._first_thread_leaker_id = test.id()
419
self._active_threads = now_active_threads
272
if getattr(sys, 'frozen', None) is None:
273
bzr_path = osutils.realpath(sys.argv[0])
275
bzr_path = sys.executable
277
'bzr selftest: %s\n' % (bzr_path,))
280
bzrlib.__path__[0],))
282
' bzr-%s python-%s %s\n' % (
283
bzrlib.version_string,
284
bzrlib._format_version_tuple(sys.version_info),
285
platform.platform(aliased=1),
287
self.stream.write('\n')
421
289
def _recordTestStartTime(self):
422
290
"""Record that a test has started."""
423
self._start_datetime = self._now()
291
self._start_time = time.time()
293
def _cleanupLogFile(self, test):
294
# We can only do this if we have one of our TestCases, not if
296
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
297
if setKeepLogfile is not None:
425
300
def addError(self, test, err):
426
301
"""Tell result that test finished with an error.
460
337
self._formatTime(benchmark_time),
462
339
self.report_success(test)
463
super(ExtendedTestResult, self).addSuccess(test)
340
self._cleanupLogFile(test)
341
unittest.TestResult.addSuccess(self, test)
464
342
test._log_contents = ''
466
344
def addExpectedFailure(self, test, err):
467
345
self.known_failure_count += 1
468
346
self.report_known_failure(test, err)
470
def addUnexpectedSuccess(self, test, details=None):
471
"""Tell result the test unexpectedly passed, counting as a failure
473
When the minimum version of testtools required becomes 0.9.8 this
474
can be updated to use the new handling there.
476
super(ExtendedTestResult, self).addFailure(test, details=details)
477
self.failure_count += 1
478
self.report_unexpected_success(test,
479
"".join(details["reason"].iter_text()))
483
348
def addNotSupported(self, test, feature):
484
349
"""The test will not be run because of a missing feature.
518
382
raise errors.BzrError("Unknown whence %r" % whence)
520
def report_tests_starting(self):
521
"""Display information before the test run begins"""
522
if getattr(sys, 'frozen', None) is None:
523
bzr_path = osutils.realpath(sys.argv[0])
525
bzr_path = sys.executable
527
'bzr selftest: %s\n' % (bzr_path,))
530
bzrlib.__path__[0],))
532
' bzr-%s python-%s %s\n' % (
533
bzrlib.version_string,
534
bzrlib._format_version_tuple(sys.version_info),
535
platform.platform(aliased=1),
537
self.stream.write('\n')
539
def report_test_start(self, test):
540
"""Display information on the test just about to be run"""
542
def _report_thread_leak(self, test, leaked_threads, active_threads):
543
"""Display information on a test that leaked one or more threads"""
544
# GZ 2010-09-09: A leak summary reported separately from the general
545
# thread debugging would be nice. Tests under subunit
546
# need something not using stream, perhaps adding a
547
# testtools details object would be fitting.
548
if 'threads' in selftest_debug_flags:
549
self.stream.write('%s is leaking, active is now %d\n' %
550
(test.id(), len(active_threads)))
384
def report_cleaning_up(self):
552
387
def startTestRun(self):
553
388
self.startTime = time.time()
694
533
return '%s%s' % (indent, err[1])
696
535
def report_error(self, test, err):
697
self.stream.write('ERROR %s\n%s\n'
536
self.stream.writeln('ERROR %s\n%s'
698
537
% (self._testTimeString(test),
699
538
self._error_summary(err)))
701
540
def report_failure(self, test, err):
702
self.stream.write(' FAIL %s\n%s\n'
541
self.stream.writeln(' FAIL %s\n%s'
703
542
% (self._testTimeString(test),
704
543
self._error_summary(err)))
706
545
def report_known_failure(self, test, err):
707
self.stream.write('XFAIL %s\n%s\n'
546
self.stream.writeln('XFAIL %s\n%s'
708
547
% (self._testTimeString(test),
709
548
self._error_summary(err)))
711
def report_unexpected_success(self, test, reason):
712
self.stream.write(' FAIL %s\n%s: %s\n'
713
% (self._testTimeString(test),
714
"Unexpected success. Should have failed",
717
550
def report_success(self, test):
718
self.stream.write(' OK %s\n' % self._testTimeString(test))
551
self.stream.writeln(' OK %s' % self._testTimeString(test))
719
552
for bench_called, stats in getattr(test, '_benchcalls', []):
720
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
553
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
721
554
stats.pprint(file=self.stream)
722
555
# flush the stream so that we get smooth output. This verbose mode is
723
556
# used to show the output in PQM.
724
557
self.stream.flush()
726
559
def report_skip(self, test, reason):
727
self.stream.write(' SKIP %s\n%s\n'
560
self.stream.writeln(' SKIP %s\n%s'
728
561
% (self._testTimeString(test), reason))
730
563
def report_not_applicable(self, test, reason):
731
self.stream.write(' N/A %s\n %s\n'
564
self.stream.writeln(' N/A %s\n %s'
732
565
% (self._testTimeString(test), reason))
734
567
def report_unsupported(self, test, feature):
735
568
"""test cannot be run because feature is missing."""
736
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
569
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
737
570
%(self._testTimeString(test), feature))
990
808
self._track_transports()
991
809
self._track_locks()
992
810
self._clear_debug_flags()
993
# Isolate global verbosity level, to make sure it's reproducible
994
# between tests. We should get rid of this altogether: bug 656694. --
996
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
997
# Isolate config option expansion until its default value for bzrlib is
998
# settled on or a the FIXME associated with _get_expand_default_value
999
# is addressed -- vila 20110219
1000
self.overrideAttr(config, '_expand_default_value', None)
1001
self._log_files = set()
1002
# Each key in the ``_counters`` dict holds a value for a different
1003
# counter. When the test ends, addDetail() should be used to output the
1004
# counter values. This happens in install_counter_hook().
1006
if 'config_stats' in selftest_debug_flags:
1007
self._install_config_stats_hooks()
811
TestCase._active_threads = threading.activeCount()
812
self.addCleanup(self._check_leaked_threads)
1009
814
def debug(self):
1010
815
# debug a frame up.
1012
817
pdb.Pdb().set_trace(sys._getframe().f_back)
1014
def discardDetail(self, name):
1015
"""Extend the addDetail, getDetails api so we can remove a detail.
1017
eg. bzr always adds the 'log' detail at startup, but we don't want to
1018
include it for skipped, xfail, etc tests.
1020
It is safe to call this for a detail that doesn't exist, in case this
1021
gets called multiple times.
1023
# We cheat. details is stored in __details which means we shouldn't
1024
# touch it. but getDetails() returns the dict directly, so we can
1026
details = self.getDetails()
1030
def install_counter_hook(self, hooks, name, counter_name=None):
1031
"""Install a counting hook.
1033
Any hook can be counted as long as it doesn't need to return a value.
1035
:param hooks: Where the hook should be installed.
1037
:param name: The hook name that will be counted.
1039
:param counter_name: The counter identifier in ``_counters``, defaults
1042
_counters = self._counters # Avoid closing over self
1043
if counter_name is None:
1045
if _counters.has_key(counter_name):
1046
raise AssertionError('%s is already used as a counter name'
1048
_counters[counter_name] = 0
1049
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1050
lambda: ['%d' % (_counters[counter_name],)]))
1051
def increment_counter(*args, **kwargs):
1052
_counters[counter_name] += 1
1053
label = 'count %s calls' % (counter_name,)
1054
hooks.install_named_hook(name, increment_counter, label)
1055
self.addCleanup(hooks.uninstall_named_hook, name, label)
1057
def _install_config_stats_hooks(self):
1058
"""Install config hooks to count hook calls.
1061
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
self.install_counter_hook(config.ConfigHooks, hook_name,
1063
'config.%s' % (hook_name,))
1065
# The OldConfigHooks are private and need special handling to protect
1066
# against recursive tests (tests that run other tests), so we just do
1067
# manually what registering them into _builtin_known_hooks will provide
1069
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1070
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1071
self.install_counter_hook(config.OldConfigHooks, hook_name,
1072
'old_config.%s' % (hook_name,))
819
def _check_leaked_threads(self):
820
active = threading.activeCount()
821
leaked_threads = active - TestCase._active_threads
822
TestCase._active_threads = active
823
# If some tests make the number of threads *decrease*, we'll consider
824
# that they are just observing old threads dieing, not agressively kill
825
# random threads. So we don't report these tests as leaking. The risk
826
# is that we have false positives that way (the test see 2 threads
827
# going away but leak one) but it seems less likely than the actual
828
# false positives (the test see threads going away and does not leak).
829
if leaked_threads > 0:
830
TestCase._leaking_threads_tests += 1
831
if TestCase._first_thread_leaker_id is None:
832
TestCase._first_thread_leaker_id = self.id()
1074
834
def _clear_debug_flags(self):
1075
835
"""Prevent externally set debug flags affecting tests.
1077
837
Tests that want to use debug flags can just set them in the
1078
838
debug_flags set during setup/teardown.
1080
# Start with a copy of the current debug flags we can safely modify.
1081
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
840
self._preserved_debug_flags = set(debug.debug_flags)
1082
841
if 'allow_debug' not in selftest_debug_flags:
1083
842
debug.debug_flags.clear()
1084
843
if 'disable_lock_checks' not in selftest_debug_flags:
1085
844
debug.debug_flags.add('strict_locks')
845
self.addCleanup(self._restore_debug_flags)
1087
847
def _clear_hooks(self):
1088
848
# prevent hooks affecting tests
1089
known_hooks = hooks.known_hooks
1090
849
self._preserved_hooks = {}
1091
for key, (parent, name) in known_hooks.iter_parent_objects():
1092
current_hooks = getattr(parent, name)
850
for key, factory in hooks.known_hooks.items():
851
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
852
current_hooks = hooks.known_hooks_key_to_object(key)
1093
853
self._preserved_hooks[parent] = (name, current_hooks)
1094
self._preserved_lazy_hooks = hooks._lazy_hooks
1095
hooks._lazy_hooks = {}
1096
854
self.addCleanup(self._restoreHooks)
1097
for key, (parent, name) in known_hooks.iter_parent_objects():
1098
factory = known_hooks.get(key)
855
for key, factory in hooks.known_hooks.items():
856
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1099
857
setattr(parent, name, factory())
1100
858
# this hook should always be installed
1101
859
request._install_hook()
1548
1294
self.assertEqualDiff(content, s)
1550
def assertDocstring(self, expected_docstring, obj):
1551
"""Fail if obj does not have expected_docstring"""
1553
# With -OO the docstring should be None instead
1554
self.assertIs(obj.__doc__, None)
1556
self.assertEqual(expected_docstring, obj.__doc__)
1558
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1559
1296
def failUnlessExists(self, path):
1560
return self.assertPathExists(path)
1562
def assertPathExists(self, path):
1563
1297
"""Fail unless path or paths, which may be abs or relative, exist."""
1564
1298
if not isinstance(path, basestring):
1566
self.assertPathExists(p)
1300
self.failUnlessExists(p)
1568
self.assertTrue(osutils.lexists(path),
1569
path + " does not exist")
1302
self.failUnless(osutils.lexists(path),path+" does not exist")
1571
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1572
1304
def failIfExists(self, path):
1573
return self.assertPathDoesNotExist(path)
1575
def assertPathDoesNotExist(self, path):
1576
1305
"""Fail if path or paths, which may be abs or relative, exist."""
1577
1306
if not isinstance(path, basestring):
1579
self.assertPathDoesNotExist(p)
1308
self.failIfExists(p)
1581
self.assertFalse(osutils.lexists(path),
1310
self.failIf(osutils.lexists(path),path+" exists")
1584
1312
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1585
1313
"""A helper for callDeprecated and applyDeprecated.
1701
1428
The file is removed as the test is torn down.
1703
pseudo_log_file = StringIO()
1704
def _get_log_contents_for_weird_testtools_api():
1705
return [pseudo_log_file.getvalue().decode(
1706
"utf-8", "replace").encode("utf-8")]
1707
self.addDetail("log", content.Content(content.ContentType("text",
1708
"plain", {"charset": "utf8"}),
1709
_get_log_contents_for_weird_testtools_api))
1710
self._log_file = pseudo_log_file
1711
self._log_memento = trace.push_log_file(self._log_file)
1430
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1431
self._log_file = os.fdopen(fileno, 'w+')
1432
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1433
self._log_file_name = name
1712
1434
self.addCleanup(self._finishLogFile)
1714
1436
def _finishLogFile(self):
1715
1437
"""Finished with the log file.
1717
Close the file and delete it.
1439
Close the file and delete it, unless setKeepLogfile was called.
1719
if trace._trace_file:
1441
if bzrlib.trace._trace_file:
1720
1442
# flush the log file, to get all content
1721
trace._trace_file.flush()
1722
trace.pop_log_file(self._log_memento)
1443
bzrlib.trace._trace_file.flush()
1444
bzrlib.trace.pop_log_file(self._log_memento)
1445
# Cache the log result and delete the file on disk
1446
self._get_log(False)
1724
1448
def thisFailsStrictLockCheck(self):
1725
1449
"""It is known that this test would fail with -Dstrict_locks.
1735
1459
debug.debug_flags.discard('strict_locks')
1737
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1738
"""Overrides an object attribute restoring it after the test.
1740
:note: This should be used with discretion; you should think about
1741
whether it's better to make the code testable without monkey-patching.
1743
:param obj: The object that will be mutated.
1745
:param attr_name: The attribute name we want to preserve/override in
1748
:param new: The optional value we want to set the attribute to.
1750
:returns: The actual attr value.
1752
value = getattr(obj, attr_name)
1753
# The actual value is captured by the call below
1754
self.addCleanup(setattr, obj, attr_name, value)
1755
if new is not _unitialized_attr:
1756
setattr(obj, attr_name, new)
1759
def overrideEnv(self, name, new):
1760
"""Set an environment variable, and reset it after the test.
1762
:param name: The environment variable name.
1764
:param new: The value to set the variable to. If None, the
1765
variable is deleted from the environment.
1767
:returns: The actual variable value.
1769
value = osutils.set_or_unset_env(name, new)
1770
self.addCleanup(osutils.set_or_unset_env, name, value)
1773
def recordCalls(self, obj, attr_name):
1774
"""Monkeypatch in a wrapper that will record calls.
1776
The monkeypatch is automatically removed when the test concludes.
1778
:param obj: The namespace holding the reference to be replaced;
1779
typically a module, class, or object.
1780
:param attr_name: A string for the name of the attribute to
1782
:returns: A list that will be extended with one item every time the
1783
function is called, with a tuple of (args, kwargs).
1787
def decorator(*args, **kwargs):
1788
calls.append((args, kwargs))
1789
return orig(*args, **kwargs)
1790
orig = self.overrideAttr(obj, attr_name, decorator)
1461
def addCleanup(self, callable, *args, **kwargs):
1462
"""Arrange to run a callable when this case is torn down.
1464
Callables are run in the reverse of the order they are registered,
1465
ie last-in first-out.
1467
self._cleanups.append((callable, args, kwargs))
1793
1469
def _cleanEnvironment(self):
1794
for name, value in isolated_environ.iteritems():
1795
self.overrideEnv(name, value)
1471
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1472
'HOME': os.getcwd(),
1473
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1474
# tests do check our impls match APPDATA
1475
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1479
'BZREMAIL': None, # may still be present in the environment
1481
'BZR_PROGRESS_BAR': None,
1483
'BZR_PLUGIN_PATH': None,
1484
'BZR_CONCURRENCY': None,
1485
# Make sure that any text ui tests are consistent regardless of
1486
# the environment the test case is run in; you may want tests that
1487
# test other combinations. 'dumb' is a reasonable guess for tests
1488
# going to a pipe or a StringIO.
1492
'BZR_COLUMNS': '80',
1494
'SSH_AUTH_SOCK': None,
1498
'https_proxy': None,
1499
'HTTPS_PROXY': None,
1504
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1505
# least. If you do (care), please update this comment
1509
'BZR_REMOTE_PATH': None,
1512
self.addCleanup(self._restoreEnvironment)
1513
for name, value in new_env.iteritems():
1514
self._captureVar(name, value)
1516
def _captureVar(self, name, newvalue):
1517
"""Set an environment variable, and reset it when finished."""
1518
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1520
def _restore_debug_flags(self):
1521
debug.debug_flags.clear()
1522
debug.debug_flags.update(self._preserved_debug_flags)
1524
def _restoreEnvironment(self):
1525
for name, value in self.__old_env.iteritems():
1526
osutils.set_or_unset_env(name, value)
1797
1528
def _restoreHooks(self):
1798
1529
for klass, (name, hooks) in self._preserved_hooks.items():
1799
1530
setattr(klass, name, hooks)
1800
self._preserved_hooks.clear()
1801
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1802
self._preserved_lazy_hooks.clear()
1804
1532
def knownFailure(self, reason):
1805
"""Declare that this test fails for a known reason
1807
Tests that are known to fail should generally be using expectedFailure
1808
with an appropriate reverse assertion if a change could cause the test
1809
to start passing. Conversely if the test has no immediate prospect of
1810
succeeding then using skip is more suitable.
1812
When this method is called while an exception is being handled, that
1813
traceback will be used, otherwise a new exception will be thrown to
1814
provide one but won't be reported.
1816
self._add_reason(reason)
1818
exc_info = sys.exc_info()
1819
if exc_info != (None, None, None):
1820
self._report_traceback(exc_info)
1823
raise self.failureException(reason)
1824
except self.failureException:
1825
exc_info = sys.exc_info()
1826
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1827
raise testtools.testcase._ExpectedFailure(exc_info)
1831
def _suppress_log(self):
1832
"""Remove the log info from details."""
1833
self.discardDetail('log')
1533
"""This test has failed for some known reason."""
1534
raise KnownFailure(reason)
1835
1536
def _do_skip(self, result, reason):
1836
self._suppress_log()
1837
1537
addSkip = getattr(result, 'addSkip', None)
1838
1538
if not callable(addSkip):
1839
1539
result.addSuccess(result)
1918
1595
self._benchtime += time.time() - start
1920
1597
def log(self, *args):
1600
def _get_log(self, keep_log_file=False):
1601
"""Internal helper to get the log from bzrlib.trace for this test.
1603
Please use self.getDetails, or self.get_log to access this in test case
1606
:param keep_log_file: When True, if the log is still a file on disk
1607
leave it as a file on disk. When False, if the log is still a file
1608
on disk, the log file is deleted and the log preserved as
1610
:return: A string containing the log.
1612
if self._log_contents is not None:
1614
self._log_contents.decode('utf8')
1615
except UnicodeDecodeError:
1616
unicodestr = self._log_contents.decode('utf8', 'replace')
1617
self._log_contents = unicodestr.encode('utf8')
1618
return self._log_contents
1620
if bzrlib.trace._trace_file:
1621
# flush the log file, to get all content
1622
bzrlib.trace._trace_file.flush()
1623
if self._log_file_name is not None:
1624
logfile = open(self._log_file_name)
1626
log_contents = logfile.read()
1630
log_contents.decode('utf8')
1631
except UnicodeDecodeError:
1632
unicodestr = log_contents.decode('utf8', 'replace')
1633
log_contents = unicodestr.encode('utf8')
1634
if not keep_log_file:
1635
self._log_file.close()
1636
self._log_file = None
1637
# Permit multiple calls to get_log until we clean it up in
1639
self._log_contents = log_contents
1641
os.remove(self._log_file_name)
1643
if sys.platform == 'win32' and e.errno == errno.EACCES:
1644
sys.stderr.write(('Unable to delete log file '
1645
' %r\n' % self._log_file_name))
1648
self._log_file_name = None
1651
return "No log file content and no log file name."
1923
1653
def get_log(self):
1924
1654
"""Get a unicode string containing the log from bzrlib.trace.
2200
def _add_subprocess_log(self, log_file_path):
2201
if len(self._log_files) == 0:
2202
# Register an addCleanup func. We do this on the first call to
2203
# _add_subprocess_log rather than in TestCase.setUp so that this
2204
# addCleanup is registered after any cleanups for tempdirs that
2205
# subclasses might create, which will probably remove the log file
2207
self.addCleanup(self._subprocess_log_cleanup)
2208
# self._log_files is a set, so if a log file is reused we won't grab it
2210
self._log_files.add(log_file_path)
2212
def _subprocess_log_cleanup(self):
2213
for count, log_file_path in enumerate(self._log_files):
2214
# We use buffer_now=True to avoid holding the file open beyond
2215
# the life of this function, which might interfere with e.g.
2216
# cleaning tempdirs on Windows.
2217
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2218
#detail_content = content.content_from_file(
2219
# log_file_path, buffer_now=True)
2220
with open(log_file_path, 'rb') as log_file:
2221
log_file_bytes = log_file.read()
2222
detail_content = content.Content(content.ContentType("text",
2223
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2224
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2227
1921
def _popen(self, *args, **kwargs):
2228
1922
"""Place a call to Popen.
2230
1924
Allows tests to override this method to intercept the calls made to
2231
1925
Popen for introspection.
2233
return subprocess.Popen(*args, **kwargs)
1927
return Popen(*args, **kwargs)
2235
1929
def get_source_path(self):
2236
1930
"""Return the path of the directory containing bzrlib."""
2266
1960
if retcode is not None and retcode != process.returncode:
2267
1961
if process_args is None:
2268
1962
process_args = "(unknown args)"
2269
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2270
trace.mutter('Error for bzr %s:\n%s', process_args, err)
1963
mutter('Output of bzr %s:\n%s', process_args, out)
1964
mutter('Error for bzr %s:\n%s', process_args, err)
2271
1965
self.fail('Command bzr %s failed with retcode %s != %s'
2272
1966
% (process_args, retcode, process.returncode))
2273
1967
return [out, err]
2275
def check_tree_shape(self, tree, shape):
2276
"""Compare a tree to a list of expected names.
1969
def check_inventory_shape(self, inv, shape):
1970
"""Compare an inventory to a list of expected names.
2278
1972
Fail if they are not precisely equal.
2281
1975
shape = list(shape) # copy
2282
for path, ie in tree.iter_entries_by_dir():
1976
for path, ie in inv.entries():
2283
1977
name = path.replace('\\', '/')
2284
1978
if ie.kind == 'directory':
2285
1979
name = name + '/'
2287
pass # ignore root entry
2289
1981
shape.remove(name)
2291
1983
extras.append(name)
2381
2079
class TestCaseWithMemoryTransport(TestCase):
2382
2080
"""Common test class for tests that do not need disk resources.
2384
Tests that need disk resources should derive from TestCaseInTempDir
2385
orTestCaseWithTransport.
2082
Tests that need disk resources should derive from TestCaseWithTransport.
2387
2084
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2389
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2086
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2390
2087
a directory which does not exist. This serves to help ensure test isolation
2391
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2392
must exist. However, TestCaseWithMemoryTransport does not offer local file
2393
defaults for the transport in tests, nor does it obey the command line
2088
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2089
must exist. However, TestCaseWithMemoryTransport does not offer local
2090
file defaults for the transport in tests, nor does it obey the command line
2394
2091
override, so tests that accidentally write to the common directory should
2397
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2398
``.bzr`` directory that stops us ascending higher into the filesystem.
2094
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2095
a .bzr directory that stops us ascending higher into the filesystem.
2401
2098
TEST_ROOT = None
2667
2357
test_home_dir = self.test_home_dir
2668
2358
if isinstance(test_home_dir, unicode):
2669
2359
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2670
self.overrideEnv('HOME', test_home_dir)
2671
self.overrideEnv('BZR_HOME', test_home_dir)
2360
os.environ['HOME'] = test_home_dir
2361
os.environ['BZR_HOME'] = test_home_dir
2673
2363
def setUp(self):
2674
2364
super(TestCaseWithMemoryTransport, self).setUp()
2675
# Ensure that ConnectedTransport doesn't leak sockets
2676
def get_transport_from_url_with_cleanup(*args, **kwargs):
2677
t = orig_get_transport_from_url(*args, **kwargs)
2678
if isinstance(t, _mod_transport.ConnectedTransport):
2679
self.addCleanup(t.disconnect)
2682
orig_get_transport_from_url = self.overrideAttr(
2683
_mod_transport, 'get_transport_from_url',
2684
get_transport_from_url_with_cleanup)
2685
2365
self._make_test_root()
2686
self.addCleanup(os.chdir, os.getcwdu())
2366
_currentdir = os.getcwdu()
2367
def _leaveDirectory():
2368
os.chdir(_currentdir)
2369
self.addCleanup(_leaveDirectory)
2687
2370
self.makeAndChdirToTestDir()
2688
2371
self.overrideEnvironmentForTesting()
2689
2372
self.__readonly_server = None
3453
3124
def partition_tests(suite, count):
3454
3125
"""Partition suite into count lists of tests."""
3455
# This just assigns tests in a round-robin fashion. On one hand this
3456
# splits up blocks of related tests that might run faster if they shared
3457
# resources, but on the other it avoids assigning blocks of slow tests to
3458
# just one partition. So the slowest partition shouldn't be much slower
3460
partitions = [list() for i in range(count)]
3461
tests = iter_suite_tests(suite)
3462
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3463
partition.append(test)
3467
def workaround_zealous_crypto_random():
3468
"""Crypto.Random want to help us being secure, but we don't care here.
3470
This workaround some test failure related to the sftp server. Once paramiko
3471
stop using the controversial API in Crypto.Random, we may get rid of it.
3474
from Crypto.Random import atfork
3127
tests = list(iter_suite_tests(suite))
3128
tests_per_process = int(math.ceil(float(len(tests)) / count))
3129
for block in range(count):
3130
low_test = block * tests_per_process
3131
high_test = low_test + tests_per_process
3132
process_tests = tests[low_test:high_test]
3133
result.append(process_tests)
3480
3137
def fork_for_tests(suite):
3590
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3243
class ForwardingResult(unittest.TestResult):
3245
def __init__(self, target):
3246
unittest.TestResult.__init__(self)
3247
self.result = target
3249
def startTest(self, test):
3250
self.result.startTest(test)
3252
def stopTest(self, test):
3253
self.result.stopTest(test)
3255
def startTestRun(self):
3256
self.result.startTestRun()
3258
def stopTestRun(self):
3259
self.result.stopTestRun()
3261
def addSkip(self, test, reason):
3262
self.result.addSkip(test, reason)
3264
def addSuccess(self, test):
3265
self.result.addSuccess(test)
3267
def addError(self, test, err):
3268
self.result.addError(test, err)
3270
def addFailure(self, test, err):
3271
self.result.addFailure(test, err)
3272
ForwardingResult = testtools.ExtendedToOriginalDecorator
3275
from subunit.test_results import AutoTimingTestResultDecorator
3278
class ProfileResult(ForwardingResult):
3591
3279
"""Generate profiling data for all activity between start and success.
3593
3281
The profile data is appended to the test's _benchcalls attribute and can
3874
3554
'bzrlib.tests.blackbox',
3875
3555
'bzrlib.tests.commands',
3876
'bzrlib.tests.doc_generate',
3877
3556
'bzrlib.tests.per_branch',
3878
3557
'bzrlib.tests.per_bzrdir',
3879
'bzrlib.tests.per_controldir',
3880
'bzrlib.tests.per_controldir_colo',
3881
3558
'bzrlib.tests.per_foreign_vcs',
3882
3559
'bzrlib.tests.per_interrepository',
3883
3560
'bzrlib.tests.per_intertree',
3884
3561
'bzrlib.tests.per_inventory',
3885
3562
'bzrlib.tests.per_interbranch',
3886
3563
'bzrlib.tests.per_lock',
3887
'bzrlib.tests.per_merger',
3888
3564
'bzrlib.tests.per_transport',
3889
3565
'bzrlib.tests.per_tree',
3890
3566
'bzrlib.tests.per_pack_repository',
3891
3567
'bzrlib.tests.per_repository',
3892
3568
'bzrlib.tests.per_repository_chk',
3893
3569
'bzrlib.tests.per_repository_reference',
3894
'bzrlib.tests.per_repository_vf',
3895
3570
'bzrlib.tests.per_uifactory',
3896
3571
'bzrlib.tests.per_versionedfile',
3897
3572
'bzrlib.tests.per_workingtree',
3898
3573
'bzrlib.tests.test__annotator',
3899
'bzrlib.tests.test__bencode',
3900
'bzrlib.tests.test__btree_serializer',
3901
3574
'bzrlib.tests.test__chk_map',
3902
3575
'bzrlib.tests.test__dirstate_helpers',
3903
3576
'bzrlib.tests.test__groupcompress',
3925
3599
'bzrlib.tests.test_chunk_writer',
3926
3600
'bzrlib.tests.test_clean_tree',
3927
3601
'bzrlib.tests.test_cleanup',
3928
'bzrlib.tests.test_cmdline',
3929
3602
'bzrlib.tests.test_commands',
3930
3603
'bzrlib.tests.test_commit',
3931
3604
'bzrlib.tests.test_commit_merge',
3932
3605
'bzrlib.tests.test_config',
3933
3606
'bzrlib.tests.test_conflicts',
3934
'bzrlib.tests.test_controldir',
3935
3607
'bzrlib.tests.test_counted_lock',
3936
3608
'bzrlib.tests.test_crash',
3937
3609
'bzrlib.tests.test_decorators',
3938
3610
'bzrlib.tests.test_delta',
3939
3611
'bzrlib.tests.test_debug',
3612
'bzrlib.tests.test_deprecated_graph',
3940
3613
'bzrlib.tests.test_diff',
3941
3614
'bzrlib.tests.test_directory_service',
3942
3615
'bzrlib.tests.test_dirstate',
3966
3635
'bzrlib.tests.test_http',
3967
3636
'bzrlib.tests.test_http_response',
3968
3637
'bzrlib.tests.test_https_ca_bundle',
3969
'bzrlib.tests.test_i18n',
3970
3638
'bzrlib.tests.test_identitymap',
3971
3639
'bzrlib.tests.test_ignores',
3972
3640
'bzrlib.tests.test_index',
3973
'bzrlib.tests.test_import_tariff',
3974
3641
'bzrlib.tests.test_info',
3975
3642
'bzrlib.tests.test_inv',
3976
3643
'bzrlib.tests.test_inventory_delta',
3977
3644
'bzrlib.tests.test_knit',
3978
3645
'bzrlib.tests.test_lazy_import',
3979
3646
'bzrlib.tests.test_lazy_regex',
3980
'bzrlib.tests.test_library_state',
3981
3647
'bzrlib.tests.test_lock',
3982
3648
'bzrlib.tests.test_lockable_files',
3983
3649
'bzrlib.tests.test_lockdir',
4199
3850
# Some tests mentioned in the list are not in the test suite. The
4200
3851
# list may be out of date, report to the tester.
4201
3852
for id in not_found:
4202
trace.warning('"%s" not found in the test suite', id)
3853
bzrlib.trace.warning('"%s" not found in the test suite', id)
4203
3854
for id in duplicates:
4204
trace.warning('"%s" is used as an id by several tests', id)
3855
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4209
def multiply_scenarios(*scenarios):
4210
"""Multiply two or more iterables of scenarios.
4212
It is safe to pass scenario generators or iterators.
4214
:returns: A list of compound scenarios: the cross-product of all
4215
scenarios, with the names concatenated and the parameters
4218
return reduce(_multiply_two_scenarios, map(list, scenarios))
4221
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3860
def multiply_scenarios(scenarios_left, scenarios_right):
4222
3861
"""Multiply two sets of scenarios.
4224
3863
:returns: the cartesian product of the two sets of scenarios, that is
4308
3947
:param new_id: The id to assign to it.
4309
3948
:return: The new test.
4311
new_test = copy.copy(test)
3950
new_test = copy(test)
4312
3951
new_test.id = lambda: new_id
4313
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4314
# causes cloned tests to share the 'details' dict. This makes it hard to
4315
# read the test output for parameterized tests, because tracebacks will be
4316
# associated with irrelevant tests.
4318
details = new_test._TestCase__details
4319
except AttributeError:
4320
# must be a different version of testtools than expected. Do nothing.
4323
# Reset the '__details' dict.
4324
new_test._TestCase__details = {}
4325
3952
return new_test
4328
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4330
"""Helper for permutating tests against an extension module.
4332
This is meant to be used inside a modules 'load_tests()' function. It will
4333
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4334
against both implementations. Setting 'test.module' to the appropriate
4335
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4337
:param standard_tests: A test suite to permute
4338
:param loader: A TestLoader
4339
:param py_module_name: The python path to a python module that can always
4340
be loaded, and will be considered the 'python' implementation. (eg
4341
'bzrlib._chk_map_py')
4342
:param ext_module_name: The python path to an extension module. If the
4343
module cannot be loaded, a single test will be added, which notes that
4344
the module is not available. If it can be loaded, all standard_tests
4345
will be run against that module.
4346
:return: (suite, feature) suite is a test-suite that has all the permuted
4347
tests. feature is the Feature object that can be used to determine if
4348
the module is available.
4351
from bzrlib.tests.features import ModuleAvailableFeature
4352
py_module = pyutils.get_named_object(py_module_name)
4354
('python', {'module': py_module}),
4356
suite = loader.suiteClass()
4357
feature = ModuleAvailableFeature(ext_module_name)
4358
if feature.available():
4359
scenarios.append(('C', {'module': feature.module}))
4361
# the compiled module isn't available, so we add a failing test
4362
class FailWithoutFeature(TestCase):
4363
def test_fail(self):
4364
self.requireFeature(feature)
4365
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4366
result = multiply_tests(standard_tests, scenarios, suite)
4367
return result, feature
4370
3955
def _rmtree_temp_dir(dirname, test_id=None):
4371
3956
# If LANG=C we probably have created some bogus paths
4372
3957
# which rmtree(unicode) will fail to delete
4388
3973
if test_id != None:
4389
3974
ui.ui_factory.clear_term()
4390
3975
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4391
# Ugly, but the last thing we want here is fail, so bear with it.
4392
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4393
).encode('ascii', 'replace')
4394
3976
sys.stderr.write('Unable to remove testing dir %s\n%s'
4395
% (os.path.basename(dirname), printable_e))
3977
% (os.path.basename(dirname), e))
3980
class Feature(object):
3981
"""An operating system Feature."""
3984
self._available = None
3986
def available(self):
3987
"""Is the feature available?
3989
:return: True if the feature is available.
3991
if self._available is None:
3992
self._available = self._probe()
3993
return self._available
3996
"""Implement this method in concrete features.
3998
:return: True if the feature is available.
4000
raise NotImplementedError
4003
if getattr(self, 'feature_name', None):
4004
return self.feature_name()
4005
return self.__class__.__name__
4008
class _SymlinkFeature(Feature):
4011
return osutils.has_symlinks()
4013
def feature_name(self):
4016
SymlinkFeature = _SymlinkFeature()
4019
class _HardlinkFeature(Feature):
4022
return osutils.has_hardlinks()
4024
def feature_name(self):
4027
HardlinkFeature = _HardlinkFeature()
4030
class _OsFifoFeature(Feature):
4033
return getattr(os, 'mkfifo', None)
4035
def feature_name(self):
4036
return 'filesystem fifos'
4038
OsFifoFeature = _OsFifoFeature()
4041
class _UnicodeFilenameFeature(Feature):
4042
"""Does the filesystem support Unicode filenames?"""
4046
# Check for character combinations unlikely to be covered by any
4047
# single non-unicode encoding. We use the characters
4048
# - greek small letter alpha (U+03B1) and
4049
# - braille pattern dots-123456 (U+283F).
4050
os.stat(u'\u03b1\u283f')
4051
except UnicodeEncodeError:
4053
except (IOError, OSError):
4054
# The filesystem allows the Unicode filename but the file doesn't
4058
# The filesystem allows the Unicode filename and the file exists,
4062
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4065
class ModuleAvailableFeature(Feature):
4066
"""This is a feature than describes a module we want to be available.
4068
Declare the name of the module in __init__(), and then after probing, the
4069
module will be available as 'self.module'.
4071
:ivar module: The module if it is available, else None.
4074
def __init__(self, module_name):
4075
super(ModuleAvailableFeature, self).__init__()
4076
self.module_name = module_name
4080
self._module = __import__(self.module_name, {}, {}, [''])
4087
if self.available(): # Make sure the probe has been done
4091
def feature_name(self):
4092
return self.module_name
4398
4096
def probe_unicode_in_user_encoding():
4129
class _HTTPSServerFeature(Feature):
4130
"""Some tests want an https Server, check if one is available.
4132
Right now, the only way this is available is under python2.6 which provides
4143
def feature_name(self):
4144
return 'HTTPSServer'
4147
HTTPSServerFeature = _HTTPSServerFeature()
4150
class _ParamikoFeature(Feature):
4151
"""Is paramiko available?"""
4155
from bzrlib.transport.sftp import SFTPAbsoluteServer
4157
except errors.ParamikoNotPresent:
4160
def feature_name(self):
4164
ParamikoFeature = _ParamikoFeature()
4167
class _UnicodeFilename(Feature):
4168
"""Does the filesystem support Unicode filenames?"""
4173
except UnicodeEncodeError:
4175
except (IOError, OSError):
4176
# The filesystem allows the Unicode filename but the file doesn't
4180
# The filesystem allows the Unicode filename and the file exists,
4184
UnicodeFilename = _UnicodeFilename()
4187
class _UTF8Filesystem(Feature):
4188
"""Is the filesystem UTF-8?"""
4191
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4195
UTF8Filesystem = _UTF8Filesystem()
4198
class _BreakinFeature(Feature):
4199
"""Does this platform support the breakin feature?"""
4202
from bzrlib import breakin
4203
if breakin.determine_signal() is None:
4205
if sys.platform == 'win32':
4206
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4207
# We trigger SIGBREAK via a Console api so we need ctypes to
4208
# access the function
4215
def feature_name(self):
4216
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4219
BreakinFeature = _BreakinFeature()
4222
class _CaseInsCasePresFilenameFeature(Feature):
4223
"""Is the file-system case insensitive, but case-preserving?"""
4226
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4228
# first check truly case-preserving for created files, then check
4229
# case insensitive when opening existing files.
4230
name = osutils.normpath(name)
4231
base, rel = osutils.split(name)
4232
found_rel = osutils.canonical_relpath(base, name)
4233
return (found_rel == rel
4234
and os.path.isfile(name.upper())
4235
and os.path.isfile(name.lower()))
4240
def feature_name(self):
4241
return "case-insensitive case-preserving filesystem"
4243
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4246
class _CaseInsensitiveFilesystemFeature(Feature):
4247
"""Check if underlying filesystem is case-insensitive but *not* case
4250
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4251
# more likely to be case preserving, so this case is rare.
4254
if CaseInsCasePresFilenameFeature.available():
4257
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4258
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4259
TestCaseWithMemoryTransport.TEST_ROOT = root
4261
root = TestCaseWithMemoryTransport.TEST_ROOT
4262
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4264
name_a = osutils.pathjoin(tdir, 'a')
4265
name_A = osutils.pathjoin(tdir, 'A')
4267
result = osutils.isdir(name_A)
4268
_rmtree_temp_dir(tdir)
4271
def feature_name(self):
4272
return 'case-insensitive filesystem'
4274
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4277
class _SubUnitFeature(Feature):
4278
"""Check if subunit is available."""
4287
def feature_name(self):
4290
SubUnitFeature = _SubUnitFeature()
4431
4291
# Only define SubUnitBzrRunner if subunit is available.
4433
4293
from subunit import TestProtocolClient
4434
from subunit.test_results import AutoTimingTestResultDecorator
4435
class SubUnitBzrProtocolClient(TestProtocolClient):
4437
def addSuccess(self, test, details=None):
4438
# The subunit client always includes the details in the subunit
4439
# stream, but we don't want to include it in ours.
4440
if details is not None and 'log' in details:
4442
return super(SubUnitBzrProtocolClient, self).addSuccess(
4445
4294
class SubUnitBzrRunner(TextTestRunner):
4446
4295
def run(self, test):
4447
4296
result = AutoTimingTestResultDecorator(
4448
SubUnitBzrProtocolClient(self.stream))
4297
TestProtocolClient(self.stream))
4449
4298
test.run(result)
4451
4300
except ImportError:
4455
@deprecated_function(deprecated_in((2, 5, 0)))
4456
def ModuleAvailableFeature(name):
4457
from bzrlib.tests import features
4458
return features.ModuleAvailableFeature(name)