50
54
# nb: check this before importing anything else from within it
51
55
_testtools_version = getattr(testtools, '__version__', ())
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
56
if _testtools_version < (0, 9, 2):
57
raise ImportError("need at least testtools 0.9.2: %s is %r"
54
58
% (testtools.__file__, _testtools_version))
55
59
from testtools import content
58
61
from bzrlib import (
62
commands as _mod_commands,
71
plugin as _mod_plugin,
78
transport as _mod_transport,
79
import bzrlib.commands
80
import bzrlib.timestamp
82
import bzrlib.inventory
83
import bzrlib.iterablefile
82
86
import bzrlib.lsprof
83
87
except ImportError:
84
88
# lsprof not available
86
from bzrlib.smart import client, request
90
from bzrlib.merge import merge_inner
93
from bzrlib.smart import client, request, server
95
from bzrlib import symbol_versioning
96
from bzrlib.symbol_versioning import (
87
104
from bzrlib.transport import (
91
from bzrlib.symbol_versioning import (
109
import bzrlib.transport
110
from bzrlib.trace import mutter, note
95
111
from bzrlib.tests import (
115
from bzrlib.tests.http_server import HttpServer
116
from bzrlib.tests.TestUtil import (
120
from bzrlib.tests.treeshape import build_tree_contents
100
121
from bzrlib.ui import NullProgressView
101
122
from bzrlib.ui.text import TextUIFactory
123
import bzrlib.version_info_formats.format_custom
124
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
103
126
# Mark this python module as being part of the implementation
104
127
# of unittest: this gives us better tracebacks where the last
116
139
SUBUNIT_SEEK_SET = 0
117
140
SUBUNIT_SEEK_CUR = 1
119
# These are intentionally brought into this namespace. That way plugins, etc
120
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
121
TestSuite = TestUtil.TestSuite
122
TestLoader = TestUtil.TestLoader
124
# Tests should run in a clean and clearly defined environment. The goal is to
125
# keep them isolated from the running environment as mush as possible. The test
126
# framework ensures the variables defined below are set (or deleted if the
127
# value is None) before a test is run and reset to their original value after
128
# the test is run. Generally if some code depends on an environment variable,
129
# the tests should start without this variable in the environment. There are a
130
# few exceptions but you shouldn't violate this rule lightly.
134
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
135
# tests do check our impls match APPDATA
136
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
140
'BZREMAIL': None, # may still be present in the environment
141
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
142
'BZR_PROGRESS_BAR': None,
143
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
144
# as a base class instead of TestCaseInTempDir. Tests inheriting from
145
# TestCase should not use disk resources, BZR_LOG is one.
146
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
147
'BZR_PLUGIN_PATH': None,
148
'BZR_DISABLE_PLUGINS': None,
149
'BZR_PLUGINS_AT': None,
150
'BZR_CONCURRENCY': None,
151
# Make sure that any text ui tests are consistent regardless of
152
# the environment the test case is run in; you may want tests that
153
# test other combinations. 'dumb' is a reasonable guess for tests
154
# going to a pipe or a StringIO.
160
'SSH_AUTH_SOCK': None,
170
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
171
# least. If you do (care), please update this comment
175
'BZR_REMOTE_PATH': None,
176
# Generally speaking, we don't want apport reporting on crashes in
177
# the test envirnoment unless we're specifically testing apport,
178
# so that it doesn't leak into the real system environment. We
179
# use an env var so it propagates to subprocesses.
180
'APPORT_DISABLE': '1',
184
def override_os_environ(test, env=None):
185
"""Modify os.environ keeping a copy.
187
:param test: A test instance
189
:param env: A dict containing variable definitions to be installed
192
env = isolated_environ
193
test._original_os_environ = dict([(var, value)
194
for var, value in os.environ.iteritems()])
195
for var, value in env.iteritems():
196
osutils.set_or_unset_env(var, value)
197
if var not in test._original_os_environ:
198
# The var is new, add it with a value of None, so
199
# restore_os_environ will delete it
200
test._original_os_environ[var] = None
203
def restore_os_environ(test):
204
"""Restore os.environ to its original state.
206
:param test: A test instance previously passed to override_os_environ.
208
for var, value in test._original_os_environ.iteritems():
209
# Restore the original value (or delete it if the value has been set to
210
# None in override_os_environ).
211
osutils.set_or_unset_env(var, value)
214
class ExtendedTestResult(testtools.TextTestResult):
143
class ExtendedTestResult(unittest._TextTestResult):
215
144
"""Accepts, reports and accumulates the results of running tests.
217
146
Compared to the unittest version this class adds support for
266
195
self._overall_start_time = time.time()
267
196
self._strict = strict
268
self._first_thread_leaker_id = None
269
self._tests_leaking_threads_count = 0
270
self._traceback_from_test = None
272
198
def stopTestRun(self):
273
199
run = self.testsRun
274
200
actionTaken = "Ran"
275
201
stopTime = time.time()
276
202
timeTaken = stopTime - self.startTime
277
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
278
# the parent class method is similar have to duplicate
279
self._show_list('ERROR', self.errors)
280
self._show_list('FAIL', self.failures)
281
self.stream.write(self.sep2)
282
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
204
self.stream.writeln(self.separator2)
205
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
283
206
run, run != 1 and "s" or "", timeTaken))
207
self.stream.writeln()
284
208
if not self.wasSuccessful():
285
209
self.stream.write("FAILED (")
286
210
failed, errored = map(len, (self.failures, self.errors))
293
217
if failed or errored: self.stream.write(", ")
294
218
self.stream.write("known_failure_count=%d" %
295
219
self.known_failure_count)
296
self.stream.write(")\n")
220
self.stream.writeln(")")
298
222
if self.known_failure_count:
299
self.stream.write("OK (known_failures=%d)\n" %
223
self.stream.writeln("OK (known_failures=%d)" %
300
224
self.known_failure_count)
302
self.stream.write("OK\n")
226
self.stream.writeln("OK")
303
227
if self.skip_count > 0:
304
228
skipped = self.skip_count
305
self.stream.write('%d test%s skipped\n' %
229
self.stream.writeln('%d test%s skipped' %
306
230
(skipped, skipped != 1 and "s" or ""))
307
231
if self.unsupported:
308
232
for feature, count in sorted(self.unsupported.items()):
309
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
233
self.stream.writeln("Missing feature '%s' skipped %d tests." %
310
234
(feature, count))
312
236
ok = self.wasStrictlySuccessful()
314
238
ok = self.wasSuccessful()
315
if self._first_thread_leaker_id:
239
if TestCase._first_thread_leaker_id:
316
240
self.stream.write(
317
241
'%s is leaking threads among %d leaking tests.\n' % (
318
self._first_thread_leaker_id,
319
self._tests_leaking_threads_count))
242
TestCase._first_thread_leaker_id,
243
TestCase._leaking_threads_tests))
320
244
# We don't report the main thread as an active one.
321
245
self.stream.write(
322
246
'%d non-main threads were left active in the end.\n'
323
% (len(self._active_threads) - 1))
247
% (TestCase._active_threads - 1))
325
249
def getDescription(self, test):
352
275
def _shortened_test_description(self, test):
354
what = re.sub(r'^bzrlib\.tests\.', '', what)
277
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
357
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
358
# multiple times in a row, because the handler is added for
359
# each test but the container list is shared between cases.
360
# See lp:498869 lp:625574 and lp:637725 for background.
361
def _record_traceback_from_test(self, exc_info):
362
"""Store the traceback from passed exc_info tuple till"""
363
self._traceback_from_test = exc_info[2]
365
280
def startTest(self, test):
366
super(ExtendedTestResult, self).startTest(test)
281
unittest.TestResult.startTest(self, test)
367
282
if self.count == 0:
368
283
self.startTests()
370
284
self.report_test_start(test)
371
285
test.number = self.count
372
286
self._recordTestStartTime()
373
# Make testtools cases give us the real traceback on failure
374
addOnException = getattr(test, "addOnException", None)
375
if addOnException is not None:
376
addOnException(self._record_traceback_from_test)
377
# Only check for thread leaks on bzrlib derived test cases
378
if isinstance(test, TestCase):
379
test.addCleanup(self._check_leaked_threads, test)
381
def stopTest(self, test):
382
super(ExtendedTestResult, self).stopTest(test)
383
# Manually break cycles, means touching various private things but hey
384
getDetails = getattr(test, "getDetails", None)
385
if getDetails is not None:
387
# Clear _type_equality_funcs to try to stop TestCase instances
388
# from wasting memory. 'clear' is not available in all Python
389
# versions (bug 809048)
390
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
391
if type_equality_funcs is not None:
392
tef_clear = getattr(type_equality_funcs, "clear", None)
393
if tef_clear is None:
394
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
395
if tef_instance_dict is not None:
396
tef_clear = tef_instance_dict.clear
397
if tef_clear is not None:
399
self._traceback_from_test = None
401
288
def startTests(self):
402
self.report_tests_starting()
403
self._active_threads = threading.enumerate()
405
def _check_leaked_threads(self, test):
406
"""See if any threads have leaked since last call
408
A sample of live threads is stored in the _active_threads attribute,
409
when this method runs it compares the current live threads and any not
410
in the previous sample are treated as having leaked.
412
now_active_threads = set(threading.enumerate())
413
threads_leaked = now_active_threads.difference(self._active_threads)
415
self._report_thread_leak(test, threads_leaked, now_active_threads)
416
self._tests_leaking_threads_count += 1
417
if self._first_thread_leaker_id is None:
418
self._first_thread_leaker_id = test.id()
419
self._active_threads = now_active_threads
290
if getattr(sys, 'frozen', None) is None:
291
bzr_path = osutils.realpath(sys.argv[0])
293
bzr_path = sys.executable
295
'bzr selftest: %s\n' % (bzr_path,))
298
bzrlib.__path__[0],))
300
' bzr-%s python-%s %s\n' % (
301
bzrlib.version_string,
302
bzrlib._format_version_tuple(sys.version_info),
303
platform.platform(aliased=1),
305
self.stream.write('\n')
421
307
def _recordTestStartTime(self):
422
308
"""Record that a test has started."""
423
self._start_datetime = self._now()
309
self._start_time = time.time()
311
def _cleanupLogFile(self, test):
312
# We can only do this if we have one of our TestCases, not if
314
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
315
if setKeepLogfile is not None:
425
318
def addError(self, test, err):
426
319
"""Tell result that test finished with an error.
460
355
self._formatTime(benchmark_time),
462
357
self.report_success(test)
463
super(ExtendedTestResult, self).addSuccess(test)
358
self._cleanupLogFile(test)
359
unittest.TestResult.addSuccess(self, test)
464
360
test._log_contents = ''
466
362
def addExpectedFailure(self, test, err):
467
363
self.known_failure_count += 1
468
364
self.report_known_failure(test, err)
470
def addUnexpectedSuccess(self, test, details=None):
471
"""Tell result the test unexpectedly passed, counting as a failure
473
When the minimum version of testtools required becomes 0.9.8 this
474
can be updated to use the new handling there.
476
super(ExtendedTestResult, self).addFailure(test, details=details)
477
self.failure_count += 1
478
self.report_unexpected_success(test,
479
"".join(details["reason"].iter_text()))
483
366
def addNotSupported(self, test, feature):
484
367
"""The test will not be run because of a missing feature.
518
400
raise errors.BzrError("Unknown whence %r" % whence)
520
def report_tests_starting(self):
521
"""Display information before the test run begins"""
522
if getattr(sys, 'frozen', None) is None:
523
bzr_path = osutils.realpath(sys.argv[0])
525
bzr_path = sys.executable
527
'bzr selftest: %s\n' % (bzr_path,))
530
bzrlib.__path__[0],))
532
' bzr-%s python-%s %s\n' % (
533
bzrlib.version_string,
534
bzrlib._format_version_tuple(sys.version_info),
535
platform.platform(aliased=1),
537
self.stream.write('\n')
539
def report_test_start(self, test):
540
"""Display information on the test just about to be run"""
542
def _report_thread_leak(self, test, leaked_threads, active_threads):
543
"""Display information on a test that leaked one or more threads"""
544
# GZ 2010-09-09: A leak summary reported separately from the general
545
# thread debugging would be nice. Tests under subunit
546
# need something not using stream, perhaps adding a
547
# testtools details object would be fitting.
548
if 'threads' in selftest_debug_flags:
549
self.stream.write('%s is leaking, active is now %d\n' %
550
(test.id(), len(active_threads)))
402
def report_cleaning_up(self):
552
405
def startTestRun(self):
553
406
self.startTime = time.time()
694
550
return '%s%s' % (indent, err[1])
696
552
def report_error(self, test, err):
697
self.stream.write('ERROR %s\n%s\n'
553
self.stream.writeln('ERROR %s\n%s'
698
554
% (self._testTimeString(test),
699
555
self._error_summary(err)))
701
557
def report_failure(self, test, err):
702
self.stream.write(' FAIL %s\n%s\n'
558
self.stream.writeln(' FAIL %s\n%s'
703
559
% (self._testTimeString(test),
704
560
self._error_summary(err)))
706
562
def report_known_failure(self, test, err):
707
self.stream.write('XFAIL %s\n%s\n'
563
self.stream.writeln('XFAIL %s\n%s'
708
564
% (self._testTimeString(test),
709
565
self._error_summary(err)))
711
def report_unexpected_success(self, test, reason):
712
self.stream.write(' FAIL %s\n%s: %s\n'
713
% (self._testTimeString(test),
714
"Unexpected success. Should have failed",
717
567
def report_success(self, test):
718
self.stream.write(' OK %s\n' % self._testTimeString(test))
568
self.stream.writeln(' OK %s' % self._testTimeString(test))
719
569
for bench_called, stats in getattr(test, '_benchcalls', []):
720
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
570
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
721
571
stats.pprint(file=self.stream)
722
572
# flush the stream so that we get smooth output. This verbose mode is
723
573
# used to show the output in PQM.
724
574
self.stream.flush()
726
576
def report_skip(self, test, reason):
727
self.stream.write(' SKIP %s\n%s\n'
577
self.stream.writeln(' SKIP %s\n%s'
728
578
% (self._testTimeString(test), reason))
730
580
def report_not_applicable(self, test, reason):
731
self.stream.write(' N/A %s\n %s\n'
581
self.stream.writeln(' N/A %s\n %s'
732
582
% (self._testTimeString(test), reason))
734
584
def report_unsupported(self, test, feature):
735
585
"""test cannot be run because feature is missing."""
736
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
586
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
737
587
%(self._testTimeString(test), feature))
960
788
routine, and to build and check bzr trees.
962
790
In addition to the usual method of overriding tearDown(), this class also
963
allows subclasses to register cleanup functions via addCleanup, which are
791
allows subclasses to register functions into the _cleanups list, which is
964
792
run in order as the object is torn down. It's less likely this will be
965
793
accidentally overlooked.
796
_active_threads = None
797
_leaking_threads_tests = 0
798
_first_thread_leaker_id = None
799
_log_file_name = None
969
800
# record lsprof data when performing benchmark calls.
970
801
_gather_lsprof_in_benchmarks = False
972
803
def __init__(self, methodName='testMethod'):
973
804
super(TestCase, self).__init__(methodName)
974
806
self._directory_isolation = True
975
807
self.exception_handlers.insert(0,
976
808
(UnavailableFeature, self._do_unsupported_or_skip))
990
826
self._track_transports()
991
827
self._track_locks()
992
828
self._clear_debug_flags()
993
# Isolate global verbosity level, to make sure it's reproducible
994
# between tests. We should get rid of this altogether: bug 656694. --
996
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
997
# Isolate config option expansion until its default value for bzrlib is
998
# settled on or a the FIXME associated with _get_expand_default_value
999
# is addressed -- vila 20110219
1000
self.overrideAttr(config, '_expand_default_value', None)
1001
self._log_files = set()
1002
# Each key in the ``_counters`` dict holds a value for a different
1003
# counter. When the test ends, addDetail() should be used to output the
1004
# counter values. This happens in install_counter_hook().
1006
if 'config_stats' in selftest_debug_flags:
1007
self._install_config_stats_hooks()
829
TestCase._active_threads = threading.activeCount()
830
self.addCleanup(self._check_leaked_threads)
1009
832
def debug(self):
1010
833
# debug a frame up.
1012
835
pdb.Pdb().set_trace(sys._getframe().f_back)
1014
def discardDetail(self, name):
1015
"""Extend the addDetail, getDetails api so we can remove a detail.
1017
eg. bzr always adds the 'log' detail at startup, but we don't want to
1018
include it for skipped, xfail, etc tests.
1020
It is safe to call this for a detail that doesn't exist, in case this
1021
gets called multiple times.
1023
# We cheat. details is stored in __details which means we shouldn't
1024
# touch it. but getDetails() returns the dict directly, so we can
1026
details = self.getDetails()
1030
def install_counter_hook(self, hooks, name, counter_name=None):
1031
"""Install a counting hook.
1033
Any hook can be counted as long as it doesn't need to return a value.
1035
:param hooks: Where the hook should be installed.
1037
:param name: The hook name that will be counted.
1039
:param counter_name: The counter identifier in ``_counters``, defaults
1042
_counters = self._counters # Avoid closing over self
1043
if counter_name is None:
1045
if _counters.has_key(counter_name):
1046
raise AssertionError('%s is already used as a counter name'
1048
_counters[counter_name] = 0
1049
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1050
lambda: ['%d' % (_counters[counter_name],)]))
1051
def increment_counter(*args, **kwargs):
1052
_counters[counter_name] += 1
1053
label = 'count %s calls' % (counter_name,)
1054
hooks.install_named_hook(name, increment_counter, label)
1055
self.addCleanup(hooks.uninstall_named_hook, name, label)
1057
def _install_config_stats_hooks(self):
1058
"""Install config hooks to count hook calls.
1061
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
self.install_counter_hook(config.ConfigHooks, hook_name,
1063
'config.%s' % (hook_name,))
1065
# The OldConfigHooks are private and need special handling to protect
1066
# against recursive tests (tests that run other tests), so we just do
1067
# manually what registering them into _builtin_known_hooks will provide
1069
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1070
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1071
self.install_counter_hook(config.OldConfigHooks, hook_name,
1072
'old_config.%s' % (hook_name,))
837
def _check_leaked_threads(self):
838
active = threading.activeCount()
839
leaked_threads = active - TestCase._active_threads
840
TestCase._active_threads = active
841
# If some tests make the number of threads *decrease*, we'll consider
842
# that they are just observing old threads dieing, not agressively kill
843
# random threads. So we don't report these tests as leaking. The risk
844
# is that we have false positives that way (the test see 2 threads
845
# going away but leak one) but it seems less likely than the actual
846
# false positives (the test see threads going away and does not leak).
847
if leaked_threads > 0:
848
TestCase._leaking_threads_tests += 1
849
if TestCase._first_thread_leaker_id is None:
850
TestCase._first_thread_leaker_id = self.id()
1074
852
def _clear_debug_flags(self):
1075
853
"""Prevent externally set debug flags affecting tests.
1087
865
def _clear_hooks(self):
1088
866
# prevent hooks affecting tests
1089
known_hooks = hooks.known_hooks
1090
867
self._preserved_hooks = {}
1091
for key, (parent, name) in known_hooks.iter_parent_objects():
1092
current_hooks = getattr(parent, name)
868
for key, factory in hooks.known_hooks.items():
869
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
870
current_hooks = hooks.known_hooks_key_to_object(key)
1093
871
self._preserved_hooks[parent] = (name, current_hooks)
1094
self._preserved_lazy_hooks = hooks._lazy_hooks
1095
hooks._lazy_hooks = {}
1096
872
self.addCleanup(self._restoreHooks)
1097
for key, (parent, name) in known_hooks.iter_parent_objects():
1098
factory = known_hooks.get(key)
873
for key, factory in hooks.known_hooks.items():
874
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1099
875
setattr(parent, name, factory())
1100
876
# this hook should always be installed
1101
877
request._install_hook()
1362
1134
'st_mtime did not match')
1363
1135
self.assertEqual(expected.st_ctime, actual.st_ctime,
1364
1136
'st_ctime did not match')
1365
if sys.platform == 'win32':
1137
if sys.platform != 'win32':
1366
1138
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1367
1139
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1368
# odd. We just force it to always be 0 to avoid any problems.
1369
self.assertEqual(0, expected.st_dev)
1370
self.assertEqual(0, actual.st_dev)
1371
self.assertEqual(0, expected.st_ino)
1372
self.assertEqual(0, actual.st_ino)
1140
# odd. Regardless we shouldn't actually try to assert anything
1141
# about their values
1374
1142
self.assertEqual(expected.st_dev, actual.st_dev,
1375
1143
'st_dev did not match')
1376
1144
self.assertEqual(expected.st_ino, actual.st_ino,
1548
1313
self.assertEqualDiff(content, s)
1550
def assertDocstring(self, expected_docstring, obj):
1551
"""Fail if obj does not have expected_docstring"""
1553
# With -OO the docstring should be None instead
1554
self.assertIs(obj.__doc__, None)
1556
self.assertEqual(expected_docstring, obj.__doc__)
1558
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1559
1315
def failUnlessExists(self, path):
1560
return self.assertPathExists(path)
1562
def assertPathExists(self, path):
1563
1316
"""Fail unless path or paths, which may be abs or relative, exist."""
1564
1317
if not isinstance(path, basestring):
1566
self.assertPathExists(p)
1319
self.failUnlessExists(p)
1568
self.assertTrue(osutils.lexists(path),
1569
path + " does not exist")
1321
self.failUnless(osutils.lexists(path),path+" does not exist")
1571
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1572
1323
def failIfExists(self, path):
1573
return self.assertPathDoesNotExist(path)
1575
def assertPathDoesNotExist(self, path):
1576
1324
"""Fail if path or paths, which may be abs or relative, exist."""
1577
1325
if not isinstance(path, basestring):
1579
self.assertPathDoesNotExist(p)
1327
self.failIfExists(p)
1581
self.assertFalse(osutils.lexists(path),
1329
self.failIf(osutils.lexists(path),path+" exists")
1584
1331
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1585
1332
"""A helper for callDeprecated and applyDeprecated.
1701
1447
The file is removed as the test is torn down.
1703
pseudo_log_file = StringIO()
1704
def _get_log_contents_for_weird_testtools_api():
1705
return [pseudo_log_file.getvalue().decode(
1706
"utf-8", "replace").encode("utf-8")]
1707
self.addDetail("log", content.Content(content.ContentType("text",
1708
"plain", {"charset": "utf8"}),
1709
_get_log_contents_for_weird_testtools_api))
1710
self._log_file = pseudo_log_file
1711
self._log_memento = trace.push_log_file(self._log_file)
1449
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1450
self._log_file = os.fdopen(fileno, 'w+')
1451
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1452
self._log_file_name = name
1712
1453
self.addCleanup(self._finishLogFile)
1714
1455
def _finishLogFile(self):
1715
1456
"""Finished with the log file.
1717
Close the file and delete it.
1458
Close the file and delete it, unless setKeepLogfile was called.
1719
if trace._trace_file:
1460
if bzrlib.trace._trace_file:
1720
1461
# flush the log file, to get all content
1721
trace._trace_file.flush()
1722
trace.pop_log_file(self._log_memento)
1462
bzrlib.trace._trace_file.flush()
1463
bzrlib.trace.pop_log_file(self._log_memento)
1464
# Cache the log result and delete the file on disk
1465
self._get_log(False)
1724
1467
def thisFailsStrictLockCheck(self):
1725
1468
"""It is known that this test would fail with -Dstrict_locks.
1756
1504
setattr(obj, attr_name, new)
1759
def overrideEnv(self, name, new):
1760
"""Set an environment variable, and reset it after the test.
1762
:param name: The environment variable name.
1764
:param new: The value to set the variable to. If None, the
1765
variable is deleted from the environment.
1767
:returns: The actual variable value.
1769
value = osutils.set_or_unset_env(name, new)
1770
self.addCleanup(osutils.set_or_unset_env, name, value)
1773
def recordCalls(self, obj, attr_name):
1774
"""Monkeypatch in a wrapper that will record calls.
1776
The monkeypatch is automatically removed when the test concludes.
1778
:param obj: The namespace holding the reference to be replaced;
1779
typically a module, class, or object.
1780
:param attr_name: A string for the name of the attribute to
1782
:returns: A list that will be extended with one item every time the
1783
function is called, with a tuple of (args, kwargs).
1787
def decorator(*args, **kwargs):
1788
calls.append((args, kwargs))
1789
return orig(*args, **kwargs)
1790
orig = self.overrideAttr(obj, attr_name, decorator)
1793
1507
def _cleanEnvironment(self):
1794
for name, value in isolated_environ.iteritems():
1795
self.overrideEnv(name, value)
1509
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1510
'HOME': os.getcwd(),
1511
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1512
# tests do check our impls match APPDATA
1513
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1517
'BZREMAIL': None, # may still be present in the environment
1519
'BZR_PROGRESS_BAR': None,
1521
'BZR_PLUGIN_PATH': None,
1522
'BZR_CONCURRENCY': None,
1523
# Make sure that any text ui tests are consistent regardless of
1524
# the environment the test case is run in; you may want tests that
1525
# test other combinations. 'dumb' is a reasonable guess for tests
1526
# going to a pipe or a StringIO.
1530
'BZR_COLUMNS': '80',
1532
'SSH_AUTH_SOCK': None,
1536
'https_proxy': None,
1537
'HTTPS_PROXY': None,
1542
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1543
# least. If you do (care), please update this comment
1547
'BZR_REMOTE_PATH': None,
1548
# Generally speaking, we don't want apport reporting on crashes in
1549
# the test envirnoment unless we're specifically testing apport,
1550
# so that it doesn't leak into the real system environment. We
1551
# use an env var so it propagates to subprocesses.
1552
'APPORT_DISABLE': '1',
1555
self.addCleanup(self._restoreEnvironment)
1556
for name, value in new_env.iteritems():
1557
self._captureVar(name, value)
1559
def _captureVar(self, name, newvalue):
1560
"""Set an environment variable, and reset it when finished."""
1561
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1563
def _restoreEnvironment(self):
1564
for name, value in self._old_env.iteritems():
1565
osutils.set_or_unset_env(name, value)
1797
1567
def _restoreHooks(self):
1798
1568
for klass, (name, hooks) in self._preserved_hooks.items():
1799
1569
setattr(klass, name, hooks)
1800
self._preserved_hooks.clear()
1801
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1802
self._preserved_lazy_hooks.clear()
1804
1571
def knownFailure(self, reason):
1805
"""Declare that this test fails for a known reason
1807
Tests that are known to fail should generally be using expectedFailure
1808
with an appropriate reverse assertion if a change could cause the test
1809
to start passing. Conversely if the test has no immediate prospect of
1810
succeeding then using skip is more suitable.
1812
When this method is called while an exception is being handled, that
1813
traceback will be used, otherwise a new exception will be thrown to
1814
provide one but won't be reported.
1816
self._add_reason(reason)
1818
exc_info = sys.exc_info()
1819
if exc_info != (None, None, None):
1820
self._report_traceback(exc_info)
1823
raise self.failureException(reason)
1824
except self.failureException:
1825
exc_info = sys.exc_info()
1826
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1827
raise testtools.testcase._ExpectedFailure(exc_info)
1831
def _suppress_log(self):
1832
"""Remove the log info from details."""
1833
self.discardDetail('log')
1572
"""This test has failed for some known reason."""
1573
raise KnownFailure(reason)
1835
1575
def _do_skip(self, result, reason):
1836
self._suppress_log()
1837
1576
addSkip = getattr(result, 'addSkip', None)
1838
1577
if not callable(addSkip):
1839
1578
result.addSuccess(result)
1864
1601
self._do_skip(result, reason)
1867
def _report_skip(self, result, err):
1868
"""Override the default _report_skip.
1870
We want to strip the 'log' detail. If we waint until _do_skip, it has
1871
already been formatted into the 'reason' string, and we can't pull it
1874
self._suppress_log()
1875
super(TestCase, self)._report_skip(self, result, err)
1878
def _report_expected_failure(self, result, err):
1881
See _report_skip for motivation.
1883
self._suppress_log()
1884
super(TestCase, self)._report_expected_failure(self, result, err)
1887
1604
def _do_unsupported_or_skip(self, result, e):
1888
1605
reason = e.args[0]
1889
self._suppress_log()
1890
1606
addNotSupported = getattr(result, 'addNotSupported', None)
1891
1607
if addNotSupported is not None:
1892
1608
result.addNotSupported(self, reason)
1918
1634
self._benchtime += time.time() - start
1920
1636
def log(self, *args):
1639
def _get_log(self, keep_log_file=False):
1640
"""Internal helper to get the log from bzrlib.trace for this test.
1642
Please use self.getDetails, or self.get_log to access this in test case
1645
:param keep_log_file: When True, if the log is still a file on disk
1646
leave it as a file on disk. When False, if the log is still a file
1647
on disk, the log file is deleted and the log preserved as
1649
:return: A string containing the log.
1651
if self._log_contents is not None:
1653
self._log_contents.decode('utf8')
1654
except UnicodeDecodeError:
1655
unicodestr = self._log_contents.decode('utf8', 'replace')
1656
self._log_contents = unicodestr.encode('utf8')
1657
return self._log_contents
1659
if bzrlib.trace._trace_file:
1660
# flush the log file, to get all content
1661
bzrlib.trace._trace_file.flush()
1662
if self._log_file_name is not None:
1663
logfile = open(self._log_file_name)
1665
log_contents = logfile.read()
1669
log_contents.decode('utf8')
1670
except UnicodeDecodeError:
1671
unicodestr = log_contents.decode('utf8', 'replace')
1672
log_contents = unicodestr.encode('utf8')
1673
if not keep_log_file:
1674
self._log_file.close()
1675
self._log_file = None
1676
# Permit multiple calls to get_log until we clean it up in
1678
self._log_contents = log_contents
1680
os.remove(self._log_file_name)
1682
if sys.platform == 'win32' and e.errno == errno.EACCES:
1683
sys.stderr.write(('Unable to delete log file '
1684
' %r\n' % self._log_file_name))
1687
self._log_file_name = None
1690
return "No log file content and no log file name."
1923
1692
def get_log(self):
1924
1693
"""Get a unicode string containing the log from bzrlib.trace.
2200
def _add_subprocess_log(self, log_file_path):
2201
if len(self._log_files) == 0:
2202
# Register an addCleanup func. We do this on the first call to
2203
# _add_subprocess_log rather than in TestCase.setUp so that this
2204
# addCleanup is registered after any cleanups for tempdirs that
2205
# subclasses might create, which will probably remove the log file
2207
self.addCleanup(self._subprocess_log_cleanup)
2208
# self._log_files is a set, so if a log file is reused we won't grab it
2210
self._log_files.add(log_file_path)
2212
def _subprocess_log_cleanup(self):
2213
for count, log_file_path in enumerate(self._log_files):
2214
# We use buffer_now=True to avoid holding the file open beyond
2215
# the life of this function, which might interfere with e.g.
2216
# cleaning tempdirs on Windows.
2217
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2218
#detail_content = content.content_from_file(
2219
# log_file_path, buffer_now=True)
2220
with open(log_file_path, 'rb') as log_file:
2221
log_file_bytes = log_file.read()
2222
detail_content = content.Content(content.ContentType("text",
2223
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2224
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2227
1960
def _popen(self, *args, **kwargs):
2228
1961
"""Place a call to Popen.
2230
1963
Allows tests to override this method to intercept the calls made to
2231
1964
Popen for introspection.
2233
return subprocess.Popen(*args, **kwargs)
1966
return Popen(*args, **kwargs)
2235
1968
def get_source_path(self):
2236
1969
"""Return the path of the directory containing bzrlib."""
2266
1999
if retcode is not None and retcode != process.returncode:
2267
2000
if process_args is None:
2268
2001
process_args = "(unknown args)"
2269
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2270
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2002
mutter('Output of bzr %s:\n%s', process_args, out)
2003
mutter('Error for bzr %s:\n%s', process_args, err)
2271
2004
self.fail('Command bzr %s failed with retcode %s != %s'
2272
2005
% (process_args, retcode, process.returncode))
2273
2006
return [out, err]
2275
def check_tree_shape(self, tree, shape):
2276
"""Compare a tree to a list of expected names.
2008
def check_inventory_shape(self, inv, shape):
2009
"""Compare an inventory to a list of expected names.
2278
2011
Fail if they are not precisely equal.
2281
2014
shape = list(shape) # copy
2282
for path, ie in tree.iter_entries_by_dir():
2015
for path, ie in inv.entries():
2283
2016
name = path.replace('\\', '/')
2284
2017
if ie.kind == 'directory':
2285
2018
name = name + '/'
2287
pass # ignore root entry
2289
2020
shape.remove(name)
2291
2022
extras.append(name)
2381
2112
class TestCaseWithMemoryTransport(TestCase):
2382
2113
"""Common test class for tests that do not need disk resources.
2384
Tests that need disk resources should derive from TestCaseInTempDir
2385
orTestCaseWithTransport.
2115
Tests that need disk resources should derive from TestCaseWithTransport.
2387
2117
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2389
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2119
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2390
2120
a directory which does not exist. This serves to help ensure test isolation
2391
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2392
must exist. However, TestCaseWithMemoryTransport does not offer local file
2393
defaults for the transport in tests, nor does it obey the command line
2121
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2122
must exist. However, TestCaseWithMemoryTransport does not offer local
2123
file defaults for the transport in tests, nor does it obey the command line
2394
2124
override, so tests that accidentally write to the common directory should
2397
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2398
``.bzr`` directory that stops us ascending higher into the filesystem.
2127
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2128
a .bzr directory that stops us ascending higher into the filesystem.
2401
2131
TEST_ROOT = None
2667
2390
test_home_dir = self.test_home_dir
2668
2391
if isinstance(test_home_dir, unicode):
2669
2392
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2670
self.overrideEnv('HOME', test_home_dir)
2671
self.overrideEnv('BZR_HOME', test_home_dir)
2393
os.environ['HOME'] = test_home_dir
2394
os.environ['BZR_HOME'] = test_home_dir
2673
2396
def setUp(self):
2674
2397
super(TestCaseWithMemoryTransport, self).setUp()
2675
# Ensure that ConnectedTransport doesn't leak sockets
2676
def get_transport_from_url_with_cleanup(*args, **kwargs):
2677
t = orig_get_transport_from_url(*args, **kwargs)
2678
if isinstance(t, _mod_transport.ConnectedTransport):
2679
self.addCleanup(t.disconnect)
2682
orig_get_transport_from_url = self.overrideAttr(
2683
_mod_transport, 'get_transport_from_url',
2684
get_transport_from_url_with_cleanup)
2685
2398
self._make_test_root()
2686
2399
self.addCleanup(os.chdir, os.getcwdu())
2687
2400
self.makeAndChdirToTestDir()
3453
3154
def partition_tests(suite, count):
3454
3155
"""Partition suite into count lists of tests."""
3455
# This just assigns tests in a round-robin fashion. On one hand this
3456
# splits up blocks of related tests that might run faster if they shared
3457
# resources, but on the other it avoids assigning blocks of slow tests to
3458
# just one partition. So the slowest partition shouldn't be much slower
3460
partitions = [list() for i in range(count)]
3461
tests = iter_suite_tests(suite)
3462
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3463
partition.append(test)
3467
def workaround_zealous_crypto_random():
3468
"""Crypto.Random want to help us being secure, but we don't care here.
3470
This workaround some test failure related to the sftp server. Once paramiko
3471
stop using the controversial API in Crypto.Random, we may get rid of it.
3474
from Crypto.Random import atfork
3157
tests = list(iter_suite_tests(suite))
3158
tests_per_process = int(math.ceil(float(len(tests)) / count))
3159
for block in range(count):
3160
low_test = block * tests_per_process
3161
high_test = low_test + tests_per_process
3162
process_tests = tests[low_test:high_test]
3163
result.append(process_tests)
3480
3167
def fork_for_tests(suite):
3590
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3274
class ForwardingResult(unittest.TestResult):
3276
def __init__(self, target):
3277
unittest.TestResult.__init__(self)
3278
self.result = target
3280
def startTest(self, test):
3281
self.result.startTest(test)
3283
def stopTest(self, test):
3284
self.result.stopTest(test)
3286
def startTestRun(self):
3287
self.result.startTestRun()
3289
def stopTestRun(self):
3290
self.result.stopTestRun()
3292
def addSkip(self, test, reason):
3293
self.result.addSkip(test, reason)
3295
def addSuccess(self, test):
3296
self.result.addSuccess(test)
3298
def addError(self, test, err):
3299
self.result.addError(test, err)
3301
def addFailure(self, test, err):
3302
self.result.addFailure(test, err)
3303
ForwardingResult = testtools.ExtendedToOriginalDecorator
3306
class ProfileResult(ForwardingResult):
3591
3307
"""Generate profiling data for all activity between start and success.
3593
3309
The profile data is appended to the test's _benchcalls attribute and can
3891
3596
'bzrlib.tests.per_repository',
3892
3597
'bzrlib.tests.per_repository_chk',
3893
3598
'bzrlib.tests.per_repository_reference',
3894
'bzrlib.tests.per_repository_vf',
3895
3599
'bzrlib.tests.per_uifactory',
3896
3600
'bzrlib.tests.per_versionedfile',
3897
3601
'bzrlib.tests.per_workingtree',
3898
3602
'bzrlib.tests.test__annotator',
3899
3603
'bzrlib.tests.test__bencode',
3900
'bzrlib.tests.test__btree_serializer',
3901
3604
'bzrlib.tests.test__chk_map',
3902
3605
'bzrlib.tests.test__dirstate_helpers',
3903
3606
'bzrlib.tests.test__groupcompress',
3925
3628
'bzrlib.tests.test_chunk_writer',
3926
3629
'bzrlib.tests.test_clean_tree',
3927
3630
'bzrlib.tests.test_cleanup',
3928
'bzrlib.tests.test_cmdline',
3929
3631
'bzrlib.tests.test_commands',
3930
3632
'bzrlib.tests.test_commit',
3931
3633
'bzrlib.tests.test_commit_merge',
3932
3634
'bzrlib.tests.test_config',
3933
3635
'bzrlib.tests.test_conflicts',
3934
'bzrlib.tests.test_controldir',
3935
3636
'bzrlib.tests.test_counted_lock',
3936
3637
'bzrlib.tests.test_crash',
3937
3638
'bzrlib.tests.test_decorators',
3938
3639
'bzrlib.tests.test_delta',
3939
3640
'bzrlib.tests.test_debug',
3641
'bzrlib.tests.test_deprecated_graph',
3940
3642
'bzrlib.tests.test_diff',
3941
3643
'bzrlib.tests.test_directory_service',
3942
3644
'bzrlib.tests.test_dirstate',
3944
3646
'bzrlib.tests.test_eol_filters',
3945
3647
'bzrlib.tests.test_errors',
3946
3648
'bzrlib.tests.test_export',
3947
'bzrlib.tests.test_export_pot',
3948
3649
'bzrlib.tests.test_extract',
3949
'bzrlib.tests.test_features',
3950
3650
'bzrlib.tests.test_fetch',
3951
'bzrlib.tests.test_fixtures',
3952
3651
'bzrlib.tests.test_fifo_cache',
3953
3652
'bzrlib.tests.test_filters',
3954
'bzrlib.tests.test_filter_tree',
3955
3653
'bzrlib.tests.test_ftp_transport',
3956
3654
'bzrlib.tests.test_foreign',
3957
3655
'bzrlib.tests.test_generate_docs',
3985
3681
'bzrlib.tests.test_lru_cache',
3986
3682
'bzrlib.tests.test_lsprof',
3987
3683
'bzrlib.tests.test_mail_client',
3988
'bzrlib.tests.test_matchers',
3989
3684
'bzrlib.tests.test_memorytree',
3990
3685
'bzrlib.tests.test_merge',
3991
3686
'bzrlib.tests.test_merge3',
3992
3687
'bzrlib.tests.test_merge_core',
3993
3688
'bzrlib.tests.test_merge_directive',
3994
'bzrlib.tests.test_mergetools',
3995
3689
'bzrlib.tests.test_missing',
3996
3690
'bzrlib.tests.test_msgeditor',
3997
3691
'bzrlib.tests.test_multiparent',
4199
3881
# Some tests mentioned in the list are not in the test suite. The
4200
3882
# list may be out of date, report to the tester.
4201
3883
for id in not_found:
4202
trace.warning('"%s" not found in the test suite', id)
3884
bzrlib.trace.warning('"%s" not found in the test suite', id)
4203
3885
for id in duplicates:
4204
trace.warning('"%s" is used as an id by several tests', id)
3886
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4209
def multiply_scenarios(*scenarios):
4210
"""Multiply two or more iterables of scenarios.
4212
It is safe to pass scenario generators or iterators.
4214
:returns: A list of compound scenarios: the cross-product of all
4215
scenarios, with the names concatenated and the parameters
4218
return reduce(_multiply_two_scenarios, map(list, scenarios))
4221
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3891
def multiply_scenarios(scenarios_left, scenarios_right):
4222
3892
"""Multiply two sets of scenarios.
4224
3894
:returns: the cartesian product of the two sets of scenarios, that is
4388
4045
if test_id != None:
4389
4046
ui.ui_factory.clear_term()
4390
4047
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4391
# Ugly, but the last thing we want here is fail, so bear with it.
4392
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4393
).encode('ascii', 'replace')
4394
4048
sys.stderr.write('Unable to remove testing dir %s\n%s'
4395
% (os.path.basename(dirname), printable_e))
4049
% (os.path.basename(dirname), e))
4052
class Feature(object):
4053
"""An operating system Feature."""
4056
self._available = None
4058
def available(self):
4059
"""Is the feature available?
4061
:return: True if the feature is available.
4063
if self._available is None:
4064
self._available = self._probe()
4065
return self._available
4068
"""Implement this method in concrete features.
4070
:return: True if the feature is available.
4072
raise NotImplementedError
4075
if getattr(self, 'feature_name', None):
4076
return self.feature_name()
4077
return self.__class__.__name__
4080
class _SymlinkFeature(Feature):
4083
return osutils.has_symlinks()
4085
def feature_name(self):
4088
SymlinkFeature = _SymlinkFeature()
4091
class _HardlinkFeature(Feature):
4094
return osutils.has_hardlinks()
4096
def feature_name(self):
4099
HardlinkFeature = _HardlinkFeature()
4102
class _OsFifoFeature(Feature):
4105
return getattr(os, 'mkfifo', None)
4107
def feature_name(self):
4108
return 'filesystem fifos'
4110
OsFifoFeature = _OsFifoFeature()
4113
class _UnicodeFilenameFeature(Feature):
4114
"""Does the filesystem support Unicode filenames?"""
4118
# Check for character combinations unlikely to be covered by any
4119
# single non-unicode encoding. We use the characters
4120
# - greek small letter alpha (U+03B1) and
4121
# - braille pattern dots-123456 (U+283F).
4122
os.stat(u'\u03b1\u283f')
4123
except UnicodeEncodeError:
4125
except (IOError, OSError):
4126
# The filesystem allows the Unicode filename but the file doesn't
4130
# The filesystem allows the Unicode filename and the file exists,
4134
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4137
class _CompatabilityThunkFeature(Feature):
4138
"""This feature is just a thunk to another feature.
4140
It issues a deprecation warning if it is accessed, to let you know that you
4141
should really use a different feature.
4144
def __init__(self, dep_version, module, name,
4145
replacement_name, replacement_module=None):
4146
super(_CompatabilityThunkFeature, self).__init__()
4147
self._module = module
4148
if replacement_module is None:
4149
replacement_module = module
4150
self._replacement_module = replacement_module
4152
self._replacement_name = replacement_name
4153
self._dep_version = dep_version
4154
self._feature = None
4157
if self._feature is None:
4158
depr_msg = self._dep_version % ('%s.%s'
4159
% (self._module, self._name))
4160
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4161
self._replacement_name)
4162
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4163
# Import the new feature and use it as a replacement for the
4165
mod = __import__(self._replacement_module, {}, {},
4166
[self._replacement_name])
4167
self._feature = getattr(mod, self._replacement_name)
4171
return self._feature._probe()
4174
class ModuleAvailableFeature(Feature):
4175
"""This is a feature than describes a module we want to be available.
4177
Declare the name of the module in __init__(), and then after probing, the
4178
module will be available as 'self.module'.
4180
:ivar module: The module if it is available, else None.
4183
def __init__(self, module_name):
4184
super(ModuleAvailableFeature, self).__init__()
4185
self.module_name = module_name
4189
self._module = __import__(self.module_name, {}, {}, [''])
4196
if self.available(): # Make sure the probe has been done
4200
def feature_name(self):
4201
return self.module_name
4204
# This is kept here for compatibility, it is recommended to use
4205
# 'bzrlib.tests.feature.paramiko' instead
4206
ParamikoFeature = _CompatabilityThunkFeature(
4207
deprecated_in((2,1,0)),
4208
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4398
4211
def probe_unicode_in_user_encoding():
4244
class _HTTPSServerFeature(Feature):
4245
"""Some tests want an https Server, check if one is available.
4247
Right now, the only way this is available is under python2.6 which provides
4258
def feature_name(self):
4259
return 'HTTPSServer'
4262
HTTPSServerFeature = _HTTPSServerFeature()
4265
class _UnicodeFilename(Feature):
4266
"""Does the filesystem support Unicode filenames?"""
4271
except UnicodeEncodeError:
4273
except (IOError, OSError):
4274
# The filesystem allows the Unicode filename but the file doesn't
4278
# The filesystem allows the Unicode filename and the file exists,
4282
UnicodeFilename = _UnicodeFilename()
4285
class _UTF8Filesystem(Feature):
4286
"""Is the filesystem UTF-8?"""
4289
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4293
UTF8Filesystem = _UTF8Filesystem()
4296
class _BreakinFeature(Feature):
4297
"""Does this platform support the breakin feature?"""
4300
from bzrlib import breakin
4301
if breakin.determine_signal() is None:
4303
if sys.platform == 'win32':
4304
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4305
# We trigger SIGBREAK via a Console api so we need ctypes to
4306
# access the function
4313
def feature_name(self):
4314
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4317
BreakinFeature = _BreakinFeature()
4320
class _CaseInsCasePresFilenameFeature(Feature):
4321
"""Is the file-system case insensitive, but case-preserving?"""
4324
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4326
# first check truly case-preserving for created files, then check
4327
# case insensitive when opening existing files.
4328
name = osutils.normpath(name)
4329
base, rel = osutils.split(name)
4330
found_rel = osutils.canonical_relpath(base, name)
4331
return (found_rel == rel
4332
and os.path.isfile(name.upper())
4333
and os.path.isfile(name.lower()))
4338
def feature_name(self):
4339
return "case-insensitive case-preserving filesystem"
4341
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4344
class _CaseInsensitiveFilesystemFeature(Feature):
4345
"""Check if underlying filesystem is case-insensitive but *not* case
4348
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4349
# more likely to be case preserving, so this case is rare.
4352
if CaseInsCasePresFilenameFeature.available():
4355
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4356
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4357
TestCaseWithMemoryTransport.TEST_ROOT = root
4359
root = TestCaseWithMemoryTransport.TEST_ROOT
4360
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4362
name_a = osutils.pathjoin(tdir, 'a')
4363
name_A = osutils.pathjoin(tdir, 'A')
4365
result = osutils.isdir(name_A)
4366
_rmtree_temp_dir(tdir)
4369
def feature_name(self):
4370
return 'case-insensitive filesystem'
4372
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4375
class _CaseSensitiveFilesystemFeature(Feature):
4378
if CaseInsCasePresFilenameFeature.available():
4380
elif CaseInsensitiveFilesystemFeature.available():
4385
def feature_name(self):
4386
return 'case-sensitive filesystem'
4388
# new coding style is for feature instances to be lowercase
4389
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4392
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4393
SubUnitFeature = _CompatabilityThunkFeature(
4394
deprecated_in((2,1,0)),
4395
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4431
4396
# Only define SubUnitBzrRunner if subunit is available.
4433
4398
from subunit import TestProtocolClient
4434
4399
from subunit.test_results import AutoTimingTestResultDecorator
4435
class SubUnitBzrProtocolClient(TestProtocolClient):
4437
def addSuccess(self, test, details=None):
4438
# The subunit client always includes the details in the subunit
4439
# stream, but we don't want to include it in ours.
4440
if details is not None and 'log' in details:
4442
return super(SubUnitBzrProtocolClient, self).addSuccess(
4445
4400
class SubUnitBzrRunner(TextTestRunner):
4446
4401
def run(self, test):
4447
4402
result = AutoTimingTestResultDecorator(
4448
SubUnitBzrProtocolClient(self.stream))
4403
TestProtocolClient(self.stream))
4449
4404
test.run(result)
4451
4406
except ImportError:
4455
@deprecated_function(deprecated_in((2, 5, 0)))
4456
def ModuleAvailableFeature(name):
4457
from bzrlib.tests import features
4458
return features.ModuleAvailableFeature(name)