55
50
# nb: check this before importing anything else from within it
56
51
_testtools_version = getattr(testtools, '__version__', ())
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
59
54
% (testtools.__file__, _testtools_version))
60
55
from testtools import content
62
58
from bzrlib import (
62
commands as _mod_commands,
72
plugin as _mod_plugin,
77
79
transport as _mod_transport,
81
import bzrlib.commands
82
import bzrlib.timestamp
84
import bzrlib.inventory
85
import bzrlib.iterablefile
88
83
import bzrlib.lsprof
89
84
except ImportError:
90
85
# lsprof not available
92
from bzrlib.merge import merge_inner
95
from bzrlib.smart import client, request, server
97
from bzrlib import symbol_versioning
87
from bzrlib.smart import client, request
88
from bzrlib.transport import (
98
92
from bzrlib.symbol_versioning import (
100
93
deprecated_function,
106
from bzrlib.transport import (
110
from bzrlib.trace import mutter, note
111
96
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
101
from bzrlib.ui import NullProgressView
122
102
from bzrlib.ui.text import TextUIFactory
123
import bzrlib.version_info_formats.format_custom
124
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
126
104
# Mark this python module as being part of the implementation
127
105
# of unittest: this gives us better tracebacks where the last
139
117
SUBUNIT_SEEK_SET = 0
140
118
SUBUNIT_SEEK_CUR = 1
143
class ExtendedTestResult(unittest._TextTestResult):
120
# These are intentionally brought into this namespace. That way plugins, etc
121
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
122
TestSuite = TestUtil.TestSuite
123
TestLoader = TestUtil.TestLoader
125
# Tests should run in a clean and clearly defined environment. The goal is to
126
# keep them isolated from the running environment as mush as possible. The test
127
# framework ensures the variables defined below are set (or deleted if the
128
# value is None) before a test is run and reset to their original value after
129
# the test is run. Generally if some code depends on an environment variable,
130
# the tests should start without this variable in the environment. There are a
131
# few exceptions but you shouldn't violate this rule lightly.
135
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
136
# tests do check our impls match APPDATA
137
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
141
'BZREMAIL': None, # may still be present in the environment
142
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
'BZR_PROGRESS_BAR': None,
144
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
# as a base class instead of TestCaseInTempDir. Tests inheriting from
146
# TestCase should not use disk resources, BZR_LOG is one.
147
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
148
'BZR_PLUGIN_PATH': None,
149
'BZR_DISABLE_PLUGINS': None,
150
'BZR_PLUGINS_AT': None,
151
'BZR_CONCURRENCY': None,
152
# Make sure that any text ui tests are consistent regardless of
153
# the environment the test case is run in; you may want tests that
154
# test other combinations. 'dumb' is a reasonable guess for tests
155
# going to a pipe or a StringIO.
161
'SSH_AUTH_SOCK': None,
171
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
172
# least. If you do (care), please update this comment
176
'BZR_REMOTE_PATH': None,
177
# Generally speaking, we don't want apport reporting on crashes in
178
# the test envirnoment unless we're specifically testing apport,
179
# so that it doesn't leak into the real system environment. We
180
# use an env var so it propagates to subprocesses.
181
'APPORT_DISABLE': '1',
185
def override_os_environ(test, env=None):
186
"""Modify os.environ keeping a copy.
188
:param test: A test instance
190
:param env: A dict containing variable definitions to be installed
193
env = isolated_environ
194
test._original_os_environ = dict([(var, value)
195
for var, value in os.environ.iteritems()])
196
for var, value in env.iteritems():
197
osutils.set_or_unset_env(var, value)
198
if var not in test._original_os_environ:
199
# The var is new, add it with a value of None, so
200
# restore_os_environ will delete it
201
test._original_os_environ[var] = None
204
def restore_os_environ(test):
205
"""Restore os.environ to its original state.
207
:param test: A test instance previously passed to override_os_environ.
209
for var, value in test._original_os_environ.iteritems():
210
# Restore the original value (or delete it if the value has been set to
211
# None in override_os_environ).
212
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
236
class ExtendedTestResult(testtools.TextTestResult):
144
237
"""Accepts, reports and accumulates the results of running tests.
146
239
Compared to the unittest version this class adds support for
195
288
self._overall_start_time = time.time()
196
289
self._strict = strict
290
self._first_thread_leaker_id = None
291
self._tests_leaking_threads_count = 0
292
self._traceback_from_test = None
198
294
def stopTestRun(self):
199
295
run = self.testsRun
200
296
actionTaken = "Ran"
201
297
stopTime = time.time()
202
298
timeTaken = stopTime - self.startTime
204
self.stream.writeln(self.separator2)
205
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
299
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
300
# the parent class method is similar have to duplicate
301
self._show_list('ERROR', self.errors)
302
self._show_list('FAIL', self.failures)
303
self.stream.write(self.sep2)
304
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
206
305
run, run != 1 and "s" or "", timeTaken))
207
self.stream.writeln()
208
306
if not self.wasSuccessful():
209
307
self.stream.write("FAILED (")
210
308
failed, errored = map(len, (self.failures, self.errors))
217
315
if failed or errored: self.stream.write(", ")
218
316
self.stream.write("known_failure_count=%d" %
219
317
self.known_failure_count)
220
self.stream.writeln(")")
318
self.stream.write(")\n")
222
320
if self.known_failure_count:
223
self.stream.writeln("OK (known_failures=%d)" %
321
self.stream.write("OK (known_failures=%d)\n" %
224
322
self.known_failure_count)
226
self.stream.writeln("OK")
324
self.stream.write("OK\n")
227
325
if self.skip_count > 0:
228
326
skipped = self.skip_count
229
self.stream.writeln('%d test%s skipped' %
327
self.stream.write('%d test%s skipped\n' %
230
328
(skipped, skipped != 1 and "s" or ""))
231
329
if self.unsupported:
232
330
for feature, count in sorted(self.unsupported.items()):
233
self.stream.writeln("Missing feature '%s' skipped %d tests." %
331
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
234
332
(feature, count))
236
334
ok = self.wasStrictlySuccessful()
238
336
ok = self.wasSuccessful()
239
if TestCase._first_thread_leaker_id:
337
if self._first_thread_leaker_id:
240
338
self.stream.write(
241
339
'%s is leaking threads among %d leaking tests.\n' % (
242
TestCase._first_thread_leaker_id,
243
TestCase._leaking_threads_tests))
340
self._first_thread_leaker_id,
341
self._tests_leaking_threads_count))
244
342
# We don't report the main thread as an active one.
245
343
self.stream.write(
246
344
'%d non-main threads were left active in the end.\n'
247
% (TestCase._active_threads - 1))
345
% (len(self._active_threads) - 1))
249
347
def getDescription(self, test):
277
376
what = re.sub(r'^bzrlib\.tests\.', '', what)
379
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
380
# multiple times in a row, because the handler is added for
381
# each test but the container list is shared between cases.
382
# See lp:498869 lp:625574 and lp:637725 for background.
383
def _record_traceback_from_test(self, exc_info):
384
"""Store the traceback from passed exc_info tuple till"""
385
self._traceback_from_test = exc_info[2]
280
387
def startTest(self, test):
281
unittest.TestResult.startTest(self, test)
388
super(ExtendedTestResult, self).startTest(test)
282
389
if self.count == 0:
283
390
self.startTests()
284
392
self.report_test_start(test)
285
393
test.number = self.count
286
394
self._recordTestStartTime()
395
# Make testtools cases give us the real traceback on failure
396
addOnException = getattr(test, "addOnException", None)
397
if addOnException is not None:
398
addOnException(self._record_traceback_from_test)
399
# Only check for thread leaks on bzrlib derived test cases
400
if isinstance(test, TestCase):
401
test.addCleanup(self._check_leaked_threads, test)
403
def stopTest(self, test):
404
super(ExtendedTestResult, self).stopTest(test)
405
# Manually break cycles, means touching various private things but hey
406
getDetails = getattr(test, "getDetails", None)
407
if getDetails is not None:
409
_clear__type_equality_funcs(test)
410
self._traceback_from_test = None
288
412
def startTests(self):
290
if getattr(sys, 'frozen', None) is None:
291
bzr_path = osutils.realpath(sys.argv[0])
293
bzr_path = sys.executable
295
'bzr selftest: %s\n' % (bzr_path,))
298
bzrlib.__path__[0],))
300
' bzr-%s python-%s %s\n' % (
301
bzrlib.version_string,
302
bzrlib._format_version_tuple(sys.version_info),
303
platform.platform(aliased=1),
305
self.stream.write('\n')
413
self.report_tests_starting()
414
self._active_threads = threading.enumerate()
416
def _check_leaked_threads(self, test):
417
"""See if any threads have leaked since last call
419
A sample of live threads is stored in the _active_threads attribute,
420
when this method runs it compares the current live threads and any not
421
in the previous sample are treated as having leaked.
423
now_active_threads = set(threading.enumerate())
424
threads_leaked = now_active_threads.difference(self._active_threads)
426
self._report_thread_leak(test, threads_leaked, now_active_threads)
427
self._tests_leaking_threads_count += 1
428
if self._first_thread_leaker_id is None:
429
self._first_thread_leaker_id = test.id()
430
self._active_threads = now_active_threads
307
432
def _recordTestStartTime(self):
308
433
"""Record that a test has started."""
309
self._start_time = time.time()
311
def _cleanupLogFile(self, test):
312
# We can only do this if we have one of our TestCases, not if
314
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
315
if setKeepLogfile is not None:
434
self._start_datetime = self._now()
318
436
def addError(self, test, err):
319
437
"""Tell result that test finished with an error.
355
471
self._formatTime(benchmark_time),
357
473
self.report_success(test)
358
self._cleanupLogFile(test)
359
unittest.TestResult.addSuccess(self, test)
474
super(ExtendedTestResult, self).addSuccess(test)
360
475
test._log_contents = ''
362
477
def addExpectedFailure(self, test, err):
363
478
self.known_failure_count += 1
364
479
self.report_known_failure(test, err)
481
def addUnexpectedSuccess(self, test, details=None):
482
"""Tell result the test unexpectedly passed, counting as a failure
484
When the minimum version of testtools required becomes 0.9.8 this
485
can be updated to use the new handling there.
487
super(ExtendedTestResult, self).addFailure(test, details=details)
488
self.failure_count += 1
489
self.report_unexpected_success(test,
490
"".join(details["reason"].iter_text()))
366
494
def addNotSupported(self, test, feature):
367
495
"""The test will not be run because of a missing feature.
400
533
raise errors.BzrError("Unknown whence %r" % whence)
402
def report_cleaning_up(self):
535
def report_tests_starting(self):
536
"""Display information before the test run begins"""
537
if getattr(sys, 'frozen', None) is None:
538
bzr_path = osutils.realpath(sys.argv[0])
540
bzr_path = sys.executable
542
'bzr selftest: %s\n' % (bzr_path,))
545
bzrlib.__path__[0],))
547
' bzr-%s python-%s %s\n' % (
548
bzrlib.version_string,
549
bzrlib._format_version_tuple(sys.version_info),
550
platform.platform(aliased=1),
552
self.stream.write('\n')
554
def report_test_start(self, test):
555
"""Display information on the test just about to be run"""
557
def _report_thread_leak(self, test, leaked_threads, active_threads):
558
"""Display information on a test that leaked one or more threads"""
559
# GZ 2010-09-09: A leak summary reported separately from the general
560
# thread debugging would be nice. Tests under subunit
561
# need something not using stream, perhaps adding a
562
# testtools details object would be fitting.
563
if 'threads' in selftest_debug_flags:
564
self.stream.write('%s is leaking, active is now %d\n' %
565
(test.id(), len(active_threads)))
405
567
def startTestRun(self):
406
568
self.startTime = time.time()
550
709
return '%s%s' % (indent, err[1])
552
711
def report_error(self, test, err):
553
self.stream.writeln('ERROR %s\n%s'
712
self.stream.write('ERROR %s\n%s\n'
554
713
% (self._testTimeString(test),
555
714
self._error_summary(err)))
557
716
def report_failure(self, test, err):
558
self.stream.writeln(' FAIL %s\n%s'
717
self.stream.write(' FAIL %s\n%s\n'
559
718
% (self._testTimeString(test),
560
719
self._error_summary(err)))
562
721
def report_known_failure(self, test, err):
563
self.stream.writeln('XFAIL %s\n%s'
722
self.stream.write('XFAIL %s\n%s\n'
564
723
% (self._testTimeString(test),
565
724
self._error_summary(err)))
726
def report_unexpected_success(self, test, reason):
727
self.stream.write(' FAIL %s\n%s: %s\n'
728
% (self._testTimeString(test),
729
"Unexpected success. Should have failed",
567
732
def report_success(self, test):
568
self.stream.writeln(' OK %s' % self._testTimeString(test))
733
self.stream.write(' OK %s\n' % self._testTimeString(test))
569
734
for bench_called, stats in getattr(test, '_benchcalls', []):
570
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
735
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
571
736
stats.pprint(file=self.stream)
572
737
# flush the stream so that we get smooth output. This verbose mode is
573
738
# used to show the output in PQM.
574
739
self.stream.flush()
576
741
def report_skip(self, test, reason):
577
self.stream.writeln(' SKIP %s\n%s'
742
self.stream.write(' SKIP %s\n%s\n'
578
743
% (self._testTimeString(test), reason))
580
745
def report_not_applicable(self, test, reason):
581
self.stream.writeln(' N/A %s\n %s'
746
self.stream.write(' N/A %s\n %s\n'
582
747
% (self._testTimeString(test), reason))
584
749
def report_unsupported(self, test, feature):
585
750
"""test cannot be run because feature is missing."""
586
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
751
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
587
752
%(self._testTimeString(test), feature))
826
1008
self._track_transports()
827
1009
self._track_locks()
828
1010
self._clear_debug_flags()
829
TestCase._active_threads = threading.activeCount()
830
self.addCleanup(self._check_leaked_threads)
1011
# Isolate global verbosity level, to make sure it's reproducible
1012
# between tests. We should get rid of this altogether: bug 656694. --
1014
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1015
# Isolate config option expansion until its default value for bzrlib is
1016
# settled on or a the FIXME associated with _get_expand_default_value
1017
# is addressed -- vila 20110219
1018
self.overrideAttr(config, '_expand_default_value', None)
1019
self._log_files = set()
1020
# Each key in the ``_counters`` dict holds a value for a different
1021
# counter. When the test ends, addDetail() should be used to output the
1022
# counter values. This happens in install_counter_hook().
1024
if 'config_stats' in selftest_debug_flags:
1025
self._install_config_stats_hooks()
1026
# Do not use i18n for tests (unless the test reverses this)
832
1029
def debug(self):
833
1030
# debug a frame up.
835
pdb.Pdb().set_trace(sys._getframe().f_back)
837
def _check_leaked_threads(self):
838
active = threading.activeCount()
839
leaked_threads = active - TestCase._active_threads
840
TestCase._active_threads = active
841
# If some tests make the number of threads *decrease*, we'll consider
842
# that they are just observing old threads dieing, not agressively kill
843
# random threads. So we don't report these tests as leaking. The risk
844
# is that we have false positives that way (the test see 2 threads
845
# going away but leak one) but it seems less likely than the actual
846
# false positives (the test see threads going away and does not leak).
847
if leaked_threads > 0:
848
TestCase._leaking_threads_tests += 1
849
if TestCase._first_thread_leaker_id is None:
850
TestCase._first_thread_leaker_id = self.id()
1032
# The sys preserved stdin/stdout should allow blackbox tests debugging
1033
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1034
).set_trace(sys._getframe().f_back)
1036
def discardDetail(self, name):
1037
"""Extend the addDetail, getDetails api so we can remove a detail.
1039
eg. bzr always adds the 'log' detail at startup, but we don't want to
1040
include it for skipped, xfail, etc tests.
1042
It is safe to call this for a detail that doesn't exist, in case this
1043
gets called multiple times.
1045
# We cheat. details is stored in __details which means we shouldn't
1046
# touch it. but getDetails() returns the dict directly, so we can
1048
details = self.getDetails()
1052
def install_counter_hook(self, hooks, name, counter_name=None):
1053
"""Install a counting hook.
1055
Any hook can be counted as long as it doesn't need to return a value.
1057
:param hooks: Where the hook should be installed.
1059
:param name: The hook name that will be counted.
1061
:param counter_name: The counter identifier in ``_counters``, defaults
1064
_counters = self._counters # Avoid closing over self
1065
if counter_name is None:
1067
if _counters.has_key(counter_name):
1068
raise AssertionError('%s is already used as a counter name'
1070
_counters[counter_name] = 0
1071
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1072
lambda: ['%d' % (_counters[counter_name],)]))
1073
def increment_counter(*args, **kwargs):
1074
_counters[counter_name] += 1
1075
label = 'count %s calls' % (counter_name,)
1076
hooks.install_named_hook(name, increment_counter, label)
1077
self.addCleanup(hooks.uninstall_named_hook, name, label)
1079
def _install_config_stats_hooks(self):
1080
"""Install config hooks to count hook calls.
1083
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1084
self.install_counter_hook(config.ConfigHooks, hook_name,
1085
'config.%s' % (hook_name,))
1087
# The OldConfigHooks are private and need special handling to protect
1088
# against recursive tests (tests that run other tests), so we just do
1089
# manually what registering them into _builtin_known_hooks will provide
1091
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1092
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1093
self.install_counter_hook(config.OldConfigHooks, hook_name,
1094
'old_config.%s' % (hook_name,))
852
1096
def _clear_debug_flags(self):
853
1097
"""Prevent externally set debug flags affecting tests.
865
1109
def _clear_hooks(self):
866
1110
# prevent hooks affecting tests
1111
known_hooks = hooks.known_hooks
867
1112
self._preserved_hooks = {}
868
for key, factory in hooks.known_hooks.items():
869
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
870
current_hooks = hooks.known_hooks_key_to_object(key)
1113
for key, (parent, name) in known_hooks.iter_parent_objects():
1114
current_hooks = getattr(parent, name)
871
1115
self._preserved_hooks[parent] = (name, current_hooks)
1116
self._preserved_lazy_hooks = hooks._lazy_hooks
1117
hooks._lazy_hooks = {}
872
1118
self.addCleanup(self._restoreHooks)
873
for key, factory in hooks.known_hooks.items():
874
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1119
for key, (parent, name) in known_hooks.iter_parent_objects():
1120
factory = known_hooks.get(key)
875
1121
setattr(parent, name, factory())
876
1122
# this hook should always be installed
877
1123
request._install_hook()
1134
1384
'st_mtime did not match')
1135
1385
self.assertEqual(expected.st_ctime, actual.st_ctime,
1136
1386
'st_ctime did not match')
1137
if sys.platform != 'win32':
1387
if sys.platform == 'win32':
1138
1388
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1139
1389
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1140
# odd. Regardless we shouldn't actually try to assert anything
1141
# about their values
1390
# odd. We just force it to always be 0 to avoid any problems.
1391
self.assertEqual(0, expected.st_dev)
1392
self.assertEqual(0, actual.st_dev)
1393
self.assertEqual(0, expected.st_ino)
1394
self.assertEqual(0, actual.st_ino)
1142
1396
self.assertEqual(expected.st_dev, actual.st_dev,
1143
1397
'st_dev did not match')
1144
1398
self.assertEqual(expected.st_ino, actual.st_ino,
1321
1578
self.assertEqual(expected_docstring, obj.__doc__)
1580
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1323
1581
def failUnlessExists(self, path):
1582
return self.assertPathExists(path)
1584
def assertPathExists(self, path):
1324
1585
"""Fail unless path or paths, which may be abs or relative, exist."""
1325
1586
if not isinstance(path, basestring):
1327
self.failUnlessExists(p)
1588
self.assertPathExists(p)
1329
self.failUnless(osutils.lexists(path),path+" does not exist")
1590
self.assertTrue(osutils.lexists(path),
1591
path + " does not exist")
1593
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1331
1594
def failIfExists(self, path):
1595
return self.assertPathDoesNotExist(path)
1597
def assertPathDoesNotExist(self, path):
1332
1598
"""Fail if path or paths, which may be abs or relative, exist."""
1333
1599
if not isinstance(path, basestring):
1335
self.failIfExists(p)
1601
self.assertPathDoesNotExist(p)
1337
self.failIf(osutils.lexists(path),path+" exists")
1603
self.assertFalse(osutils.lexists(path),
1339
1606
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1340
1607
"""A helper for callDeprecated and applyDeprecated.
1455
1723
The file is removed as the test is torn down.
1457
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1458
self._log_file = os.fdopen(fileno, 'w+')
1459
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1460
self._log_file_name = name
1725
pseudo_log_file = StringIO()
1726
def _get_log_contents_for_weird_testtools_api():
1727
return [pseudo_log_file.getvalue().decode(
1728
"utf-8", "replace").encode("utf-8")]
1729
self.addDetail("log", content.Content(content.ContentType("text",
1730
"plain", {"charset": "utf8"}),
1731
_get_log_contents_for_weird_testtools_api))
1732
self._log_file = pseudo_log_file
1733
self._log_memento = trace.push_log_file(self._log_file)
1461
1734
self.addCleanup(self._finishLogFile)
1463
1736
def _finishLogFile(self):
1464
1737
"""Finished with the log file.
1466
Close the file and delete it, unless setKeepLogfile was called.
1739
Close the file and delete it.
1468
if bzrlib.trace._trace_file:
1741
if trace._trace_file:
1469
1742
# flush the log file, to get all content
1470
bzrlib.trace._trace_file.flush()
1471
bzrlib.trace.pop_log_file(self._log_memento)
1472
# Cache the log result and delete the file on disk
1473
self._get_log(False)
1743
trace._trace_file.flush()
1744
trace.pop_log_file(self._log_memento)
1475
1746
def thisFailsStrictLockCheck(self):
1476
1747
"""It is known that this test would fail with -Dstrict_locks.
1512
1778
setattr(obj, attr_name, new)
1781
def overrideEnv(self, name, new):
1782
"""Set an environment variable, and reset it after the test.
1784
:param name: The environment variable name.
1786
:param new: The value to set the variable to. If None, the
1787
variable is deleted from the environment.
1789
:returns: The actual variable value.
1791
value = osutils.set_or_unset_env(name, new)
1792
self.addCleanup(osutils.set_or_unset_env, name, value)
1795
def recordCalls(self, obj, attr_name):
1796
"""Monkeypatch in a wrapper that will record calls.
1798
The monkeypatch is automatically removed when the test concludes.
1800
:param obj: The namespace holding the reference to be replaced;
1801
typically a module, class, or object.
1802
:param attr_name: A string for the name of the attribute to
1804
:returns: A list that will be extended with one item every time the
1805
function is called, with a tuple of (args, kwargs).
1809
def decorator(*args, **kwargs):
1810
calls.append((args, kwargs))
1811
return orig(*args, **kwargs)
1812
orig = self.overrideAttr(obj, attr_name, decorator)
1515
1815
def _cleanEnvironment(self):
1517
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1518
'HOME': os.getcwd(),
1519
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1520
# tests do check our impls match APPDATA
1521
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1525
'BZREMAIL': None, # may still be present in the environment
1526
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1527
'BZR_PROGRESS_BAR': None,
1529
'BZR_PLUGIN_PATH': None,
1530
'BZR_DISABLE_PLUGINS': None,
1531
'BZR_PLUGINS_AT': None,
1532
'BZR_CONCURRENCY': None,
1533
# Make sure that any text ui tests are consistent regardless of
1534
# the environment the test case is run in; you may want tests that
1535
# test other combinations. 'dumb' is a reasonable guess for tests
1536
# going to a pipe or a StringIO.
1540
'BZR_COLUMNS': '80',
1542
'SSH_AUTH_SOCK': None,
1546
'https_proxy': None,
1547
'HTTPS_PROXY': None,
1552
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1553
# least. If you do (care), please update this comment
1557
'BZR_REMOTE_PATH': None,
1558
# Generally speaking, we don't want apport reporting on crashes in
1559
# the test envirnoment unless we're specifically testing apport,
1560
# so that it doesn't leak into the real system environment. We
1561
# use an env var so it propagates to subprocesses.
1562
'APPORT_DISABLE': '1',
1565
self.addCleanup(self._restoreEnvironment)
1566
for name, value in new_env.iteritems():
1567
self._captureVar(name, value)
1569
def _captureVar(self, name, newvalue):
1570
"""Set an environment variable, and reset it when finished."""
1571
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1573
def _restoreEnvironment(self):
1574
for name, value in self._old_env.iteritems():
1575
osutils.set_or_unset_env(name, value)
1816
for name, value in isolated_environ.iteritems():
1817
self.overrideEnv(name, value)
1577
1819
def _restoreHooks(self):
1578
1820
for klass, (name, hooks) in self._preserved_hooks.items():
1579
1821
setattr(klass, name, hooks)
1822
self._preserved_hooks.clear()
1823
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1824
self._preserved_lazy_hooks.clear()
1581
1826
def knownFailure(self, reason):
1582
"""This test has failed for some known reason."""
1583
raise KnownFailure(reason)
1827
"""Declare that this test fails for a known reason
1829
Tests that are known to fail should generally be using expectedFailure
1830
with an appropriate reverse assertion if a change could cause the test
1831
to start passing. Conversely if the test has no immediate prospect of
1832
succeeding then using skip is more suitable.
1834
When this method is called while an exception is being handled, that
1835
traceback will be used, otherwise a new exception will be thrown to
1836
provide one but won't be reported.
1838
self._add_reason(reason)
1840
exc_info = sys.exc_info()
1841
if exc_info != (None, None, None):
1842
self._report_traceback(exc_info)
1845
raise self.failureException(reason)
1846
except self.failureException:
1847
exc_info = sys.exc_info()
1848
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1849
raise testtools.testcase._ExpectedFailure(exc_info)
1853
def _suppress_log(self):
1854
"""Remove the log info from details."""
1855
self.discardDetail('log')
1585
1857
def _do_skip(self, result, reason):
1858
self._suppress_log()
1586
1859
addSkip = getattr(result, 'addSkip', None)
1587
1860
if not callable(addSkip):
1588
1861
result.addSuccess(result)
1611
1886
self._do_skip(result, reason)
1889
def _report_skip(self, result, err):
1890
"""Override the default _report_skip.
1892
We want to strip the 'log' detail. If we waint until _do_skip, it has
1893
already been formatted into the 'reason' string, and we can't pull it
1896
self._suppress_log()
1897
super(TestCase, self)._report_skip(self, result, err)
1900
def _report_expected_failure(self, result, err):
1903
See _report_skip for motivation.
1905
self._suppress_log()
1906
super(TestCase, self)._report_expected_failure(self, result, err)
1614
1909
def _do_unsupported_or_skip(self, result, e):
1615
1910
reason = e.args[0]
1911
self._suppress_log()
1616
1912
addNotSupported = getattr(result, 'addNotSupported', None)
1617
1913
if addNotSupported is not None:
1618
1914
result.addNotSupported(self, reason)
1644
1940
self._benchtime += time.time() - start
1646
1942
def log(self, *args):
1649
def _get_log(self, keep_log_file=False):
1650
"""Internal helper to get the log from bzrlib.trace for this test.
1652
Please use self.getDetails, or self.get_log to access this in test case
1655
:param keep_log_file: When True, if the log is still a file on disk
1656
leave it as a file on disk. When False, if the log is still a file
1657
on disk, the log file is deleted and the log preserved as
1659
:return: A string containing the log.
1661
if self._log_contents is not None:
1663
self._log_contents.decode('utf8')
1664
except UnicodeDecodeError:
1665
unicodestr = self._log_contents.decode('utf8', 'replace')
1666
self._log_contents = unicodestr.encode('utf8')
1667
return self._log_contents
1669
if bzrlib.trace._trace_file:
1670
# flush the log file, to get all content
1671
bzrlib.trace._trace_file.flush()
1672
if self._log_file_name is not None:
1673
logfile = open(self._log_file_name)
1675
log_contents = logfile.read()
1679
log_contents.decode('utf8')
1680
except UnicodeDecodeError:
1681
unicodestr = log_contents.decode('utf8', 'replace')
1682
log_contents = unicodestr.encode('utf8')
1683
if not keep_log_file:
1685
max_close_attempts = 100
1686
first_close_error = None
1687
while close_attempts < max_close_attempts:
1690
self._log_file.close()
1691
except IOError, ioe:
1692
if ioe.errno is None:
1693
# No errno implies 'close() called during
1694
# concurrent operation on the same file object', so
1695
# retry. Probably a thread is trying to write to
1697
if first_close_error is None:
1698
first_close_error = ioe
1703
if close_attempts > 1:
1705
'Unable to close log file on first attempt, '
1706
'will retry: %s\n' % (first_close_error,))
1707
if close_attempts == max_close_attempts:
1709
'Unable to close log file after %d attempts.\n'
1710
% (max_close_attempts,))
1711
self._log_file = None
1712
# Permit multiple calls to get_log until we clean it up in
1714
self._log_contents = log_contents
1716
os.remove(self._log_file_name)
1718
if sys.platform == 'win32' and e.errno == errno.EACCES:
1719
sys.stderr.write(('Unable to delete log file '
1720
' %r\n' % self._log_file_name))
1723
self._log_file_name = None
1726
return "No log file content and no log file name."
1728
1945
def get_log(self):
1729
1946
"""Get a unicode string containing the log from bzrlib.trace.
2222
def _add_subprocess_log(self, log_file_path):
2223
if len(self._log_files) == 0:
2224
# Register an addCleanup func. We do this on the first call to
2225
# _add_subprocess_log rather than in TestCase.setUp so that this
2226
# addCleanup is registered after any cleanups for tempdirs that
2227
# subclasses might create, which will probably remove the log file
2229
self.addCleanup(self._subprocess_log_cleanup)
2230
# self._log_files is a set, so if a log file is reused we won't grab it
2232
self._log_files.add(log_file_path)
2234
def _subprocess_log_cleanup(self):
2235
for count, log_file_path in enumerate(self._log_files):
2236
# We use buffer_now=True to avoid holding the file open beyond
2237
# the life of this function, which might interfere with e.g.
2238
# cleaning tempdirs on Windows.
2239
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2240
#detail_content = content.content_from_file(
2241
# log_file_path, buffer_now=True)
2242
with open(log_file_path, 'rb') as log_file:
2243
log_file_bytes = log_file.read()
2244
detail_content = content.Content(content.ContentType("text",
2245
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2246
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1998
2249
def _popen(self, *args, **kwargs):
1999
2250
"""Place a call to Popen.
2037
2288
if retcode is not None and retcode != process.returncode:
2038
2289
if process_args is None:
2039
2290
process_args = "(unknown args)"
2040
mutter('Output of bzr %s:\n%s', process_args, out)
2041
mutter('Error for bzr %s:\n%s', process_args, err)
2291
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2292
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2042
2293
self.fail('Command bzr %s failed with retcode %s != %s'
2043
2294
% (process_args, retcode, process.returncode))
2044
2295
return [out, err]
2046
def check_inventory_shape(self, inv, shape):
2047
"""Compare an inventory to a list of expected names.
2297
def check_tree_shape(self, tree, shape):
2298
"""Compare a tree to a list of expected names.
2049
2300
Fail if they are not precisely equal.
2052
2303
shape = list(shape) # copy
2053
for path, ie in inv.entries():
2304
for path, ie in tree.iter_entries_by_dir():
2054
2305
name = path.replace('\\', '/')
2055
2306
if ie.kind == 'directory':
2056
2307
name = name + '/'
2309
pass # ignore root entry
2058
2311
shape.remove(name)
2060
2313
extras.append(name)
2150
2403
class TestCaseWithMemoryTransport(TestCase):
2151
2404
"""Common test class for tests that do not need disk resources.
2153
Tests that need disk resources should derive from TestCaseWithTransport.
2406
Tests that need disk resources should derive from TestCaseInTempDir
2407
orTestCaseWithTransport.
2155
2409
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2157
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2411
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2158
2412
a directory which does not exist. This serves to help ensure test isolation
2159
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2160
must exist. However, TestCaseWithMemoryTransport does not offer local
2161
file defaults for the transport in tests, nor does it obey the command line
2413
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2414
must exist. However, TestCaseWithMemoryTransport does not offer local file
2415
defaults for the transport in tests, nor does it obey the command line
2162
2416
override, so tests that accidentally write to the common directory should
2165
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2166
a .bzr directory that stops us ascending higher into the filesystem.
2419
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2420
``.bzr`` directory that stops us ascending higher into the filesystem.
2169
2423
TEST_ROOT = None
2379
2644
def make_branch(self, relpath, format=None):
2380
2645
"""Create a branch on the transport at relpath."""
2381
2646
repo = self.make_repository(relpath, format=format)
2382
return repo.bzrdir.create_branch()
2647
return repo.bzrdir.create_branch(append_revisions_only=False)
2649
def get_default_format(self):
2652
def resolve_format(self, format):
2653
"""Resolve an object to a ControlDir format object.
2655
The initial format object can either already be
2656
a ControlDirFormat, None (for the default format),
2657
or a string with the name of the control dir format.
2659
:param format: Object to resolve
2660
:return A ControlDirFormat instance
2663
format = self.get_default_format()
2664
if isinstance(format, basestring):
2665
format = bzrdir.format_registry.make_bzrdir(format)
2384
2668
def make_bzrdir(self, relpath, format=None):
2431
2712
test_home_dir = self.test_home_dir
2432
2713
if isinstance(test_home_dir, unicode):
2433
2714
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2434
os.environ['HOME'] = test_home_dir
2435
os.environ['BZR_HOME'] = test_home_dir
2715
self.overrideEnv('HOME', test_home_dir)
2716
self.overrideEnv('BZR_HOME', test_home_dir)
2437
2718
def setUp(self):
2438
2719
super(TestCaseWithMemoryTransport, self).setUp()
2720
# Ensure that ConnectedTransport doesn't leak sockets
2721
def get_transport_from_url_with_cleanup(*args, **kwargs):
2722
t = orig_get_transport_from_url(*args, **kwargs)
2723
if isinstance(t, _mod_transport.ConnectedTransport):
2724
self.addCleanup(t.disconnect)
2727
orig_get_transport_from_url = self.overrideAttr(
2728
_mod_transport, 'get_transport_from_url',
2729
get_transport_from_url_with_cleanup)
2439
2730
self._make_test_root()
2440
2731
self.addCleanup(os.chdir, os.getcwdu())
2441
2732
self.makeAndChdirToTestDir()
3065
class TestDecorator(TestSuite):
3373
class TestDecorator(TestUtil.TestSuite):
3066
3374
"""A decorator for TestCase/TestSuite objects.
3068
Usually, subclasses should override __iter__(used when flattening test
3069
suites), which we do to filter, reorder, parallelise and so on, run() and
3376
Contains rather than flattening suite passed on construction
3073
def __init__(self, suite):
3074
TestSuite.__init__(self)
3077
def countTestCases(self):
3080
cases += test.countTestCases()
3087
def run(self, result):
3088
# Use iteration on self, not self._tests, to allow subclasses to hook
3091
if result.shouldStop:
3379
def __init__(self, suite=None):
3380
super(TestDecorator, self).__init__()
3381
if suite is not None:
3384
# Don't need subclass run method with suite emptying
3385
run = unittest.TestSuite.run
3097
3388
class CountingDecorator(TestDecorator):
3108
3399
"""A decorator which excludes test matching an exclude pattern."""
3110
3401
def __init__(self, suite, exclude_pattern):
3111
TestDecorator.__init__(self, suite)
3112
self.exclude_pattern = exclude_pattern
3113
self.excluded = False
3117
return iter(self._tests)
3118
self.excluded = True
3119
suite = exclude_tests_by_re(self, self.exclude_pattern)
3121
self.addTests(suite)
3122
return iter(self._tests)
3402
super(ExcludeDecorator, self).__init__(
3403
exclude_tests_by_re(suite, exclude_pattern))
3125
3406
class FilterTestsDecorator(TestDecorator):
3126
3407
"""A decorator which filters tests to those matching a pattern."""
3128
3409
def __init__(self, suite, pattern):
3129
TestDecorator.__init__(self, suite)
3130
self.pattern = pattern
3131
self.filtered = False
3135
return iter(self._tests)
3136
self.filtered = True
3137
suite = filter_suite_by_re(self, self.pattern)
3139
self.addTests(suite)
3140
return iter(self._tests)
3410
super(FilterTestsDecorator, self).__init__(
3411
filter_suite_by_re(suite, pattern))
3143
3414
class RandomDecorator(TestDecorator):
3144
3415
"""A decorator which randomises the order of its tests."""
3146
3417
def __init__(self, suite, random_seed, stream):
3147
TestDecorator.__init__(self, suite)
3148
self.random_seed = random_seed
3149
self.randomised = False
3150
self.stream = stream
3154
return iter(self._tests)
3155
self.randomised = True
3156
self.stream.write("Randomizing test order using seed %s\n\n" %
3157
(self.actual_seed()))
3418
random_seed = self.actual_seed(random_seed)
3419
stream.write("Randomizing test order using seed %s\n\n" %
3158
3421
# Initialise the random number generator.
3159
random.seed(self.actual_seed())
3160
suite = randomize_suite(self)
3162
self.addTests(suite)
3163
return iter(self._tests)
3422
random.seed(random_seed)
3423
super(RandomDecorator, self).__init__(randomize_suite(suite))
3165
def actual_seed(self):
3166
if self.random_seed == "now":
3426
def actual_seed(seed):
3167
3428
# We convert the seed to a long to make it reuseable across
3168
3429
# invocations (because the user can reenter it).
3169
self.random_seed = long(time.time())
3430
return long(time.time())
3171
3432
# Convert the seed to a long if we can
3173
self.random_seed = long(self.random_seed)
3435
except (TypeError, ValueError):
3176
return self.random_seed
3179
3440
class TestFirstDecorator(TestDecorator):
3180
3441
"""A decorator which moves named tests to the front."""
3182
3443
def __init__(self, suite, pattern):
3183
TestDecorator.__init__(self, suite)
3184
self.pattern = pattern
3185
self.filtered = False
3189
return iter(self._tests)
3190
self.filtered = True
3191
suites = split_suite_by_re(self, self.pattern)
3193
self.addTests(suites)
3194
return iter(self._tests)
3444
super(TestFirstDecorator, self).__init__()
3445
self.addTests(split_suite_by_re(suite, pattern))
3197
3448
def partition_tests(suite, count):
3198
3449
"""Partition suite into count lists of tests."""
3200
tests = list(iter_suite_tests(suite))
3201
tests_per_process = int(math.ceil(float(len(tests)) / count))
3202
for block in range(count):
3203
low_test = block * tests_per_process
3204
high_test = low_test + tests_per_process
3205
process_tests = tests[low_test:high_test]
3206
result.append(process_tests)
3450
# This just assigns tests in a round-robin fashion. On one hand this
3451
# splits up blocks of related tests that might run faster if they shared
3452
# resources, but on the other it avoids assigning blocks of slow tests to
3453
# just one partition. So the slowest partition shouldn't be much slower
3455
partitions = [list() for i in range(count)]
3456
tests = iter_suite_tests(suite)
3457
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3458
partition.append(test)
3210
3462
def workaround_zealous_crypto_random():
3333
class ForwardingResult(unittest.TestResult):
3335
def __init__(self, target):
3336
unittest.TestResult.__init__(self)
3337
self.result = target
3339
def startTest(self, test):
3340
self.result.startTest(test)
3342
def stopTest(self, test):
3343
self.result.stopTest(test)
3345
def startTestRun(self):
3346
self.result.startTestRun()
3348
def stopTestRun(self):
3349
self.result.stopTestRun()
3351
def addSkip(self, test, reason):
3352
self.result.addSkip(test, reason)
3354
def addSuccess(self, test):
3355
self.result.addSuccess(test)
3357
def addError(self, test, err):
3358
self.result.addError(test, err)
3360
def addFailure(self, test, err):
3361
self.result.addFailure(test, err)
3362
ForwardingResult = testtools.ExtendedToOriginalDecorator
3365
class ProfileResult(ForwardingResult):
3591
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3366
3592
"""Generate profiling data for all activity between start and success.
3368
3594
The profile data is appended to the test's _benchcalls attribute and can
3698
3933
'bzrlib.tests.test_commit_merge',
3699
3934
'bzrlib.tests.test_config',
3700
3935
'bzrlib.tests.test_conflicts',
3936
'bzrlib.tests.test_controldir',
3701
3937
'bzrlib.tests.test_counted_lock',
3702
3938
'bzrlib.tests.test_crash',
3703
3939
'bzrlib.tests.test_decorators',
3704
3940
'bzrlib.tests.test_delta',
3705
3941
'bzrlib.tests.test_debug',
3706
'bzrlib.tests.test_deprecated_graph',
3707
3942
'bzrlib.tests.test_diff',
3708
3943
'bzrlib.tests.test_directory_service',
3709
3944
'bzrlib.tests.test_dirstate',
3710
3945
'bzrlib.tests.test_email_message',
3711
3946
'bzrlib.tests.test_eol_filters',
3712
3947
'bzrlib.tests.test_errors',
3948
'bzrlib.tests.test_estimate_compressed_size',
3713
3949
'bzrlib.tests.test_export',
3950
'bzrlib.tests.test_export_pot',
3714
3951
'bzrlib.tests.test_extract',
3952
'bzrlib.tests.test_features',
3715
3953
'bzrlib.tests.test_fetch',
3716
3954
'bzrlib.tests.test_fixtures',
3717
3955
'bzrlib.tests.test_fifo_cache',
3718
3956
'bzrlib.tests.test_filters',
3957
'bzrlib.tests.test_filter_tree',
3719
3958
'bzrlib.tests.test_ftp_transport',
3720
3959
'bzrlib.tests.test_foreign',
3721
3960
'bzrlib.tests.test_generate_docs',
3954
4203
# Some tests mentioned in the list are not in the test suite. The
3955
4204
# list may be out of date, report to the tester.
3956
4205
for id in not_found:
3957
bzrlib.trace.warning('"%s" not found in the test suite', id)
4206
trace.warning('"%s" not found in the test suite', id)
3958
4207
for id in duplicates:
3959
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4208
trace.warning('"%s" is used as an id by several tests', id)
3964
def multiply_scenarios(scenarios_left, scenarios_right):
4213
def multiply_scenarios(*scenarios):
4214
"""Multiply two or more iterables of scenarios.
4216
It is safe to pass scenario generators or iterators.
4218
:returns: A list of compound scenarios: the cross-product of all
4219
scenarios, with the names concatenated and the parameters
4222
return reduce(_multiply_two_scenarios, map(list, scenarios))
4225
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3965
4226
"""Multiply two sets of scenarios.
3967
4228
:returns: the cartesian product of the two sets of scenarios, that is
4125
4399
% (os.path.basename(dirname), printable_e))
4128
class Feature(object):
4129
"""An operating system Feature."""
4132
self._available = None
4134
def available(self):
4135
"""Is the feature available?
4137
:return: True if the feature is available.
4139
if self._available is None:
4140
self._available = self._probe()
4141
return self._available
4144
"""Implement this method in concrete features.
4146
:return: True if the feature is available.
4148
raise NotImplementedError
4151
if getattr(self, 'feature_name', None):
4152
return self.feature_name()
4153
return self.__class__.__name__
4156
class _SymlinkFeature(Feature):
4159
return osutils.has_symlinks()
4161
def feature_name(self):
4164
SymlinkFeature = _SymlinkFeature()
4167
class _HardlinkFeature(Feature):
4170
return osutils.has_hardlinks()
4172
def feature_name(self):
4175
HardlinkFeature = _HardlinkFeature()
4178
class _OsFifoFeature(Feature):
4181
return getattr(os, 'mkfifo', None)
4183
def feature_name(self):
4184
return 'filesystem fifos'
4186
OsFifoFeature = _OsFifoFeature()
4189
class _UnicodeFilenameFeature(Feature):
4190
"""Does the filesystem support Unicode filenames?"""
4194
# Check for character combinations unlikely to be covered by any
4195
# single non-unicode encoding. We use the characters
4196
# - greek small letter alpha (U+03B1) and
4197
# - braille pattern dots-123456 (U+283F).
4198
os.stat(u'\u03b1\u283f')
4199
except UnicodeEncodeError:
4201
except (IOError, OSError):
4202
# The filesystem allows the Unicode filename but the file doesn't
4206
# The filesystem allows the Unicode filename and the file exists,
4210
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4213
class _CompatabilityThunkFeature(Feature):
4214
"""This feature is just a thunk to another feature.
4216
It issues a deprecation warning if it is accessed, to let you know that you
4217
should really use a different feature.
4220
def __init__(self, dep_version, module, name,
4221
replacement_name, replacement_module=None):
4222
super(_CompatabilityThunkFeature, self).__init__()
4223
self._module = module
4224
if replacement_module is None:
4225
replacement_module = module
4226
self._replacement_module = replacement_module
4228
self._replacement_name = replacement_name
4229
self._dep_version = dep_version
4230
self._feature = None
4233
if self._feature is None:
4234
depr_msg = self._dep_version % ('%s.%s'
4235
% (self._module, self._name))
4236
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4237
self._replacement_name)
4238
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4239
# Import the new feature and use it as a replacement for the
4241
mod = __import__(self._replacement_module, {}, {},
4242
[self._replacement_name])
4243
self._feature = getattr(mod, self._replacement_name)
4247
return self._feature._probe()
4250
class ModuleAvailableFeature(Feature):
4251
"""This is a feature than describes a module we want to be available.
4253
Declare the name of the module in __init__(), and then after probing, the
4254
module will be available as 'self.module'.
4256
:ivar module: The module if it is available, else None.
4259
def __init__(self, module_name):
4260
super(ModuleAvailableFeature, self).__init__()
4261
self.module_name = module_name
4265
self._module = __import__(self.module_name, {}, {}, [''])
4272
if self.available(): # Make sure the probe has been done
4276
def feature_name(self):
4277
return self.module_name
4280
# This is kept here for compatibility, it is recommended to use
4281
# 'bzrlib.tests.feature.paramiko' instead
4282
ParamikoFeature = _CompatabilityThunkFeature(
4283
deprecated_in((2,1,0)),
4284
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4287
4402
def probe_unicode_in_user_encoding():
4288
4403
"""Try to encode several unicode strings to use in unicode-aware tests.
4289
4404
Return first successfull match.
4320
class _HTTPSServerFeature(Feature):
4321
"""Some tests want an https Server, check if one is available.
4323
Right now, the only way this is available is under python2.6 which provides
4334
def feature_name(self):
4335
return 'HTTPSServer'
4338
HTTPSServerFeature = _HTTPSServerFeature()
4341
class _UnicodeFilename(Feature):
4342
"""Does the filesystem support Unicode filenames?"""
4347
except UnicodeEncodeError:
4349
except (IOError, OSError):
4350
# The filesystem allows the Unicode filename but the file doesn't
4354
# The filesystem allows the Unicode filename and the file exists,
4358
UnicodeFilename = _UnicodeFilename()
4361
class _ByteStringNamedFilesystem(Feature):
4362
"""Is the filesystem based on bytes?"""
4365
if os.name == "posix":
4369
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4372
class _UTF8Filesystem(Feature):
4373
"""Is the filesystem UTF-8?"""
4376
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4380
UTF8Filesystem = _UTF8Filesystem()
4383
class _BreakinFeature(Feature):
4384
"""Does this platform support the breakin feature?"""
4387
from bzrlib import breakin
4388
if breakin.determine_signal() is None:
4390
if sys.platform == 'win32':
4391
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4392
# We trigger SIGBREAK via a Console api so we need ctypes to
4393
# access the function
4400
def feature_name(self):
4401
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4404
BreakinFeature = _BreakinFeature()
4407
class _CaseInsCasePresFilenameFeature(Feature):
4408
"""Is the file-system case insensitive, but case-preserving?"""
4411
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4413
# first check truly case-preserving for created files, then check
4414
# case insensitive when opening existing files.
4415
name = osutils.normpath(name)
4416
base, rel = osutils.split(name)
4417
found_rel = osutils.canonical_relpath(base, name)
4418
return (found_rel == rel
4419
and os.path.isfile(name.upper())
4420
and os.path.isfile(name.lower()))
4425
def feature_name(self):
4426
return "case-insensitive case-preserving filesystem"
4428
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4431
class _CaseInsensitiveFilesystemFeature(Feature):
4432
"""Check if underlying filesystem is case-insensitive but *not* case
4435
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4436
# more likely to be case preserving, so this case is rare.
4439
if CaseInsCasePresFilenameFeature.available():
4442
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4443
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4444
TestCaseWithMemoryTransport.TEST_ROOT = root
4446
root = TestCaseWithMemoryTransport.TEST_ROOT
4447
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4449
name_a = osutils.pathjoin(tdir, 'a')
4450
name_A = osutils.pathjoin(tdir, 'A')
4452
result = osutils.isdir(name_A)
4453
_rmtree_temp_dir(tdir)
4456
def feature_name(self):
4457
return 'case-insensitive filesystem'
4459
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4462
class _CaseSensitiveFilesystemFeature(Feature):
4465
if CaseInsCasePresFilenameFeature.available():
4467
elif CaseInsensitiveFilesystemFeature.available():
4472
def feature_name(self):
4473
return 'case-sensitive filesystem'
4475
# new coding style is for feature instances to be lowercase
4476
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4479
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4480
SubUnitFeature = _CompatabilityThunkFeature(
4481
deprecated_in((2,1,0)),
4482
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4483
4435
# Only define SubUnitBzrRunner if subunit is available.
4485
4437
from subunit import TestProtocolClient
4486
4438
from subunit.test_results import AutoTimingTestResultDecorator
4439
class SubUnitBzrProtocolClient(TestProtocolClient):
4441
def stopTest(self, test):
4442
super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
_clear__type_equality_funcs(test)
4445
def addSuccess(self, test, details=None):
4446
# The subunit client always includes the details in the subunit
4447
# stream, but we don't want to include it in ours.
4448
if details is not None and 'log' in details:
4450
return super(SubUnitBzrProtocolClient, self).addSuccess(
4487
4453
class SubUnitBzrRunner(TextTestRunner):
4488
4454
def run(self, test):
4489
4455
result = AutoTimingTestResultDecorator(
4490
TestProtocolClient(self.stream))
4456
SubUnitBzrProtocolClient(self.stream))
4491
4457
test.run(result)
4493
4459
except ImportError:
4496
class _PosixPermissionsFeature(Feature):
4500
# create temporary file and check if specified perms are maintained.
4503
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4504
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4507
os.chmod(name, write_perms)
4509
read_perms = os.stat(name).st_mode & 0777
4511
return (write_perms == read_perms)
4513
return (os.name == 'posix') and has_perms()
4515
def feature_name(self):
4516
return 'POSIX permissions support'
4518
posix_permissions_feature = _PosixPermissionsFeature()
4463
@deprecated_function(deprecated_in((2, 5, 0)))
4464
def ModuleAvailableFeature(name):
4465
from bzrlib.tests import features
4466
return features.ModuleAvailableFeature(name)